class Kafka::Client

def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil,

Returns:
  • (Client) -

Parameters:
  • sasl_scram_mechanism (String, nil) -- Scram mechanism, either "sha256" or "sha512"
  • sasl_scram_password (String, nil) -- SCRAM password
  • sasl_scram_username (String, nil) -- SCRAM username
  • sasl_gssapi_keytab (String, nil) -- a KRB5 keytab filepath
  • sasl_gssapi_principal (String, nil) -- a KRB5 principal
  • ssl_client_cert_key (String, nil) -- a PEM encoded client cert key to use with an
  • ssl_client_cert (String, nil) -- a PEM encoded client cert to use with an
  • ssl_ca_cert_file_path (String, nil) -- a path on the filesystem to a PEM encoded CA cert
  • ssl_ca_cert (String, Array, nil) -- a PEM encoded CA cert, or an Array of
  • socket_timeout (Integer, nil) -- the timeout setting for socket
  • connect_timeout (Integer, nil) -- the timeout setting for connecting
  • logger (Logger) -- the logger that should be used by the client.
  • client_id (String) -- the identifier for this application.
  • seed_brokers (Array, String) -- the list of brokers used to initialize
def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil,
               ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
               sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil,
               sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil,
               sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, ssl_ca_certs_from_system: false)
  @logger = logger || Logger.new(nil)
  @instrumenter = Instrumenter.new(client_id: client_id)
  @seed_brokers = normalize_seed_brokers(seed_brokers)
  ssl_context = SslContext.build(
    ca_cert_file_path: ssl_ca_cert_file_path,
    ca_cert: ssl_ca_cert,
    client_cert: ssl_client_cert,
    client_cert_key: ssl_client_cert_key,
    ca_certs_from_system: ssl_ca_certs_from_system,
  )
  sasl_authenticator = SaslAuthenticator.new(
    sasl_gssapi_principal: sasl_gssapi_principal,
    sasl_gssapi_keytab: sasl_gssapi_keytab,
    sasl_plain_authzid: sasl_plain_authzid,
    sasl_plain_username: sasl_plain_username,
    sasl_plain_password: sasl_plain_password,
    sasl_scram_username: sasl_scram_username,
    sasl_scram_password: sasl_scram_password,
    sasl_scram_mechanism: sasl_scram_mechanism,
    logger: @logger
  )
  if sasl_authenticator.enabled? && ssl_context.nil?
    raise ArgumentError, "SASL authentication requires that SSL is configured"
  end
  @connection_builder = ConnectionBuilder.new(
    client_id: client_id,
    connect_timeout: connect_timeout,
    socket_timeout: socket_timeout,
    ssl_context: ssl_context,
    logger: @logger,
    instrumenter: @instrumenter,
    sasl_authenticator: sasl_authenticator
  )
  @cluster = initialize_cluster
end