Skip to main content

Queue ETL: Apache Kafka

Setup workflow

Creating a Kafka ETL task using the Client API

Add a Kafka connection string

Before setting up the ETL task, define a connection string that the task will use to connect to the message broker's bootstrap servers.

Example: defining the connection string

// Prepare the connection string:
var conStr = new QueueConnectionString
{
// Provide a name for this connection string
Name = "myKafkaConStr",

// Set the broker type
BrokerType = QueueBrokerType.Kafka,

// Configure the connection details
KafkaConnectionSettings = new KafkaConnectionSettings()
{ BootstrapServers = "localhost:9092" }
};

// Deploy (send) the connection string to the server via the PutConnectionStringOperation:
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(conStr));

Example: connecting to a secure Kafka cluster

For a secure Kafka cluster, set the authentication and TLS keys on the optional ConnectionOptions dictionary.

See Prerequisites for a secure Kafka server for the matching cluster-side ACL setup.

// Prepare a connection string for a secure Kafka cluster:
var conStr = new QueueConnectionString
{
Name = "secureKafkaConStr",
BrokerType = QueueBrokerType.Kafka,
KafkaConnectionSettings = new KafkaConnectionSettings
{
BootstrapServers = "broker.example.com:9092",

ConnectionOptions = new Dictionary<string, string>
{
// Transport protocol. SASL_SSL combines SASL auth with SSL/TLS encryption.
// Other values: PLAINTEXT, SSL (mTLS only), SASL_PLAINTEXT.
{ "security.protocol", "SASL_SSL" },

// SASL mechanism. PLAIN sends credentials as cleartext (safe only over TLS).
// Other common values: SCRAM-SHA-256, SCRAM-SHA-512.
{ "sasl.mechanism", "PLAIN" },

// Credentials issued by the Kafka cluster admin.
{ "sasl.username", "<your-username>" },
{ "sasl.password", "<your-password>" }
}
}
};

// Deploy the connection string via PutConnectionStringOperation:
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(conStr));

Use RavenDB Certificate

If your RavenDB cluster has been set up securely, you can authenticate to Kafka using RavenDB's cluster-wide certificate (the one defined at RavenDB setup) by setting UseRavenCertificate to true on the KafkaConnectionSettings.

KafkaConnectionSettings = new KafkaConnectionSettings
{
BootstrapServers = "broker.example.com:9092",
UseRavenCertificate = true
}

With this enabled, the Kafka connection runs over SSL/TLS, and RavenDB authenticates with a client certificate derived from its cluster setup. The certificate is either the cluster server certificate itself (if it carries the client-auth EKU, the X.509 extension that permits acting as a TLS client) or a separate certificate that RavenDB issues from the server certificate's key pair. Either way, this replaces the TLS keys you would otherwise add to ConnectionOptions.

To complete the setup, register RavenDB's cluster-wide certificate in Kafka's truststore on the target machine(s).

Add a Kafka ETL task

Example: basic

In this example, the Kafka ETL Task will:

  • Extract source documents from the "Orders" collection in RavenDB.
  • Process each "Order" document using a defined script that creates a new orderData object.
  • Load the orderData object to the "OrdersTopic" in a Kafka broker.

For more details about the script and the loadTo method, see The transformation script below.

// Define a transformation script for the task:
Transformation transformation = new Transformation
{
// Define the input collections
Collections = { "Orders" },
ApplyToAllDocuments = false,

// The transformation script
Name = "scriptName",
Script = @"// Create an orderData object
var orderData = {
Id: id(this),
OrderLinesCount: this.Lines.length,
TotalCost: 0
};

// Update the orderData's TotalCost field
for (var i = 0; i < this.Lines.length; i++) {
var line = this.Lines[i];
var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
orderData.TotalCost += cost;
}

// Load the object to the 'OrdersTopic' in Kafka
loadToOrdersTopic(orderData, {
Id: id(this),
PartitionKey: id(this),
Type: 'com.example.promotions',
Source: '/promotion-campaigns/summer-sale'
});"
};

