module PWN::Plugins::DefectDojo

def self.authors

def self.authors
st.pentest@0dayinc.com>

def self.engagement_create(opts = {})

def self.engagement_create(opts = {})
obj]
bj[:api_version]
ptions w/ optional params set to default values

 = true
 opts[:name]
tion] = opts[:description]
pe] ? (http_body[:engagement_type] = opts[:engagement_type]) : (http_body[:engagement_type] = 'CI/CD')
tus].to_s.strip.chomp.scrub
, 'On Hold', ''
n Progress'
http_body[:status] = 'In Progress') : (http_body[:status] = status)
 status not implemented for #engagement_create - use #engagement_update instead'
ngagement status: #{opts[:status]}.  Options for this method are 'In Progress' || 'On Hold'"
e the resource_uri for the lead username
ts[:lead_username].to_s.strip.chomp.scrub
ser_list(dd_obj: dd_obj)
'v1'
_object = user_list[:objects].select do |user|
] == lead_username
 = user_by_username_object.first[:resource_uri]
'v2'
_object = user_list[:results].select do |user|
] == lead_username
er return 1 result so we should be good here
 = user_by_username_object.first[:id]
e the resource_uri for the product name
s[:product_name].to_s.strip.chomp.scrub
f.product_list(dd_obj: dd_obj)
'v1'
object = product_list[:objects].select do |prod|
 product_name
er return 1 result so we should be good here
ct] = product_by_name_object.first[:resource_uri]
'v2'
object = product_list[:results].select do |prod|
 product_name
er return 1 result so we should be good here
ct] = product_by_name_object.first[:id]
rategy] = opts[:test_strategy]
e the resource_uri orchestration, build_server, and scm_server
ne = opts[:orchestration_engine].to_s.strip.chomp.scrub
ration_engine] = tool_configuration_resource_uri_by_name(
: orchestration_engine
s[:build_server].to_s.strip.chomp.scrub
erver] = tool_configuration_resource_uri_by_name(
: build_server
:scm_server].to_s.strip.chomp.scrub
code_management_server] = tool_configuration_resource_uri_by_name(
: scm_server
e
(http_body[:api_test] = true) : (http_body[:api_test] = false)
e
(http_body[:pen_test] = true) : (http_body[:pen_test] = false)
e
] ? (http_body[:threat_model] = true) : (http_body[:threat_model] = false)
e
? (http_body[:check_list] = true) : (http_body[:check_list] = false)
.now.strftime('%Y-%m-%d')
ted] ? (http_body[:first_contacted] = opts[:first_contacted]) : (http_body[:first_contacted] = Time.now.strftime('%Y-%m-%d'))
.now.strftime('%Y-%m-%d')
] ? (http_body[:target_start] = opts[:target_start]) : (http_body[:target_start] = Time.now.strftime('%Y-%m-%d'))
.now.strftime('%Y-%m-%d')
? (http_body[:target_end] = opts[:target_end]) : (http_body[:target_end] = Time.now.strftime('%Y-%m-%d'))
e
sting] = false
gements/',
st,
body
 => e

def self.engagement_list(opts = {})

def self.engagement_list(opts = {})
obj]
call = "engagements/#{opts[:id].to_i}") : (rest_call = 'engagements')
ll(
call
taining the post-authenticated DefectDojo REST API token
e, symbolize_names: true)
 => e

def self.finding_list(opts = {})

def self.finding_list(opts = {})
obj]
call = "findings/#{opts[:id].to_i}") : (rest_call = 'findings')
ll(
call
taining the post-authenticated DefectDojo REST API token
e, symbolize_names: true)
 => e

def self.help

