Skip to main content

Queue Sink: Apache Kafka

The Queue Sink Task

Connecting a Kafka broker

Users of RavenDB 6.0 and on can create an ongoing Sink task that connects a Kafka broker, retrieves enqueued messages from selected Kafka topics, runs a user-defined script to manipulate data and construct documents, and potentially stores the created documents in RavenDB collections.

In the message broker architecture, RavenDB sinks take the role of data consumers.
A sink would connect a Kafka broker using a connection string, and retrieve messages from the broker's Topics.

Read below about adding a connection string via API.
Read here about adding a connection string using Studio.

Like all ongoing tasks, a sink task is operated by a responsible node.
When the responsibility for the task is moved from one node to another, e.g. from node A to node B as a result of node A down time:

  • The consumer task will maintain the same consumer group id it had on the original node.
  • Kafka brokers may cease serving the sink task for some time as the Kafka consumer group rebalances (adapting to the leaving of one node and the joining of another, among other changes).

Retrieving enqueued messages from selected Kafka topics

While a queue sink task is running, it prevents the host database from being unloaded due to idle operations.
This ensures that message consumption from Kafka topics continues uninterrupted, even if the database would otherwise go idle.

When a message is sent to a Kafka broker by a producer, it is pushed to the tail of a topic. As preceding messages are pulled, the message advances up the queue until it reaches its head and can be consumed by RavenDB's sink.

Running user-defined scripts

A sink task's script is a JavaScript segment. Its basic role is to retrieve selected Kafka messages or message properties, and construct documents that will then be stored in RavenDB.

The script can store the whole message as a document, as in this segment:

// Add the document a metadata `@collection` property to keep it in
// this collection, or do not set it to store the document in @empty).
this['@metadata']['@collection'] = 'Orders';
// Store the message as is, using its Id property as its RavenDB Id as well.
put(this.Id.toString(), this)

The script can also retrieve selected information from the read message and construct a new document that does not resemble the original message.
Scripts often apply two sections: a section that creates a JSON object that defines the document's structure and contents, and a second section that stores the document.

E.g., for Kafka messages of this format:

{
"Id" : 13,
"FirstName" : "John",
"LastName" : "Doe"
}

We can create this script:

var item = {
Id : this.Id,
FirstName : this.FirstName,
LastName : this.LastName,
FullName : this.FirstName + ' ' + this.LastName,
"@metadata" : {
"@collection" : "Users"
}
};

// Use .toString() to pass the Id as a string even if Kafka provides it as a number
put(this.Id.toString(), item)

The script can also apply various other JavaScript commands, including load to load a RavenDB document (e.g. to construct a document that includes data from the retrieved message and complementing data from existing RavenDB documents), del to remove existing RavenDB documents, and many others.

Storing documents in RavenDB collections

The sink task consumes batches of queued messages and stores them in RavenDB in a transactional manner, processing either the entire batch or none of it.

Exceptions to this rule

Some script processing errors are allowed; when such an error occurs RavenDB will skip the affected message, record the event in the logs, and alert the user in Studio, but continue processing the batch.

Once a batch is consumed, the task confirms it by calling kafkaConsumer.Commit().

Note that the number of documents included in a batch is configurable.

Take care of duplicates

Producers may enqueue multiple instances of the same document.
If processing each message only once is important to the consumer, it is the consumer's responsibility to verify the uniqueness of each consumed message.

Note that as long as the Id property of Kafka messages is preserved (so duplicate messages share an Id), the script's put(ID, { ... }) command will overwrite a previous document with the same Id and only one copy of it will remain.

Client API

Adding a Kafka connection string

Before defining a sink task, add a Kafka connection string for the task to use.
Create a QueueConnectionString object configured for Kafka, and pass it to PutConnectionStringOperation.

// Add a Kafka connection string
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(
new QueueConnectionString
{
Name = "KafkaConStr",
BrokerType = QueueBrokerType.Kafka,
KafkaConnectionSettings = new KafkaConnectionSettings()
{ BootstrapServers = "localhost:9092" }
}));