// Define the Kafka ETL task:
var etlTask = new QueueEtlConfiguration()
{
BrokerType = QueueBrokerType.Kafka,

Name = "myKafkaEtlTaskName",
ConnectionStringName = "myKafkaConStr",

Transforms = { transformation },

// Set to false to allow task failover to another node if current one is down
PinToMentorNode = false
};

// Deploy (send) the task to the server via the AddEtlOperation:
store.Maintenance.Send(new AddEtlOperation<QueueConnectionString>(etlTask));

Example: delete processed documents

You have the option to delete documents from your RavenDB database once they have been processed by the Queue ETL task.

Set the optional Queues property in your ETL configuration with the list of Kafka topics for which processed documents should be deleted.

var etlTask = new QueueEtlConfiguration()
{
BrokerType = QueueBrokerType.Kafka,

Name = "myKafkaEtlTaskName",
ConnectionStringName = "myKafkaConStr",

Transforms = { transformation },

// Define whether to delete documents from RavenDB after they are sent to the target topic
Queues = new List<EtlQueue>()
{
new()
{
// The name of the Kafka topic
Name = "OrdersTopic",

// When set to 'true',
// documents that were processed by the transformation script will be deleted
// from RavenDB after the message is loaded to the "OrdersTopic" in Kafka.
DeleteProcessedDocuments = true
}
}
};

store.Maintenance.Send(new AddEtlOperation<QueueConnectionString>(etlTask));

The transformation script

The basic characteristics of a Kafka ETL script are similar to those of other ETL types.
The script defines what data to extract from the source document, how to transform this data, and which Kafka topic to load it to.

The loadTo method

To specify which Kafka topic to load the data into, use either of the following methods in your script.
The two methods are equivalent, offering alternative syntax:

  • loadTo<TopicName>(obj, {attributes})

    • Here the target is specified as part of the function name.
    • The target <TopicName> in this syntax is not a variable and cannot be used as one,
      it is a string literal of the target's name.
  • loadTo('TopicName', obj, {attributes})

    • Here the target is passed as an argument to the method.
    • Separating the target name from the loadTo command makes it possible to include symbols like '-' and '.' in target names.
      This is not possible when the loadTo<TopicName> syntax is used because including special characters in the name of a JavaScript function makes it invalid.
ParameterTypeDescription
TopicNamestringThe name of the Kafka topic.
objobjectThe object to transfer.
attributesobjectAn object with optional and required CloudEvents attributes.

For example, the following two calls, which load data to "OrdersTopic", are equivalent:

  • loadToOrdersTopic(obj, {attributes})
  • loadTo('OrdersTopic', obj, {attributes})

A sample script that processes documents from the Orders collection:

// Create an orderData object
var orderData = {
Id: id(this),
OrderLinesCount: this.Lines.length,
TotalCost: 0
};

// Update the orderData's TotalCost field
for (var i = 0; i < this.Lines.length; i++) {
var line = this.Lines[i];
var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
orderData.TotalCost += cost;
}

// Load the object to the "OrdersTopic" in Kafka
loadToOrders(orderData, {
Id: id(this),
PartitionKey: id(this),
Type: 'com.example.promotions',
Source: '/promotion-campaigns/summer-sale'
})

Prerequisites for a secure Kafka server

What needs to be granted

When the Kafka cluster uses ACLs (Access Control Lists), two kinds of ACL grants must be set up on the cluster:

  • WRITE on each target topic.
  • WRITE and DESCRIBE on each RavenDB transformation script transactional ID.

The transactional ID is generated by RavenDB per transformation script, see Computing the transactional IDs below for the transactional ID format and how to compute it.


Computing the transactional IDs

A transformation script's transactional ID follows this format:

{task-name}/{script-name}-{database-group-id}

When RavenDB builds the actual transactional ID from this template, every / in the resulting string is replaced by _, since Kafka does not accept / in transactional ID values.

You set the task name and the script name yourself when you build the QueueEtlConfiguration.
The database-group ID is the ID of the database group that runs the task (send GetDatabaseRecordOperation to fetch the database record and read the ID from its Topology.DatabaseTopologyIdBase64 property).
With the ID in hand, you can compute every transactional ID before deploying the task.

