The current Auth mechanismI’m trying to configure Lago with Confluent Cloud as a Kafka broker.
Confluent Cloud requires SASL authentication using the PLAIN mechanism. However, in Lago’s Kafka configuration there is no option to specify sasl.mechanism=PLAIN (or equivalent), and the current implementation seems to only support SASL/SCRAM in a way that is not compatible with Confluent Cloud’s PLAIN mechanism.
As a result, it is not possible to connect Lago to Confluent Cloud using the recommended/standard SASL configuration.
Expected behavior
Be able to configure Lago’s Kafka client to connect to Confluent Cloud using:
• security.protocol=SASL_SSL
• sasl.mechanism=PLAIN
• Username/Password (API key / secret from Confluent Cloud)
Ideally, Lago should expose configuration options (env vars or config file) to set:
• SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, etc.)
• Security protocol (SASL_SSL / SSL / PLAINTEXT)
• Username / password (or API key / secret)
Current behavior
• There is no way (or it’s not documented) to set sasl.mechanism=PLAIN.
• The existing Kafka config in Lago only supports a subset of auth modes and does not work with Confluent Cloud’s default SASL PLAIN configuration.
Use case
Use Lago with managed Kafka (Confluent Cloud) without having to run and maintain our own Kafka cluster, and still use Lago’s event ingestion over Kafka.
only support Scram256 and Scram512 and not Plain
Proposed fix
I implemented a local fix that adds support for the PLAIN SASL mechanism in the Kafka client configuration.
In summary, the changes are:
• Allow configuring the SASL mechanism via environment variable (e.g. KAFKA_SASL_MECHANISM), defaulting to the current behavior if not set.
• Pass the configured mechanism (including PLAIN) to the Kafka client.
• Ensure SASL_SSL works correctly with username/password for Confluent Cloud.
switch serverConfig.ScramAlgorithm {
case Scram256:
scramOpt = kgo.SASL(scramAuth.AsSha256Mechanism())
case Scram512:
scramOpt = kgo.SASL(scramAuth.AsSha512Mechanism())
case ScramPlain:
scramOpt = kgo.SASL(plain.Auth{
User: serverConfig.UserName,
Pass: serverConfig.Password}.AsMechanism())
}