class Kafka::SaslAuthenticator
def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:,
def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:, sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:, sasl_scram_username:, sasl_scram_password:, sasl_scram_mechanism:) @logger = logger @plain = Sasl::Plain.new( authzid: sasl_plain_authzid, username: sasl_plain_username, password: sasl_plain_password, logger: @logger, ) @gssapi = Sasl::Gssapi.new( principal: sasl_gssapi_principal, keytab: sasl_gssapi_keytab, logger: @logger, ) @scram = Sasl::Scram.new( username: sasl_scram_username, password: sasl_scram_password, mechanism: sasl_scram_mechanism, logger: @logger, ) @mechanism = [@gssapi, @plain, @scram].find(&:configured?) end