# frozen_string_literal: true
# Until jenkins_api_client is Updated
require 'jenkins_api_client2'
module PWN
module Plugins
# This plugin is used to interact w/ the Jenkins API and can be
# used to carry out tasks when certain events occur w/in Jenkins.
module Jenkins
@@logger = PWN::Plugins::PWNLogger.create
# Supported Method Parameters::
# PWN::Plugins::Jenkins.connect(
# ip: 'required host/ip of Jenkins Server',
# port: 'optional tcp port (defaults to 8080),
# username: 'optional username (functionality will be limited if ommitted)',
# api_key: 'optional api_key (functionality will be limited if ommitted)'
# identity_file: 'optional ssh private key path to AuthN w/ Jenkins PREFERRED over username/api_key',
# ssl: 'optional connect over TLS (defaults to true),
# proxy: 'optional debug proxy rest api requests to jenkins (e.g. "http://127.0.0.1:8080")''
# )
public_class_method def self.connect(opts = {})
ip = opts[:ip]
port = opts[:port].to_i ||= 8888
username = opts[:username].to_s.scrub
base_jenkins_api_uri = "https://#{ip}/ase/services".to_s.scrub
api_key = opts[:api_key].to_s.scrub
identity_file = opts[:identity_file].to_s.scrub
ssl_bool = true if opts[:ssl] ||= false
if opts[:proxy]
proxy = URI(opts[:proxy])
proxy_protocol = proxy.scheme
proxy_ip = proxy.host
proxy_port = proxy.port
end
@@logger.info("Logging into Jenkins Server: #{ip}")
if username == '' && api_key == ''
if identity_file == ''
jenkins_obj = JenkinsApi::Client.new(
server_ip: ip,
server_port: port,
follow_redirects: true,
ssl: ssl_bool,
proxy_protocol: proxy_protocol,
proxy_ip: proxy_ip,
proxy_port: proxy_port
)
else
jenkins_obj = JenkinsApi::Client.new(
server_ip: ip,
server_port: port,
identity_file: identity_file,
follow_redirects: true,
ssl: ssl_bool,
proxy_protocol: proxy_protocol,
proxy_ip: proxy_ip,
proxy_port: proxy_port
)
end
else
api_key = PWN::Plugins::AuthenticationHelper.mask_password if api_key == ''
jenkins_obj = JenkinsApi::Client.new(
server_ip: ip,
server_port: port,
username: username,
password: api_key,
follow_redirects: true,
ssl: ssl_bool,
proxy_protocol: proxy_protocol,
proxy_ip: proxy_ip,
proxy_port: proxy_port
)
end
jenkins_obj.system.wait_for_ready
jenkins_obj
rescue StandardError => e
raise e
end
# PWN::Plugins::Jenkins.create_user(
# jenkins_obj: 'required - jenkins_obj returned from #connect method',
# username: 'required - user to create',
# password: 'required - password for new user'
# fullname: 'required - full name of new user'
# email: 'required - email address of new user'
# )
public_class_method def self.create_user(opts = {})
jenkins_obj = opts[:jenkins_obj]
username = opts[:username].to_s.scrub
password = opts[:password].to_s.scrub
password = PWN::Plugins::AuthenticationHelper.mask_password if password == ''
fullname = opts[:fullname].to_s.scrub
email = opts[:email].to_s.scrub
post_body = {
'username' => username,
'password1' => password,
'password2' => password,
'fullname' => fullname,
'email' => email,
'json' => {
'username' => username,
'password1' => password,
'password2' => password,
'fullname' => fullname,
'email' => email
}.to_json
}
@@logger.info("Creating #{username}...")
resp = jenkins_obj.api_post_request(
'/securityRealm/createAccountByAdmin',
post_body
)
resp == '302'
# rescue JenkinsApi::Exceptions::UserAlreadyExists => e
# @@logger.warn("Jenkins view: #{view_name} already exists")
# return e.class
rescue StandardError => e
raise e
end
# PWN::Plugins::Jenkins.create_ssh_credential(
# jenkins_obj: 'required - jenkins_obj returned from #connect method',
# username: 'required - username for new credential'
# private_key_path: 'required - path of private ssh key for new credential'
# key_passphrase: 'optional - private key passphrase for new credential'
# credential_id: 'optional but recommended - useful when creating userland jobs',
# description: 'optional - description of new credential'
# domain: 'optional - defaults to GLOBAL',
# scope: 'optional - GLOBAL or SYSTEM (defaults to GLOBAL)'
# )
public_class_method def self.create_ssh_credential(opts = {})
jenkins_obj = opts[:jenkins_obj]
username = opts[:username].to_s.scrub
private_key_path = opts[:private_key_path].to_s.strip.chomp.scrub
key_passphrase = opts[:key_passphrase].to_s.scrub
credential_id = opts[:credential_id].to_s.scrub
description = opts[:description].to_s.scrub
if opts[:domain].to_s.strip.chomp.scrub == 'GLOBAL' || opts[:domain].nil?
uri_path = '/credentials/store/system/domain/_/createCredentials'
else
domain = opts[:domain].to_s.strip.chomp.scrub
uri_path = "/credentials/store/system/domain/#{domain}/createCredentials"
end
if opts[:scope].to_s.strip.chomp.scrub == 'SYSTEM'
scope = 'SYSTEM'
else
scope = 'GLOBAL'
end
if credential_id == ''
post_body = {
'json' => {
'' => '0',
'credentials' => {
'scope' => scope,
'username' => username,
'privateKeySource' => {
'stapler-class' => 'com.cloudbees.jenkins.plugins.sshcredentials.impl.BasicSSHUserPrivateKey$DirectEntryPrivateKeySource',
'privateKey' => File.read(private_key_path)
},
'passphrase' => key_passphrase,
'description' => description,
'stapler-class' => 'com.cloudbees.jenkins.plugins.sshcredentials.impl.BasicSSHUserPrivateKey'
}
}.to_json
}
else
post_body = {
'json' => {
'' => '0',
'credentials' => {
'scope' => scope,
'username' => username,
'privateKeySource' => {
'stapler-class' => 'com.cloudbees.jenkins.plugins.sshcredentials.impl.BasicSSHUserPrivateKey$DirectEntryPrivateKeySource',
'privateKey' => File.read(private_key_path)
},
'passphrase' => key_passphrase,
'id' => credential_id,
'description' => description,
'stapler-class' => 'com.cloudbees.jenkins.plugins.sshcredentials.impl.BasicSSHUserPrivateKey'
}
}.to_json
}
end
resp = jenkins_obj.api_post_request(
uri_path,
post_body
)
resp == '302'
rescue StandardError => e
raise e
end
# Supported Method Parameters::
# PWN::Plugins::Jenkins.get_all_job_git_repos(
# jenkins_obj: 'required jenkins_obj returned from #connect method'
# )
public_class_method def self.get_all_job_git_repos(opts = {})
jenkins_obj = opts[:jenkins_obj]
@@logger.info('Retrieving a List of Git Repos from Every Job...')
git_repo_arr = []
jenkins_obj.job.list_all_with_details.each do |job|
this_config = Nokogiri::XML(jenkins_obj.job.get_config(job['name']))
this_git_repo = this_config.xpath('//scm/userRemoteConfigs/hudson.plugins.git.UserRemoteConfig/url').text
this_git_branch = this_config.xpath('//scm/branches/hudson.plugins.git.BranchSpec/name').text
next if this_git_repo == ''
# Obtain all jobs' git repos
job_git_repo = {}
job_git_repo[:name] = job['name']
job_git_repo[:url] = job['url']
job_git_repo[:job_state] = job['color']
job_git_repo[:git_repo] = this_git_repo
job_git_repo[:git_branch] = this_git_branch
job_git_repo[:config_xml_response] = this_config
git_repo_arr.push(job_git_repo)
end
git_repo_arr
rescue StandardError => e
raise e
end
# Supported Method Parameters::
# PWN::Plugins::Jenkins.list_nested_jobs(
# jenkins_obj: 'required jenkins_obj returned from #connect method',
# view_path: 'required view path to list jobs'
# )
public_class_method def self.list_nested_jobs(opts = {})
jenkins_obj = opts[:jenkins_obj]
view_path = opts[:view_path].to_s.scrub
nested_view_resp = jenkins_obj.api_get_request(view_path)
nested_view_resp['jobs']
rescue StandardError => e
raise e
end
# Supported Method Parameters::
# PWN::Plugins::Jenkins.list_nested_views(
# jenkins_obj: 'required jenkins_obj returned from #connect method',
# view_path: 'required view path list sub-views'
# )
public_class_method def self.list_nested_views(opts = {})
jenkins_obj = opts[:jenkins_obj]
view_path = opts[:view_path].to_s.scrub
nested_view_resp = jenkins_obj.api_get_request(view_path)
nested_view_resp['views']
rescue StandardError => e
raise e
end
# PWN::Plugins::Jenkins.create_nested_view(
# jenkins_obj: 'required jenkins_obj returned from #connect method',
# view_path: 'required view path create',
# create_in_view_path: 'optional creates nested view within an existing nested view, defaults to / views'
# )
public_class_method def self.create_nested_view(opts = {})
jenkins_obj = opts[:jenkins_obj]
view_name = opts[:view_name].to_s.scrub
create_in_view_path = opts[:create_in_view_path].to_s.scrub
# TODO: pass parameter for modes and use case statement to build dynamically post_body
# mode = 'hudson.plugins.nested_view.NestedView' # Requires Jenkins Nested View Plugin to Work Properly
mode = 'hudson.model.ListView'
post_body = {
'name' => view_name,
'mode' => mode,
'json' => {
'name' => view_name,
'mode' => mode
}.to_json
}
root_view_paths_arr = [
'',
'/'
]
if root_view_paths_arr.include?(create_in_view_path)
@@logger.info('Creating Nested View in /...')
resp = jenkins_obj.api_post_request(
'/createView',
post_body
)
else
@@logger.info("Creating Nested View in #{create_in_view_path}...")
# Example view_path would be '/view/Projects/PROJECT_NAME/view/RELEASES'
# This is taken out of the Jenkins URI when residing in the view in which
# you want to create your view...simply drop the domain name.
resp = jenkins_obj.api_post_request(
"#{create_in_view_path}/createView",
post_body
)
end
resp == '302'
rescue JenkinsApi::Exceptions::ViewAlreadyExists => e
@@logger.warn("Jenkins view: #{view_name} already exists")
e.class
rescue StandardError => e
raise e
end
# Supported Method Parameters::
# PWN::Plugins::Jenkins.add_job_to_nested_view(
# jenkins_obj: 'required jenkins_obj returned from #connect method',
# view_path: 'required view path associate job',
# job_name: 'required view path attach to a view',
# )
def self.add_job_to_nested_view(opts = {})
jenkins_obj = opts[:jenkins_obj]
view_path = opts[:view_path].to_s.scrub
job_name = opts[:job_name].to_s.scrub
jenkins_obj.api_post_request("#{view_path}/addJobToView?name=#{job_name}")
rescue StandardError => e
raise e
end
# Supported Method Parameters::
# PWN::Plugins::Jenkins.copy_job_no_fail_on_exist(
# jenkins_obj: 'required jenkins_obj returned from #connect method',
# existing_job_name: 'required existing job to copt to new job',
# new_job_name: 'required name of new job'
# )
public_class_method def self.copy_job_no_fail_on_exist(opts = {})
jenkins_obj = opts[:jenkins_obj]
existing_job_name = opts[:existing_job_name]
new_job_name = opts[:new_job_name]
copy_job_resp = jenkins_obj.job.copy(existing_job_name, new_job_name)
rescue JenkinsApi::Exceptions::JobAlreadyExists => e
@@logger.warn("Jenkins job: #{new_job_name} already exists")
e.class
rescue StandardError => e
raise e
end
# Supported Method Parameters::
# PWN::Plugins::Jenkins.disable_jobs_by_regex(
# jenkins_obj: 'required jenkins_obj returned from #connect method',
# regex: 'required regex pattern for matching jobs to disable e.g. :regex => "^M[0-9]"',
# )
public_class_method def self.disable_jobs_by_regex(opts = {})
jenkins_obj = opts[:jenkins_obj]
regex = opts[:regex].to_s.scrub
jenkins_obj.job.list_all_with_details.each do |job|
job_name = job['name']
if job_name.match?(/#{regex}/)
@@logger.info("Disabling #{job_name}")
jenkins_obj.job.disable(job_name)
end
end
rescue StandardError => e
raise e
end
# Supported Method Parameters::
# PWN::Plugins::Jenkins.delete_jobs_by_regex(
# jenkins_obj: 'required jenkins_obj returned from #connect method',
# regex: 'required regex pattern for matching jobs to disable e.g. :regex => "^M[0-9]"',
# )
public_class_method def self.delete_jobs_by_regex(opts = {})
jenkins_obj = opts[:jenkins_obj]
regex = opts[:regex].to_s.scrub
jenkins_obj.job.list_all_with_details.each do |job|
job_name = job['name']
if job_name.match?(/#{regex}/)
@@logger.info("Deleting #{job_name}")
jenkins_obj.job.delete(job_name)
end
end
rescue StandardError => e
raise e
end
# Supported Method Parameters::
# PWN::Plugins::Jenkins.clear_build_queue(
# jenkins_obj: 'required jenkins_obj returned from #connect method',
# )
public_class_method def self.clear_build_queue(opts = {})
jenkins_obj = opts[:jenkins_obj]
jenkins_obj.queue.list.each do |job_name|
@@logger.info("Clearing #{job_name} Build from Queue")
jenkins_obj.job.stop_build(job_name)
end
rescue StandardError => e
raise e
end
# Supported Method Parameters::
# PWN::Plugins::Jenkins.disconnect(
# jenkins_obj: 'required jenkins_obj returned from #connect method'
# )
public_class_method def self.disconnect(opts = {})
jenkins_obj = opts[:jenkins_obj]
@@logger.info('Disconnecting from Jenkins...')
jenkins_obj = nil
'complete'
rescue StandardError => e
raise e
end
# Author(s):: 0day Inc. <request.pentest@0dayinc.com>
public_class_method def self.authors
"AUTHOR(S):
0day Inc. <request.pentest@0dayinc.com>
"
end
# Display Usage for this Module
public_class_method def self.help
puts %{USAGE:
jenkins_obj = #{self}.connect(
ip: 'required host/ip of Jenkins Server',
port: 'optional tcp port (defaults to 8080),
username: 'optional username (functionality will be limited if ommitted)',
api_key: 'optional api_key (functionality will be limited if ommitted)',
identity_file: 'optional ssh private key path to AuthN w/ Jenkins PREFERRED over username/api_key',
ssl: 'optional connect over TLS (defaults to true),
proxy: 'optional debug proxy rest api requests to jenkins (e.g. "http://127.0.0.1:8080")''
)
puts jenkins_obj.public_methods
#{self}.create_user(
jenkins_obj: 'required - jenkins_obj returned from #connect method',
username: 'required - user to create',
password: 'optional - password for new user (will prompt if nil)'
fullname: 'required - full name of new user'
email: 'required - email address of new user'
)
#{self}.create_ssh_credential(
jenkins_obj: 'required - jenkins_obj returned from #connect method',
username: 'required - username for new credential'
private_key_path: 'required - path of private ssh key for new credential'
key_passphrase: 'optional - private key passphrase for new credential'
credential_id: 'optional but recommended - useful when creating userland jobs',
description: 'optional - description of new credential'
domain: 'optional - defaults to GLOBAL',
scope: 'optional - GLOBAL or SYSTEM (defaults to GLOBAL)'
)
git_repo_arr = #{self}.get_all_job_git_repos(
jenkins_obj: 'required jenkins_obj returned from connect method'
)
git_repo_branches = #{self}.get_all_git_repo_branches_by_commit_date(
jenkins_obj: 'required jenkins_obj returned from #connect method',
job_name: 'required jenkins job name',
git_url: 'required git url for git_repo'
)
nested_jobs_arr = #{self}.list_nested_jobs(
jenkins_obj: 'required jenkins_obj returned from #connect method',
view_path: 'required view path list jobs'
)
nested_views_arr = #{self}.list_nested_views(
jenkins_obj: 'required jenkins_obj returned from #connect method',
view_path: 'required view path list sub-views'
)
view_created_bool = #{self}.create_nested_view(
jenkins_obj: 'required jenkins_obj returned from #connect method',
view_path: 'required view path create',
create_in_view_path: 'optional creates nested view within an existing nested view, defaults to / views'
)
add_job_to_nested_view_resp = #{self}.add_job_to_nested_view(
jenkins_obj: 'required jenkins_obj returned from #connect method',
view_path: 'required view path associate job',
job_name: 'required view path attach to a view',
)
copy_job_resp = #{self}.copy_job_no_fail_on_exist(
jenkins_obj: 'required jenkins_obj returned from #connect method',
existing_job_name: 'required existing job to copt to new job',
new_job_name: 'required name of new job'
)
#{self}.disable_jobs_by_regex(
jenkins_obj: 'required jenkins_obj returned from #connect method',
regex: 'required regex pattern for matching jobs to disable e.g. :regex => "^M[0-9]"',
)
#{self}.delete_job_by_regex(
jenkins_obj: 'required jenkins_obj returned from #connect method',
regex: 'required regex pattern for matching jobs to disable e.g. :regex => "^M[0-9]"',
)
#{self}.clear_build_queue(
jenkins_obj: 'required jenkins_obj returned from #connect method',
)
#{self}.disconnect(
jenkins_obj: 'required jenkins_obj returned from connect method'
)
#{self}.authors
}
end
end
end
end