class Concurrent::SerializedExecution
Ensures passed jobs in a serialized order never running at the same time.
def call_job(job)
def call_job(job) did_it_run = begin job.executor.post { work(job) } true rescue RejectedExecutionError => ex false end # TODO not the best idea to run it myself unless did_it_run begin work job rescue => ex # let it fail log DEBUG, ex end end end
def initialize()
def initialize() super() synchronize { ns_initialize } end
def ns_initialize
def ns_initialize @being_executed = false @stash = [] end
def post(executor, *args, &task)
-
(ArgumentError)
- if no task is given
Returns:
-
(Boolean)
- `true` if the task is queued, `false` if the executor
Other tags:
- Yield: - the asynchronous task to perform
Parameters:
-
args
(Array
) -- zero or more arguments to be passed to the task -
executor
(Executor
) -- to be used for this job
def post(executor, *args, &task) posts [[executor, args, task]] true end
def posts(posts)
-
posts
(Array
) -- array of triplets where, Proc)>
def posts(posts) # if can_overflow? # raise ArgumentError, 'SerializedExecution does not support thread-pools which can overflow' # end return nil if posts.empty? jobs = posts.map { |executor, args, task| Job.new executor, args, task } job_to_post = synchronize do if @being_executed @stash.push(*jobs) nil else @being_executed = true @stash.push(*jobs[1..-1]) jobs.first end end call_job job_to_post if job_to_post true end
def work(job)
def work(job) job.call ensure synchronize do job = @stash.shift || (@being_executed = false) end # TODO maybe be able to tell caching pool to just enqueue this job, because the current one end at the end # of this block call_job job if job end