class Kafka::AsyncProducer::Worker
def deliver_messages
def deliver_messages @producer.deliver_messages rescue DeliveryFailed, ConnectionError # Failed to deliver messages -- nothing to do but try again later. end
def initialize(queue:, producer:, delivery_threshold:, instrumenter:, logger:)
def initialize(queue:, producer:, delivery_threshold:, instrumenter:, logger:) @queue = queue @producer = producer @delivery_threshold = delivery_threshold @instrumenter = instrumenter @logger = logger end
def produce(*args)
def produce(*args) @producer.produce(*args) rescue BufferOverflow deliver_messages retry end
def run
def run @logger.info "Starting async producer in the background..." loop do operation, payload = @queue.pop case operation when :produce produce(*payload) deliver_messages if threshold_reached? when :deliver_messages deliver_messages when :shutdown begin # Deliver any pending messages first. @producer.deliver_messages rescue Error => e @logger.error("Failed to deliver messages during shutdown: #{e.message}") @instrumenter.instrument("drop_messages.async_producer", { message_count: @producer.buffer_size + @queue.size, }) end # Stop the run loop. break else raise "Unknown operation #{operation.inspect}" end end rescue Kafka::Error => e @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" @logger.info "Restarting in 10 seconds..." sleep 10 retry rescue Exception => e @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}" @logger.error "Async producer crashed!" ensure @producer.shutdown end
def threshold_reached?
def threshold_reached? @delivery_threshold > 0 && @producer.buffer_size >= @delivery_threshold end