For the full property reference, see the Connection string class in the Syntax section.


Adding a Kafka sink task

To define the sink task, prepare a QueueSinkConfiguration object and pass it to AddQueueSinkOperation.
The configuration needs to name the connection string to use, set the broker type to Kafka, and hold one or more QueueSinkScript objects.

  • Each script lists the Kafka topics to consume from in its Queues property, where each entry is a topic name.
  • Each script's Script property holds the JavaScript that turns the consumed messages into RavenDB documents.

Example: Adding a Kafka sink task

// Add Kafka connection string
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(
new QueueConnectionString
{
Name = "KafkaConStr",
BrokerType = QueueBrokerType.Kafka,
KafkaConnectionSettings = new KafkaConnectionSettings()
{ BootstrapServers = "localhost:9092" }
}));

// Define a Sink script
QueueSinkScript queueSinkScript = new QueueSinkScript
{
// Script name
Name = "orders",
// A list of Kafka topics to consume from
Queues = new List<string>() { "orders" },
// Apply this script
Script = @"this['@metadata']['@collection'] = 'Orders';
put(this.Id.toString(), this)"
};

// Define a Kafka sink task configuration
var config = new QueueSinkConfiguration()
{
// Sink name
Name = "KafkaSinkTaskName",
// The connection string to connect the broker with
ConnectionStringName = "KafkaConStr",
// What queue broker is this task using
BrokerType = QueueBrokerType.Kafka,
// The list of scripts to run
Scripts = { queueSinkScript }
};

AddQueueSinkOperationResult addQueueSinkOperationResult =
store.Maintenance.Send(new AddQueueSinkOperation<QueueConnectionString>(config));

For the QueueSinkConfiguration and QueueSinkScript property reference, see the Sink task classes in the Syntax section.

Configuration Options

Use these configuration options to gain more control over queue sink tasks.

Syntax

Classes

Connection string

The connection string a sink task uses to reach a Kafka broker.

class QueueConnectionString
{
string Name
QueueBrokerType BrokerType
KafkaConnectionSettings KafkaConnectionSettings
}

PropertyTypeDescription
NamestringThe connection string name
BrokerTypeQueueBrokerTypeSet to QueueBrokerType.Kafka for a Kafka connection string
KafkaConnectionSettingsKafkaConnectionSettingsThe broker's bootstrap servers and connection options

QueueConnectionString is shared by all queue brokers, so it also defines settings for the other broker types (RabbitMqConnectionSettings, AzureQueueStorageConnectionSettings, AmazonSqsConnectionSettings, AzureServiceBusConnectionSettings).
Set only the one matching BrokerType.

Sink task

The configuration of a Kafka sink task.

class QueueSinkConfiguration
{
string Name
string ConnectionStringName
QueueBrokerType BrokerType
List<QueueSinkScript> Scripts
bool Disabled
string MentorNode
bool PinToMentorNode
long TaskId
}

PropertyTypeDescription
NamestringThe sink task name
ConnectionStringNamestringThe name of the Kafka connection string the task uses
BrokerTypeQueueBrokerTypeSet to QueueBrokerType.Kafka (must match the connection string's broker type)
ScriptsList<QueueSinkScript>The scripts the task runs
DisabledboolWhether the task is created in a disabled state
MentorNodestringThe preferred responsible node for the task, if any
PinToMentorNodeboolWhether to pin the task to its mentor node
TaskIdlongThe task's identifier, assigned by the server

Enums

Identifies the message broker that a connection string and sink task use.

enum QueueBrokerType
{
None,
Kafka,
RabbitMq,
AzureQueueStorage,
AmazonSqs,
AzureServiceBus
}

ValueDescription
KafkaSelects Apache Kafka. Use this value for a Kafka connection string and sink task.
None, RabbitMq, AzureQueueStorage, AmazonSqs, AzureServiceBusThe other broker types, used by the remaining Queue Sink and Queue ETL brokers.

In this article