# frozen_string_literal: truerequire"sidekiq/fetch"require"sidekiq/job_logger"require"sidekiq/job_retry"require"sidekiq/profiler"moduleSidekiq### The Processor is a standalone thread which:## 1. fetches a job from Redis# 2. executes the job# a. instantiate the job class# b. run the middleware chain# c. call #perform## A Processor can exit due to shutdown or due to# an error during job execution.## If an error occurs in the job execution, the# Processor calls the Manager to create a new one# to replace itself and exits.#classProcessorincludeSidekiq::Componentattr_reader:threadattr_reader:jobattr_reader:capsuledefinitialize(capsule,&block)@config=@capsule=capsule@callback=block@down=false@done=false@job=nil@thread=nil@reloader=Sidekiq.default_configuration[:reloader]@job_logger=(capsule.config[:job_logger]||Sidekiq::JobLogger).new(capsule.config)@retrier=Sidekiq::JobRetry.new(capsule)enddefterminate(wait=false)@done=truereturnunless@thread@thread.valueifwaitenddefkill(wait=false)@done=truereturnunless@thread# unlike the other actors, terminate does not wait# for the thread to finish because we don't know how# long the job will take to finish. Instead we# provide a `kill` method to call after the shutdown# timeout passes.@thread.raise::Sidekiq::Shutdown@thread.valueifwaitenddefstopping?@doneenddefstart@thread||=safe_thread("#{config.name}/processor",&method(:run))endprivatedefrun# By setting this thread-local, Sidekiq.redis will access +Sidekiq::Capsule#redis_pool+# instead of the global pool in +Sidekiq::Config#redis_pool+.Thread.current[:sidekiq_capsule]=@capsuleprocess_oneuntil@done@callback.call(self)rescueSidekiq::Shutdown@callback.call(self)rescueException=>ex@callback.call(self,ex)enddefprocess_one(&block)@job=fetchprocess(@job)if@job@job=nilenddefget_oneuow=capsule.fetcher.retrieve_workif@downlogger.info{"Redis is online, #{::Process.clock_gettime(::Process::CLOCK_MONOTONIC)-@down} sec downtime"}@down=nilenduowrescueSidekiq::Shutdownrescue=>exhandle_fetch_exception(ex)enddeffetchj=get_oneifj&&@donej.requeuenilelsejendenddefhandle_fetch_exception(ex)unless@down@down=::Process.clock_gettime(::Process::CLOCK_MONOTONIC)handle_exception(ex)endsleep(1)nilenddefprofile(job,&block)returnyieldunlessjob["profile"]Sidekiq::Profiler.new(config).call(job,&block)enddefdispatch(job_hash,queue,jobstr)# since middleware can mutate the job hash# we need to clone it to report the original# job structure to the Web UI# or to push back to redis when retrying.# To avoid costly and, most of the time, useless cloning here,# we pass original String of JSON to respected methods# to re-parse it there if we need access to the original, untouched job@job_logger.prepare(job_hash)do@retrier.global(jobstr,queue)do@job_logger.call(job_hash,queue)dostats(jobstr,queue)doprofile(job_hash)do# Rails 5 requires a Reloader to wrap code execution. In order to# constantize the worker and instantiate an instance, we have to call# the Reloader. It handles code loading, db connection management, etc.# Effectively this block denotes a "unit of work" to Rails.@reloader.calldoklass=Object.const_get(job_hash["class"])instance=klass.newinstance.jid=job_hash["jid"]instance._context=self@retrier.local(instance,jobstr,queue)doyieldinstanceendendendendendendendendIGNORE_SHUTDOWN_INTERRUPTS={Sidekiq::Shutdown=>:never}private_constant:IGNORE_SHUTDOWN_INTERRUPTSALLOW_SHUTDOWN_INTERRUPTS={Sidekiq::Shutdown=>:immediate}private_constant:ALLOW_SHUTDOWN_INTERRUPTSdefprocess(uow)jobstr=uow.jobqueue=uow.queue_name# Treat malformed JSON as a special case: job goes straight to the morgue.job_hash=nilbeginjob_hash=Sidekiq.load_json(jobstr)rescue=>exnow=Time.now.to_fredisdo|conn|conn.multido|xa|xa.zadd("dead",now.to_s,jobstr)xa.zremrangebyscore("dead","-inf",now-@capsule.config[:dead_timeout_in_seconds])xa.zremrangebyrank("dead",0,-@capsule.config[:dead_max_jobs])endendhandle_exception(ex,{context: "Invalid JSON for job",jobstr: jobstr})returnuow.acknowledgeendack=falseThread.handle_interrupt(IGNORE_SHUTDOWN_INTERRUPTS)doThread.handle_interrupt(ALLOW_SHUTDOWN_INTERRUPTS)dodispatch(job_hash,queue,jobstr)do|instance|config.server_middleware.invoke(instance,job_hash,queue)doexecute_job(instance,job_hash["args"])endendack=truerescueSidekiq::Shutdown# Had to force kill this job because it didn't finish# within the timeout. Don't acknowledge the work since# we didn't properly finish it.rescueSidekiq::JobRetry::Skip=>s# Skip means we handled this error elsewhere. We don't# need to log or report the error.ack=trueraisesrescueSidekiq::JobRetry::Handled=>h# this is the common case: job raised error and Sidekiq::JobRetry::Handled# signals that we created a retry successfully. We can acknowledge the job.ack=truee=h.cause||hhandle_exception(e,{context: "Job raised exception",job: job_hash})raiseerescueException=>ex# Unexpected error! This is very bad and indicates an exception that got past# the retry subsystem (e.g. network partition). We won't acknowledge the job# so it can be rescued when using Sidekiq Pro.handle_exception(ex,{context: "Internal exception!",job: job_hash,jobstr: jobstr})raiseexendensureifackuow.acknowledgeendendenddefexecute_job(instance,cloned_args)instance.perform(*cloned_args)end# Ruby doesn't provide atomic counters out of the box so we'll# implement something simple ourselves.# https://bugs.ruby-lang.org/issues/14706classCounterdefinitialize@value=0@lock=Mutex.newenddefincr(amount=1)@lock.synchronize{@value+=amount}enddefreset@lock.synchronize{val=@value@value=0val}endend# jruby's Hash implementation is not threadsafe, so we wrap it in a mutex hereclassSharedWorkStatedefinitialize@work_state={}@lock=Mutex.newenddefset(tid,hash)@lock.synchronize{@work_state[tid]=hash}enddefdelete(tid)@lock.synchronize{@work_state.delete(tid)}enddefdup@lock.synchronize{@work_state.dup}enddefsize@lock.synchronize{@work_state.size}enddefclear@lock.synchronize{@work_state.clear}endendPROCESSED=Counter.newFAILURE=Counter.newWORK_STATE=SharedWorkState.newdefstats(jobstr,queue)WORK_STATE.set(tid,{queue: queue,payload: jobstr,run_at: Time.now.to_i})beginyieldrescueExceptionFAILURE.incrraiseensureWORK_STATE.delete(tid)PROCESSED.increndendendend