Skip to main content

Queue Sink: Azure Service Bus

The Azure Service Bus Sink Task

Connecting an Azure Service Bus namespace

A RavenDB sink task is a consumer in the Azure Service Bus architecture, reading the messages that producer applications place on the broker.

The task connects to an Azure Service Bus namespace using a connection string that you register with RavenDB. The connection string identifies the namespace and holds the credentials that the task authenticates with.

Read below about adding a connection string via the client API.

Selecting message sources

Each sink script lists one or more sources.

A source is either:

  • a queue, identified by its name, or
  • a topic subscription, identified by its topic and subscription names.

One script can read from several sources, and a sink task can run several scripts, so a single task can consume from any mix of queues and subscriptions in the same namespace.

Learn below how to specify each source via the client API.

Retrieving enqueued messages

A running sink task prevents RavenDB from unloading its host database due to inactivity, so consumption from Azure Service Bus continues uninterrupted even when the database would otherwise go idle.

A producer places a message on a queue or publishes it to a topic, where it waits to be consumed.
The sink task receives the messages available on each of its sources and hands each message to a script.

Running user-defined scripts

A sink script is a JavaScript segment that turns each consumed message into one or more RavenDB documents.

The simplest script stores the message as it arrives, using the put command:

// Set the @collection metadata to store the document in a collection;
// without it, the document is stored in the @empty collection.
this['@metadata']['@collection'] = 'Orders';

// Store the message as-is, reusing its Id property as the document ID.
put(this.Id.toString(), this);

A script can also reshape a message into a different document.
e.g., for a message like this one:

{
"Id": 13,
"FirstName": "John",
"LastName": "Doe"
}

the following script adds a FullName field and stores the document in the Users collection:

var item = {
Id: this.Id,
FirstName: this.FirstName,
LastName: this.LastName,
FullName: this.FirstName + ' ' + this.LastName,
"@metadata": {
"@collection": "Users"
}
};

// Pass the Id as a string, even when Azure Service Bus delivers it as a number.
put(this.Id.toString(), item);

Beyond put, a script can call load to read an existing RavenDB document (e.g., to enrich a message with related data), del to delete a document, and various other commands.

Storing documents in RavenDB collections

The sink task processes a batch of messages and stores the resulting documents in a single transaction, either the whole batch or none of it.

Exceptions to this rule

Some script processing errors are allowed. When such an error occurs, RavenDB skips the affected message, logs the event, and raises an alert, but continues processing the rest of the batch.

Only after the batch is stored does the task acknowledge its messages to Azure Service Bus, which then removes them from the queue or subscription.
Because the task acknowledges a message only after its document is persisted, every message is processed at least once, even if the sink fails partway through a batch.

The number of messages in a batch is configurable.

Handle duplicate messages

At-least-once processing means the same message can be delivered more than once. This happens when a producer enqueues a message more than once, or when a batch takes longer than the message's lock duration (the time Azure Service Bus reserves a message for the sink): once the lock expires, Azure Service Bus redelivers the message.

If processing each message only once matters to the consumer, it is the consumer's responsibility to handle the duplicates. Often this needs no extra work: as long as a message keeps the same Id, the script's put(id, ...) command overwrites the earlier document, so only one copy remains.

Client API

Adding an Azure Service Bus connection string

Before defining a sink task, add an Azure Service Bus connection string for the task to use.
Create a QueueConnectionString object configured for Azure Service Bus, and pass it to PutConnectionStringOperation.

An Azure Service Bus connection authenticates in one of three modes: a SAS connection string, Entra ID application credentials, or passwordless authentication.
Set exactly one of them in the connection string's AzureServiceBusConnectionSettings object:

var connectionString = new QueueConnectionString
{
Name = "AzureServiceBusConStr",
BrokerType = QueueBrokerType.AzureServiceBus,
AzureServiceBusConnectionSettings = new AzureServiceBusConnectionSettings
{
// A Service Bus SAS connection string, copied from the Azure portal
ConnectionString = "Endpoint=sb://<namespace>.servicebus.windows.net/;" +
"SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
}
};

store.Maintenance.Send(
new PutConnectionStringOperation<QueueConnectionString>(connectionString));

For the full property reference, see the Connection string and Authentication credentials classes in the Syntax section.


Adding an Azure Service Bus sink task

