Concurrent Subscriptions
-
With Concurrent Subscriptions, multiple data subscription workers can connect to the same subscription task simultaneously.
-
Each worker is assigned a different batch of documents to process.
-
By processing different batches in parallel, multiple workers can significantly accelerate the consumption of the subscription's contents.
-
Documents that were assigned to workers whose connection has ended unexpectedly,
can be reassigned by the server to available workers. See connection failure below. -
In this page:
Defining concurrent workers
Concurrent workers are defined similarly to other workers, except their strategy is set to SubscriptionOpeningStrategy.Concurrent.
-
To define a concurrent worker:
- Create the worker using GetSubscriptionWorker.
- Pass it a SubscriptionWorkerOptions instance.
- Set the strategy to
SubscriptionOpeningStrategy.Concurrent
-
Usage:
- Define two concurrent workers
// Define concurrent subscription workers
var subscriptionWorker1 = store.Subscriptions.GetSubscriptionWorker<Order>(
// Set the worker to connect to the "All Orders" subscription task
new SubscriptionWorkerOptions("All Orders")
{
// Set Concurrent strategy
Strategy = SubscriptionOpeningStrategy.Concurrent,
MaxDocsPerBatch = 20
});
var subscriptionWorker2 = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions("All Orders")
{
Strategy = SubscriptionOpeningStrategy.Concurrent,
MaxDocsPerBatch = 20
});
- Run both workers
// Start the concurrent worker. Workers will connect concurrently to the "All Orders" subscription task.
var subscriptionRuntimeTask1 = subscriptionWorker1.Run(batch =>
{
// process batch
foreach (var item in batch.Items)
{
// process item
}
});
var subscriptionRuntimeTask2 = subscriptionWorker2.Run(batch =>
{
// process batch
foreach (var item in batch.Items)
{
// process item
}
});
Dropping a connection
- Use
Subscriptions.DropSubscriptionWorker
to forcefully disconnect the specified worker from the subscription it is connected to.
public void DropSubscriptionWorker<T>(SubscriptionWorker<T> worker, string database = null)
- Usage:
//drop a concurrent subscription worker
store.Subscriptions.DropSubscriptionWorker(subscriptionWorker2);
Connection failure
- When a concurrent worker's connection ends unexpectedly, the server may reassign the documents this worker has been processing to any other concurrent worker that is available.
- A worker that reconnects after a connection failure will be assigned a new batch of documents.
It is not guaranteed that the new batch will contain the same documents this worker was processing before the disconnection. - As a result, documents may be processed more than once:
- first by a worker that disconnected unexpectedly without acknowledging the completion of its assigned documents,
- and later by other workers the documents are reassigned to.