Queue Sink: RabbitMQ
-
RabbitMQ brokers are designed to disperse data to multiple queues, making for a flexible data channeling system that can handle complex message streaming scenarios.
-
RavenDB can harness the advantages presented by RabbitMQ brokers both as a producer (by running ETL tasks) and as a consumer (using a sink task to consume enqueued messages).
-
To use RavenDB as a consumer, define an ongoing Sink task that will read batches of JSON formatted messages from RabbitMQ queues, construct documents using user-defined scripts, and store the documents in RavenDB collections.
-
In this article:
The RabbitMQ Sink Task
Connecting a RabbitMQ broker
Users of RavenDB 6.0 and on can create an ongoing Sink task that connects a RabbitMQ broker, retrieves messages from selected queues, runs a user-defined script to manipulate data and construct documents, and potentially stores the created documents in RavenDB collections.
In the message broker architecture, RavenDB sinks take the role of data consumers.
A sink would connect a RabbitMQ broker using a connection string, and retrieve messages
from the broker's queues.
Read below
about adding a connection string via API.
Read here
about adding a connection string using Studio.
Retrieving messages from RabbitMQ queues
While a queue sink task is running, it prevents the host database from being unloaded due to idle operations.
This ensures that message consumption from RabbitMQ queues continues uninterrupted, even if the database would otherwise go idle.
When a message is sent to a RabbitMQ broker by a producer, it is pushed to the tail of a queue. As preceding messages are pulled, the message advances up the queue until it reaches its head and can be consumed by RavenDB's sink.
Running user-defined scripts
A sink task's script is a JavaScript segment. Its basic role is to retrieve selected RabbitMQ messages or message properties, and construct documents that will then be stored in RavenDB.
The script can store the whole message as a document, as in this segment:
// Add the document a metadata `@collection` property to keep it in
// this collection, or do not set it to store the document in @empty).
this['@metadata']['@collection'] = 'Orders';
// Store the message as is, using its Id property as its RavenDB Id as well.
put(this.Id.toString(), this)
The script can also retrieve selected information from the read message and
construct a new document that does not resemble the original message.
Scripts often apply two sections: a section that creates a JSON object
that defines the document's structure and contents, and a second section
that stores the document.
E.g., for RabbitMQ messages of this format:
{
"Id" : 13,
"FirstName" : "John",
"LastName" : "Doe"
}
We can create this script:
var item = {
Id : this.Id,
FirstName : this.FirstName,
LastName : this.LastName,
FullName : this.FirstName + ' ' + this.LastName,
"@metadata" : {
"@collection" : "Users"
}
};
// Use .toString() to pass the Id as a string even if RabbitMQ provides it as a number
put(this.Id.toString(), item)
The script can also apply various other JavaScript commands, including
load to load a RavenDB document (e.g. to construct a document that
includes data from the retrieved message and complementing data from
existing RavenDB documents), del to remove existing RavenDB documents,
and many others.
Storing documents in RavenDB collections
The sink task consumes batches of queued messages and stores them in RavenDB in a transactional manner, processing either the entire batch or none of it.
Some script processing errors are allowed; when such an error occurs RavenDB will skip the affected message, record the event in the logs, and alert the user in Studio, but continue processing the batch.
Once a batch is consumed, the task confirms it by sending _channel.BasicAck.
Note that the number of documents included in a batch is configurable.
Producers may enqueue
multiple instances
of the same document.
If processing each message only once is important to the consumer,
it is the consumer's responsibility to verify the uniqueness of
each consumed message.
Note that as long as the Id property of RabbitMQ messages is preserved
(so duplicate messages share an Id), the script's put(ID, { ... }) command
will overwrite a previous document with the same Id and only one copy of
it will remain.
Client API
Adding a RabbitMQ connection string
Before defining a sink task, add a RabbitMQ connection string for the task to use.
Create a QueueConnectionString object configured for RabbitMQ, and pass it to
PutConnectionStringOperation.
// Add a RabbitMQ connection string
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(
new QueueConnectionString
{
Name = "RabbitMqConStr",
BrokerType = QueueBrokerType.RabbitMq,
RabbitMqConnectionSettings = new RabbitMqConnectionSettings()
{ ConnectionString = "amqp://guest:guest@localhost:5672/" }
}));
For the full property reference, see the Connection string class in the Syntax section.
Adding a RabbitMQ sink task
To define the sink task, prepare a QueueSinkConfiguration object and pass it to
AddQueueSinkOperation.
The configuration needs to name the connection string to use, set the broker type to RabbitMQ,
and hold one or more QueueSinkScript objects.
- Each script lists the RabbitMQ queues to consume from in its
Queuesproperty, where each entry is a queue name. - Each script's
Scriptproperty holds the JavaScript that turns the consumed messages into RavenDB documents.
Example: Adding a RabbitMQ sink task
// Add RabbitMQ connection string
var res = store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(
new QueueConnectionString
{
Name = "RabbitMqConStr",
BrokerType = QueueBrokerType.RabbitMq,
RabbitMqConnectionSettings = new RabbitMqConnectionSettings()
{ ConnectionString = "amqp://guest:guest@localhost:5672/" }
}));
// Define a Sink script
QueueSinkScript queueSinkScript = new QueueSinkScript
{
// Script name
Name = "orders",
// A list of RabbitMQ queues to consume from
Queues = new List<string>() { "orders" },
// Apply this script
Script = @"this['@metadata']['@collection'] = 'Orders';
put(this.Id.toString(), this)"
};
// Define a RabbitMQ sink task configuration
var config = new QueueSinkConfiguration()
{
// Sink name
Name = "RabbitMqSinkTaskName",
// The connection string to connect the broker with
ConnectionStringName = "RabbitMqConStr",
// What queue broker is this task using
BrokerType = QueueBrokerType.RabbitMq,
// The list of scripts to run
Scripts = { queueSinkScript }
};
AddQueueSinkOperationResult addQueueSinkOperationResult =
store.Maintenance.Send(new AddQueueSinkOperation<QueueConnectionString>(config));
For the QueueSinkConfiguration and QueueSinkScript property reference, see the
Sink task
classes in the Syntax section.
Configuration Options
Use these configuration options to gain more control over queue sink tasks.
- QueueSink.MaxBatchSize
The maximum number of pulled messages consumed in a single batch. - QueueSink.MaxFallbackTimeInSec
The maximum number of seconds the Queue Sink process will be in a fallback mode (i.e. suspending the process) after a connection failure.
Syntax
Classes
Connection string
- QueueConnectionString
- RabbitMqConnectionSettings
The connection string a sink task uses to reach a RabbitMQ broker.
class QueueConnectionString
{
string Name
QueueBrokerType BrokerType
RabbitMqConnectionSettings RabbitMqConnectionSettings
}
| Property | Type | Description |
|---|---|---|
| Name | string | The connection string name |
| BrokerType | QueueBrokerType | Set to QueueBrokerType.RabbitMq for a RabbitMQ connection string |
| RabbitMqConnectionSettings | RabbitMqConnectionSettings | The broker's connection details |
QueueConnectionString is shared by all queue brokers, so it also defines settings for the other
broker types (KafkaConnectionSettings, AzureQueueStorageConnectionSettings,
AmazonSqsConnectionSettings, AzureServiceBusConnectionSettings).
Set only the one matching BrokerType.
The connection details for a RabbitMQ broker.
class RabbitMqConnectionSettings
{
string ConnectionString
}
| Property | Type | Description |
|---|---|---|
| ConnectionString | string | An AMQP connection string for the RabbitMQ broker, e.g. amqp://guest:guest@localhost:5672/ |
Sink task
- QueueSinkConfiguration
- QueueSinkScript
The configuration of a RabbitMQ sink task.
class QueueSinkConfiguration
{
string Name
string ConnectionStringName
QueueBrokerType BrokerType
List<QueueSinkScript> Scripts
bool Disabled
string MentorNode
bool PinToMentorNode
long TaskId
}
| Property | Type | Description |
|---|---|---|
| Name | string | The sink task name |
| ConnectionStringName | string | The name of the RabbitMQ connection string the task uses |
| BrokerType | QueueBrokerType | Set to QueueBrokerType.RabbitMq (must match the connection string's broker type) |
| Scripts | List<QueueSinkScript> | The scripts the task runs |
| Disabled | bool | Whether the task is created in a disabled state |
| MentorNode | string | The preferred responsible node for the task, if any |
| PinToMentorNode | bool | Whether to pin the task to its mentor node |
| TaskId | long | The task's identifier, assigned by the server |
A script that turns the messages from its queues into RavenDB documents.
class QueueSinkScript
{
string Name
List<string> Queues
string Script
bool Disabled
}
| Property | Type | Description |
|---|---|---|
| Name | string | The script name |
| Queues | List<string> | The RabbitMQ queues to consume from |
| Script | string | The JavaScript that processes each message |
| Disabled | bool | Whether the script is disabled |
Enums
- QueueBrokerType
Identifies the message broker that a connection string and sink task use.
enum QueueBrokerType
{
None,
Kafka,
RabbitMq,
AzureQueueStorage,
AmazonSqs,
AzureServiceBus
}
| Value | Description |
|---|---|
| RabbitMq | Selects RabbitMQ. Use this value for a RabbitMQ connection string and sink task. |
| None, Kafka, AzureQueueStorage, AmazonSqs, AzureServiceBus | The other broker types, used by the remaining Queue Sink and Queue ETL brokers. |