class Kafka::AsyncProducer::Worker

def deliver_messages

def deliver_messages
  @producer.deliver_messages
rescue DeliveryFailed, ConnectionError
  # Failed to deliver messages -- nothing to do but try again later.
end

def initialize(queue:, producer:, delivery_threshold:, instrumenter:, logger:)

def initialize(queue:, producer:, delivery_threshold:, instrumenter:, logger:)
  @queue = queue
  @producer = producer
  @delivery_threshold = delivery_threshold
  @instrumenter = instrumenter
  @logger = logger
end

def produce(*args)

def produce(*args)
  @producer.produce(*args)
rescue BufferOverflow
  deliver_messages
  retry
end

def run

def run
  @logger.info "Starting async producer in the background..."
  loop do
    operation, payload = @queue.pop
    case operation
    when :produce
      produce(*payload)
      deliver_messages if threshold_reached?
    when :deliver_messages
      deliver_messages
    when :shutdown
      begin
        # Deliver any pending messages first.
        @producer.deliver_messages
      rescue Error => e
        @logger.error("Failed to deliver messages during shutdown: #{e.message}")
        @instrumenter.instrument("drop_messages.async_producer", {
          message_count: @producer.buffer_size + @queue.size,
        })
      end
      # Stop the run loop.
      break
    else
      raise "Unknown operation #{operation.inspect}"
    end
  end
rescue Kafka::Error => e
  @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
  @logger.info "Restarting in 10 seconds..."
  sleep 10
  retry
rescue Exception => e
  @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
  @logger.error "Async producer crashed!"
ensure
  @producer.shutdown
end

def threshold_reached?

def threshold_reached?
  @delivery_threshold > 0 &&
    @producer.buffer_size >= @delivery_threshold
end