moduleCanvasSyncmoduleJobBatchesclassPoolincludeRedisModelHINCR_MAX=RedisScript.new(Pathname.new(__FILE__)+"../hincr_max.lua")attr_reader:pidredis_attr:descriptionredis_attr:created_atredis_attr:concurrency,:intredis_attr:orderredis_attr:on_failed_job,:symbolredis_attr:clean_when_empty,:booldefinitialize(pooolid=nil,**kwargs)ifpooolid@existing=true@pid=pooolidelse@pid=SecureRandom.urlsafe_base64(10)initialize_new(**kwargs)endenddefself.from_pid(pid)raise"PID must be given"unlesspid.present?new(pid)enddef<<(job_desc)add_job(job_desc)enddefadd_job(job_desc)add_jobs([job_desc])enddefadd_jobs(job_descs)job_descs.eachdo|job_desc|wrapper=Batch.newwrapper.description="Pool Job Wrapper (PID: #{pid})"checkin_event=(on_failed_job==:wait)?:success::completewrapper.on(checkin_event,"#{self.class.to_s}.job_checked_in",pool_id: pid)wrapper.jobs{}job_desc=job_desc.symbolize_keysjob_desc=job_desc.merge!(job: job_desc[:job].to_s,pool_wrapper_batch: wrapper.bid,)push_job_to_pool(job_desc)endrefill_allotmentenddefkeep_open!ifblock_given?beginkeep_open!yieldensurelet_close!endelseredis.hset(redis_key,'keep_open',true)endenddeflet_close!_,active_count=redis.multido|r|r.hset(redis_key,'keep_open',false)r.hincrby(redis_key,"active_count",0)endifactive_count==0&&pending_count==0cleanup_redisifclean_when_emptyendenddefcleanup_redisBatch.logger.debug{"Cleaning redis of pool #{pid}"}redisdo|r|r.zrem("pools",pid)r.unlink("#{redis_key}","#{redis_key}-jobs",)endenddefactive_countredis.hincrby(redis_key,"active_count",0)enddefpending_count(r=redis)jobs_key="#{redis_key}-jobs"order=self.order||'fifo'caseorder.to_symwhen:fifo,:lifor.llen(jobs_key)when:randomr.scard(jobs_key)when:priorityr.zcard(jobs_key)endenddefjob_checked_in(status,options)active_count,pending_count=redisdo|r|returnunlessr.exists?(redis_key)# Make sure this is loaded outside of the pipelineself.orderredis.multido|r|r.hincrby(redis_key,"active_count",-1)self.pending_count(r)endendadded_count=refill_allotmentifactive_count==0&&added_count==0&&pending_count==0ifclean_when_empty&&redis.hget(redis_key,'keep_open')!='true'cleanup_redisendendenddefself.job_checked_in(status,options)pid=options['pool_id']from_pid(pid).job_checked_in(status,options)endprotecteddefredis_key"POOLID-#{pid}"enddefrefill_allotmentjobs_added=0limit=concurrency.to_iredisdo|r|current_count=0whiletruecurrent_count=HINCR_MAX.call(r,[redis_key],["active_count",limit]).to_iifcurrent_count<limitjob_desc=pop_job_from_poolifjob_desc.present?Batch.new(job_desc[:pool_wrapper_batch]).jobsdoChainBuilder.enqueue_job(job_desc)endjobs_added+=1elser.hincrby(redis_key,"active_count",-1)breakendelsebreakendendr.expire(redis_key,Batch::BID_EXPIRE_TTL)r.expire("#{redis_key}-jobs",Batch::BID_EXPIRE_TTL)endjobs_addedenddefpush_job_to_pool(job_desc)jobs_key="#{redis_key}-jobs"# This allows duplicate jobs when a Redis Set is usedjob_desc[:_pool_random_key_]=SecureRandom.urlsafe_base64(10)job_json=JSON.unparse(::ActiveJob::Arguments.serialize([job_desc]))order=self.orderredis.multido|r|caseorder.to_symwhen:fifo,:lifor.rpush(jobs_key,job_json)when:randomr.sadd(jobs_key,job_json)when:priorityr.zadd(jobs_key,job_desc[:priority]||0,job_json)endr.expire(jobs_key,Batch::BID_EXPIRE_TTL)endenddefpop_job_from_pooljobs_key="#{redis_key}-jobs"order=self.orderjob_json=caseorder.to_symwhen:fiforedis.lpop(jobs_key)when:liforedis.rpop(jobs_key)when:randomredis.spop(jobs_key)when:priorityredis.zpopmax(jobs_key)endreturnnilunlessjob_json.present?::ActiveJob::Arguments.deserialize(JSON.parse(job_json))[0]&.symbolize_keysenddefself.redis(&blk)Batch.redis&blkenddelegate:redis,to: :classdefflush_pending_attrssuperredis.zadd("pools",created_at,pid)endprivatedefinitialize_new(concurrency: nil,order: :fifo,clean_when_empty: true,on_failed_job: :wait,description: nil)self.created_at=Time.now.utc.to_fself.description=descriptionself.order=orderself.concurrency=concurrencyself.clean_when_empty=clean_when_emptyself.on_failed_job=on_failed_jobflush_pending_attrsendendendend