class Kafka::Broker

def commit_offsets(**options)

def commit_offsets(**options)
  request = Protocol::OffsetCommitRequest.new(**options)
  @connection.send_request(request)
end

def disconnect

Returns:
  • (nil) -
def disconnect
  @connection.close
end

def fetch_messages(**options)

Returns:
  • (Kafka::Protocol::FetchResponse) -

Parameters:
  • () --
def fetch_messages(**options)
  request = Protocol::FetchRequest.new(**options)
  @connection.send_request(request)
end

def fetch_metadata(**options)

Returns:
  • (Kafka::Protocol::MetadataResponse) -

Parameters:
  • () --
def fetch_metadata(**options)
  request = Protocol::TopicMetadataRequest.new(**options)
  @connection.send_request(request)
end

def fetch_offsets(**options)

def fetch_offsets(**options)
  request = Protocol::OffsetFetchRequest.new(**options)
  @connection.send_request(request)
end

def find_group_coordinator(**options)

def find_group_coordinator(**options)
  request = Protocol::GroupCoordinatorRequest.new(**options)
  @connection.send_request(request)
end

def heartbeat(**options)

def heartbeat(**options)
  request = Protocol::HeartbeatRequest.new(**options)
  @connection.send_request(request)
end

def initialize(connection:, node_id: nil, logger:)

def initialize(connection:, node_id: nil, logger:)
  @connection = connection
  @node_id = node_id
  @logger = logger
end

def join_group(**options)

def join_group(**options)
  request = Protocol::JoinGroupRequest.new(**options)
  @connection.send_request(request)
end

def leave_group(**options)

def leave_group(**options)
  request = Protocol::LeaveGroupRequest.new(**options)
  @connection.send_request(request)
end

def list_offsets(**options)

Returns:
  • (Kafka::Protocol::ListOffsetResponse) -

Parameters:
  • () --
def list_offsets(**options)
  request = Protocol::ListOffsetRequest.new(**options)
  @connection.send_request(request)
end

def produce(**options)

Returns:
  • (Kafka::Protocol::ProduceResponse) -

Parameters:
  • () --
def produce(**options)
  request = Protocol::ProduceRequest.new(**options)
  @connection.send_request(request)
end

def sasl_handshake(**options)

def sasl_handshake(**options)
  request = Protocol::SaslHandshakeRequest(**options)
  @connection.send_request(request)
end

def sync_group(**options)

def sync_group(**options)
  request = Protocol::SyncGroupRequest.new(**options)
  @connection.send_request(request)
end

def to_s

Returns:
  • (String) -
def to_s
  "#{@connection} (node_id=#{@node_id.inspect})"
end