class MiGA::Daemon

MiGA Daemons handling job submissions.
#

def check_datasets

datasets are incomplete
Traverse datasets, and returns boolean indicating if at any reference
#
def check_datasets
  l_say(2, 'Checking datasets')
  o = false
  project.each_dataset do |ds|
    next unless ds.status == :incomplete
    next if ds.next_preprocessing(false).nil?
    o = true if ds.ref?
    queue_job(:d, ds)
  end
  unless show_log?
    n = project.dataset_names.count
    k = jobs_to_run.size + jobs_running.size
    k -= 1 unless get_job(:maintenance).nil?
    advance('Datasets:', n - k, n, false)
    miga_say if k == 0
  end
  o
end

def check_project

project-level tasks
Check if all reference datasets are pre-processed. If yes, check the
#
def check_project
  l_say(2, 'Checking project')
  # Ignore task if the project has no datasets
  return if project.dataset_names.empty?
  # Double-check if all datasets are ready
  return unless project.done_preprocessing?
  # Queue project-level job
  to_run = project.next_task(nil, false)
  queue_job(:p) unless to_run.nil?
end

def daemon_first_loop

Run only in the first loop
#
def daemon_first_loop
  say '-----------------------------------'
  say 'MiGA:%s launched' % project.name
  say '-----------------------------------'
  miga_say "Saving log to: #{output_file}" unless show_log?
  say 'Configuration options:'
  say @runopts.to_s
  load_status
  queue_maintenance(true)
end

def daemon_home(project)

full path to the project's 'daemon' folder
Daemon's home inside the MiGA::Project +project+ or a String with the
#
def daemon_home(project)
  return project if project.is_a? String
  File.join(project.path, 'daemon')
end

def daemon_home

Path to the daemon home
#
def daemon_home
  self.class.daemon_home(project)
end

def daemon_loop

Run one loop step. Returns a Boolean indicating if the loop should continue
#
def daemon_loop
  l_say(3, 'Daemon loop start')
  reload_project
  check_datasets or check_project
  if shutdown_when_done? && (jobs_running.size + jobs_to_run.size).zero?
    say 'Nothing else to do, shutting down'
    exit_cleanup
    return false
  end
  flush!
  if (loop_i % 12).zero?
    purge!
    queue_maintenance if (loop_i % (12 * (skip_maintenance + 1))).zero?
  end
  save_status
  sleep(latency)
  l_say(3, 'Daemon loop end')
  true
end

def daemon_name

Name of the daemon
#
def daemon_name
  "MiGA:#{project.name}"
end

def exit_cleanup

Remove temporary files on completion
#
def exit_cleanup
  FileUtils.rm_f(File.join(daemon_home, 'status.json'))
end

def flush!

possible respecting #maxjobs or #nodelist (if set).
Remove finished jobs from the internal queue and launch as many as
#
def flush!
  # Check for finished jobs
  l_say(2, 'Checking for finished jobs')
  @jobs_running.select! do |job|
    ongoing =
      case job[:job].to_s
      when 'd'
        !job[:ds].nil? && !job[:ds].next_preprocessing(false).nil?
      when 'p'
        !project.next_task(nil, false).nil?
      else
        (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false).nil?
      end
    say "Completed pid:#{job[:pid]} for #{job[:task_name]}" unless ongoing
    ongoing
  end
  # Avoid single datasets hogging resources
  @jobs_to_run.rotate! rand(jobs_to_run.size)
  # Prioritize: Project-wide > MiGA Online queries > Other datasets
  @jobs_to_run.sort_by! do |job|
    job[:ds].nil? ? 1 : job[:ds_name] =~ /^qG_/ ? 2 : 3
  end
  # Launch as many +jobs_to_run+ as possible
  while (hostk = next_host)
    break if jobs_to_run.empty?
    launch_job(@jobs_to_run.shift, hostk)
  end
end

def get_job(job, ds = nil)

let +ds+ be nil.
Get the taks with key symbol +job+ in dataset +ds+. For project-wide tasks
#
def get_job(job, ds = nil)
  (jobs_to_run + jobs_running).find do |j|
    if ds.nil?
      j[:ds].nil? && j[:job] == job
    else
      !j[:ds].nil? && j[:ds].name == ds.name && j[:job] == job
    end
  end
end

def initialize(project, json = nil)

~/.miga_daemon.json.
is used. In either case, missing variables are used as defined in
definition in json format. Otherwise, the project-stored daemon definition
to wake the daemon. If passed, +json+ must be the path to a daemon
Initialize an unactive daemon for the MiGA::Project +project+. See #daemon
#
def initialize(project, json = nil)
  @project = project
  @runopts = {}
  json ||= File.join(project.path, 'daemon/daemon.json')
  default_json = File.expand_path('.miga_daemon.json', ENV['MIGA_HOME'])
  MiGA::Json.parse(
    json, default: File.exist?(default_json) ? default_json : nil
  ).each { |k, v| runopts(k, v) }
  update_format_0
  @jobs_to_run = []
  @jobs_running = []
end

def job_cmd(to_run)

daemon settings
Construct the command for the given job definition with current
#
def job_cmd(to_run)
  what = to_run[:ds].nil? ? :project : :dataset
  vars = {
    'PROJECT' => project.path,
    'RUNTYPE' => runopts_for(:type, what),
    'CORES' => ppn(what),
    'MIGA' => MiGA::MiGA.root_path
  }
  vars['DATASET'] = to_run[:ds].name unless to_run[:ds].nil?
  log_dir = File.expand_path("daemon/#{to_run[:job]}", project.path)
  FileUtils.mkdir_p(log_dir)
  var_hsh = {
    script: MiGA::MiGA.script_path(
              to_run[:job], miga: vars['MIGA'], project: project
            ),
    vars: vars.map do |k, v|
            runopts(:var).miga_variables(key: k, value: v)
          end.join(runopts_for(:varsep, what)),
    cpus: ppn(what),
    log: File.join(log_dir, "#{to_run[:ds_name]}.log"),
    task_name: to_run[:task_name],
    task_name_simple: to_run[:task_name].gsub(/[^A-Za-z0-9_]/, '-'),
    miga: File.join(MiGA::MiGA.root_path, 'bin/miga').shellescape
  }
  runopts_for(:cmd, what).miga_variables(var_hsh)
