Skip to main content

Consume Subscriptions API

Create the subscription worker

A subscription worker can be created using the following GetSubscriptionWorker methods available through the Subscriptions property of the DocumentStore.

Note: Simply creating the worker is insufficient;
after creating the worker, you need to run the subscription worker to initiate document processing.

SubscriptionWorker<dynamic> GetSubscriptionWorker(
string subscriptionName, string database = null);

SubscriptionWorker<dynamic> GetSubscriptionWorker(
SubscriptionWorkerOptions options, string database = null);

SubscriptionWorker<T> GetSubscriptionWorker<T>(
string subscriptionName, string database = null) where T : class;

SubscriptionWorker<T> GetSubscriptionWorker<T>(
SubscriptionWorkerOptions options, string database = null) where T : class;
ParameterTypeDescription
subscriptionNamestringThe name of the subscription to which the worker will connect.
optionsSubscriptionWorkerOptionsOptions that affect how the worker interacts with the subscription. These options do not alter the definition of the subscription itself.
databasestringThe name of the database where the subscription task resides.
If null, the default database configured in DocumentStore will be used.
Return value
SubscriptionWorkerThe subscription worker that has been created.
Initially, it is idle and will only start processing documents when the Run function is called.

SubscriptionWorkerOptions

public class SubscriptionWorkerOptions
{
public string SubscriptionName { get; }
public int MaxDocsPerBatch { get; set; }
public int SendBufferSizeInBytes { get; set; }
public int ReceiveBufferSizeInBytes { get; set; }
public bool IgnoreSubscriberErrors { get; set; }
public bool CloseWhenNoDocsLeft { get; set; }
public TimeSpan TimeToWaitBeforeConnectionRetry { get; set; }
public TimeSpan ConnectionStreamTimeout { get; set; }
public TimeSpan MaxErroneousPeriod { get; set; }
public SubscriptionOpeningStrategy Strategy { get; set; }
}

When creating a worker with SubscriptionWorkerOptions, the only mandatory property is SubscriptionName.
All other parameters are optional and will default to their respective default values if not specified.

MemberTypeDescription
SubscriptionNamestringThe name of the subscription to which the worker will connect.
MaxDocsPerBatchintThe maximum number of documents that the server will try to retrieve and send to the client in a batch. If the server doesn't find as many documents as specified, it will send the documents it has found without waiting. Default: 4096.
SendBufferSizeInBytesintThe size in bytes of the TCP socket buffer used for sending data.
Default: 32,768 bytes (32 KiB).
ReceiveBufferSizeInBytesintThe size in bytes of the TCP socket buffer used for receiving data.
Default: 4096 (4 KiB).
IgnoreSubscriberErrorsboolDetermines if subscription processing is aborted when the worker's batch-handling code throws an unhandled exception.

true – subscription processing will continue.

false (Default) – subscription processing will be aborted.
CloseWhenNoDocsLeftboolDetermines whether the subscription connection closes when no new documents are available.

true – The subscription worker processes all available documents and stops when none remain, at which point the Run method throws a SubscriptionClosedException.
Useful for ad-hoc, one-time processing.

false (Default) – The subscription worker remains active, waiting for new documents.
TimeToWaitBeforeConnectionRetryTimeSpanThe time to wait before attempting to reconnect after a non-aborting failure during subscription processing. Default: 5 seconds.
MaxErroneousPeriodTimeSpanThe maximum amount of time a subscription connection can remain in an erroneous state before it is terminated. Default: 5 minutes.
StrategySubscriptionOpeningStrategyThis enum configures how the server handles connection attempts from workers to a specific subscription task.
Default: OpenIfFree.

Learn more about SubscriptionOpeningStrategy in worker strategies.

public enum SubscriptionOpeningStrategy
{
// Connect if no other worker is connected
OpenIfFree,

// Take over the connection
TakeOver,

// Wait for currently connected worker to disconnect
WaitForFree,

// Connect concurrently
Concurrent
}

Run the subscription worker

After creating a subscription worker, the subscription worker is still not processing any documents.
To start processing, you need to call the Run method of the SubscriptionWorker.

The Run function takes a delegate, which is your client-side code responsible for processing the received document batches.

