Data Subscriptions: Subscription Consumption Examples
In this page:
Worker with a specified batch size
Client with full exception handling and processing retries
Subscription that ends when no documents left
Worker that processes dynamic objects
Subscription that works with a session
Subscription that uses included documents
Subscription that works with lowest level API
Two subscription workers that are waiting for each other
Worker with a specified batch size
Here we create a worker, specifying the maximum batch size we want to receive.
var workerWBatch = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions(subscriptionName)
{
MaxDocsPerBatch = 20
});
_ = workerWBatch.Run(x =>
{
// your custom logic
});
Client with full exception handling and processing retries
Here we implement a client that treats exceptions thrown by worker, and retries creating the worker if an exception is recoverable.
while (true)
{
// Create the worker:
// ==================
var options = new SubscriptionWorkerOptions(subscriptionName);
// Configure the worker:
// Allow a downtime of up to 2 hours,
// and wait 2 minutes before reconnecting
options.MaxErroneousPeriod = TimeSpan.FromHours(2);
options.TimeToWaitBeforeConnectionRetry = TimeSpan.FromMinutes(2);
subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(options);
try
{
// Subscribe to connection retry events
// and log any exceptions that occur during processing
subscriptionWorker.OnSubscriptionConnectionRetry += exception =>
{
Logger.Error("Error during subscription processing: " + subscriptionName,
exception);
};
// Run the worker:
// ===============
await subscriptionWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
// Forcefully stop subscription processing if the ID is "companies/2-A"
// and throw an exception to let external logic handle the specific case
if (item.Result.Company == "companies/2-A")
{
// The custom exception thrown from here
// will be wrapped by `SubscriberErrorException`
throw new UnsupportedCompanyException(
"Company ID can't be 'companies/2-A', pleases fix");
}
// Process the order document - provide your own logic
ProcessOrder(item.Result);
}
}, cancellationToken);
// The Run method will stop if the subscription worker is disposed,
// exiting the while loop
return;
}
catch (Exception e)
{
Logger.Error("Failure in subscription: " + subscriptionName, e);
// The following exceptions are Not recoverable
if (e is DatabaseDoesNotExistException ||
e is SubscriptionDoesNotExistException ||
e is SubscriptionInvalidStateException ||
e is AuthorizationException)
throw;
if (e is SubscriptionClosedException)
// Subscription probably closed explicitly by admin
return;
if (e is SubscriberErrorException se)
{
// For UnsupportedCompanyException we want to throw an exception,
// otherwise, continue processing
if (se.InnerException != null && se.InnerException is UnsupportedCompanyException)
{
throw;
}
// Call continue to skip the current while(true) iteration and try reconnecting
// in the next one, allowing the worker to process future batches.
continue;
}
// Handle this depending on the subscription opening strategy
if (e is SubscriptionInUseException)
continue;
// Call return to exit the while(true) loop,
// dispose the worker (via finally), and stop the subscription.
return;
}
finally
{
subscriptionWorker.Dispose();
}
}
Subscription that ends when no documents left
Here we create a subscription client that runs only up to the point there are no more new documents left to process.
This is useful for an ad-hoc single use processing that the user wants to be sure is performed completely.
// Create the subscription task on the server:
// ===========================================
var subscriptionName = store.Subscriptions.Create<Order>(
new SubscriptionCreationOptions<Order>
{
Filter = order => order.Lines.Sum(line => line.PricePerUnit * line.Quantity) > 10000,
Projection = order => new OrderAndCompany
{
OrderId = order.Id,
Company = RavenQuery.Load<Company>(order.Company)
}
});
// Create the subscription worker that will consume the documents:
// ===============================================================
var highValueOrdersWorker = store.Subscriptions.GetSubscriptionWorker<OrderAndCompany>(
new SubscriptionWorkerOptions(subscriptionName)
{
// Here we set the worker to stop when there are no more documents left to send
// Will throw SubscriptionClosedException when it finishes it's job
CloseWhenNoDocsLeft = true
});
try
{
await highValueOrdersWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
SendThankYouNoteToEmployee(item.Result); // call your custom method
}
});
}
catch (SubscriptionClosedException)
{
// That's expected, no more documents to process
}
Worker that processes dynamic objects
Here we create a worker that processes received data as dynamic objects.
// Create the subscription task on the server:
// ===========================================
var subscriptionName = "My dynamic subscription";
store.Subscriptions.Create(new SubscriptionCreationOptions<Order>()
{
Name = subscriptionName,
Projection = order =>
new { DynanamicField_1 = "Company: " + order.Company + " Employee: " + order.Employee }
});
// Create the subscription worker that will consume the documents:
// ===============================================================
var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
// Access the dynamic field in the document
dynamic field = item.Result.DynanamicField_1;
// Call your custom method
ProcessItem(field);
}
});
Subscription that works with a session
Here we create a worker that receives all orders without a shipping date, lets the shipment mechanism to handle it and updates the ShippedAt
field value.
// Create the subscription task on the server:
// ===========================================
var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
Query = @"from Orders as o where o.ShippedAt = null"
});
// Create the subscription worker that will consume the documents:
// ===============================================================
var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
// Open a session with 'batch.OpenSession'
using (var session = batch.OpenSession())
{
foreach (var order in batch.Items.Select(x => x.Result))
{
TransferOrderToShipmentCompany(order); // call your custom method
order.ShippedAt = DateTime.UtcNow; // update the document field
}
// Save the updated Order documents
session.SaveChanges();
}
});
Subscription that uses included documents
Here we create a subscription utilizing the includes feature, by processing Order
documents and including all Product
s of each order.
When processing the subscription, we create a session using the SubscriptionBatch<T> object,
and for each order line, we obtain the Product
document and process it alongside with the Order
.
// Create the subscription task on the server:
// ===========================================
var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
// Include the referenced Product documents for each Order document
Query = @"from Orders include Lines[].Product"
});
// Create the subscription worker that will consume the documents:
// ===============================================================
var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
// Open a session via 'batch.OpenSession'
// in order to access the Product documents
using (var session = batch.OpenSession())
{
foreach (var order in batch.Items.Select(x => x.Result))
{
foreach (var orderLine in order.Lines)
{
// Calling Load will Not generate a request to the server,
// because orderLine.Product was included in the batch
var product = session.Load<Product>(orderLine.Product);
ProcessOrderAndProduct(order, product); // call your custom method
}
}
}
});
Subscription that works with lowest level API
Here we create a subscription that works with blittable document representation that can be useful in very high performance scenarios, but it may be dangerous due to the direct usage of unmanaged memory.
// Create the subscription task on the server:
// ===========================================
var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions<Order>
{
Projection = x => new
{
x.Employee
}
});
// Create the subscription worker that will consume the documents:
// ===============================================================
var subscriptionWorker =
// Specify `BlittableJsonReaderObject` as the generic type parameter
store.Subscriptions.GetSubscriptionWorker<BlittableJsonReaderObject>(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
// Access the Employee field within the blittable object
var employeeField = item.Result["Employee"].ToString();
ProcessItem(employeeField); // call your custom method
}
});
Two subscription workers that are waiting for each other
Here we create two workers:
- The main worker with the
TakeOver
strategy that will take over the other one and will take the lead - The secondary worker that will wait for the first one fail (due to machine failure etc.)
The main worker:
var primaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions(subscriptionName)
{
Strategy = SubscriptionOpeningStrategy.TakeOver
});
while (true)
{
try
{
await primaryWorker.Run(x =>
{
// your logic
});
}
catch (Exception)
{
// retry
}
}
The secondary worker:
var secondaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions(subscriptionName)
{
Strategy = SubscriptionOpeningStrategy.WaitForFree
});
while (true)
{
try
{
await secondaryWorker.Run(x =>
{
// your logic
});
}
catch (Exception)
{
// retry
}
}