class Kafka::Client

def consumer(group_id:, session_timeout: 30, offset_commit_interval: 10, offset_commit_threshold: 0, heartbeat_interval: 10, offset_retention_time: nil)

Returns:
  • (Consumer) -

Parameters:
  • offset_retention_time (Integer) -- the time period that committed
  • heartbeat_interval (Integer) -- the interval between heartbeats; must be less
  • offset_commit_threshold (Integer) -- the number of messages that can be
  • offset_commit_interval (Integer) -- the interval between offset commits,
  • session_timeout (Integer) -- the number of seconds after which, if a client
  • group_id (String) -- the id of the group that the consumer should join.
def consumer(group_id:, session_timeout: 30, offset_commit_interval: 10, offset_commit_threshold: 0, heartbeat_interval: 10, offset_retention_time: nil)
  cluster = initialize_cluster
  instrumenter = DecoratingInstrumenter.new(@instrumenter, {
    group_id: group_id,
  })
  # The Kafka protocol expects the retention time to be in ms.
  retention_time = (offset_retention_time && offset_retention_time * 1_000) || -1
  group = ConsumerGroup.new(
    cluster: cluster,
    logger: @logger,
    group_id: group_id,
    session_timeout: session_timeout,
    retention_time: retention_time,
    instrumenter: instrumenter,
  )
  fetcher = Fetcher.new(
    cluster: initialize_cluster,
    logger: @logger,
    instrumenter: instrumenter,
  )
  offset_manager = OffsetManager.new(
    cluster: cluster,
    group: group,
    fetcher: fetcher,
    logger: @logger,
    commit_interval: offset_commit_interval,
    commit_threshold: offset_commit_threshold,
    offset_retention_time: offset_retention_time
  )
  heartbeat = Heartbeat.new(
    group: group,
    interval: heartbeat_interval,
  )
  Consumer.new(
    cluster: cluster,
    logger: @logger,
    instrumenter: instrumenter,
    group: group,
    offset_manager: offset_manager,
    fetcher: fetcher,
    session_timeout: session_timeout,
    heartbeat: heartbeat,
  )
end