Skip to main content

Data Subscriptions: Subscription Consumption Examples

Worker with a specified batch size

Here we create a worker, specifying the maximum batch size we want to receive.

var workerWBatch = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions(subscriptionName)
{
MaxDocsPerBatch = 20
});

_ = workerWBatch.Run(x =>
{
// your custom logic
});

Client with full exception handling and processing retries

Here we implement a client that treats exceptions thrown by worker, and retries creating the worker if an exception is recoverable.

while (true)
{
// Create the worker:
// ==================
var options = new SubscriptionWorkerOptions(subscriptionName);

// Configure the worker:
// Allow a downtime of up to 2 hours,
// and wait 2 minutes before reconnecting
options.MaxErroneousPeriod = TimeSpan.FromHours(2);
options.TimeToWaitBeforeConnectionRetry = TimeSpan.FromMinutes(2);

subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(options);

try
{
// Subscribe to connection retry events
// and log any exceptions that occur during processing
subscriptionWorker.OnSubscriptionConnectionRetry += exception =>
{
Logger.Error("Error during subscription processing: " + subscriptionName,
exception);
};

// Run the worker:
// ===============
await subscriptionWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
// Forcefully stop subscription processing if the ID is "companies/2-A"
// and throw an exception to let external logic handle the specific case
if (item.Result.Company == "companies/2-A")
{
// The custom exception thrown from here
// will be wrapped by `SubscriberErrorException`
throw new UnsupportedCompanyException(
"Company ID can't be 'companies/2-A', pleases fix");
}

// Process the order document - provide your own logic
ProcessOrder(item.Result);
}
}, cancellationToken);

// The Run method will stop if the subscription worker is disposed,
// exiting the while loop
return;
}
catch (Exception e)
{
Logger.Error("Failure in subscription: " + subscriptionName, e);

// The following exceptions are Not recoverable
if (e is DatabaseDoesNotExistException ||
e is SubscriptionDoesNotExistException ||
e is SubscriptionInvalidStateException ||
e is AuthorizationException)
throw;


if (e is SubscriptionClosedException)
// Subscription probably closed explicitly by admin
return;

if (e is SubscriberErrorException se)
{
// For UnsupportedCompanyException we want to throw an exception,
// otherwise, continue processing
if (se.InnerException != null && se.InnerException is UnsupportedCompanyException)
{
throw;
}

// Call continue to skip the current while(true) iteration and try reconnecting
// in the next one, allowing the worker to process future batches.
continue;
}

// Handle this depending on the subscription opening strategy
if (e is SubscriptionInUseException)
continue;

// Call return to exit the while(true) loop,
// dispose the worker (via finally), and stop the subscription.
return;
}
finally
{
subscriptionWorker.Dispose();
}
}

Subscription that ends when no documents left

Here we create a subscription client that runs only up to the point there are no more new documents left to process.

This is useful for an ad-hoc single use processing that the user wants to be sure is performed completely.

// Create the subscription task on the server:
// ===========================================
var subscriptionName = store.Subscriptions.Create<Order>(
new SubscriptionCreationOptions<Order>
{
Filter = order => order.Lines.Sum(line => line.PricePerUnit * line.Quantity) > 10000,
Projection = order => new OrderAndCompany
{
OrderId = order.Id,
Company = RavenQuery.Load<Company>(order.Company)
}
});

// Create the subscription worker that will consume the documents:
// ===============================================================
var highValueOrdersWorker = store.Subscriptions.GetSubscriptionWorker<OrderAndCompany>(
new SubscriptionWorkerOptions(subscriptionName)
{
// Here we set the worker to stop when there are no more documents left to send
// Will throw SubscriptionClosedException when it finishes it's job
CloseWhenNoDocsLeft = true
});

try
{
await highValueOrdersWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
SendThankYouNoteToEmployee(item.Result); // call your custom method
}
});
}
catch (SubscriptionClosedException)
{
// That's expected, no more documents to process
}

Worker that processes dynamic objects

Here we create a worker that processes received data as dynamic objects.

