Skip to main content

Data Subscriptions: Consumption API Overview

Subscription worker generation

Subscription worker generation is accessible through the DocumentStore's Subscriptions Property, of type DocumentSubscriptions:

SubscriptionWorker<dynamic> GetSubscriptionWorker(
string subscriptionName, string database = null);

SubscriptionWorker<dynamic> GetSubscriptionWorker(
SubscriptionWorkerOptions options, string database = null);

SubscriptionWorker<T> GetSubscriptionWorker<T>(
string subscriptionName, string database = null) where T : class;

SubscriptionWorker<T> GetSubscriptionWorker<T>(
SubscriptionWorkerOptions options, string database = null) where T : class;
Parameters
subscriptionNamestringThe subscription's name. This parameter appears in more simple overloads allowing to start processing without creating a SubscriptionCreationOptions instance, relying on the default values
optionsSubscriptionWorkerOptionsContains subscription worker, affecting the interaction of the specific worker with the subscription, but does not affect the subscription's definition
databasestringName of the database to look for the data subscription. If null, the default database configured in DocumentStore will be used.
Return value
SubscriptionWorkerA created data subscription worker. When returned, the worker is Idle and it will start working only when the Run function is called.

SubscriptionWorkerOptions

The only mandatory parameter for SubscriptionWorkerOptions creation is the subscription's name.

MemberTypeDescription
SubscriptionNamestringReturns the subscription name passed to the constructor. This name will be used by the server side to identify the subscription in question.
TimeToWaitBeforeConnectionRetryTimeSpanTime to wait before reconnecting, in the case of non-aborting failure during the subscription processing. Default: 5 seconds.
IgnoreSubscriberErrorsboolIf true, will not abort subscription processing if client code, passed to the Run function, throws an unhandled exception. Default: false.
StrategySubscriptionOpeningStrategy(enum)Sets the way the server will treat current and/or other clients when they will try to connect. See Workers interplay. Default: OpenIfFree.
MaxDocsPerBatchintMaximum amount of documents that the server will try sending in a batch. If the server will not find "enough" documents, it won't wait and send the amount it found. Default: 4096.
CloseWhenNoDocsLeftboolIf true, it performs an "ad-hoc" operation that processes all possible documents, until the server can't find any new documents to send. At that moment, the task returned by the Run function will fail and throw a SubscriptionClosedException exception. Default: false.

Running subscription worker

After receiving a subscription worker, the subscription worker is still not processing any documents. SubscriptionWorker's Run function allows you to start processing worker operations.
The Run function receives the client-side code as a delegate that will process the received batches:

Task Run(Action<SubscriptionBatch<T>> processDocuments,
CancellationToken ct = default(CancellationToken));

Task Run(Func<SubscriptionBatch<T>, Task> processDocuments,
CancellationToken ct = default(CancellationToken));
Parameters
processDocumentsAction<SubscriptionBatch<T>>Delegate for sync batches processing
processDocumentsFunc<SubscriptionBatch<T>, Task>Delegate for async batches processing
ctCancellationTokenCancellation token used in order to halt the worker operation
Return value
TaskTask that is alive as long as the subscription worker is processing or tries processing. If the processing is aborted, the task exits with an exception

SubscriptionBatch<T>

MemberTypeDescription
ItemsList<SubscriptionBatch<T>.Item>Batch's items list.
NumberOfItemsInBatchintAmount of items in the batch.
Method SignatureReturn valueDescription
OpenSession()IDocumentSessionNew document session, that tracks all items and included items of the current batch.
OpenAsyncSession()IDocumentSessionNew asynchronous document session, that tracks all items and included items of the current batch.

As long as there is no exception, the worker will continue addressing the same server that the first batch was received from.
If the worker fails to reach that node, it will try to failover to another node from the session's topology list.
The node that the worker succeeded connecting to, will inform the worker which node is currently responsible for data subscriptions.

if T is BlittableJsonReaderObject, no deserialization will take place

MemberTypeDescription
ResultTCurrent batch item.
ExceptionMessagestringMessage of the exception thrown during current document processing in the server side.
IdstringCurrent batch item's underlying document ID.
ChangeVectorstringCurrent batch item's underlying document change vector of the current document.
RawResultBlittableJsonReaderObjectCurrent batch item before serialization to T.
RawMetadataBlittableJsonReaderObjectCurrent batch item's underlying document metadata.
MetadataIMetadataDictionaryCurrent batch item's underlying metadata values.

Usage of RawResult, RawMetadata, and Metadata values outside of the document processing delegate are not supported

SubscriptionWorker<T>

Method SignatureReturn TypeDescription
Dispose()voidAborts subscription worker operation ungracefully by waiting for the task returned by the Run function to finish running.
DisposeAsync()TaskAsync version of Dispose().
Dispose(bool waitForSubscriptionTask)voidAborts the subscription worker, but allows deciding whether to wait for the Run function task or not.
DisposeAsync(bool waitForSubscriptionTask)voidAsync version of DisposeAsync(bool waitForSubscriptionTask).
Run (multiple overloads)TaskStarts the subscription worker work of processing batches, receiving the batch processing delegates (see above).
EventType\Return typeDescription
AfterAcknowledgmentAfterAcknowledgmentAction (event)Event that is risen after each the server acknowledges batch processing progress.
OnSubscriptionConnectionRetryAction<Exception> (event)Event that is fired when the subscription worker tries to reconnect to the server after a failure. The event receives as a parameter the exception that interrupted the processing.
OnDisposedAction<SubscriptionWorker<T>> (event)Event that is fired after the subscription worker was disposed.
Parameters
batchSubscriptionBatch<T>The batch process which was acknowledged
Return value
TaskTask for which the worker will wait for the event processing to be finished (for async functions etc.)
MemberType\Return typeDescription
CurrentNodeTagstringReturns current processing RavenDB server's node tag.
SubscriptionNamestringReturns processed subscription's name.