class Kafka::OffsetManager

from, etc.
Manages a consumer’s position in partitions, figures out where to resume processing

def clear_offsets

Returns:
  • (nil) -
def clear_offsets
  @processed_offsets.clear
  @resolved_offsets.clear
  # Clear the cached commits from the brokers.
  @committed_offsets = nil
end

def clear_offsets_excluding(excluded)

Returns:
  • (nil) -
def clear_offsets_excluding(excluded)
  # Clear all offsets that aren't in `excluded`.
  @processed_offsets.each do |topic, partitions|
    partitions.keep_if do |partition, _|
      excluded.fetch(topic, []).include?(partition)
    end
  end
  # Clear the cached commits from the brokers.
  @committed_offsets = nil
  @resolved_offsets.clear
end

def clear_resolved_offset(topic)

def clear_resolved_offset(topic)
  @resolved_offsets.delete(topic)
end

def commit_offsets(recommit = false)

Returns:
  • (nil) -

Parameters:
  • recommit (Boolean) -- whether to recommit offsets that have already been
def commit_offsets(recommit = false)
  offsets = offsets_to_commit(recommit)
  unless offsets.empty?
    @logger.info "Committing offsets#{recommit ? ' with recommit' : ''}: #{prettify_offsets(offsets)}"
    @group.commit_offsets(offsets)
    @last_commit = Time.now
    @last_recommit = Time.now if recommit
    @uncommitted_offsets = 0
    @committed_offsets = nil
  end
end

def commit_offsets_if_necessary

Returns:
  • (nil) -
def commit_offsets_if_necessary
  recommit = recommit_timeout_reached?
  if recommit || commit_timeout_reached? || commit_threshold_reached?
    commit_offsets(recommit)
  end
end

def commit_threshold_reached?

def commit_threshold_reached?
  @commit_threshold != 0 && @uncommitted_offsets >= @commit_threshold
end

def commit_timeout_reached?

def commit_timeout_reached?
  @commit_interval != 0 && seconds_since_last_commit >= @commit_interval
end

def committed_offset_for(topic, partition)

def committed_offset_for(topic, partition)
  committed_offsets.offset_for(topic, partition)
end

def committed_offsets

def committed_offsets
  @committed_offsets ||= @group.fetch_offsets
end

def fetch_resolved_offsets(topic)

def fetch_resolved_offsets(topic)
  default_offset = @default_offsets.fetch(topic)
  partitions = @group.assigned_partitions.fetch(topic)
  @cluster.resolve_offsets(topic, partitions, default_offset)
end

def initialize(cluster:, group:, fetcher:, logger:, commit_interval:, commit_threshold:, offset_retention_time:)

def initialize(cluster:, group:, fetcher:, logger:, commit_interval:, commit_threshold:, offset_retention_time:)
  @cluster = cluster
  @group = group
  @fetcher = fetcher
  @logger = logger
  @commit_interval = commit_interval
  @commit_threshold = commit_threshold
  @uncommitted_offsets = 0
  @processed_offsets = {}
  @default_offsets = {}
  @committed_offsets = nil
  @resolved_offsets = {}
  @last_commit = Time.now
  @last_recommit = nil
  @recommit_interval = (offset_retention_time || DEFAULT_RETENTION_TIME) / 2
end

def mark_as_processed(topic, partition, offset)

Returns:
  • (nil) -

Parameters:
  • offset (Integer) -- the offset of the message that should be marked as processed.
  • partition (Integer) -- the partition number.
  • topic (String) -- the name of the topic.
def mark_as_processed(topic, partition, offset)
  @uncommitted_offsets += 1
  @processed_offsets[topic] ||= {}
  # The committed offset should always be the offset of the next message that the
  # application will read, thus adding one to the last message processed.
  @processed_offsets[topic][partition] = offset + 1
  @logger.debug "Marking #{topic}/#{partition}:#{offset} as processed"
end

def next_offset_for(topic, partition)

Returns:
  • (Integer) - the next offset that should be fetched.

Parameters:
  • partition (Integer) -- the partition number.
  • topic (String) -- the name of the topic.
def next_offset_for(topic, partition)
  offset = @processed_offsets.fetch(topic, {}).fetch(partition) {
    committed_offset_for(topic, partition)
  }
  # A negative offset means that no offset has been committed, so we need to
  # resolve the default offset for the topic.
  if offset < 0
    resolve_offset(topic, partition)
  else
    # The next offset is the last offset.
    offset
  end
end

def offsets_to_commit(recommit = false)

def offsets_to_commit(recommit = false)
  if recommit
    offsets_to_recommit.merge!(@processed_offsets) do |_topic, committed, processed|
      committed.merge!(processed)
    end
  else
    @processed_offsets
  end
end

def offsets_to_recommit

def offsets_to_recommit
  committed_offsets.topics.each_with_object({}) do |(topic, partition_info), offsets|
    topic_offsets = partition_info.keys.each_with_object({}) do |partition, partition_map|
      offset = committed_offsets.offset_for(topic, partition)
      partition_map[partition] = offset unless offset == -1
    end
    offsets[topic] = topic_offsets unless topic_offsets.empty?
  end
end

def prettify_offsets(offsets)

def prettify_offsets(offsets)
  offsets.flat_map do |topic, partitions|
    partitions.map { |partition, offset| "#{topic}/#{partition}:#{offset}" }
  end.join(', ')
end

def recommit_timeout_reached?

def recommit_timeout_reached?
  @last_recommit.nil? || seconds_since(@last_recommit) >= @recommit_interval
end

def resolve_offset(topic, partition)

def resolve_offset(topic, partition)
  @resolved_offsets[topic] ||= fetch_resolved_offsets(topic)
  @resolved_offsets[topic].fetch(partition)
end

def seconds_since(time)

def seconds_since(time)
  Time.now - time
end

def seconds_since_last_commit

def seconds_since_last_commit
  seconds_since(@last_commit)
end

def seek_to(topic, partition, offset)

Returns:
  • (nil) -

Parameters:
  • offset (Integer) -- the offset that the consumer position should be moved to.
  • partition (Integer) -- the partition number.
  • topic (String) -- the name of the topic.
def seek_to(topic, partition, offset)
  @processed_offsets[topic] ||= {}
  @processed_offsets[topic][partition] = offset
  @fetcher.seek(topic, partition, offset)
end

def seek_to_default(topic, partition)

Returns:
  • (nil) -

Parameters:
  • partition (Integer) -- the partition number.
  • topic (String) -- the name of the topic.
def seek_to_default(topic, partition)
  # Remove any cached offset, in case things have changed broker-side.
  clear_resolved_offset(topic)
  offset = resolve_offset(topic, partition)
  seek_to(topic, partition, offset)
end

def set_default_offset(topic, default_offset)

Returns:
  • (nil) -

Parameters:
  • default_offset (Symbol) -- either `:earliest` or `:latest`.
  • topic (String) -- the name of the topic.
def set_default_offset(topic, default_offset)
  @default_offsets[topic] = default_offset
end