// Create the subscription task on the server:
// ===========================================

var subscriptionName = "My dynamic subscription";
store.Subscriptions.Create(new SubscriptionCreationOptions<Order>()
{
Name = subscriptionName,
Projection = order =>
new { DynanamicField_1 = "Company: " + order.Company + " Employee: " + order.Employee }
});

// Create the subscription worker that will consume the documents:
// ===============================================================

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
// Access the dynamic field in the document
dynamic field = item.Result.DynanamicField_1;

// Call your custom method
ProcessItem(field);
}
});

Subscription that works with a session

Here we create a worker that receives all orders without a shipping date, lets the shipment mechanism to handle it and updates the ShippedAt field value.

// Create the subscription task on the server:
// ===========================================

var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
Query = @"from Orders as o where o.ShippedAt = null"
});

// Create the subscription worker that will consume the documents:
// ===============================================================

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
// Open a session with 'batch.OpenSession'
using (var session = batch.OpenSession())
{
foreach (var order in batch.Items.Select(x => x.Result))
{
TransferOrderToShipmentCompany(order); // call your custom method
order.ShippedAt = DateTime.UtcNow; // update the document field
}

// Save the updated Order documents
session.SaveChanges();
}
});

Subscription that uses included documents

Here we create a subscription utilizing the includes feature, by processing Order documents and including all Products of each order.
When processing the subscription, we create a session using the SubscriptionBatch<T> object, and for each order line, we obtain the Product document and process it alongside with the Order.

// Create the subscription task on the server:
// ===========================================

var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions()
{
// Include the referenced Product documents for each Order document
Query = @"from Orders include Lines[].Product"
});

// Create the subscription worker that will consume the documents:
// ===============================================================

var subscriptionWorker = store.Subscriptions.GetSubscriptionWorker<Order>(subscriptionName);
_ = subscriptionWorker.Run(batch =>
{
// Open a session via 'batch.OpenSession'
// in order to access the Product documents
using (var session = batch.OpenSession())
{
foreach (var order in batch.Items.Select(x => x.Result))
{
foreach (var orderLine in order.Lines)
{
// Calling Load will Not generate a request to the server,
// because orderLine.Product was included in the batch
var product = session.Load<Product>(orderLine.Product);

ProcessOrderAndProduct(order, product); // call your custom method
}
}
}
});

Subscription that works with lowest level API

Here we create a subscription that works with blittable document representation that can be useful in very high performance scenarios, but it may be dangerous due to the direct usage of unmanaged memory.

// Create the subscription task on the server:
// ===========================================

var subscriptionName = store.Subscriptions.Create(new SubscriptionCreationOptions<Order>
{
Projection = x => new
{
x.Employee
}
});

// Create the subscription worker that will consume the documents:
// ===============================================================

var subscriptionWorker =
// Specify `BlittableJsonReaderObject` as the generic type parameter
store.Subscriptions.GetSubscriptionWorker<BlittableJsonReaderObject>(subscriptionName);

_ = subscriptionWorker.Run(batch =>
{
foreach (var item in batch.Items)
{
// Access the Employee field within the blittable object
var employeeField = item.Result["Employee"].ToString();

ProcessItem(employeeField); // call your custom method
}
});

Two subscription workers that are waiting for each other

Here we create two workers:

  • The main worker with the TakeOver strategy that will take over the other one and will take the lead
  • The secondary worker that will wait for the first one fail (due to machine failure etc.)

The main worker:

var primaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions(subscriptionName)
{
Strategy = SubscriptionOpeningStrategy.TakeOver
});

while (true)
{
try
{
await primaryWorker.Run(x =>
{
// your logic
});
}
catch (Exception)
{
// retry
}
}

The secondary worker:

var secondaryWorker = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions(subscriptionName)
{
Strategy = SubscriptionOpeningStrategy.WaitForFree
});

while (true)
{
try
{
await secondaryWorker.Run(x =>
{
// your logic
});
}
catch (Exception)
{
// retry
}
}