# frozen_string_literal: truemoduleSidekiqUniqueJobs# Lock manager class that handles all the various locks## @author Mikael Henriksson <mikael@mhenrixon.com>classLocksmith# rubocop:disable Metrics/ClassLength# includes "SidekiqUniqueJobs::Connection"# @!parse include SidekiqUniqueJobs::ConnectionincludeSidekiqUniqueJobs::Connection# includes "SidekiqUniqueJobs::Logging"# @!parse include SidekiqUniqueJobs::LoggingincludeSidekiqUniqueJobs::Logging# includes "SidekiqUniqueJobs::Timing"# @!parse include SidekiqUniqueJobs::TimingincludeSidekiqUniqueJobs::Timing# includes "SidekiqUniqueJobs::Script::Caller"# @!parse include SidekiqUniqueJobs::Script::CallerincludeSidekiqUniqueJobs::Script::Caller# includes "SidekiqUniqueJobs::JSON"# @!parse include SidekiqUniqueJobs::JSONincludeSidekiqUniqueJobs::JSON## @return [Float] used to take into consideration the inaccuracy of redis timestampsCLOCK_DRIFT_FACTOR=0.01## @!attribute [r] key# @return [Key] the key used for lockingattr_reader:key## @!attribute [r] job_id# @return [String] a sidekiq JIDattr_reader:job_id## @!attribute [r] config# @return [LockConfig] the configuration for this lockattr_reader:config## @!attribute [r] item# @return [Hash] a sidekiq job hashattr_reader:item## Initialize a new Locksmith instance## @param [Hash] item a Sidekiq job hash# @option item [Integer] :lock_ttl the configured expiration# @option item [String] :jid the sidekiq job id# @option item [String] :unique_digest the unique digest (See: {LockDigest#lock_digest})# @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection#definitialize(item,redis_pool=nil)@item=item@key=Key.new(item[LOCK_DIGEST]||item[UNIQUE_DIGEST])# fallback until can be removed@job_id=item[JID]@config=LockConfig.new(item)@redis_pool=redis_poolend## Deletes the lock unless it has a pttl set##defdeletereturnifconfig.pttl.positive?delete!end## Deletes the lock regardless of if it has a pttl set#defdelete!call_script(:delete,key.to_a,[job_id,config.pttl,config.type,config.limit]).positive?end## Create a lock for the Sidekiq job## @return [String] the Sidekiq job_id that was locked/queued#deflock(&block)redis(redis_pool)do|conn|returnlock_async(conn,&block)ifblock_given?lock_sync(conn)doreturnjob_idendendend## Removes the lock keys from Redis if locked by the provided jid/token## @return [false] unless locked?# @return [String] Sidekiq job_id (jid) if successful#defunlock(conn=nil)returnfalseunlesslocked?(conn)unlock!(conn)end## Removes the lock keys from Redis## @return [false] unless locked?# @return [String] Sidekiq job_id (jid) if successful#defunlock!(conn=nil)call_script(:unlock,key.to_a,argv,conn)end# Checks if this instance is considered locked## @return [true, false] true when the :LOCKED hash contains the job_id#deflocked?(conn=nil)returntaken?(conn)ifconnredis{|rcon|taken?(rcon)}end## Nicely formatted string with information about self### @return [String]#defto_s"Locksmith##{object_id}(digest=#{key} job_id=#{job_id}, locked=#{locked?})"end## @see to_s#definspectto_send## Compare this locksmith with another## @param [Locksmith] other the locksmith to compare with## @return [true, false]#def==(other)key==other.key&&job_id==other.job_idendprivateattr_reader:redis_pooldefargv[job_id,config.pttl,config.type,config.limit]end## Used for runtime locks that need automatic unlock after yielding## @param [Redis] conn a redis connection## @return [nil] when lock was not possible# @return [Object] whatever the block returns when lock was acquired## @yieldparam [String] job_id a Sidekiq JID#deflock_async(conn)returnyieldjob_idiflocked?(conn)enqueue(conn)doprimed_async(conn)dolocked_token=call_script(:lock,key.to_a,argv,conn)returnyieldiflocked_token==job_idendendensureunlock!(conn)end## Pops an enqueued token# @note Used for runtime locks to avoid problems with blocking commands# in current thread## @param [Redis] conn a redis connection## @return [nil] when lock was not possible# @return [Object] whatever the block returns when lock was acquired#defprimed_async(conn)returnyieldifConcurrent::Promises.future(conn){|red_con|pop_queued(red_con)}.value(drift(config.ttl))warn_about_timeoutend## Used for non-runtime locks (no block was given)## @param [Redis] conn a redis connection## @return [nil] when lock was not possible# @return [Object] whatever the block returns when lock was acquired## @yieldparam [String] job_id a Sidekiq JID#deflock_sync(conn)returnyieldiflocked?(conn)enqueue(conn)doprimed_sync(conn)dolocked_token=call_script(:lock,key.to_a,argv,conn)returnyieldiflocked_tokenendendend## Pops an enqueued token# @note Used for non-runtime locks## @param [Redis] conn a redis connection## @return [nil] when lock was not possible# @return [Object] whatever the block returns when lock was acquired#defprimed_sync(conn)returnyieldifpop_queued(conn)warn_about_timeoutend## Does the actual popping of the enqueued token## @param [Redis] conn a redis connection## @return [String] a previously enqueued token (now taken off the queue)#defpop_queued(conn)ifconfig.wait_for_lock?brpoplpush(conn)elserpoplpush(conn)endend## @api private#defbrpoplpush(conn)# passing timeout 0 to brpoplpush causes it to block indefinitelyconn.brpoplpush(key.queued,key.primed,timeout: config.timeout||0)end## @api private#defrpoplpush(conn)conn.rpoplpush(key.queued,key.primed)end## Prepares all the various lock data## @param [Redis] conn a redis connection## @return [nil] when redis was already prepared for this lock# @return [yield<String>] when successfully enqueued#defenqueue(conn)queued_token,elapsed=timeddocall_script(:queue,key.to_a,argv,conn)endvalidity=config.pttl-elapsed-drift(config.pttl)returnunlessqueued_token&&(validity>=0||config.pttl.zero?)write_lock_info(conn)yieldend## Writes lock information to redis.# The lock information contains information about worker, queue, limit etc.### @return [void]#defwrite_lock_info(conn)returnunlessconfig.lock_infoconn.set(key.info,lock_info)end## Used to combat redis imprecision with ttl/pttl## @param [Integer] val the value to compute drift for## @return [Integer] a computed drift value#defdrift(val)# Add 2 milliseconds to the drift to account for Redis expires# precision, which is 1 millisecond, plus 1 millisecond min drift# for small TTLs.(val.to_i*CLOCK_DRIFT_FACTOR).to_i+2end## Checks if the lock has been taken## @param [Redis] conn a redis connection## @return [true, false]#deftaken?(conn)conn.hexists(key.locked,job_id)enddefwarn_about_timeoutlog_warn("Timed out after #{config.timeout}s while waiting for primed token (digest: #{key}, job_id: #{job_id})")enddeflock_info@lock_info||=dump_json(WORKER=>item[CLASS],QUEUE=>item[QUEUE],LIMIT=>item[LOCK_LIMIT],TIMEOUT=>item[LOCK_TIMEOUT],TTL=>item[LOCK_TTL],LOCK=>config.type,LOCK_ARGS=>item[LOCK_ARGS],TIME=>now_f,)endendend