Task Run(Action<SubscriptionBatch<T>> processDocuments,
CancellationToken ct = default(CancellationToken));

Task Run(Func<SubscriptionBatch<T>, Task> processDocuments,
CancellationToken ct = default(CancellationToken));
ParameterTypeDescription
processDocumentsAction<SubscriptionBatch<T>>Delegate for sync batches processing.
processDocumentsFunc<SubscriptionBatch<T>, Task>Delegate for async batches processing.
ctCancellationTokenCancellation token used in order to halt the worker operation.
Return value
TaskTask that is alive as long as the subscription worker is processing or tries processing.
If the processing is aborted, the task exits with an exception.

SubscriptionBatch<T>

MemberTypeDescription
ItemsList<SubscriptionBatch<T>.Item>List of items in the batch.
See SubscriptionBatch<T>.Item below.
NumberOfItemsInBatchintNumber of items in the batch.
Method SignatureReturn valueDescription
OpenSession()IDocumentSessionOpen a new document session that tracks all items and their included items within the current batch.
OpenAsyncSession()IAsyncDocumentSessionOpen a new asynchronous document session that tracks all items and their included items within the current batch.
Subscription worker connectivity

As long as there is no exception, the worker will continue addressing the same server that the first batch was received from.
If the worker fails to reach that node, it will try to failover to another node from the session's topology list.
The node that the worker succeeded connecting to, will inform the worker which node is currently responsible for data subscriptions.

SubscriptionBatch<T>.Item

This class represents a single item in a subscription batch results.

public struct Item
{
public T Result { get; internal set; }
public string ExceptionMessage { get; internal set; }
public string Id { get; internal set; }
public string ChangeVector { get; internal set; }
public bool Projection { get; internal set; }
public bool Revision { get; internal set; }
public BlittableJsonReaderObject RawResult { get; internal set; }
public BlittableJsonReaderObject RawMetadata { get; internal set; }
public IMetadataDictionary Metadata { get; internal set; }
}
MemberTypeDescription
ResultTThe current batch item.
If T is BlittableJsonReaderObject, no deserialization will take place.
ExceptionMessagestringThe exception message thrown during current document processing in the server side.
IdstringThe document ID of the underlying document for the current batch item.
ChangeVectorstringThe change vector of the underlying document for the current batch item.
RawResultBlittableJsonReaderObjectCurrent batch item before serialization to T.
RawMetadataBlittableJsonReaderObjectCurrent batch item's underlying document metadata.
MetadataIMetadataDictionaryCurrent batch item's underlying metadata values.

This class should only be used within the subscription's Run delegate.
Using it outside this scope may cause unexpected behavior.

SubscriptionWorker<T>

Methods
Method SignatureReturn TypeDescription
Dispose()voidAborts subscription worker operation ungracefully by waiting for the task returned by the Run function to finish running.
DisposeAsync()TaskAsync version of Dispose().
Dispose(bool waitForSubscriptionTask)voidAborts the subscription worker, but allows deciding whether to wait for the Run function task or not.
DisposeAsync(bool waitForSubscriptionTask)TaskAsync version of DisposeAsync(bool waitForSubscriptionTask).
Run (multiple overloads)TaskCall Run to begin the worker's batch processing.
Pass the batch processing delegates to this method
(see above).
Events
EventEvent typeDescription
AfterAcknowledgmentAfterAcknowledgmentActionTriggered after each time the server acknowledges the progress of batch processing.
OnSubscriptionConnectionRetryAction<Exception>Triggered when the subscription worker attempts to reconnect to the server after a failure.
The event receives as a parameter the exception that interrupted the processing.
OnDisposedAction<SubscriptionWorker<T>>Triggered after the subscription worker is disposed.
AfterAcknowledgmentAction
Parameter
batchSubscriptionBatch<T>The batch process which was acknowledged
Return value
TaskTask for which the worker will wait for the event processing to be finished (for async functions, etc.)
Properties
MemberTypeDescription
CurrentNodeTagstringThe node tag of the current RavenDB server handling the subscription.
SubscriptionNamestringThe name of the currently processed subscription.
WorkerIdstringThe worker ID.