class Kafka::Broker
def address_match?(host, port)
def address_match?(host, port) host == @host && port == @port end
def alter_configs(**options)
def alter_configs(**options) request = Protocol::AlterConfigsRequest.new(**options) send_request(request) end
def api_versions
def api_versions request = Protocol::ApiVersionsRequest.new send_request(request) end
def commit_offsets(**options)
def commit_offsets(**options) request = Protocol::OffsetCommitRequest.new(**options) send_request(request) end
def connected?
-
(Boolean)
-
def connected? !@connection.nil? end
def connection
def connection @connection ||= @connection_builder.build_connection(@host, @port) end
def create_partitions(**options)
def create_partitions(**options) request = Protocol::CreatePartitionsRequest.new(**options) send_request(request) end
def create_topics(**options)
def create_topics(**options) request = Protocol::CreateTopicsRequest.new(**options) send_request(request) end
def delete_topics(**options)
def delete_topics(**options) request = Protocol::DeleteTopicsRequest.new(**options) send_request(request) end
def describe_configs(**options)
def describe_configs(**options) request = Protocol::DescribeConfigsRequest.new(**options) send_request(request) end
def disconnect
-
(nil)
-
def disconnect connection.close if connected? end
def fetch_messages(**options)
-
(Kafka::Protocol::FetchResponse)
-
Parameters:
-
(
) --
def fetch_messages(**options) request = Protocol::FetchRequest.new(**options) send_request(request) end
def fetch_metadata(**options)
-
(Kafka::Protocol::MetadataResponse)
-
Parameters:
-
(
) --
def fetch_metadata(**options) request = Protocol::MetadataRequest.new(**options) send_request(request) end
def fetch_offsets(**options)
def fetch_offsets(**options) request = Protocol::OffsetFetchRequest.new(**options) send_request(request) end
def find_group_coordinator(**options)
def find_group_coordinator(**options) request = Protocol::GroupCoordinatorRequest.new(**options) send_request(request) end
def heartbeat(**options)
def heartbeat(**options) request = Protocol::HeartbeatRequest.new(**options) send_request(request) end
def initialize(connection_builder:, host:, port:, node_id: nil, logger:)
def initialize(connection_builder:, host:, port:, node_id: nil, logger:) @connection_builder = connection_builder @connection = nil @host = host @port = port @node_id = node_id @logger = logger end
def join_group(**options)
def join_group(**options) request = Protocol::JoinGroupRequest.new(**options) send_request(request) end
def leave_group(**options)
def leave_group(**options) request = Protocol::LeaveGroupRequest.new(**options) send_request(request) end
def list_offsets(**options)
-
(Kafka::Protocol::ListOffsetResponse)
-
Parameters:
-
(
) --
def list_offsets(**options) request = Protocol::ListOffsetRequest.new(**options) send_request(request) end
def produce(**options)
-
(Kafka::Protocol::ProduceResponse)
-
Parameters:
-
(
) --
def produce(**options) request = Protocol::ProduceRequest.new(**options) send_request(request) end
def send_request(request)
def send_request(request) connection.send_request(request) rescue IdleConnection @logger.warn "Connection has been unused for too long, re-connecting..." @connection.close rescue nil @connection = nil retry rescue ConnectionError @connection.close rescue nil @connection = nil raise end
def sync_group(**options)
def sync_group(**options) request = Protocol::SyncGroupRequest.new(**options) send_request(request) end
def to_s
-
(String)
-
def to_s "#{@host}:#{@port} (node_id=#{@node_id.inspect})" end