class CanvasSync::JobBatches::ChainBuilder
def <<(new_job)
def <<(new_job) insert_at(-1, new_job) end
def [](key)
def [](key) if key.is_a?(Class) get_sub_chain(key) else @base_job[key] end end
def _job_type_definitions
def _job_type_definitions @job_type_definitions ||= {} end
def apply_block(&blk)
def apply_block(&blk) return unless blk.present? instance_exec(&blk) end
def build(job, *args, &blk)
def build(job, *args, &blk) new(job).tap do |ch| ch[:parameters] = args ch.apply_block(&blk) end end
def build_job_hash(job, *params, &blk)
def build_job_hash(job, *params, &blk) hsh = { job: job, parameters: params, } self.class.new(hsh).apply_block(&blk) if blk.present? hsh end
def enqueue_job(job_def)
> UI will map Batches to Chain-steps using the chain_step_id. UI will add entries for any Chain-steps that were not tied to a Batch
> Using server-middleware, if a Chain-job (has chain_id and chain_step_id) creates a Batch, tag the Batch w/ the chain_id and chain_step_id
> Chain-jobs will be supplied chain_id and chain_step_id metadata
> If augmented with Chain data, the above will be annotated with Chain-related info and will be able to show Jobs defined in the Chain
> [DONE] Tree view w/o Chain will only show Parent/Current batches and Job Counts
Augment Batch tree-view with Chain data
TODO: Add a Chain progress web View
def enqueue_job(job_def) job_class = job_def[:job].constantize job_options = job_def[:parameters] || [] # Legacy Support if job_def[:options] job_options << {} unless job_options[-1].is_a?(Hash) job_options[-1].merge!(job_def[:options]) end if job_class.respond_to? :perform_async job_class.perform_async(*job_options) else job_class.perform_later(*job_options) end end
def find_matching_jobs(search_job, parent_job = self.base_job)
def find_matching_jobs(search_job, parent_job = self.base_job) return to_enum(:find_matching_jobs, search_job, parent_job) unless block_given? sub_jobs = self.class.get_chain_parameter(parent_job) sub_jobs.each_with_index do |sub_job, i| if sub_job[:job].to_s == search_job.to_s yield [sub_job, parent_job, i] elsif self.class._job_type_definitions[sub_job[:job].to_s] find_matching_jobs(search_job, sub_job) { |item| yield item } end end end
def find_parent_job(job_def)
def find_parent_job(job_def) iterate_job_tree do |job, path| return path[-1] if job == job_def end nil end
def get_chain_parameter(job_def, raise_error: true)
def get_chain_parameter(job_def, raise_error: true) unless _job_type_definitions[job_def[:job].to_s].present? raise "Job Type #{job_def[:job].to_s} does not accept a sub-chain" if raise_error return nil end key = _job_type_definitions[job_def[:job].to_s][:chain_parameter] mapper = ParamsMapper.new(job_def[:parameters]) mapper[key] ||= [] end
def get_sub_chain(sub_type)
def get_sub_chain(sub_type) matching_jobs = find_matching_jobs(sub_type).to_a raise "Found multiple \"#{sub_type}\" jobs in the chain" if matching_jobs.count > 1 return nil if matching_jobs.count == 0 job = matching_jobs[0][0] job = self.class.new(job) unless job.is_a?(ChainBuilder) job end
def initialize(base_type = SerialBatchJob)
def initialize(base_type = SerialBatchJob) if base_type.is_a?(Hash) @base_job = base_type else @base_job = { job: base_type, parameters: [], } end end
def insert(new_jobs, *args, **kwargs, &blk)
def insert(new_jobs, *args, **kwargs, &blk) if new_jobs.is_a?(Class) || new_jobs.is_a?(String) job_kwargs = kwargs.except(*VALID_PLACEMENT_PARAMETERS) args << job_kwargs if job_kwargs.present? new_jobs = build_job_hash(new_jobs, *args, &blk) kwargs = kwargs.slice(*VALID_PLACEMENT_PARAMETERS) else invalid_params = kwargs.keys - VALID_PLACEMENT_PARAMETERS raise "Invalid placement parameters: #{invalid_params.map(&:to_s).join(', ')}" if invalid_params.present? raise "At most one placement parameter may be provided" if kwargs.values.compact.length > 1 raise "Unexpected number of arguments" if args.length > 0 end new_jobs = [new_jobs] unless new_jobs.is_a?(Array) if !kwargs.present? insert_at(-1, new_jobs) else placement = kwargs.keys[0] relative_to = kwargs.values[0] matching_jobs = find_matching_jobs(relative_to).to_a raise "Could not find a \"#{relative_to}\" job in the chain" if matching_jobs.count == 0 raise "Found multiple \"#{relative_to}\" jobs in the chain" if matching_jobs.count > 1 relative_job, parent_job, sub_index = matching_jobs[0] needed_parent_type = placement == :with ? ConcurrentBatchJob : SerialBatchJob chain = self.class.get_chain_parameter(parent_job) if parent_job[:job] != needed_parent_type old_job = chain[sub_index] parent_job = chain[sub_index] = { job: needed_parent_type, parameters: [], } sub_index = 0 chain = self.class.get_chain_parameter(parent_job) chain << old_job end if placement == :with chain.insert(-1, *new_jobs) else sub_index += 1 if placement == :after chain.insert(sub_index, *new_jobs) end end end
def insert_at(position, new_jobs, *args, &blk)
def insert_at(position, new_jobs, *args, &blk) chain = self.class.get_chain_parameter(base_job) if new_jobs.is_a?(Class) || new_jobs.is_a?(String) new_jobs = build_job_hash(new_jobs, *args, &blk) elsif args.length > 0 raise "Unexpected number of arguments" end new_jobs = [new_jobs] unless new_jobs.is_a?(Array) chain.insert(position, *new_jobs) end
def iterate_job_tree(root: self.base_job, path: [], &blk)
def iterate_job_tree(root: self.base_job, path: [], &blk) blk.call(root, path) if self.class._job_type_definitions[root[:job]] sub_jobs = self.class.get_chain_parameter(root) sub_jobs.each_with_index do |sub_job, i| iterate_job_tree(root: sub_job, path: [*path, root], &blk) end end end
def merge_options(job, options)
def merge_options(job, options) find_matching_jobs(job).each do |j, parent, index| j[:options] ||= {} j[:options].deep_merge!(options) end end
def normalize!(job_def = self.base_job)
def normalize!(job_def = self.base_job) if job_def.is_a?(ChainBuilder) job_def.normalize! else job_def[:job] = job_def[:job].to_s if (chain = self.class.get_chain_parameter(job_def, raise_error: false)).present? chain.map! { |sub_job| normalize!(sub_job) } end job_def end end
def params
def params ParamsMapper.new(self[:parameters]) end
def process!
def process! normalize! self.class.enqueue_job(base_job) end
def register_chain_job(job_class, chain_parameter, **options)
def register_chain_job(job_class, chain_parameter, **options) _job_type_definitions[job_class.to_s] = { **options, chain_parameter: chain_parameter, } end