end

def l_say(level, *msg)

Send +msg+ to +say+ as long as +level+ is at most +verbosity+
#
def l_say(level, *msg)
  say(*msg) if verbosity >= level
end

def launch_job(job, hostk = nil)

Launch the job described by Hash +job+ to +hostk+-th host
#
def launch_job(job, hostk = nil)
  # Execute job
  job[:cmd] = job_cmd(job)
  MiGA::MiGA.DEBUG "CMD: #{job[:cmd]}"
  case runopts(:type)
  when 'ssh'
    # Remote job
    job[:hostk] = hostk
    job[:cmd] = job[:cmd].miga_variables(host: nodelist[hostk])
    job[:pid] = spawn job[:cmd]
    MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}"
    Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid])
  when 'bash'
    # Local job
    job[:pid] = spawn job[:cmd]
    MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}"
    Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid])
  else
    # Schedule cluster job (qsub, msub, slurm)
    job[:pid] = MiGA::MiGA.run_cmd(job[:cmd], return: :output).chomp
  end
  # Check if registered
  if [nil, '', 0].include? job[:pid]
    job[:pid] = nil
    @jobs_to_run << job
    say "Unsuccessful #{job[:task_name]}, rescheduling"
  else
    @jobs_running << job
    job_host = " to #{job[:hostk]}:#{nodelist[job[:hostk]]}" if job[:hostk]
    say "Spawned pid:#{job[:pid]}#{job_host} for #{job[:task_name]}"
  end
end

def load_status

Load the status of a previous instance.
#
def load_status
  f_path = File.join(daemon_home, 'status.json')
  return unless File.size? f_path
  say 'Loading previous status in daemon/status.json:'
  status = MiGA::Json.parse(f_path)
  status.each_key do |i|
    status[i].map! do |j|
      j.tap do |k|
        unless k[:ds].nil? || k[:ds_name] == 'miga-project'
          k[:ds] = project.dataset(k[:ds_name])
        end
        k[:job] = k[:job].to_sym unless k[:job].nil?
      end
    end
  end
  @jobs_running = status[:jobs_running]
  @jobs_to_run  = status[:jobs_to_run]
  say "- jobs left running: #{@jobs_running.size}"
  purge!
  say "- jobs running: #{@jobs_running.size}"
  say "- jobs to run: #{@jobs_to_run.size}"
end

def next_host

In any other daemons, returns true as long as #maxjobs is not reached
In SSH daemons, retrieve the host index of an available node, nil if none.
#
def next_host
  return jobs_running.size < maxjobs if runopts(:type) != 'ssh'
  allk = (0..nodelist.size - 1).to_a
  busyk = jobs_running.map { |k| k[:hostk] }
  (allk - busyk).first
end

def path

Alias to +project.path+ for compatibility with lairs
#
def path
  project.path
end

def purge!

Remove dead jobs.
#
def purge!
  say 'Probing running jobs'
  @jobs_running.select! do |job|
    MiGA::MiGA.run_cmd(
      runopts(:alive).miga_variables(pid: job[:pid]), return: :output
    ).chomp.to_i == 1
  end
end

def queue_job(job, ds = nil)

scheduler (or to bash or ssh) see #flush!
dataset-specific, +ds+ specifies the dataset. To submit jobs to the
Add the task to the internal queue with symbol key +job+. If the task is
#
def queue_job(job, ds = nil)
  return nil unless get_job(job, ds).nil?
  ds_name = (ds.nil? ? 'miga-project' : ds.name)
  task_name = "#{project.metadata[:name][0..9]}:#{job}:#{ds_name}"
  to_run = { ds: ds, ds_name: ds_name, job: job, task_name: task_name }
  say 'Queueing %s:%s' % [to_run[:ds_name], to_run[:job]]
  @jobs_to_run << to_run
end

def queue_maintenance(force = false)

Queue maintenance tasks as an analysis job
#
def queue_maintenance(force = false)
  return if bypass_maintenance? || (!force && shutdown_when_done?)
  say 'Queueing maintenance tasks'
  queue_job(:maintenance)
end

def reload_project

Reload the project's metadata
#
def reload_project
  l_say(2, 'Reloading project')
  project.load
end

def save_status

Report status in a JSON file.
#
def save_status
  l_say(2, 'Saving current status')
  MiGA::Json.generate(
    { jobs_running: @jobs_running, jobs_to_run: @jobs_to_run },
    File.join(daemon_home, 'status.json')
  )
end

def say(*msg)

Same as +l_say+ with +level = 1+
#
def say(*msg)
  super(logfh, *msg) if verbosity >= 1
end

def update_format_0

Update from daemon JSON format 0 to the latest version
#
def update_format_0
  {
    cmd: %w[script vars cpus log task_name],
    var: %w[key value],
    alive: %w[pid],
    kill: %w[pid]
  }.each do |k, v|
    if !runopts(k).nil? && runopts(k) =~ /%(\d+\$)?[ds]/
      runopts(
        k, runopts(k).gsub(/%(\d+\$)?d/, '%\\1s') % v.map { |i| "{{#{i}}}" }
      )
    end
  end
  runopts(:format_version, 1)
end