def self.help
.login(
 - url of DefectDojo Server',
required - api version to use v1 || v2',
uired - username to AuthN w/ api v1)',
onal - defect dojo api key (will prompt if nil)',
al - proxy all traffic through MITM proxy (defaults to nil)'
{self}.product_list(
red dd_obj returned from #login_v1 method',
- retrieve single product by id, otherwise return all'
= #{self}.engagement_list(
red dd_obj returned from #login_v1 method',
- retrieve single engagement by id, otherwise return all'
e_response = #{self}.engagement_create(
red - dd_obj returned from #login_v1 method',
d - name of the engagement',
optional - description of engagement',
e: 'optional - type of engagement Interactive||CI/CD (defaults to CI/CD)',
nal - status of the engagement In Progress || On Hold (defaults to In Progress)',
 'required - username of lead to tie to engagement',
'required - product name in which to create engagement',
 'required - URL of test strategy documentation (e.g. OWASP ASVS URL)',
engine: 'optional - name of orchestration engine tied to CI/CD engagement',
'optional - name of build server tied to CI/CD engagement',
ptional - name of SCM server tied to CI/CD engagement',
ional - boolean to set an engagement as an api assessment (defaults to false)',
ional - boolean to set an engagement as a manual penetration test (defaults to false)',
'optional - boolean to set an engagement as a threat model (defaults to false)',
ptional - boolean to set an engagement as a checkbox assessment (defaults to false)',
d: 'optional - date of engagement request e.g. 2018-06-18 (Defaults to current day)',
'optional - date to start enagement e.g. 2018-06-19 (Defaults to current day)',
ptional - date of engagement completion e.g. 2018-06-20 (Defaults to current day)'
lf}.test_list(
red dd_obj returned from #login_v1 method',
- retrieve single test by id, otherwise return all'
nse = #{self}.importscan(
red - dd_obj returned from #login_v1 method',
e: 'required - name of engagement to associate w/ scan',
quired - type of scan importing (see <DEFECTDOJO_URL>/admin/dojo/test_type/ for listing)',
d - path of scan results file',
 'required - username of lead to tie to scan',
l - comma-delimited list of tag names to tie to scan',
ty: 'optional - minimum finding severity Info||Low||Medium||High||Critical (Defaults to Info)',
tional - date in which scan was kicked off (defaults to now)',
ional - flag finding as verified by a tester (defaults to false)',
_groups: 'optional - flag to create finding groups (defaults to false)',
ings_product_scope: 'optional - flag to close old findings from engagement (defaults to false)',
ings: 'optional - flag to close old findings, regardless of engagement (defaults to false)',
'optional - flag to push findings to JIRA (defaults to false)'
ponse = #{self}.reimportscan(
red - dd_obj returned from #login_v1 method',
e: 'required - name of engagement to associate w/ scan',
quired - type of scan importing (see <DEFECTDOJO_URL>/admin/dojo/test_type/ for listing)',
d - path of scan results file',
l - comma-delimited list of tag names to tie to scan for unique test resource_uri retrival',
uri: 'optional - alternative to tag names to know which test to reimport',
ty: 'optional - minimum finding severity Info||Low||Medium||High||Critical (Defaults to Info)',
tional - date in which scan was kicked off (defaults to now)',
ional - flag finding as verified by a tester (defaults to false)',
_groups: 'optional - flag to create finding groups (defaults to false)',
ings_product_scope: 'optional - flag to close old findings from engagement (defaults to false)',
ings: 'optional - flag to close old findings, regardless of engagement (defaults to false)',
'optional - flag to push findings to JIRA (defaults to false)'
{self}.finding_list(
red dd_obj returned from #login_v1 method',
- retrieve single finding by id, otherwise return all'
lf}.user_list(
red dd_obj returned from #login_v1 method',
- retrieve single user by id, otherwise return all'
on_list = #{self}.tool_configuration_list(
red dd_obj returned from #login_v1 method',
- retrieve single test by id, otherwise return all'
red dd_obj returned from #login_v1 or #login_v2 method'

def self.importscan(opts = {})

def self.importscan(opts = {})
obj]
bj[:api_version]
ptions w/ optional params set to default values

 = true
e the resource_uri for the engagement name
opts[:engagement_name].to_s.strip.chomp.scrub
self.engagement_list(dd_obj: dd_obj)
'v1'
me_object = engagement_list[:objects].select do |engagement|
me] == engagement_name
er return 1 result so we should be good here
ement] = engagement_by_name_object.first[:resource_uri]
'v2'
me_object = engagement_list[:results].select do |engagement|
me] == engagement_name
er return 1 result so we should be good here
ement] = engagement_by_name_object.first[:id]
pe] = opts[:scan_type].to_s.strip.chomp.scrub
oad file to remote host
rt] = true
 File.new(opts[:file].to_s.strip.chomp.scrub, 'rb') if File.exist?(opts[:file].to_s.strip.chomp.scrub)
