class MiGA::Daemon
MiGA Daemons handling job submissions.
#
def check_datasets
Traverse datasets, and returns boolean indicating if at any reference
#
def check_datasets l_say(2, 'Checking datasets') o = false project.each_dataset do |ds| next unless ds.status == :incomplete next if ds.next_preprocessing(false).nil? o = true if ds.ref? queue_job(:d, ds) end unless show_log? n = project.dataset_names.count k = jobs_to_run.size + jobs_running.size k -= 1 unless get_job(:maintenance).nil? advance('Datasets:', n - k, n, false) miga_say if k == 0 end o end
def check_project
Check if all reference datasets are pre-processed. If yes, check the
#
def check_project l_say(2, 'Checking project') # Ignore task if the project has no datasets return if project.dataset_names.empty? # Double-check if all datasets are ready return unless project.done_preprocessing? # Queue project-level job to_run = project.next_task(nil, false) queue_job(:p) unless to_run.nil? end
def daemon_first_loop
#
def daemon_first_loop say '-----------------------------------' say 'MiGA:%s launched' % project.name say '-----------------------------------' miga_say "Saving log to: #{output_file}" unless show_log? say 'Configuration options:' say @runopts.to_s load_status queue_maintenance(true) end
def daemon_home(project)
Daemon's home inside the MiGA::Project +project+ or a String with the
#
def daemon_home(project) return project if project.is_a? String File.join(project.path, 'daemon') end
def daemon_home
#
def daemon_home self.class.daemon_home(project) end
def daemon_loop
#
def daemon_loop l_say(3, 'Daemon loop start') reload_project check_datasets or check_project if shutdown_when_done? && (jobs_running.size + jobs_to_run.size).zero? say 'Nothing else to do, shutting down' exit_cleanup return false end flush! if (loop_i % 12).zero? purge! queue_maintenance if (loop_i % (12 * (skip_maintenance + 1))).zero? end save_status sleep(latency) l_say(3, 'Daemon loop end') true end
def daemon_name
#
def daemon_name "MiGA:#{project.name}" end
def exit_cleanup
#
def exit_cleanup FileUtils.rm_f(File.join(daemon_home, 'status.json')) end
def flush!
Remove finished jobs from the internal queue and launch as many as
#
def flush! # Check for finished jobs l_say(2, 'Checking for finished jobs') @jobs_running.select! do |job| ongoing = case job[:job].to_s when 'd' !job[:ds].nil? && !job[:ds].next_preprocessing(false).nil? when 'p' !project.next_task(nil, false).nil? else (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false).nil? end say "Completed pid:#{job[:pid]} for #{job[:task_name]}" unless ongoing ongoing end # Avoid single datasets hogging resources @jobs_to_run.rotate! rand(jobs_to_run.size) # Prioritize: Project-wide > MiGA Online queries > Other datasets @jobs_to_run.sort_by! do |job| job[:ds].nil? ? 1 : job[:ds_name] =~ /^qG_/ ? 2 : 3 end # Launch as many +jobs_to_run+ as possible while (hostk = next_host) break if jobs_to_run.empty? launch_job(@jobs_to_run.shift, hostk) end end
def get_job(job, ds = nil)
Get the taks with key symbol +job+ in dataset +ds+. For project-wide tasks
#
def get_job(job, ds = nil) (jobs_to_run + jobs_running).find do |j| if ds.nil? j[:ds].nil? && j[:job] == job else !j[:ds].nil? && j[:ds].name == ds.name && j[:job] == job end end end
def initialize(project, json = nil)
is used. In either case, missing variables are used as defined in
definition in json format. Otherwise, the project-stored daemon definition
to wake the daemon. If passed, +json+ must be the path to a daemon
Initialize an unactive daemon for the MiGA::Project +project+. See #daemon
#
def initialize(project, json = nil) @project = project @runopts = {} json ||= File.join(project.path, 'daemon/daemon.json') default_json = File.expand_path('.miga_daemon.json', ENV['MIGA_HOME']) MiGA::Json.parse( json, default: File.exist?(default_json) ? default_json : nil ).each { |k, v| runopts(k, v) } update_format_0 @jobs_to_run = [] @jobs_running = [] end
def job_cmd(to_run)
Construct the command for the given job definition with current
#
def job_cmd(to_run) what = to_run[:ds].nil? ? :project : :dataset vars = { 'PROJECT' => project.path, 'RUNTYPE' => runopts_for(:type, what), 'CORES' => ppn(what), 'MIGA' => MiGA::MiGA.root_path } vars['DATASET'] = to_run[:ds].name unless to_run[:ds].nil? log_dir = File.expand_path("daemon/#{to_run[:job]}", project.path) FileUtils.mkdir_p(log_dir) var_hsh = { script: MiGA::MiGA.script_path( to_run[:job], miga: vars['MIGA'], project: project ), vars: vars.map do |k, v| runopts(:var).miga_variables(key: k, value: v) end.join(runopts_for(:varsep, what)), cpus: ppn(what), log: File.join(log_dir, "#{to_run[:ds_name]}.log"), task_name: to_run[:task_name], task_name_simple: to_run[:task_name].gsub(/[^A-Za-z0-9_]/, '-'), miga: File.join(MiGA::MiGA.root_path, 'bin/miga').shellescape } runopts_for(:cmd, what).miga_variables(var_hsh) end
def l_say(level, *msg)
#
def l_say(level, *msg) say(*msg) if verbosity >= level end
def launch_job(job, hostk = nil)
#
def launch_job(job, hostk = nil) # Execute job job[:cmd] = job_cmd(job) MiGA::MiGA.DEBUG "CMD: #{job[:cmd]}" case runopts(:type) when 'ssh' # Remote job job[:hostk] = hostk job[:cmd] = job[:cmd].miga_variables(host: nodelist[hostk]) job[:pid] = spawn job[:cmd] MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}" Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid]) when 'bash' # Local job job[:pid] = spawn job[:cmd] MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}" Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid]) else # Schedule cluster job (qsub, msub, slurm) job[:pid] = MiGA::MiGA.run_cmd(job[:cmd], return: :output).chomp end # Check if registered if [nil, '', 0].include? job[:pid] job[:pid] = nil @jobs_to_run << job say "Unsuccessful #{job[:task_name]}, rescheduling" else @jobs_running << job job_host = " to #{job[:hostk]}:#{nodelist[job[:hostk]]}" if job[:hostk] say "Spawned pid:#{job[:pid]}#{job_host} for #{job[:task_name]}" end end
def load_status
#
def load_status f_path = File.join(daemon_home, 'status.json') return unless File.size? f_path say 'Loading previous status in daemon/status.json:' status = MiGA::Json.parse(f_path) status.each_key do |i| status[i].map! do |j| j.tap do |k| unless k[:ds].nil? || k[:ds_name] == 'miga-project' k[:ds] = project.dataset(k[:ds_name]) end k[:job] = k[:job].to_sym unless k[:job].nil? end end end @jobs_running = status[:jobs_running] @jobs_to_run = status[:jobs_to_run] say "- jobs left running: #{@jobs_running.size}" purge! say "- jobs running: #{@jobs_running.size}" say "- jobs to run: #{@jobs_to_run.size}" end
def next_host
In SSH daemons, retrieve the host index of an available node, nil if none.
#
def next_host return jobs_running.size < maxjobs if runopts(:type) != 'ssh' allk = (0..nodelist.size - 1).to_a busyk = jobs_running.map { |k| k[:hostk] } (allk - busyk).first end
def path
#
def path project.path end
def purge!
#
def purge! say 'Probing running jobs' @jobs_running.select! do |job| MiGA::MiGA.run_cmd( runopts(:alive).miga_variables(pid: job[:pid]), return: :output ).chomp.to_i == 1 end end
def queue_job(job, ds = nil)
dataset-specific, +ds+ specifies the dataset. To submit jobs to the
Add the task to the internal queue with symbol key +job+. If the task is
#
def queue_job(job, ds = nil) return nil unless get_job(job, ds).nil? ds_name = (ds.nil? ? 'miga-project' : ds.name) task_name = "#{project.metadata[:name][0..9]}:#{job}:#{ds_name}" to_run = { ds: ds, ds_name: ds_name, job: job, task_name: task_name } say 'Queueing %s:%s' % [to_run[:ds_name], to_run[:job]] @jobs_to_run << to_run end
def queue_maintenance(force = false)
#
def queue_maintenance(force = false) return if bypass_maintenance? || (!force && shutdown_when_done?) say 'Queueing maintenance tasks' queue_job(:maintenance) end
def reload_project
#
def reload_project l_say(2, 'Reloading project') project.load end
def save_status
#
def save_status l_say(2, 'Saving current status') MiGA::Json.generate( { jobs_running: @jobs_running, jobs_to_run: @jobs_to_run }, File.join(daemon_home, 'status.json') ) end
def say(*msg)
#
def say(*msg) super(logfh, *msg) if verbosity >= 1 end
def update_format_0
#
def update_format_0 { cmd: %w[script vars cpus log task_name], var: %w[key value], alive: %w[pid], kill: %w[pid] }.each do |k, v| if !runopts(k).nil? && runopts(k) =~ /%(\d+\$)?[ds]/ runopts( k, runopts(k).gsub(/%(\d+\$)?d/, '%\\1s') % v.map { |i| "{{#{i}}}" } ) end end runopts(:format_version, 1) end