Skip to main content

Kafka triggers

Windmill can connect to Kafka brokers servers and trigger runnables (scripts, flows) when a message is received. Listening is done from the servers, so it doesn't take up any workers. Kafka triggers is a self-hosted Enterprise feature.

Kafka triggers

Kafka resource configuration

Before creating a Kafka trigger, you need to set up a Kafka resource. Head to the Resources page, click "Add resource" and select kafka.

The resource requires:

  • Brokers: List of broker hostnames in the format hostname:port
  • Security: Authentication and encryption settings

Security options

Security modeDescription
PLAINTEXTNo authentication or encryption. Use only for development.
SASL_PLAINTEXTUsername/password authentication without encryption. Supports PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 mechanisms.
SSLTLS encryption with optional client certificate authentication.
SASL_SSLUsername/password authentication with TLS encryption.
SASL_GSSAPIKerberos (GSSAPI) authentication without encryption.
SASL_SSL_GSSAPIKerberos (GSSAPI) authentication with TLS encryption.

Kerberos (GSSAPI) authentication

For enterprise environments using Kerberos, select SASL_GSSAPI or SASL_SSL_GSSAPI security mode.

PropertyDescriptionRequired
kerberos_service_nameKerberos principal name of the Kafka broker service (default: kafka)No
kerberos_principalClient's Kerberos principal (e.g., user@REALM.COM)Yes
keytab_pathPath to keytab file mounted on the serverNo*
keytab_base64Base64-encoded keytab contentNo*

*Either keytab_path or keytab_base64 must be provided.

Using keytab_base64: Encode your keytab file with base64 -w0 /path/to/keytab and paste the result. This is stored securely and written to a temporary file at runtime.

Using keytab_path: Mount the keytab file on the server container and provide the path. This is useful when deploying with Kubernetes secrets or Docker volumes.

Non-root server and keytab permissions

When running the Windmill server as a non-root user (e.g., uid 1000 or 1001), the keytab file must be readable by that user. Keytab files are typically created with restrictive permissions (600, owner-only). If you see kinit: Permission denied errors, either:

  • Use keytab_base64 instead of keytab_path - Windmill writes the keytab with the server's ownership
  • In Kubernetes, set defaultMode: 0644 on your Secret volume mount:
    volumes:
    - name: keytab
    secret:
    secretName: kafka-keytab
    defaultMode: 0644
  • In Docker Compose, ensure the keytab file on the host has readable permissions before mounting
Kerberos configuration

Kafka triggers run on the server (windmill-app pod), not on workers. When using keytab_path, the keytab file and /etc/krb5.conf must be mounted on the server pod, not worker pods. When using SASL_SSL_GSSAPI, you can also provide CA certificates for TLS verification.

Example krb5.conf

[libdefaults]
default_realm = EXAMPLE.COM
dns_lookup_realm = false
dns_lookup_kdc = false
# Recommended for containerized environments - see troubleshooting below
dns_canonicalize_hostname = false

[realms]
EXAMPLE.COM = {
kdc = kdc.example.com
admin_server = kdc.example.com
}

[domain_realm]
.example.com = EXAMPLE.COM
example.com = EXAMPLE.COM

Docker Compose example

services:
windmill_server:
image: ghcr.io/windmill-labs/windmill-ee:latest
volumes:
- ./krb5.conf:/etc/krb5.conf:ro

Helm/Kubernetes example

Create a ConfigMap for krb5.conf:

apiVersion: v1
kind: ConfigMap
metadata:
name: krb5-config
data:
krb5.conf: |
[libdefaults]
default_realm = EXAMPLE.COM
dns_canonicalize_hostname = false
[realms]
EXAMPLE.COM = {
kdc = kdc.example.com
}

Then in your Helm values:

windmill:
app:
volumes:
- name: krb5-config
configMap:
name: krb5-config
volumeMounts:
- name: krb5-config
mountPath: /etc/krb5.conf
subPath: krb5.conf
Troubleshooting "Server not found in Kerberos database"

If you encounter this error, GSSAPI may be constructing the wrong service principal due to DNS canonicalization. GSSAPI performs reverse DNS lookups on the broker IP to determine the hostname for the service principal.

Verify that reverse DNS for your broker IP returns the expected hostname matching your Kerberos SPN. For example, if your SPN is kafka/kafka.example.com@REALM, then host <broker-ip> should return kafka.example.com.

In containerized environments (Docker, Kubernetes) where reverse DNS may return internal container/pod names instead of the expected hostname, add to your krb5.conf:

[libdefaults]
dns_canonicalize_hostname = false

This tells GSSAPI to use the hostname as configured in your broker list without reverse DNS canonicalization.

How to use

Create a new trigger on the Kafka triggers page. Add a Kafka resource with the broker hostnames (hostname:port) and the security settings. Specify the topics the trigger should listen to. The group id is automatically filled in from the current workspace and the trigger path. You can change it if necessary. It indicates the consumer group to which the trigger belongs. This garantees that even if the trigger stops listening for a while, it will receive the messages it missed when it starts listening again.

Once the Kafka resource and settings are set, select the runnable that should be triggered by this trigger. The received webhook base64 encoded payload will be passed to the runnable as a string argument called payload.

Here's an example script:

export async function main(payload: string) {
// do something with the message
}

And if you use a preprocessor, the script could look like this:

export async function preprocessor(
event: {
kind: "kafka",
payload: string, // base64 encoded payload
brokers: string[];
topic: string; // the specific topic the message was received from
group_id: string;
}
) {
if (event.kind !== "kafka") {
throw new Error(`Expected a kafka event`);
}

// assuming the message is a JSON object
const msg = JSON.parse(atob(event.payload));

// define args for the main function
// let's assume we want to use the message content and the topic
return {
message_content: msg.content,
topic: event.topic
};
}

export async function main(message_content: string, topic: string) {
// do something with the message content and topic
}

Error handling

Kafka triggers support local error handlers that override workspace error handlers for specific triggers. See the error handling documentation for configuration details and examples.