e the resource_uri for the lead username
ts[:lead_username].to_s.strip.chomp.scrub
ser_list(dd_obj: dd_obj)
'v1'
_object = user_list[:objects].select do |user|
] == lead_username
er return 1 result so we should be good here
 = user_by_username_object.first[:resource_uri]
'v2'
_object = user_list[:results].select do |user|
] == lead_username
er return 1 result so we should be good here
 = user_by_username_object.first[:id]
 opts[:tags].to_s.strip.chomp.scrub.delete("\s").split(',') if opts[:tags]
 opts[:minimum_severity].to_s.strip.chomp.scrub.downcase.capitalize
ity
Low', 'Medium', 'High', 'Critical'
nfo'
 == '' ? (http_body[:minimum_severity] = 'Info') : (http_body[:minimum_severity] = minimum_severity)
inimum severity: #{opts[:minimum_severity]}.  Options are Info||Low||Medium||High||Critical'"
.now.strftime('%Y-%m-%d')
 (http_body[:scan_date] = opts[:scan_date]) : (http_body[:scan_date] = Time.now.strftime('%Y-%m-%d'))
e
(http_body[:verified] = true) : (http_body[:verified] = false)
ng_groups] ? (http_body[:create_finding_groups_for_all_findings] = true) : (http_body[:create_finding_groups_for_all_findings] = false)
ndings_product_scope] ? (http_body[:close_old_findings_product_scope] = true) : (http_body[:close_old_findings_product_scope] = false)
ndings] = true if opts[:close_old_findings_product_scope]
ndings] ? (http_body[:close_old_findings] = true) : (http_body[:close_old_findings] = false)
] ? (http_body[:push_to_jira] = true) : (http_body[:push_to_jira] = false)
-scan/'
scan/' if api_version == 'v1'
ath,
st,
body
 => e

def self.login(opts = {})

def self.login(opts = {})
 ? (api_version = opts[:api_version]) : (api_version = 'v2')
sername].to_s.scrub
i_key].to_s.scrub
gins::AuthenticationHelper.mask_password(prompt: 'API Key') if opts[:api_key].nil?
y]

er] = "Token #{api_key}"
er] = "ApiKey #{username}:#{api_key}" if api_version == 'v1'
roxy
n] = api_version
n] = 'v1' if api_version == 'v1'
 => e

def self.logout(opts = {})

def self.logout(opts = {})
obj]
ging out...')
Session if Possible via API Call
 => e

def self.product_list(opts = {})

def self.product_list(opts = {})
obj]
call = "products/#{opts[:id].to_i}") : (rest_call = 'products')
ll(
call
taining the post-authenticated DefectDojo REST API token
e, symbolize_names: true)
 => e

def self.reimportscan(opts = {})

def self.reimportscan(opts = {})
obj]
bj[:api_version]
ptions w/ optional params set to default values

 = true
e the resource_uri for the engagement name
opts[:engagement_name].to_s.strip.chomp.scrub
self.engagement_list(dd_obj: dd_obj)
'v1'
me_object = engagement_list[:objects].select do |engagement|
me] == engagement_name
er return 1 result so we should be good here
rce_uri = engagement_by_name_object.first[:resource_uri]
'v2'
me_object = engagement_list[:results].select do |engagement|
me] == engagement_name
er return 1 result so we should be good here
rce_uri = engagement_by_name_object.first[:id]
n_type for test resource_uri since the scan_type should never change
pe] = opts[:scan_type].to_s.strip.chomp.scrub
oad file to remote host
rt] = true
 File.new(opts[:file].to_s.strip.chomp.scrub, 'rb') if File.exist?(opts[:file].to_s.strip.chomp.scrub)
e the resource_uri for the test we're looking to remimport
est_list(dd_obj: dd_obj)
'v1'
ent_object = test_list[:objects].select do |test|
nt] == engagement_resource_uri
'v2'
ent_object = test_list[:results].select do |test|
nt] == engagement_resource_uri
 opts[:tags].to_s.strip.chomp.scrub.delete("\s").split(',') if opts[:tags]
 opts[:test_resource_uri] if opts[:test_resource_uri]
 opts[:minimum_severity].to_s.strip.chomp.scrub.downcase.capitalize
ity
Low', 'Medium', 'High', 'Critical'
nfo'
 == '' ? (http_body[:minimum_severity] = 'Info') : (http_body[:minimum_severity] = minimum_severity)
