moduleResquemoduleDynamicQueuesdeffilter_busy_queuesqsbusy_queues=Resque::Worker.working.map{|worker|worker.job["queue"]}.compactArray(qs.dup).compact-busy_queuesenddefrotated_queues@n||=0@n+=1rot_queues=queues# since we rely on the resque-dynamic-queues plugin, this is all the queues, expanded outifrot_queues.size>0@n=@n%rot_queues.sizerot_queues.rotate(@n)elserot_queuesendenddefqueue_depthqueuenamebusy_queues=Resque::Worker.working.map{|worker|worker.job["queue"]}.compact# find the queuename, count it.busy_queues.select{|q|q==queuename}.sizeenddefget_categorized_queues(queue_list)priority_map={"Synchronous"=>0,"High"=>1,"Medium"=>2,"Low"=>3}categorized_queues={}forqueueinqueue_list.uniqpriority=queue.split("_")[1]priority="Medium"if!["Synchronous","High","Medium","Low"].include?(priority)categorized_queues[priority]||=[]categorized_queues[priority].push(queue)endreturncategorized_queues.transform_keys{|key|priority_map[key.to_s]}.sortendDEFAULT_QUEUE_DEPTH=0defshould_work_on_queue?queuenamereturntrueif@queues.include?'*'# workers with QUEUES=* are special and are not subject to queue depth settingmax=DEFAULT_QUEUE_DEPTHunlessENV["RESQUE_QUEUE_DEPTH"].nil?||ENV["RESQUE_QUEUE_DEPTH"]==""max=ENV["RESQUE_QUEUE_DEPTH"].to_iendreturntrueifmax==0# 0 means no limitingcur_depth=queue_depth(queuename)log!"queue #{queuename} depth = #{cur_depth} max = #{max}"returntrueifcur_depth<maxfalseenddefget_grouped_queuesself.queues.sort.group_by{|u|/(\d{1,20})_.*/.match(u)?/(\d{1,20})_.*/.match(u).captures.first:nil}enddefreserve_with_round_robingrouped_queues=self.get_grouped_queues#Instance queue groupingif!grouped_queues.keys.include?(nil)&&grouped_queues.keys.size>0ifZuoraConnect.configuration.blpop_queue@job_in_progress=get_restricted_jobreturn@job_in_progressif@job_in_progress.present?return@job_in_progress=get_queued_job(grouped_queues)else@n||=0@n+=1@n=@n%grouped_queues.keys.sizegrouped_queues.keys.rotate(@n).eachdo|key|self.get_categorized_queues(grouped_queues[key]).eachdo|key,queues|queues.eachdo|queue|log!"Checking #{queue}"ifshould_work_on_queue?(queue)&&@job_in_progress=Resque::Job.reserve(queue)log!"Found job on #{queue}"return@job_in_progressendendend@n+=1# Start the next search at the queue after the one from which we pick a job.endnilendelsereturnreserve_without_round_robinendrescueException=>exifdefined?(Ougai::Logger)&&Resque.logger.is_a?(Ougai::Logger)log_with_severity:error,"Error reserving job",exelselog_with_severity:error,"Error reserving job: #{ex.inspect}"log_with_severity:error,ex.backtrace.join("\n")endraiseeenddefcreate_job(queue,payload)returnunlesspayloadResque::Job.new(queue,payload)enddefget_next_job(grouped_queues)@n||=0low_priority_queues=[]high_priority_queues=[]# Categorize queues as low_priority if its appinstance id is lower than currentgrouped_queues.sort_by{|app_instance_id,_|app_instance_id.to_i}.eachdo|app_instance_queues|app_instance_id,queues=app_instance_queuesqueues=get_categorized_queues(queues).to_h.values.flattenapp_instance_id.to_i<=@n?low_priority_queues.push(queues):high_priority_queues.push(queues)end# Reorder queue groups by putting high priority queues in the frontgrouped_queues=(high_priority_queues+low_priority_queues).flatten.delete_if{|queue|!should_work_on_queue?(queue)}.map{|queue|"queue:#{queue}"}queue,payload=get_job_from_queues(grouped_queues)returnnilifqueue.blank?queue=queue.split("queue:")[1]# track last app instance id@n=queue.split('_').first.to_ireturncreate_job(queue,Resque.decode(payload))enddefget_job_from_queues(grouped_queues)Resque.redis.blpop(grouped_queues,:timeout=>(ENV["BLPOP_TIMEOUT"].to_i||30))enddefget_restricted_jobResque::Plugins::ConcurrentRestrictionJob.next_runnable_job_randomenddefget_queued_job(grouped_queues)ifdefined?(Resque::Plugins::ConcurrentRestriction)# Bounded retry1.upto(Resque::Plugins::ConcurrentRestriction.reserve_queued_job_attempts)do|i|resque_job=get_next_job(grouped_queues)# Short-curcuit if a job was not foundreturnifresque_job.nil?# If there is a job on regular queues, then only run it if its not restrictedjob_class=resque_job.payload_classjob_args=resque_job.args# Return to work on job if not a restricted jobreturnresque_jobunlessjob_class.is_a?(Resque::Plugins::ConcurrentRestriction)# Keep trying if job is restricted. If job is runnable, we keep the lock until# done_workingreturnresque_jobunlessjob_class.stash_if_restricted(resque_job)end# Safety net, here in case we hit the upper bound and there are still queued itemsreturnnilelsereturnget_next_job(grouped_queues)endend# Returns a list of queues to use when searching for a job.## A splat ("*") means you want every queue (in alpha order) - this# can be useful for dynamically adding new queues.## The splat can also be used as a wildcard within a queue name,# e.g. "*high*", and negation can be indicated with a prefix of "!"## An @key can be used to dynamically look up the queue list for key from redis.# If no key is supplied, it defaults to the worker's hostname, and wildcards# and negations can be used inside this dynamic queue list. Set the queue# list for a key with Resque.set_dynamic_queue(key, ["q1", "q2"]#defqueues_with_dynamicqueue_names=@queues.dupreturnqueues_without_dynamicifqueue_names.grep(/(^!)|(^@)|(\*)/).size==0real_queues=Resque.queuesmatched_queues=[]#Remove Queues under Api LimitsRedis.current.zremrangebyscore("APILimits","0","(#{Time.now.to_i}")api_limit_instances=Redis.current.zrange("APILimits",0,-1).map{|key|key.to_iifkey.match(/^\d*$/)}.compactreal_queues=real_queues.select{|key|keyif!api_limit_instances.include?((key.match(/^(\d*)_.*/)||[])[1].to_i)}## 2#Queue Pausing Resque.redis.zremrangebyscore("PauseQueue","0","(#{Time.now.to_i}")paused_instances=Resque.redis.zrange("PauseQueue",0,-1).map{|key|key.split("__")[0].to_iifkey.match(/^\d*__.*/)}.compactreal_queues=real_queues.select{|key|keyif!paused_instances.include?((key.match(/^(\d*)_.*/)||[])[1].to_i)}whileq=queue_names.shiftq=q.to_sifq=~/^(!)?@(.*)/key=$2.stripkey=hostnameifkey.size==0add_queues=Resque.get_dynamic_queue(key)add_queues.map!{|q|q.gsub!(/^!/,'')||q.gsub!(/^/,'!')}if$1queue_names.concat(add_queues)nextendifq=~/^!/negated=trueq=q[1..-1]endpatstr=q.gsub(/\*/,'.*')pattern=/^#{patstr}$/ifnegatedmatched_queues-=matched_queues.grep(pattern)elsematches=real_queues.grep(/^#{pattern}$/)matches=[q]ifmatches.size==0&&q==patstrmatched_queues.concat(matches.sort)endendreturnmatched_queues.uniqenddefself.included(receiver)receiver.class_evaldoaliasqueues_without_dynamicqueuesaliasqueuesqueues_with_dynamicaliasreserve_without_round_robinreservealiasreservereserve_with_round_robinendendendend