Queue ETL: Apache Kafka
-
Apache Kafka is a distributed, high-performance, transactional messaging platform that remains performant as the number of messages it needs to process increases and the number of events it needs to stream climbs to the big-data zone.
-
Create a Kafka ETL Task to:
- Extract data from a RavenDB database
- Transform the data using one or more custom scripts
- Load the resulting JSON object to a Kafka destination as a CloudEvents message
-
Utilizing this task allows RavenDB to act as an event producer in a Kafka architecture.
-
Read more about Kafka clusters, brokers, topics, partitions, and other related subjects in the platform's official documentation.
-
This article focuses on how to create a Kafka ETL task using the Client API.
To define a Kafka ETL task from Studio, see Studio: Kafka ETL Task.
For an overview of Queue ETL tasks, see Queue ETL tasks overview. -
In this article:
Setup workflow
-
If your Kafka cluster does not use ACLs (Access Control Lists):
Deploy the task as described in Creating a Kafka ETL task using the Client API below. -
If your Kafka cluster does use ACLs:
- Prepare the task configuration as described in Creating a Kafka ETL task using the Client API below, but do not deploy the task yet.
- Compute the transactional IDs and set up the ACL grants as described in Prerequisites for a secure Kafka server below.
- Deploy the task as described in the Creating section.
Creating a Kafka ETL task using the Client API
Add a Kafka connection string
Before setting up the ETL task, define a connection string that the task will use to connect to the message broker's bootstrap servers.
Example: defining the connection string
// Prepare the connection string:
var conStr = new QueueConnectionString
{
// Provide a name for this connection string
Name = "myKafkaConStr",
// Set the broker type
BrokerType = QueueBrokerType.Kafka,
// Configure the connection details
KafkaConnectionSettings = new KafkaConnectionSettings()
{ BootstrapServers = "localhost:9092" }
};
// Deploy (send) the connection string to the server via the PutConnectionStringOperation:
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(conStr));
Example: connecting to a secure Kafka cluster
For a secure Kafka cluster, set the authentication and TLS keys on the optional
ConnectionOptions dictionary.
See Prerequisites for a secure Kafka server for the matching cluster-side ACL setup.
// Prepare a connection string for a secure Kafka cluster:
var conStr = new QueueConnectionString
{
Name = "secureKafkaConStr",
BrokerType = QueueBrokerType.Kafka,
KafkaConnectionSettings = new KafkaConnectionSettings
{
BootstrapServers = "broker.example.com:9092",
ConnectionOptions = new Dictionary<string, string>
{
// Transport protocol. SASL_SSL combines SASL auth with SSL/TLS encryption.
// Other values: PLAINTEXT, SSL (mTLS only), SASL_PLAINTEXT.
{ "security.protocol", "SASL_SSL" },
// SASL mechanism. PLAIN sends credentials as cleartext (safe only over TLS).
// Other common values: SCRAM-SHA-256, SCRAM-SHA-512.
{ "sasl.mechanism", "PLAIN" },
// Credentials issued by the Kafka cluster admin.
{ "sasl.username", "<your-username>" },
{ "sasl.password", "<your-password>" }
}
}
};
// Deploy the connection string via PutConnectionStringOperation:
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(conStr));
Use RavenDB Certificate
If your RavenDB cluster has been set up securely, you can authenticate to Kafka
using RavenDB's cluster-wide certificate (the one defined at RavenDB setup) by
setting UseRavenCertificate to true on the
KafkaConnectionSettings.
KafkaConnectionSettings = new KafkaConnectionSettings
{
BootstrapServers = "broker.example.com:9092",
UseRavenCertificate = true
}
With this enabled, the Kafka connection runs over SSL/TLS, and RavenDB
authenticates with a client certificate derived from its cluster setup. The
certificate is either the cluster server certificate itself (if it carries the
client-auth EKU, the X.509 extension that permits acting as a TLS client) or
a separate certificate that RavenDB issues from the server certificate's key
pair. Either way, this replaces the TLS keys you would otherwise add to
ConnectionOptions.
To complete the setup, register RavenDB's cluster-wide certificate in Kafka's truststore on the target machine(s).
Add a Kafka ETL task
Example: basic
In this example, the Kafka ETL Task will:
- Extract source documents from the "Orders" collection in RavenDB.
- Process each "Order" document using a defined script that creates a new
orderDataobject. - Load the
orderDataobject to the "OrdersTopic" in a Kafka broker.
For more details about the script and the loadTo method, see The transformation script below.
// Define a transformation script for the task:
Transformation transformation = new Transformation
{
// Define the input collections
Collections = { "Orders" },
ApplyToAllDocuments = false,
// The transformation script
Name = "scriptName",
Script = @"// Create an orderData object
var orderData = {
Id: id(this),
OrderLinesCount: this.Lines.length,
TotalCost: 0
};
// Update the orderData's TotalCost field
for (var i = 0; i < this.Lines.length; i++) {
var line = this.Lines[i];
var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
orderData.TotalCost += cost;
}
// Load the object to the 'OrdersTopic' in Kafka
loadToOrdersTopic(orderData, {
Id: id(this),
PartitionKey: id(this),
Type: 'com.example.promotions',
Source: '/promotion-campaigns/summer-sale'
});"
};
// Define the Kafka ETL task:
var etlTask = new QueueEtlConfiguration()
{
BrokerType = QueueBrokerType.Kafka,
Name = "myKafkaEtlTaskName",
ConnectionStringName = "myKafkaConStr",
Transforms = { transformation },
// Set to false to allow task failover to another node if current one is down
PinToMentorNode = false
};
// Deploy (send) the task to the server via the AddEtlOperation:
store.Maintenance.Send(new AddEtlOperation<QueueConnectionString>(etlTask));
Example: delete processed documents
You have the option to delete documents from your RavenDB database once they have been processed by the Queue ETL task.
Set the optional Queues property in your ETL configuration with the list of Kafka topics for which processed documents should be deleted.
var etlTask = new QueueEtlConfiguration()
{
BrokerType = QueueBrokerType.Kafka,
Name = "myKafkaEtlTaskName",
ConnectionStringName = "myKafkaConStr",
Transforms = { transformation },
// Define whether to delete documents from RavenDB after they are sent to the target topic
Queues = new List<EtlQueue>()
{
new()
{
// The name of the Kafka topic
Name = "OrdersTopic",
// When set to 'true',
// documents that were processed by the transformation script will be deleted
// from RavenDB after the message is loaded to the "OrdersTopic" in Kafka.
DeleteProcessedDocuments = true
}
}
};
store.Maintenance.Send(new AddEtlOperation<QueueConnectionString>(etlTask));
The transformation script
The basic characteristics of a Kafka ETL script are similar to those of other ETL types.
The script defines what data to extract from the source document, how to transform this data, and which Kafka topic to load it to.
The loadTo method
To specify which Kafka topic to load the data into, use either of the following methods in your script.
The two methods are equivalent, offering alternative syntax:
-
loadTo<TopicName>(obj, {attributes})- Here the target is specified as part of the function name.
- The target <TopicName> in this syntax is not a variable and cannot be used as one,
it is a string literal of the target's name.
-
loadTo('TopicName', obj, {attributes})- Here the target is passed as an argument to the method.
- Separating the target name from the
loadTocommand makes it possible to include symbols like'-'and'.'in target names.
This is not possible when theloadTo<TopicName>syntax is used because including special characters in the name of a JavaScript function makes it invalid.
| Parameter | Type | Description |
|---|---|---|
| TopicName | string | The name of the Kafka topic. |
| obj | object | The object to transfer. |
| attributes | object | An object with optional and required CloudEvents attributes. |
For example, the following two calls, which load data to "OrdersTopic", are equivalent:
loadToOrdersTopic(obj, {attributes})loadTo('OrdersTopic', obj, {attributes})
A sample script that processes documents from the Orders collection:
// Create an orderData object
var orderData = {
Id: id(this),
OrderLinesCount: this.Lines.length,
TotalCost: 0
};
// Update the orderData's TotalCost field
for (var i = 0; i < this.Lines.length; i++) {
var line = this.Lines[i];
var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
orderData.TotalCost += cost;
}
// Load the object to the "OrdersTopic" in Kafka
loadToOrders(orderData, {
Id: id(this),
PartitionKey: id(this),
Type: 'com.example.promotions',
Source: '/promotion-campaigns/summer-sale'
})
Prerequisites for a secure Kafka server
What needs to be granted
When the Kafka cluster uses ACLs (Access Control Lists), two kinds of ACL grants must be set up on the cluster:
- WRITE on each target topic.
- WRITE and DESCRIBE on each RavenDB transformation script transactional ID.
The transactional ID is generated by RavenDB per transformation script, see Computing the transactional IDs below for the transactional ID format and how to compute it.
Computing the transactional IDs
A transformation script's transactional ID follows this format:
{task-name}/{script-name}-{database-group-id}
When RavenDB builds the actual transactional ID from this template, every / in
the resulting string is replaced by _, since Kafka does not accept / in
transactional ID values.
You set the task name and the script name yourself when you build the
QueueEtlConfiguration.
The database-group ID is the ID of the database group that runs the task (send
GetDatabaseRecordOperation to fetch the database record and read the ID from
its Topology.DatabaseTopologyIdBase64 property).
With the ID in hand, you can compute every transactional ID before deploying the task.
Example: building the transactional ID
// Fetch the database-group ID from the database record
var record = store.Maintenance.Server.Send(
new GetDatabaseRecordOperation(store.Database));
var databaseGroupId = record.Topology.DatabaseTopologyIdBase64;
// Compute the transactional ID for a transformation script
// Build the ID from the template and replace '/' with '_' to match
// what RavenDB will use when registering the producer with Kafka.
var taskName = "OrdersToKafka";
var scriptName = "Promote";
var transactionalId = $"{taskName}/{scriptName}-{databaseGroupId}".Replace("/", "_");
// For databaseGroupId = "a1b2c3d4", transactionalId is:
// "OrdersToKafka_Promote-a1b2c3d4"
Another way to learn a transactional ID is to read it from RavenDB's exception text when a grant is missing or a transactional ID does not match. The exception message names the missing resource and the operation (WRITE or DESCRIBE), and includes the actual transactional ID RavenDB built.
For example, if the WRITE and DESCRIBE grants on a transactional ID are missing, the exception message includes:
ETL process: myKafkaEtlTaskName. Failed to initialize transactions for the producer
instance. Error code: 'TransactionalIdAuthorizationFailed'. Error reason: '...'.
Add the required ACL permissions to your API Key that will make WRITE and DESCRIBE
operations possible for transactional ID 'OrdersToKafka_Promote-a1b2c3d4'.
Entering the grants on the Kafka cluster
Configure the grants on the Kafka cluster using its admin tools. The example below uses Apache Kafka's CLI; the same operations are available in vendor UIs like Confluent Cloud, AKHQ, or kafka-ui.
# Grant WRITE on a target topic
kafka-acls --bootstrap-server <broker> --add \
--allow-principal User:<principal> \
--operation Write --topic <topic-name>
# Grant WRITE and DESCRIBE on a transactional ID
kafka-acls --bootstrap-server <broker> --add \
--allow-principal User:<principal> \
--operation Write --operation Describe \
--transactional-id <transactional-id>
For more on the broker rule set, see Confluent's Authorization for Idempotent and Transactional APIs.
Single-node Kafka cluster
On a single-node Kafka cluster, set the following on the broker
(in server.properties), so that Kafka transactions can initialize:
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
These are not security requirements, but RavenDB's Kafka ETL uses a transactional producer, which cannot initialize on a single-node cluster without them.
SSL/TLS required for encrypted RavenDB databases
When the source RavenDB database is encrypted, the Kafka connection must use SSL/TLS as well. RavenDB refuses to run the task otherwise, at validation time.
To allow the task to run over an unencrypted Kafka connection, set
AllowEtlOnNonEncryptedChannel to true on the
QueueEtlConfiguration. This is a RavenDB-side validation, separate from the ACL
grants discussed above.
Syntax
Connection string classes
- QueueConnectionString
- QueueBrokerType
- KafkaConnectionSettings
public class QueueConnectionString : ConnectionString
{
// Set the broker type to QueueBrokerType.Kafka for a Kafka connection string
public QueueBrokerType BrokerType { get; set; }
// Configure this when setting a connection string for Kafka
public KafkaConnectionSettings KafkaConnectionSettings { get; set; }
// Configure this when setting a connection string for RabbitMQ
public RabbitMqConnectionSettings RabbitMqConnectionSettings { get; set; }
// Configure this when setting a connection string for Azure Queue Storage
public AzureQueueStorageConnectionSettings AzureQueueStorageConnectionSettings { get; set; }
// Configure this when setting a connection string for Amazon SQS
public AmazonSqsConnectionSettings AmazonSqsConnectionSettings { get; set; }
}
public enum QueueBrokerType
{
None,
Kafka,
RabbitMq,
AzureQueueStorage,
AmazonSqs
}
public class KafkaConnectionSettings
{
// A string containing comma-separated keys of "host:port" URLs to Kafka brokers
public string BootstrapServers { get; set; }
// librdkafka client config keys, passed through to the Kafka producer
public Dictionary<string, string> ConnectionOptions { get; set; }
public bool UseRavenCertificate { get; set; }
}
ETL task classes
- QueueEtlConfiguration et al.
public class QueueEtlConfiguration
{
// Set to QueueBrokerType.Kafka to define a Kafka ETL task
public QueueBrokerType BrokerType { get; set; }
// The ETL task name
public string Name { get; set; }
// The registered connection string name
public string ConnectionStringName { get; set; }
// List of transformation scripts
public List<Transformation> Transforms { get; set; }
// Optional configuration per queue
public List<EtlQueue> Queues { get; set; }
// When 'true', RavenDB will not auto-create the target topic/queue;
// the broker must already have it provisioned
public bool SkipAutomaticQueueDeclaration { get; set; }
// Set to 'false' to allow task failover to another node if current one is down
public bool PinToMentorNode { get; set; }
// When 'true', allow the task to run even when the source database is
// encrypted and the connection to the broker is not (no SSL/TLS).
// Otherwise the task is refused at validation time.
public bool AllowEtlOnNonEncryptedChannel { get; set; }
}
public class Transformation
{
// The script name
public string Name { get; set; }
// The source RavenDB collections that serve as the input for the script
public List<string> Collections { get; set; }
// Set whether to apply the script on all collections
public bool ApplyToAllDocuments { get; set; }
// The script itself
public string Script { get; set; }
}
public class EtlQueue
{
// The Kafka topic name
public string Name { get; set; }
// Delete processed documents when set to 'true'
public bool DeleteProcessedDocuments { get; set; }
}