inimum severity: #{opts[:minimum_severity]}.  Options are Info||Low||Medium||High||Critical'"
.now.strftime('%Y-%m-%d')
 (http_body[:scan_date] = opts[:scan_date]) : (http_body[:scan_date] = Time.now.strftime('%Y/%m/%d'))
e
(http_body[:verified] = true) : (http_body[:verified] = false)
ng_groups] ? (http_body[:create_finding_groups_for_all_findings] = true) : (http_body[:create_finding_groups_for_all_findings] = false)
ndings_product_scope] ? (http_body[:close_old_findings_product_scope] = true) : (http_body[:close_old_findings_product_scope] = false)
ndings] = true if opts[:close_old_findings_product_scope]
ndings] ? (http_body[:close_old_findings] = true) : (http_body[:close_old_findings] = false)
] ? (http_body[:push_to_jira] = true) : (http_body[:push_to_jira] = false)
rt-scan/'
rtscan/' if api_version == 'v1'
ath,
st,
body
 => e

def self.rest_call(opts = {})

def self.rest_call(opts = {})
 are huge and require long timeouts...defaulting to 9 mins.
40
bj]
est_call].to_s.scrub
? (http_method = opts[:http_method].to_s.scrub.to_sym) : (http_method = :get)
ms]
ttp_body]
lication/json; charset=UTF-8'
j[:api_version]
#{url}/api/#{api_version}".to_s.scrub
Plugins::TransparentBrowser.open(browser_type: :rest)
::Plugins::TransparentBrowser.open(
rest,
proxy]
er_obj[:browser]
_client::Request
equest.execute(
_api_uri}/#{rest_call}",
 content_type,
: dd_obj[:authz_header],
s
se,
t_timeout,
equest_timeout
(:multipart)
ame="tags[]" to name="tags" to allow for multi-tag submission
could just used payload = http_body
t_client::Payload::Multipart.new(http_body)
multipart.headers['Content-Type']
ged = multipart.to_s.gsub(
osition: form-data; name="tags[]"',
osition: form-data; name="tags"'
ent::Payload::Base.new(multipart_massaged)
to_s
body.to_json
equest.execute(
_api_uri}/#{rest_call}",
 content_type,
: dd_obj[:authz_header]
d,
se,
t_timeout,
equest_timeout
ror("Unsupported HTTP Method #{http_method} for #{self} Plugin")
ceptionWithResponse => e
ime('%Y-%m-%d %H:%M:%S.%N %z')
f}"
d_api_uri}/#{rest_call}"
ams.inspect}"
Y: #{http_body.inspect}" if http_body
ponse}\n\n\n"
 SystemExit, Interrupt => e
obj) unless dd_obj.nil?

def self.test_list(opts = {})

def self.test_list(opts = {})
obj]
call = "tests/#{opts[:id].to_i}") : (rest_call = 'tests')
ll(
call
taining the post-authenticated DefectDojo REST API token
e, symbolize_names: true)
 => e

def self.tool_configuration_list(opts = {})

def self.tool_configuration_list(opts = {})
obj]
call = "tool_configurations/#{opts[:id].to_i}") : (rest_call = 'tool_configurations')
ll(
call
taining the post-authenticated DefectDojo REST API token
e, symbolize_names: true)
 => e

def self.tool_configuration_resource_uri_by_name(opts = {})

def self.tool_configuration_resource_uri_by_name(opts = {})
bj]
j[:api_version]
opts[:tool_config_name].to_s.scrub
list = self.tool_configuration_list(dd_obj: dd_obj)
v1'
n_by_name_object = tool_configuration_list[:objects].select do |tool_configuration|
ion[:name] == tool_config_name
v2'
n_by_name_object = tool_configuration_list[:results].select do |tool_configuration|
ion[:name] == tool_config_name
by_name_object.first[:resource_uri] if api_version == 'v1'
by_name_object.first[:id] if api_version == 'v2'
 SystemExit, Interrupt => e
obj) unless dd_obj.nil?

def self.user_list(opts = {})

def self.user_list(opts = {})
obj]
call = "users/#{opts[:id].to_i}") : (rest_call = 'users')
ll(
call
taining the post-authenticated DefectDojo REST API token
e, symbolize_names: true)
 => e