Data Subscriptions: Revisions Support
Data subscription supports subscribing not only on documents, but also on all their revisions.
Revision support should be defined in the subscription. It also requires revisions to be configured on the collection in question.
While regular subscriptions process single documents, subscription on documents revisions processes pairs of subsequent document revisions.
Such functionality allows keeping track of each change that was performed on a document, and even to compare two subsequent versions of a document.
Both document revisions are accessible in the filtering and the projection process.
In this page:
Revisions processing
Simple declaration and usage
Revisions processing and projection
Revisions processing order
Documents revisions feature allows tracking changes that were performed on a document, by storing the audit trail of its changes over time.
An audit trail entry is called a Document Revision and is comprised of a document snapshot.
In data subscription, Documents Revisions will be processed in pairs of subsequent entries.
Example:
Let us assume a user document that looks like:
{ Name:'James', Age:'21' }
We update the User document twice, in separate operations:
- We update the 'Age' field to the value of 22
- We update the 'Age' field to the value of 23
Data subscription's revision processing mechanism will receive pairs of revision in the following order:
# | Previous | Current |
---|---|---|
1 | null | { Name:'James', Age:'21' } |
2 | { Name:'James', Age:'21' } | { Name:'James', Age:'22' } |
3 | { Name:'James', Age:'22' } | { Name:'James', Age:'23' } |
As seen above, in order for subscriptions on revisions to work properly, it needs the revisions entries to be available, otherwise, there will be no data to process. Therfore, it's crucial to make sure that the revisions configuration allows storing documents revisions enough time, without discarding unprocessed revisions
Simple declaration and usage
Here we declare a simple revisions subscription that will send pairs of subsequent document revisions to the client:
Creation:
- Generic-syntax
- RQL-syntax
name = store.Subscriptions.Create(
new SubscriptionCreationOptions<Revision<Order>>());
name = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions()
{
Query = @"From Orders (Revisions = true)"
});
Consumption:
SubscriptionWorker<Revision<Order>> revisionsWorker =
// Specify <Revision<Order>> as the type of the processed items
store.Subscriptions.GetSubscriptionWorker<Revision<Order>>(subscriptionName);
await revisionsWorker.Run((SubscriptionBatch<Revision<Order>> batch) =>
{
foreach (var item in batch.Items)
{
// Access the previous revision via 'Result.Previous'
var previousRevision = item.Result.Previous;
// Access the current revision via 'Result.Current'
var currentRevision = item.Result.Current;
// Provide your own processing logic:
ProcessOrderRevisions(previousRevision, currentRevision);
}
});
Revisions processing and projection
Here we declare a revisions subscription that will filter and project data from revisions pairs:
Creation:
- Generic-syntax
- RQL-syntax
name = store.Subscriptions.Create(
new SubscriptionCreationOptions<Revision<Order>>()
{
Filter = tuple => tuple.Current.Lines.Count > tuple.Previous.Lines.Count,
Projection = tuple => new
{
PreviousRevenue = tuple.Previous.Lines.Sum(x=>x.PricePerUnit*x.Quantity),
CurrentRevenue = tuple.Current.Lines.Sum(x=>x.PricePerUnit*x.Quantity)
}
});
name = await store.Subscriptions.CreateAsync(new SubscriptionCreationOptions()
{
Query = @"declare function getOrderLinesSum(doc){
var sum = 0;
for (var i in doc.Lines) { sum += doc.Lines[i];}
return sum;
}
From Orders (Revisions = true)
Where getOrderLinesSum(this.Current) > getOrderLinesSum(this.Previous)
Select
{
PreviousRevenue: getOrderLinesSum(this.Previous),
CurrentRevenue: getOrderLinesSum(this.Current)
}"
});
Consumption:
SubscriptionWorker<Revision<Order>> revisionsWorker =
// Specify <Revision<Order>> as the type of the processed items
store.Subscriptions.GetSubscriptionWorker<Revision<Order>>(subscriptionName);
await revisionsWorker.Run((SubscriptionBatch<Revision<Order>> batch) =>
{
foreach (var item in batch.Items)
{
// Access the previous revision via 'Result.Previous'
var previousRevision = item.Result.Previous;
// Access the current revision via 'Result.Current'
var currentRevision = item.Result.Current;
// Provide your own processing logic:
ProcessOrderRevisions(previousRevision, currentRevision);
}
});