class Kafka::Consumer
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:)
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:) @cluster = cluster @logger = logger @instrumenter = instrumenter @group = group @offset_manager = offset_manager @session_timeout = session_timeout @fetcher = fetcher @heartbeat = heartbeat @pauses = Hash.new {|h, k| h[k] = Hash.new {|h2, k2| h2[k2] = Pause.new } } # Whether or not the consumer is currently consuming messages. @running = false # Hash containing offsets for each topic and partition that has the # automatically_mark_as_processed feature disabled. Offset manager is only active # when everything is suppose to happen automatically. Otherwise we need to keep track of the # offset manually in memory for all the time # The key structure for this equals an array with topic and partition [topic, partition] # The value is equal to the offset of the last message we've received # @note It won't be updated in case user marks message as processed, because for the case # when user commits message other than last in a batch, this would make ruby-kafka refetch # some already consumed messages @current_offsets = Hash.new { |h, k| h[k] = {} } end