module PWN::Plugins::DefectDojo
def self.authors
def self.authors st.pentest@0dayinc.com>
def self.engagement_create(opts = {})
def self.engagement_create(opts = {}) obj] bj[:api_version] ptions w/ optional params set to default values = true opts[:name] tion] = opts[:description] pe] ? (http_body[:engagement_type] = opts[:engagement_type]) : (http_body[:engagement_type] = 'CI/CD') tus].to_s.strip.chomp.scrub , 'On Hold', '' n Progress' http_body[:status] = 'In Progress') : (http_body[:status] = status) status not implemented for #engagement_create - use #engagement_update instead' ngagement status: #{opts[:status]}. Options for this method are 'In Progress' || 'On Hold'" e the resource_uri for the lead username ts[:lead_username].to_s.strip.chomp.scrub ser_list(dd_obj: dd_obj) 'v1' _object = user_list[:objects].select do |user| ] == lead_username = user_by_username_object.first[:resource_uri] 'v2' _object = user_list[:results].select do |user| ] == lead_username er return 1 result so we should be good here = user_by_username_object.first[:id] e the resource_uri for the product name s[:product_name].to_s.strip.chomp.scrub f.product_list(dd_obj: dd_obj) 'v1' object = product_list[:objects].select do |prod| product_name er return 1 result so we should be good here ct] = product_by_name_object.first[:resource_uri] 'v2' object = product_list[:results].select do |prod| product_name er return 1 result so we should be good here ct] = product_by_name_object.first[:id] rategy] = opts[:test_strategy] e the resource_uri orchestration, build_server, and scm_server ne = opts[:orchestration_engine].to_s.strip.chomp.scrub ration_engine] = tool_configuration_resource_uri_by_name( : orchestration_engine s[:build_server].to_s.strip.chomp.scrub erver] = tool_configuration_resource_uri_by_name( : build_server :scm_server].to_s.strip.chomp.scrub code_management_server] = tool_configuration_resource_uri_by_name( : scm_server e (http_body[:api_test] = true) : (http_body[:api_test] = false) e (http_body[:pen_test] = true) : (http_body[:pen_test] = false) e ] ? (http_body[:threat_model] = true) : (http_body[:threat_model] = false) e ? (http_body[:check_list] = true) : (http_body[:check_list] = false) .now.strftime('%Y-%m-%d') ted] ? (http_body[:first_contacted] = opts[:first_contacted]) : (http_body[:first_contacted] = Time.now.strftime('%Y-%m-%d')) .now.strftime('%Y-%m-%d') ] ? (http_body[:target_start] = opts[:target_start]) : (http_body[:target_start] = Time.now.strftime('%Y-%m-%d')) .now.strftime('%Y-%m-%d') ? (http_body[:target_end] = opts[:target_end]) : (http_body[:target_end] = Time.now.strftime('%Y-%m-%d')) e sting] = false gements/', st, body => e
def self.engagement_list(opts = {})
def self.engagement_list(opts = {}) obj] call = "engagements/#{opts[:id].to_i}") : (rest_call = 'engagements') ll( call taining the post-authenticated DefectDojo REST API token e, symbolize_names: true) => e
def self.finding_list(opts = {})
def self.finding_list(opts = {}) obj] call = "findings/#{opts[:id].to_i}") : (rest_call = 'findings') ll( call taining the post-authenticated DefectDojo REST API token e, symbolize_names: true) => e
def self.help
def self.help .login( - url of DefectDojo Server', required - api version to use v1 || v2', uired - username to AuthN w/ api v1)', onal - defect dojo api key (will prompt if nil)', al - proxy all traffic through MITM proxy (defaults to nil)' {self}.product_list( red dd_obj returned from #login_v1 method', - retrieve single product by id, otherwise return all' = #{self}.engagement_list( red dd_obj returned from #login_v1 method', - retrieve single engagement by id, otherwise return all' e_response = #{self}.engagement_create( red - dd_obj returned from #login_v1 method', d - name of the engagement', optional - description of engagement', e: 'optional - type of engagement Interactive||CI/CD (defaults to CI/CD)', nal - status of the engagement In Progress || On Hold (defaults to In Progress)', 'required - username of lead to tie to engagement', 'required - product name in which to create engagement', 'required - URL of test strategy documentation (e.g. OWASP ASVS URL)', engine: 'optional - name of orchestration engine tied to CI/CD engagement', 'optional - name of build server tied to CI/CD engagement', ptional - name of SCM server tied to CI/CD engagement', ional - boolean to set an engagement as an api assessment (defaults to false)', ional - boolean to set an engagement as a manual penetration test (defaults to false)', 'optional - boolean to set an engagement as a threat model (defaults to false)', ptional - boolean to set an engagement as a checkbox assessment (defaults to false)', d: 'optional - date of engagement request e.g. 2018-06-18 (Defaults to current day)', 'optional - date to start enagement e.g. 2018-06-19 (Defaults to current day)', ptional - date of engagement completion e.g. 2018-06-20 (Defaults to current day)' lf}.test_list( red dd_obj returned from #login_v1 method', - retrieve single test by id, otherwise return all' nse = #{self}.importscan( red - dd_obj returned from #login_v1 method', e: 'required - name of engagement to associate w/ scan', quired - type of scan importing (see <DEFECTDOJO_URL>/admin/dojo/test_type/ for listing)', d - path of scan results file', 'required - username of lead to tie to scan', l - comma-delimited list of tag names to tie to scan', ty: 'optional - minimum finding severity Info||Low||Medium||High||Critical (Defaults to Info)', tional - date in which scan was kicked off (defaults to now)', ional - flag finding as verified by a tester (defaults to false)', _groups: 'optional - flag to create finding groups (defaults to false)', ings_product_scope: 'optional - flag to close old findings from engagement (defaults to false)', ings: 'optional - flag to close old findings, regardless of engagement (defaults to false)', 'optional - flag to push findings to JIRA (defaults to false)' ponse = #{self}.reimportscan( red - dd_obj returned from #login_v1 method', e: 'required - name of engagement to associate w/ scan', quired - type of scan importing (see <DEFECTDOJO_URL>/admin/dojo/test_type/ for listing)', d - path of scan results file', l - comma-delimited list of tag names to tie to scan for unique test resource_uri retrival', uri: 'optional - alternative to tag names to know which test to reimport', ty: 'optional - minimum finding severity Info||Low||Medium||High||Critical (Defaults to Info)', tional - date in which scan was kicked off (defaults to now)', ional - flag finding as verified by a tester (defaults to false)', _groups: 'optional - flag to create finding groups (defaults to false)', ings_product_scope: 'optional - flag to close old findings from engagement (defaults to false)', ings: 'optional - flag to close old findings, regardless of engagement (defaults to false)', 'optional - flag to push findings to JIRA (defaults to false)' {self}.finding_list( red dd_obj returned from #login_v1 method', - retrieve single finding by id, otherwise return all' lf}.user_list( red dd_obj returned from #login_v1 method', - retrieve single user by id, otherwise return all' on_list = #{self}.tool_configuration_list( red dd_obj returned from #login_v1 method', - retrieve single test by id, otherwise return all' red dd_obj returned from #login_v1 or #login_v2 method'
def self.importscan(opts = {})
def self.importscan(opts = {}) obj] bj[:api_version] ptions w/ optional params set to default values = true e the resource_uri for the engagement name opts[:engagement_name].to_s.strip.chomp.scrub self.engagement_list(dd_obj: dd_obj) 'v1' me_object = engagement_list[:objects].select do |engagement| me] == engagement_name er return 1 result so we should be good here ement] = engagement_by_name_object.first[:resource_uri] 'v2' me_object = engagement_list[:results].select do |engagement| me] == engagement_name er return 1 result so we should be good here ement] = engagement_by_name_object.first[:id] pe] = opts[:scan_type].to_s.strip.chomp.scrub oad file to remote host rt] = true File.new(opts[:file].to_s.strip.chomp.scrub, 'rb') if File.exist?(opts[:file].to_s.strip.chomp.scrub) e the resource_uri for the lead username ts[:lead_username].to_s.strip.chomp.scrub ser_list(dd_obj: dd_obj) 'v1' _object = user_list[:objects].select do |user| ] == lead_username er return 1 result so we should be good here = user_by_username_object.first[:resource_uri] 'v2' _object = user_list[:results].select do |user| ] == lead_username er return 1 result so we should be good here = user_by_username_object.first[:id] opts[:tags].to_s.strip.chomp.scrub.delete("\s").split(',') if opts[:tags] opts[:minimum_severity].to_s.strip.chomp.scrub.downcase.capitalize ity Low', 'Medium', 'High', 'Critical' nfo' == '' ? (http_body[:minimum_severity] = 'Info') : (http_body[:minimum_severity] = minimum_severity) inimum severity: #{opts[:minimum_severity]}. Options are Info||Low||Medium||High||Critical'" .now.strftime('%Y-%m-%d') (http_body[:scan_date] = opts[:scan_date]) : (http_body[:scan_date] = Time.now.strftime('%Y-%m-%d')) e (http_body[:verified] = true) : (http_body[:verified] = false) ng_groups] ? (http_body[:create_finding_groups_for_all_findings] = true) : (http_body[:create_finding_groups_for_all_findings] = false) ndings_product_scope] ? (http_body[:close_old_findings_product_scope] = true) : (http_body[:close_old_findings_product_scope] = false) ndings] = true if opts[:close_old_findings_product_scope] ndings] ? (http_body[:close_old_findings] = true) : (http_body[:close_old_findings] = false) ] ? (http_body[:push_to_jira] = true) : (http_body[:push_to_jira] = false) -scan/' scan/' if api_version == 'v1' ath, st, body => e
def self.login(opts = {})
def self.login(opts = {}) ? (api_version = opts[:api_version]) : (api_version = 'v2') sername].to_s.scrub i_key].to_s.scrub gins::AuthenticationHelper.mask_password(prompt: 'API Key') if opts[:api_key].nil? y] er] = "Token #{api_key}" er] = "ApiKey #{username}:#{api_key}" if api_version == 'v1' roxy n] = api_version n] = 'v1' if api_version == 'v1' => e
def self.logout(opts = {})
def self.logout(opts = {}) obj] ging out...') Session if Possible via API Call => e
def self.product_list(opts = {})
def self.product_list(opts = {}) obj] call = "products/#{opts[:id].to_i}") : (rest_call = 'products') ll( call taining the post-authenticated DefectDojo REST API token e, symbolize_names: true) => e
def self.reimportscan(opts = {})
def self.reimportscan(opts = {}) obj] bj[:api_version] ptions w/ optional params set to default values = true e the resource_uri for the engagement name opts[:engagement_name].to_s.strip.chomp.scrub self.engagement_list(dd_obj: dd_obj) 'v1' me_object = engagement_list[:objects].select do |engagement| me] == engagement_name er return 1 result so we should be good here rce_uri = engagement_by_name_object.first[:resource_uri] 'v2' me_object = engagement_list[:results].select do |engagement| me] == engagement_name er return 1 result so we should be good here rce_uri = engagement_by_name_object.first[:id] n_type for test resource_uri since the scan_type should never change pe] = opts[:scan_type].to_s.strip.chomp.scrub oad file to remote host rt] = true File.new(opts[:file].to_s.strip.chomp.scrub, 'rb') if File.exist?(opts[:file].to_s.strip.chomp.scrub) e the resource_uri for the test we're looking to remimport est_list(dd_obj: dd_obj) 'v1' ent_object = test_list[:objects].select do |test| nt] == engagement_resource_uri 'v2' ent_object = test_list[:results].select do |test| nt] == engagement_resource_uri opts[:tags].to_s.strip.chomp.scrub.delete("\s").split(',') if opts[:tags] opts[:test_resource_uri] if opts[:test_resource_uri] opts[:minimum_severity].to_s.strip.chomp.scrub.downcase.capitalize ity Low', 'Medium', 'High', 'Critical' nfo' == '' ? (http_body[:minimum_severity] = 'Info') : (http_body[:minimum_severity] = minimum_severity) inimum severity: #{opts[:minimum_severity]}. Options are Info||Low||Medium||High||Critical'" .now.strftime('%Y-%m-%d') (http_body[:scan_date] = opts[:scan_date]) : (http_body[:scan_date] = Time.now.strftime('%Y/%m/%d')) e (http_body[:verified] = true) : (http_body[:verified] = false) ng_groups] ? (http_body[:create_finding_groups_for_all_findings] = true) : (http_body[:create_finding_groups_for_all_findings] = false) ndings_product_scope] ? (http_body[:close_old_findings_product_scope] = true) : (http_body[:close_old_findings_product_scope] = false) ndings] = true if opts[:close_old_findings_product_scope] ndings] ? (http_body[:close_old_findings] = true) : (http_body[:close_old_findings] = false) ] ? (http_body[:push_to_jira] = true) : (http_body[:push_to_jira] = false) rt-scan/' rtscan/' if api_version == 'v1' ath, st, body => e
def self.rest_call(opts = {})
def self.rest_call(opts = {}) are huge and require long timeouts...defaulting to 9 mins. 40 bj] est_call].to_s.scrub ? (http_method = opts[:http_method].to_s.scrub.to_sym) : (http_method = :get) ms] ttp_body] lication/json; charset=UTF-8' j[:api_version] #{url}/api/#{api_version}".to_s.scrub Plugins::TransparentBrowser.open(browser_type: :rest) ::Plugins::TransparentBrowser.open( rest, proxy] er_obj[:browser] _client::Request equest.execute( _api_uri}/#{rest_call}", content_type, : dd_obj[:authz_header], s se, t_timeout, equest_timeout (:multipart) ame="tags[]" to name="tags" to allow for multi-tag submission could just used payload = http_body t_client::Payload::Multipart.new(http_body) multipart.headers['Content-Type'] ged = multipart.to_s.gsub( osition: form-data; name="tags[]"', osition: form-data; name="tags"' ent::Payload::Base.new(multipart_massaged) to_s body.to_json equest.execute( _api_uri}/#{rest_call}", content_type, : dd_obj[:authz_header] d, se, t_timeout, equest_timeout ror("Unsupported HTTP Method #{http_method} for #{self} Plugin") ceptionWithResponse => e ime('%Y-%m-%d %H:%M:%S.%N %z') f}" d_api_uri}/#{rest_call}" ams.inspect}" Y: #{http_body.inspect}" if http_body ponse}\n\n\n" SystemExit, Interrupt => e obj) unless dd_obj.nil?
def self.test_list(opts = {})
def self.test_list(opts = {}) obj] call = "tests/#{opts[:id].to_i}") : (rest_call = 'tests') ll( call taining the post-authenticated DefectDojo REST API token e, symbolize_names: true) => e
def self.tool_configuration_list(opts = {})
def self.tool_configuration_list(opts = {}) obj] call = "tool_configurations/#{opts[:id].to_i}") : (rest_call = 'tool_configurations') ll( call taining the post-authenticated DefectDojo REST API token e, symbolize_names: true) => e
def self.tool_configuration_resource_uri_by_name(opts = {})
def self.tool_configuration_resource_uri_by_name(opts = {}) bj] j[:api_version] opts[:tool_config_name].to_s.scrub list = self.tool_configuration_list(dd_obj: dd_obj) v1' n_by_name_object = tool_configuration_list[:objects].select do |tool_configuration| ion[:name] == tool_config_name v2' n_by_name_object = tool_configuration_list[:results].select do |tool_configuration| ion[:name] == tool_config_name by_name_object.first[:resource_uri] if api_version == 'v1' by_name_object.first[:id] if api_version == 'v2' SystemExit, Interrupt => e obj) unless dd_obj.nil?
def self.user_list(opts = {})
def self.user_list(opts = {}) obj] call = "users/#{opts[:id].to_i}") : (rest_call = 'users') ll( call taining the post-authenticated DefectDojo REST API token e, symbolize_names: true) => e