lib/sidekiq/client.rb



# frozen_string_literal: true

require "securerandom"
require "sidekiq/middleware/chain"
require "sidekiq/job_util"

module Sidekiq
  class Client
    include Sidekiq::JobUtil

    ##
    # Define client-side middleware:
    #
    #   client = Sidekiq::Client.new
    #   client.middleware do |chain|
    #     chain.use MyClientMiddleware
    #   end
    #   client.push('class' => 'SomeJob', 'args' => [1,2,3])
    #
    # All client instances default to the globally-defined
    # Sidekiq.client_middleware but you can change as necessary.
    #
    def middleware(&block)
      if block
        @chain = @chain.dup
        yield @chain
      end
      @chain
    end

    attr_accessor :redis_pool

    # Sidekiq::Client is responsible for pushing job payloads to Redis.
    # Requires the :pool or :config keyword argument.
    #
    #   Sidekiq::Client.new(pool: Sidekiq::RedisConnection.create)
    #
    # Inside the Sidekiq process, you can reuse the configured resources:
    #
    #   Sidekiq::Client.new(config: config)
    #
    # @param pool [ConnectionPool] explicit Redis pool to use
    # @param config [Sidekiq::Config] use the pool and middleware from the given Sidekiq container
    # @param chain [Sidekiq::Middleware::Chain] use the given middleware chain
    def initialize(*args, **kwargs)
      if args.size == 1 && kwargs.size == 0
        warn "Sidekiq::Client.new(pool) is deprecated, please use Sidekiq::Client.new(pool: pool), #{caller(0..3)}"
        # old calling method, accept 1 pool argument
        @redis_pool = args[0]
        @chain = Sidekiq.default_configuration.client_middleware
        @config = Sidekiq.default_configuration
      else
        # new calling method: keyword arguments
        @config = kwargs[:config] || Sidekiq.default_configuration
        @redis_pool = kwargs[:pool] || Thread.current[:sidekiq_redis_pool] || @config&.redis_pool
        @chain = kwargs[:chain] || @config&.client_middleware
        raise ArgumentError, "No Redis pool available for Sidekiq::Client" unless @redis_pool
      end
    end

    # Cancel the IterableJob with the given JID.
    # **NB: Cancellation is asynchronous.** Iteration checks every
    # five seconds so this will not immediately stop the given job.
    def cancel!(jid)
      key = "it-#{jid}"
      _, result, _ = Sidekiq.redis do |c|
        c.pipelined do |p|
          p.hsetnx(key, "cancelled", Time.now.to_i)
          p.hget(key, "cancelled")
          p.expire(key, Sidekiq::Job::Iterable::STATE_TTL, "nx")
        end
      end
      result.to_i
    end

    ##
    # The main method used to push a job to Redis.  Accepts a number of options:
    #
    #   queue - the named queue to use, default 'default'
    #   class - the job class to call, required
    #   args - an array of simple arguments to the perform method, must be JSON-serializable
    #   at - timestamp to schedule the job (optional), must be Numeric (e.g. Time.now.to_f)
    #   retry - whether to retry this job if it fails, default true or an integer number of retries
    #   retry_for - relative amount of time to retry this job if it fails, default nil
    #   backtrace - whether to save any error backtrace, default false
    #
    # If class is set to the class name, the jobs' options will be based on Sidekiq's default
    # job options. Otherwise, they will be based on the job class's options.
    #
    # Any options valid for a job class's sidekiq_options are also available here.
    #
    # All keys must be strings, not symbols.  NB: because we are serializing to JSON, all
    # symbols in 'args' will be converted to strings.  Note that +backtrace: true+ can take quite a bit of
    # space in Redis; a large volume of failing jobs can start Redis swapping if you aren't careful.
    #
    # Returns a unique Job ID.  If middleware stops the job, nil will be returned instead.
    #
    # Example:
    #   push('queue' => 'my_queue', 'class' => MyJob, 'args' => ['foo', 1, :bat => 'bar'])
    #
    def push(item)
      normed = normalize_item(item)
      payload = middleware.invoke(item["class"], normed, normed["queue"], @redis_pool) do
        normed
      end
      if payload
        verify_json(payload)
        raw_push([payload])
        payload["jid"]
      end
    end

    ##
    # Push a large number of jobs to Redis. This method cuts out the redis
    # network round trip latency. It pushes jobs in batches if more than
    # `:batch_size` (1000 by default) of jobs are passed. I wouldn't recommend making `:batch_size`
    # larger than 1000 but YMMV based on network quality, size of job args, etc.
    # A large number of jobs can cause a bit of Redis command processing latency.
    #
    # Takes the same arguments as #push except that args is expected to be
    # an Array of Arrays.  All other keys are duplicated for each job.  Each job
    # is run through the client middleware pipeline and each job gets its own Job ID
    # as normal.
    #
    # Returns an array of the of pushed jobs' jids, may contain nils if any client middleware
    # prevented a job push.
    #
    # Example (pushing jobs in batches):
    #   push_bulk('class' => MyJob, 'args' => (1..100_000).to_a, batch_size: 1_000)
    #
    def push_bulk(items)
      batch_size = items.delete(:batch_size) || items.delete("batch_size") || 1_000
      args = items["args"]
      at = items.delete("at")
      raise ArgumentError, "Job 'at' must be a Numeric or an Array of Numeric timestamps" if at && (Array(at).empty? || !Array(at).all? { |entry| entry.is_a?(Numeric) })
      raise ArgumentError, "Job 'at' Array must have same size as 'args' Array" if at.is_a?(Array) && at.size != args.size

      jid = items.delete("jid")
      raise ArgumentError, "Explicitly passing 'jid' when pushing more than one job is not supported" if jid && args.size > 1

      normed = normalize_item(items)
      slice_index = 0
      result = args.each_slice(batch_size).flat_map do |slice|
        raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" unless slice.is_a?(Array) && slice.all?(Array)
        break [] if slice.empty? # no jobs to push

        payloads = slice.map.with_index { |job_args, index|
          copy = normed.merge("args" => job_args, "jid" => SecureRandom.hex(12))
          copy["at"] = (at.is_a?(Array) ? at[slice_index + index] : at) if at
          result = middleware.invoke(items["class"], copy, copy["queue"], @redis_pool) do
            verify_json(copy)
            copy
          end
          result || nil
        }
        slice_index += batch_size

        to_push = payloads.compact
        raw_push(to_push) unless to_push.empty?
        payloads.map { |payload| payload&.[]("jid") }
      end

      result.is_a?(Enumerator::Lazy) ? result.force : result
    end

    # Allows sharding of jobs across any number of Redis instances.  All jobs
    # defined within the block will use the given Redis connection pool.
    #
    #   pool = ConnectionPool.new { Redis.new }
    #   Sidekiq::Client.via(pool) do
    #     SomeJob.perform_async(1,2,3)
    #     SomeOtherJob.perform_async(1,2,3)
    #   end
    #
    # Generally this is only needed for very large Sidekiq installs processing
    # thousands of jobs per second.  I do not recommend sharding unless
    # you cannot scale any other way (e.g. splitting your app into smaller apps).
    def self.via(pool)
      raise ArgumentError, "No pool given" if pool.nil?
      current_sidekiq_pool = Thread.current[:sidekiq_redis_pool]
      Thread.current[:sidekiq_redis_pool] = pool
      yield
    ensure
      Thread.current[:sidekiq_redis_pool] = current_sidekiq_pool
    end

    class << self
      def push(item)
        new.push(item)
      end

      def push_bulk(...)
        new.push_bulk(...)
      end

      # Resque compatibility helpers.  Note all helpers
      # should go through Sidekiq::Job#client_push.
      #
      # Example usage:
      #   Sidekiq::Client.enqueue(MyJob, 'foo', 1, :bat => 'bar')
      #
      # Messages are enqueued to the 'default' queue.
      #
      def enqueue(klass, *args)
        klass.client_push("class" => klass, "args" => args)
      end

      # Example usage:
      #   Sidekiq::Client.enqueue_to(:queue_name, MyJob, 'foo', 1, :bat => 'bar')
      #
      def enqueue_to(queue, klass, *args)
        klass.client_push("queue" => queue, "class" => klass, "args" => args)
      end

      # Example usage:
      #   Sidekiq::Client.enqueue_to_in(:queue_name, 3.minutes, MyJob, 'foo', 1, :bat => 'bar')
      #
      def enqueue_to_in(queue, interval, klass, *args)
        int = interval.to_f
        now = Time.now.to_f
        ts = ((int < 1_000_000_000) ? now + int : int)

        item = {"class" => klass, "args" => args, "at" => ts, "queue" => queue}
        item.delete("at") if ts <= now

        klass.client_push(item)
      end

      # Example usage:
      #   Sidekiq::Client.enqueue_in(3.minutes, MyJob, 'foo', 1, :bat => 'bar')
      #
      def enqueue_in(interval, klass, *args)
        klass.perform_in(interval, *args)
      end
    end

    private

    def raw_push(payloads)
      @redis_pool.with do |conn|
        retryable = true
        begin
          conn.pipelined do |pipeline|
            atomic_push(pipeline, payloads)
          end
        rescue RedisClient::Error => ex
          # 2550 Failover can cause the server to become a replica, need
          # to disconnect and reopen the socket to get back to the primary.
          # 4495 Use the same logic if we have a "Not enough replicas" error from the primary
          # 4985 Use the same logic when a blocking command is force-unblocked
          # The retry logic is copied from sidekiq.rb
          if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/
            conn.close
            retryable = false
            retry
          end
          raise
        end
      end
      true
    end

    def atomic_push(conn, payloads)
      if payloads.first.key?("at")
        conn.zadd("schedule", payloads.flat_map { |hash|
          at = hash["at"].to_s
          # ActiveJob sets enqueued_at but the job has not been enqueued yet
          hash = hash.except("enqueued_at", "at")
          [at, Sidekiq.dump_json(hash)]
        })
      else
        now = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond) # milliseconds since the epoch
        grouped_queues = payloads.group_by { |job| job["queue"] }
        conn.sadd("queues", grouped_queues.keys)
        grouped_queues.each do |queue, grouped_payloads|
          to_push = grouped_payloads.map { |entry|
            entry["enqueued_at"] = now
            Sidekiq.dump_json(entry)
          }
          conn.lpush("queue:#{queue}", to_push)
        end
      end
    end
  end
end