Skip to main content

Queue ETL: Apache Kafka

Add a Kafka connection string

Before setting up the ETL task, define a connection string that the task will use to connect to the message broker's bootstrap servers.

Example

// Prepare the connection string:
// ==============================
var conStr = new QueueConnectionString
{
// Provide a name for this connection string
Name = "myKafkaConStr",

// Set the broker type
BrokerType = QueueBrokerType.Kafka,

// Configure the connection details
KafkaConnectionSettings = new KafkaConnectionSettings()
{ BootstrapServers = "localhost:9092" }
};

// Deploy (send) the connection string to the server via the PutConnectionStringOperation:
// =======================================================================================
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(conStr));

Syntax

public class QueueConnectionString : ConnectionString
{
// Set the broker type to QueueBrokerType.Kafka for a Kafka connection string
public QueueBrokerType BrokerType { get; set; }

// Configure this when setting a connection string for Kafka
public KafkaConnectionSettings KafkaConnectionSettings { get; set; }

// Configure this when setting a connection string for RabbitMQ
public RabbitMqConnectionSettings RabbitMqConnectionSettings { get; set; }

// Configure this when setting a connection string for Azure Queue Storage
public AzureQueueStorageConnectionSettings AzureQueueStorageConnectionSettings { get; set; }
}
public enum QueueBrokerType
{
None,
Kafka,
RabbitMq,
AzureQueueStorage
}
public class KafkaConnectionSettings
{
// A string containing comma-separated keys of "host:port" URLs to Kafka brokers
public string BootstrapServers { get; set; }

// Various configuration options
public Dictionary<string, string> ConnectionOptions { get; set; }

public bool UseRavenCertificate { get; set; }
}

Add a Kafka ETL task

Example - basic:

  • In this example, the Kafka ETL Task will -
    • Extract source documents from the "Orders" collection in RavenDB.
    • Process each "Order" document using a defined script that creates a new orderData object.
    • Load the orderData object to the "OrdersTopic" in a Kafka broker.
  • For more details about the script and the loadTo method, see the transromation script section below.
// Define a transformation script for the task: 
// ============================================
Transformation transformation = new Transformation
{
// Define the input collections
Collections = { "Orders" },
ApplyToAllDocuments = false,

// The transformation script
Name = "scriptName",
Script = @"// Create an orderData object
// ==========================
var orderData = {
Id: id(this),
OrderLinesCount: this.Lines.length,
TotalCost: 0
};

// Update the orderData's TotalCost field
// ======================================
for (var i = 0; i < this.Lines.length; i++) {
var line = this.Lines[i];
var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
orderData.TotalCost += cost;
}

// Load the object to the 'OrdersTopic' in Kafka
// =============================================
loadToOrdersTopic(orderData, {
Id: id(this),
PartitionKey: id(this),
Type: 'com.example.promotions',
Source: '/promotion-campaigns/summer-sale'
});"
};

// Define the Kafka ETL task:
// ==========================
var etlTask = new QueueEtlConfiguration()
{
BrokerType = QueueBrokerType.Kafka,

Name = "myKafkaEtlTaskName",
ConnectionStringName = "myKafkaConStr",

Transforms = { transformation },

// Set to false to allow task failover to another node if current one is down
PinToMentorNode = false
};

// Deploy (send) the task to the server via the AddEtlOperation:
// =============================================================
store.Maintenance.Send(new AddEtlOperation<QueueConnectionString>(etlTask));

Example - delete processed documents:

  • You have the option to delete documents from your RavenDB database once they have been processed by the Queue ETL task.

  • Set the optional Queues property in your ETL configuration with the list of Kafka topics for which processed documents should be deleted.

var etlTask = new QueueEtlConfiguration()
{
BrokerType = QueueBrokerType.Kafka,

Name = "myKafkaEtlTaskName",
ConnectionStringName = "myKafkaConStr",

Transforms = { transformation },

// Define whether to delete documents from RavenDB after they are sent to the target topic
Queues = new List<EtlQueue>()
{
new()
{
// The name of the Kafka topic
Name = "OrdersTopic",

// When set to 'true',
// documents that were processed by the transformation script will be deleted
// from RavenDB after the message is loaded to the "OrdersTopic" in Kafka.
DeleteProcessedDocuments = true
}
}
};

store.Maintenance.Send(new AddEtlOperation<QueueConnectionString>(etlTask));

Syntax

public class QueueEtlConfiguration
{
// Set to QueueBrokerType.Kafka to define a Kafka ETL task
public QueueBrokerType BrokerType { get; set; }
// The ETL task name
public string Name { get; set; }
// The registered connection string name
public string ConnectionStringName { get; set; }
// List of transformation scripts
public List<Transformation> Transforms { get; set; }
// Optional configuration per queue
public List<EtlQueue> Queues { get; set; }
// Set to 'false' to allow task failover to another node if current one is down
public bool PinToMentorNode { get; set; }
}

public class Transformation
{
// The script name
public string Name { get; set; }
// The source RavenDB collections that serve as the input for the script
public List<string> Collections { get; set; }
// Set whether to apply the script on all collections
public bool ApplyToAllDocuments { get; set; }
// The script itself
public string Script { get; set; }
}

public class EtlQueue
{
// The Kafka topic name
public string Name { get; set; }
// Delete processed documents when set to 'true'
public bool DeleteProcessedDocuments { get; set; }
}

The transformation script

The basic characteristics of a Kafka ETL script are similar to those of other ETL types.
The script defines what data to extract from the source document, how to transform this data,
and which Kafka Topic to load it to.

The loadTo method

To specify which Kafka topic to load the data into, use either of the following methods in your script.
The two methods are equivalent, offering alternative syntax:

  • loadTo<TopicName>(obj, {attributes})

    • Here the target is specified as part of the function name.
    • The target <TopicName> in this syntax is Not a variable and cannot be used as one,
      it is simply a string literal of the target's name.
  • loadTo('TopicName', obj, {attributes})

    • Here the target is passed as an argument to the method.

    • Separating the target name from the loadTo command makes it possible to include symbols like '-' and '.' in target names. This is not possible when the loadTo<TopicName> syntax is used because including special characters in the name of a JavaScript function makes it invalid.

      ParameterTypeDescription
      TopicNamestringThe name of the Kafka topic
      objobjectThe object to transfer
      attributesobjectAn object with optional & required CloudEvents attributes

For example, the following two calls, which load data to "OrdersTopic", are equivalent:

  • loadToOrdersTopic(obj, {attributes})
  • loadTo('OrdersTopic', obj, {attributes}) A sample script that process documents from the Orders collection:
// Create an orderData object
// ==========================
var orderData = {
Id: id(this),
OrderLinesCount: this.Lines.length,
TotalCost: 0
};

// Update the orderData's TotalCost field
// ======================================
for (var i = 0; i < this.Lines.length; i++) {
var line = this.Lines[i];
var cost = (line.Quantity * line.PricePerUnit) * ( 1 - line.Discount);
orderData.TotalCost += cost;
}

// Load the object to the "OrdersTopic" in Kafka
// =============================================
loadToOrders(orderData, {
Id: id(this),
PartitionKey: id(this),
Type: 'com.example.promotions',
Source: '/promotion-campaigns/summer-sale'
})