class Kafka::Broker
def commit_offsets(**options)
def commit_offsets(**options) request = Protocol::OffsetCommitRequest.new(**options) @connection.send_request(request) end
def disconnect
-
(nil)
-
def disconnect @connection.close end
def fetch_messages(**options)
-
(Kafka::Protocol::FetchResponse)
-
Parameters:
-
(
) --
def fetch_messages(**options) request = Protocol::FetchRequest.new(**options) @connection.send_request(request) end
def fetch_metadata(**options)
-
(Kafka::Protocol::MetadataResponse)
-
Parameters:
-
(
) --
def fetch_metadata(**options) request = Protocol::TopicMetadataRequest.new(**options) @connection.send_request(request) end
def fetch_offsets(**options)
def fetch_offsets(**options) request = Protocol::OffsetFetchRequest.new(**options) @connection.send_request(request) end
def find_group_coordinator(**options)
def find_group_coordinator(**options) request = Protocol::GroupCoordinatorRequest.new(**options) @connection.send_request(request) end
def heartbeat(**options)
def heartbeat(**options) request = Protocol::HeartbeatRequest.new(**options) @connection.send_request(request) end
def initialize(connection:, node_id: nil, logger:)
def initialize(connection:, node_id: nil, logger:) @connection = connection @node_id = node_id @logger = logger end
def join_group(**options)
def join_group(**options) request = Protocol::JoinGroupRequest.new(**options) @connection.send_request(request) end
def leave_group(**options)
def leave_group(**options) request = Protocol::LeaveGroupRequest.new(**options) @connection.send_request(request) end
def list_offsets(**options)
-
(Kafka::Protocol::ListOffsetResponse)
-
Parameters:
-
(
) --
def list_offsets(**options) request = Protocol::ListOffsetRequest.new(**options) @connection.send_request(request) end
def produce(**options)
-
(Kafka::Protocol::ProduceResponse)
-
Parameters:
-
(
) --
def produce(**options) request = Protocol::ProduceRequest.new(**options) @connection.send_request(request) end
def sasl_handshake(**options)
def sasl_handshake(**options) request = Protocol::SaslHandshakeRequest(**options) @connection.send_request(request) end
def sync_group(**options)
def sync_group(**options) request = Protocol::SyncGroupRequest.new(**options) @connection.send_request(request) end
def to_s
-
(String)
-
def to_s "#{@connection} (node_id=#{@node_id.inspect})" end