class Kafka::Client
def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil,
-
(Client)
-
Parameters:
-
sasl_scram_mechanism
(String, nil
) -- Scram mechanism, either "sha256" or "sha512" -
sasl_scram_password
(String, nil
) -- SCRAM password -
sasl_scram_username
(String, nil
) -- SCRAM username -
sasl_gssapi_keytab
(String, nil
) -- a KRB5 keytab filepath -
sasl_gssapi_principal
(String, nil
) -- a KRB5 principal -
ssl_client_cert_key
(String, nil
) -- a PEM encoded client cert key to use with an -
ssl_client_cert
(String, nil
) -- a PEM encoded client cert to use with an -
ssl_ca_cert_file_path
(String, nil
) -- a path on the filesystem to a PEM encoded CA cert -
ssl_ca_cert
(String, Array
) -- a PEM encoded CA cert, or an Array of, nil -
socket_timeout
(Integer, nil
) -- the timeout setting for socket -
connect_timeout
(Integer, nil
) -- the timeout setting for connecting -
logger
(Logger
) -- the logger that should be used by the client. -
client_id
(String
) -- the identifier for this application. -
seed_brokers
(Array
) -- the list of brokers used to initialize, String
def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, ssl_ca_certs_from_system: false) @logger = logger || Logger.new(nil) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) ssl_context = SslContext.build( ca_cert_file_path: ssl_ca_cert_file_path, ca_cert: ssl_ca_cert, client_cert: ssl_client_cert, client_cert_key: ssl_client_cert_key, ca_certs_from_system: ssl_ca_certs_from_system, ) sasl_authenticator = SaslAuthenticator.new( sasl_gssapi_principal: sasl_gssapi_principal, sasl_gssapi_keytab: sasl_gssapi_keytab, sasl_plain_authzid: sasl_plain_authzid, sasl_plain_username: sasl_plain_username, sasl_plain_password: sasl_plain_password, sasl_scram_username: sasl_scram_username, sasl_scram_password: sasl_scram_password, sasl_scram_mechanism: sasl_scram_mechanism, logger: @logger ) if sasl_authenticator.enabled? && ssl_context.nil? raise ArgumentError, "SASL authentication requires that SSL is configured" end @connection_builder = ConnectionBuilder.new( client_id: client_id, connect_timeout: connect_timeout, socket_timeout: socket_timeout, ssl_context: ssl_context, logger: @logger, instrumenter: @instrumenter, sasl_authenticator: sasl_authenticator ) @cluster = initialize_cluster end