class Kafka::SaslAuthenticator

def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:,

def initialize(logger:, sasl_gssapi_principal:, sasl_gssapi_keytab:,
               sasl_plain_authzid:, sasl_plain_username:, sasl_plain_password:,
               sasl_scram_username:, sasl_scram_password:, sasl_scram_mechanism:)
  @logger = logger
  @plain = Sasl::Plain.new(
    authzid: sasl_plain_authzid,
    username: sasl_plain_username,
    password: sasl_plain_password,
    logger: @logger,
  )
  @gssapi = Sasl::Gssapi.new(
    principal: sasl_gssapi_principal,
    keytab: sasl_gssapi_keytab,
    logger: @logger,
  )
  @scram = Sasl::Scram.new(
    username: sasl_scram_username,
    password: sasl_scram_password,
    mechanism: sasl_scram_mechanism,
    logger: @logger,
  )
  @mechanism = [@gssapi, @plain, @scram].find(&:configured?)
end