To define the sink task, prepare a QueueSinkConfiguration object and pass it to AddQueueSinkOperation.
The configuration needs to name the connection string to use, set the broker type to Azure Service Bus, and hold one or more QueueSinkScript objects.

  • Each script lists its sources in the Queues property, where each entry is a single string:
    • A queue source is just the queue name.
      e.g., orders
    • A topic-subscription source is the topic name and the subscription name joined by a semicolon.
      e.g., orders-topic;ravendb-sub
      The semicolon is an unambiguous separator: Service Bus queue, topic, and subscription names can never contain a semicolon, so it cannot be mistaken for part of either name.
  • AzureServiceBusSinkSource is a helper class that builds and validates these entries, so you don't have to build the topicName;subscriptionName form yourself and can't pass an invalid name:
    • Use AzureServiceBusSinkSource.Queue(queueName) for a queue source.
    • Use AzureServiceBusSinkSource.Subscription(topicName, subscriptionName) for a topic-subscription source.

Example: Reading from a queue

// Read from a single Service Bus queue
var script = new QueueSinkScript
{
Name = "orders",
Queues = new List<string> { AzureServiceBusSinkSource.Queue("orders") },
// Store each message as an Orders document
Script = @"this['@metadata']['@collection'] = 'Orders';
put(this.Id.toString(), this)"
};

var config = new QueueSinkConfiguration
{
Name = "AzureServiceBusSink",
// The connection string added above
ConnectionStringName = "AzureServiceBusConStr",
BrokerType = QueueBrokerType.AzureServiceBus,
Scripts = { script }
};

store.Maintenance.Send(new AddQueueSinkOperation<QueueConnectionString>(config));

Example: Reading from multiple sources

A script can list any mix of queues and subscriptions in its Queues property:

Queues = new List<string>
{
AzureServiceBusSinkSource.Queue("orders"),
AzureServiceBusSinkSource.Subscription("orders-topic", "ravendb-sub")
}

For the QueueSinkConfiguration and QueueSinkScript property reference, see the Sink task classes in the Syntax section.

Configuration Options

Use these configuration options for finer control over the sink task:

Syntax

Methods

The AzureServiceBusSinkSource methods build and validate the entries stored in a script's Queues list.

Validates a queue name and returns it as a sink source entry.

public static string Queue(string queueName)

Usage:

AzureServiceBusSinkSource.Queue("orders")

Parameters:

ParameterTypeDescription
queueNamestringThe Service Bus queue to consume from

Return value:

TypeDescription
stringThe source entry to add to QueueSinkScript.Queues

Classes

Connection string

The connection string a sink task uses to reach an Azure Service Bus namespace.

class QueueConnectionString
{
string Name
QueueBrokerType BrokerType
AzureServiceBusConnectionSettings AzureServiceBusConnectionSettings
}

PropertyTypeDescription
NamestringThe connection string name
BrokerTypeQueueBrokerTypeSet to QueueBrokerType.AzureServiceBus for an Azure Service Bus connection string
AzureServiceBusConnectionSettingsAzureServiceBusConnectionSettingsThe namespace and authentication details

QueueConnectionString is shared by all queue brokers, so it also defines settings for the other broker types (KafkaConnectionSettings, RabbitMqConnectionSettings, AzureQueueStorageConnectionSettings, AmazonSqsConnectionSettings).
Set only the one matching BrokerType.

Authentication credentials

Entra ID application credentials for connecting to a namespace.

class AzureServiceBusEntraId
{
string Namespace
string TenantId
string ClientId
string ClientSecret
}

PropertyTypeDescription
NamespacestringThe fully qualified namespace, e.g. mynamespace.servicebus.windows.net
TenantIdstringThe Entra ID tenant ID
ClientIdstringThe application (client) ID
ClientSecretstringThe application's client secret

Sink task

The configuration of an Azure Service Bus sink task.

class QueueSinkConfiguration
{
string Name
string ConnectionStringName
QueueBrokerType BrokerType
List<QueueSinkScript> Scripts
bool Disabled
string MentorNode
bool PinToMentorNode
long TaskId
}

PropertyTypeDescription
NamestringThe sink task name
ConnectionStringNamestringThe name of the Azure Service Bus connection string the task uses
BrokerTypeQueueBrokerTypeSet to QueueBrokerType.AzureServiceBus (must match the connection string's broker type)
ScriptsList<QueueSinkScript>The scripts the task runs
DisabledboolWhether the task is created in a disabled state
MentorNodestringThe preferred responsible node for the task, if any
PinToMentorNodeboolWhether to pin the task to its mentor node
TaskIdlongThe task's identifier, assigned by the server

Enums

Identifies the message broker that a connection string and sink task use.

enum QueueBrokerType
{
None,
Kafka,
RabbitMq,
AzureQueueStorage,
AmazonSqs,
AzureServiceBus
}

ValueDescription
AzureServiceBusSelects Azure Service Bus. Use this value for an Azure Service Bus connection string and sink task.
None, Kafka, RabbitMq, AzureQueueStorage, AmazonSqsThe other broker types, used by the remaining Queue Sink and Queue ETL brokers.

In this article