Example: building the transactional ID

// Fetch the database-group ID from the database record
var record = store.Maintenance.Server.Send(
new GetDatabaseRecordOperation(store.Database));
var databaseGroupId = record.Topology.DatabaseTopologyIdBase64;

// Compute the transactional ID for a transformation script
// Build the ID from the template and replace '/' with '_' to match
// what RavenDB will use when registering the producer with Kafka.
var taskName = "OrdersToKafka";
var scriptName = "Promote";
var transactionalId = $"{taskName}/{scriptName}-{databaseGroupId}".Replace("/", "_");

// For databaseGroupId = "a1b2c3d4", transactionalId is:
// "OrdersToKafka_Promote-a1b2c3d4"

Another way to learn a transactional ID is to read it from RavenDB's exception text when a grant is missing or a transactional ID does not match. The exception message names the missing resource and the operation (WRITE or DESCRIBE), and includes the actual transactional ID RavenDB built.

For example, if the WRITE and DESCRIBE grants on a transactional ID are missing, the exception message includes:

ETL process: myKafkaEtlTaskName. Failed to initialize transactions for the producer
instance. Error code: 'TransactionalIdAuthorizationFailed'. Error reason: '...'.
Add the required ACL permissions to your API Key that will make WRITE and DESCRIBE
operations possible for transactional ID 'OrdersToKafka_Promote-a1b2c3d4'.

Entering the grants on the Kafka cluster

Configure the grants on the Kafka cluster using its admin tools. The example below uses Apache Kafka's CLI; the same operations are available in vendor UIs like Confluent Cloud, AKHQ, or kafka-ui.

# Grant WRITE on a target topic
kafka-acls --bootstrap-server <broker> --add \
--allow-principal User:<principal> \
--operation Write --topic <topic-name>

# Grant WRITE and DESCRIBE on a transactional ID
kafka-acls --bootstrap-server <broker> --add \
--allow-principal User:<principal> \
--operation Write --operation Describe \
--transactional-id <transactional-id>

For more on the broker rule set, see Confluent's Authorization for Idempotent and Transactional APIs.


Single-node Kafka cluster

On a single-node Kafka cluster, set the following on the broker
(in server.properties), so that Kafka transactions can initialize:

transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

These are not security requirements, but RavenDB's Kafka ETL uses a transactional producer, which cannot initialize on a single-node cluster without them.

Syntax

Connection string classes

public class QueueConnectionString : ConnectionString
{
// Set the broker type to QueueBrokerType.Kafka for a Kafka connection string
public QueueBrokerType BrokerType { get; set; }

// Configure this when setting a connection string for Kafka
public KafkaConnectionSettings KafkaConnectionSettings { get; set; }

// Configure this when setting a connection string for RabbitMQ
public RabbitMqConnectionSettings RabbitMqConnectionSettings { get; set; }

// Configure this when setting a connection string for Azure Queue Storage
public AzureQueueStorageConnectionSettings AzureQueueStorageConnectionSettings { get; set; }
}

ETL task classes

public class QueueEtlConfiguration
{
// Set to QueueBrokerType.Kafka to define a Kafka ETL task
public QueueBrokerType BrokerType { get; set; }
// The ETL task name
public string Name { get; set; }
// The registered connection string name
public string ConnectionStringName { get; set; }
// List of transformation scripts
public List<Transformation> Transforms { get; set; }
// Optional configuration per queue
public List<EtlQueue> Queues { get; set; }
// Set to 'false' to allow task failover to another node if current one is down
public bool PinToMentorNode { get; set; }
}

public class Transformation
{
// The script name
public string Name { get; set; }
// The source RavenDB collections that serve as the input for the script
public List<string> Collections { get; set; }
// Set whether to apply the script on all collections
public bool ApplyToAllDocuments { get; set; }
// The script itself
public string Script { get; set; }
}

public class EtlQueue
{
// The Kafka topic name
public string Name { get; set; }
// Delete processed documents when set to 'true'
public bool DeleteProcessedDocuments { get; set; }
}

In this article