class Kafka::Broker

def address_match?(host, port)

def address_match?(host, port)
  host == @host && port == @port
end

def alter_configs(**options)

def alter_configs(**options)
  request = Protocol::AlterConfigsRequest.new(**options)
  send_request(request)
end

def api_versions

def api_versions
  request = Protocol::ApiVersionsRequest.new
  send_request(request)
end

def commit_offsets(**options)

def commit_offsets(**options)
  request = Protocol::OffsetCommitRequest.new(**options)
  send_request(request)
end

def connected?

Returns:
  • (Boolean) -
def connected?
  !@connection.nil?
end

def connection

def connection
  @connection ||= @connection_builder.build_connection(@host, @port)
end

def create_partitions(**options)

def create_partitions(**options)
  request = Protocol::CreatePartitionsRequest.new(**options)
  send_request(request)
end

def create_topics(**options)

def create_topics(**options)
  request = Protocol::CreateTopicsRequest.new(**options)
  send_request(request)
end

def delete_topics(**options)

def delete_topics(**options)
  request = Protocol::DeleteTopicsRequest.new(**options)
  send_request(request)
end

def describe_configs(**options)

def describe_configs(**options)
  request = Protocol::DescribeConfigsRequest.new(**options)
  send_request(request)
end

def disconnect

Returns:
  • (nil) -
def disconnect
  connection.close if connected?
end

def fetch_messages(**options)

Returns:
  • (Kafka::Protocol::FetchResponse) -

Parameters:
  • () --
def fetch_messages(**options)
  request = Protocol::FetchRequest.new(**options)
  send_request(request)
end

def fetch_metadata(**options)

Returns:
  • (Kafka::Protocol::MetadataResponse) -

Parameters:
  • () --
def fetch_metadata(**options)
  request = Protocol::MetadataRequest.new(**options)
  send_request(request)
end

def fetch_offsets(**options)

def fetch_offsets(**options)
  request = Protocol::OffsetFetchRequest.new(**options)
  send_request(request)
end

def find_group_coordinator(**options)

def find_group_coordinator(**options)
  request = Protocol::GroupCoordinatorRequest.new(**options)
  send_request(request)
end

def heartbeat(**options)

def heartbeat(**options)
  request = Protocol::HeartbeatRequest.new(**options)
  send_request(request)
end

def initialize(connection_builder:, host:, port:, node_id: nil, logger:)

def initialize(connection_builder:, host:, port:, node_id: nil, logger:)
  @connection_builder = connection_builder
  @connection = nil
  @host = host
  @port = port
  @node_id = node_id
  @logger = logger
end

def join_group(**options)

def join_group(**options)
  request = Protocol::JoinGroupRequest.new(**options)
  send_request(request)
end

def leave_group(**options)

def leave_group(**options)
  request = Protocol::LeaveGroupRequest.new(**options)
  send_request(request)
end

def list_offsets(**options)

Returns:
  • (Kafka::Protocol::ListOffsetResponse) -

Parameters:
  • () --
def list_offsets(**options)
  request = Protocol::ListOffsetRequest.new(**options)
  send_request(request)
end

def produce(**options)

Returns:
  • (Kafka::Protocol::ProduceResponse) -

Parameters:
  • () --
def produce(**options)
  request = Protocol::ProduceRequest.new(**options)
  send_request(request)
end

def send_request(request)

def send_request(request)
  connection.send_request(request)
rescue IdleConnection
  @logger.warn "Connection has been unused for too long, re-connecting..."
  @connection.close rescue nil
  @connection = nil
  retry
rescue ConnectionError
  @connection.close rescue nil
  @connection = nil
  raise
end

def sync_group(**options)

def sync_group(**options)
  request = Protocol::SyncGroupRequest.new(**options)
  send_request(request)
end

def to_s

Returns:
  • (String) -
def to_s
  "#{@host}:#{@port} (node_id=#{@node_id.inspect})"
end