Bulk Insert: How to Work With Bulk Insert Operation
-
BulkInsertis useful when inserting a large quantity of data from the client to the server. -
It is an optimized time-saving approach with a few limitations like the possibility that interruptions will occur during the operation.
-
In this page:
Quick example
Open a bulk insert from the document store and store entities through it.
Each Store call is buffered and streamed to the server in batches; the operation finalizes
when the bulk insert is disposed.
using (BulkInsertOperation bulkInsert = store.BulkInsert())
{
foreach (var employee in employees)
{
bulkInsert.Store(employee);
}
}
For the full set of BulkInsert overloads and their parameters, see the
Syntax
section at the end of this page.
BulkInsertOperation
The following methods can be used when creating a bulk insert.
Methods
| Signature | Description |
|---|---|
| void Abort() | Abort the operation |
| string Store(object entity, IMetadataDictionary metadata = null) | Store the entity, identifier will be generated automatically on client-side. Optional, metadata can be provided for the stored entity. |
| void Store(object entity, string id, IMetadataDictionary metadata = null) | Store the entity, with id parameter to explicitly declare the entity identifier. Optional, metadata can be provided for the stored entity. |
| Task<string> StoreAsync(object entity, IMetadataDictionary metadata = null) | Store the entity in an async manner, identifier will be generated automatically on client-side. Optional, metadata can be provided for the stored entity. |
| Task StoreAsync(object entity, string id, IMetadataDictionary metadata = null) | Store the entity in an async manner, with id parameter to explicitly declare the entity identifier. Optional, metadata can be provided for the stored entity. |
| void Dispose() | Dispose of an object |
| ValueTask DisposeAsync() | Dispose of an object in an async manner |
Limitations
- BulkInsert is designed to efficiently push large volumes of data.
Data is therefore streamed and processed by the server in batches.
Each batch is fully transactional, but there are no transaction guarantees between the batches, and the operation as a whole is non-transactional.
If the bulk insert operation is interrupted mid-way, some of your data might be persisted on the server while some of it might not.- Make sure that your logic accounts for the possibility of an interruption that would cause some of your data not to persist on the server yet.
- If the operation was interrupted and you choose to re-insert the whole dataset in a
new operation, you can set
SkipOverwriteIfUnchanged
to
trueso the operation overwrites existing documents only if they changed since the last insertion. - If you need full transactionality, using a
session
may be a better option.
Note that if a session is used, all of the data is processed in a single transaction, so the server must have sufficient resources to handle the entire data set included in the transaction.
- Bulk insert is not thread-safe.
A single bulk insert should not be accessed concurrently.- Using multiple bulk inserts concurrently on the same client is supported.
- Usage in an async context is also supported.
Example
Create bulk insert
Here we create a bulk insert operation and insert a million documents of type Employee:
- sync
- async
using (BulkInsertOperation bulkInsert = store.BulkInsert())
{
for (int i = 0; i < 1000 * 1000; i++)
{
bulkInsert.Store(new Employee
{
FirstName = "FirstName #" + i,
LastName = "LastName #" + i
});
}
}
BulkInsertOperation bulkInsert = null;
try
{
bulkInsert = store.BulkInsert();
for (int i = 0; i < 1000 * 1000; i++)
{
await bulkInsert.StoreAsync(new Employee
{
FirstName = "FirstName #" + i,
LastName = "LastName #" + i
});
}
}
finally
{
if (bulkInsert != null)
{
await bulkInsert.DisposeAsync().ConfigureAwait(false);
}
}
BulkInsertOptions
The following options can be configured for BulkInsert.
CompressionLevel
| Value | Type | Description |
|---|---|---|
| Optimal | CompressionLevel | Compression level to be used when compressing static files. |
| Fastest (Default) | CompressionLevel | Compression level to be used when compressing HTTP responses with GZip or Deflate. |
| NoCompression | CompressionLevel | Does not compress. |
SkipOverwriteIfUnchanged
Use this option to avoid overriding documents when the inserted document and the existing one are similar.
Enabling this flag can exempt the server of many operations triggered by document-change,
like re-indexation and subscription or ETL-tasks updates.
There is a slight potential cost in the additional comparison that has to be made between
the existing documents and the ones that are being inserted.
using (var bulk = store.BulkInsert(new BulkInsertOptions
{
SkipOverwriteIfUnchanged = true
}))
{
// ...
}
Track progress with OnProgress
A long-running bulk insert can take some time to complete. To track its progress -
for example, to show a progress bar or to log how many documents have been inserted
so far - subscribe to the OnProgress event.
The bulk insert delivers a progress snapshot to your handler each time the server
reports new progress, with counters such as DocumentsProcessed, BatchCount, and
the ID of the document processed most recently (LastProcessedId).
See Classes
for the full property list.
- Updates arrive asynchronously over the
Changes
stream that the server opens for the operation.
Your handler is invoked by the server's progress reports, not by yourStorecalls - a single progress event typically covers manyStorecalls at once. - The subscription opens after the first
Storecall. Attach the handler before storing anything to avoid missing early updates. - Each snapshot is cumulative from the start of the bulk insert.
DocumentsProcessed,BatchCount,Total, and the other counters grow over time and do not reset between events.
To find the number of documents added between two consecutive events, subtract the older snapshot'sDocumentsProcessedfrom the newer one's. For example, if event A reportsDocumentsProcessed = 1000and event B reportsDocumentsProcessed = 1500, then 500 documents were added in the interval.
Example: Print progress to the console
using (BulkInsertOperation bulkInsert = store.BulkInsert())
{
// Attach the handler before the first Store call so early updates are not missed.
bulkInsert.OnProgress += (sender, args) =>
{
// Each event carries a cumulative snapshot since the bulk insert started.
Console.WriteLine(
$"Processed {args.Progress.DocumentsProcessed} documents " +
$"(last: {args.Progress.LastProcessedId})");
};
// Each employee inserted here advances the counters reported to the handler above.
foreach (var employee in employees)
{
bulkInsert.Store(employee);
}
}
Syntax
Method signatures
- BulkInsert
- BulkInsert (with options)
- BulkInsert (options only)
Opens a bulk insert against the given database, or the document store's default database
when database is null.
public BulkInsertOperation BulkInsert(
string database = null,
CancellationToken token = default);
Usage:
using (var bulkInsert = store.BulkInsert())
{
// ...
}
| Parameter | Type | Description |
|---|---|---|
| database | string | The name of the database to perform the bulk operation on. If null, the DocumentStore.Database is used. |
| token | CancellationToken | Cancellation token used to halt the worker operation. |
| Return value | |
|---|---|
BulkInsertOperation | Instance of BulkInsertOperation used for interaction. |
Opens a bulk insert against the given database with a
BulkInsertOptions
instance to configure compression and overwrite behavior.
public BulkInsertOperation BulkInsert(
string database,
BulkInsertOptions options,
CancellationToken token = default);
Usage:
using (var bulkInsert = store.BulkInsert("Northwind", new BulkInsertOptions
{
SkipOverwriteIfUnchanged = true
}))
{
// ...
}
| Parameter | Type | Description |
|---|---|---|
| database | string | The name of the database to perform the bulk operation on. If null, the DocumentStore.Database is used. |
| options | BulkInsertOptions | Options to configure BulkInsert. |
| token | CancellationToken | Cancellation token used to halt the worker operation. |
| Return value | |
|---|---|
BulkInsertOperation | Instance of BulkInsertOperation used for interaction. |
Opens a bulk insert against the document store's default database with a
BulkInsertOptions
instance to configure compression and overwrite behavior.
public BulkInsertOperation BulkInsert(
BulkInsertOptions options,
CancellationToken token = default);
Usage:
using (var bulkInsert = store.BulkInsert(new BulkInsertOptions
{
SkipOverwriteIfUnchanged = true
}))
{
// ...
}
| Parameter | Type | Description |
|---|---|---|
| options | BulkInsertOptions | Options to configure BulkInsert. |
| token | CancellationToken | Cancellation token used to halt the worker operation. |
| Return value | |
|---|---|
BulkInsertOperation | Instance of BulkInsertOperation used for interaction. |
Event signature
- OnProgress
Raised while a bulk insert is running, each time the server reports new progress. Each invocation carries a cumulative progress snapshot since the start of the operation.
public event EventHandler<BulkInsertOnProgressEventArgs> OnProgress;
Usage:
bulkInsert.OnProgress += (sender, args) =>
{
// Read args.Progress for the current snapshot.
};
Classes
- BulkInsertOnProgressEventArgs
- BulkInsertProgress
The event arguments delivered to an OnProgress handler.
public class BulkInsertOnProgressEventArgs : EventArgs
{
public BulkInsertProgress Progress { get; }
}
| Property | Type | Description |
|---|---|---|
| Progress | BulkInsertProgress | The cumulative progress snapshot for this update. |
A cumulative snapshot of the bulk insert's progress, reported by the server.
public class BulkInsertProgress
{
public long Total { get; set; }
public long BatchCount { get; set; }
public string LastProcessedId { get; set; }
public long DocumentsProcessed { get; set; }
public long AttachmentsProcessed { get; set; }
public long CountersProcessed { get; set; }
public long TimeSeriesProcessed { get; set; }
}
| Property | Type | Description |
|---|---|---|
| Total | long | Total items processed since the bulk insert started, across documents, attachments, counters, and time series. |
| BatchCount | long | Number of server-side batches processed so far. |
| LastProcessedId | string | The identifier of the document that was processed most recently. |
| DocumentsProcessed | long | Number of documents inserted so far. |
| AttachmentsProcessed | long | Number of attachments stored so far. |
| CountersProcessed | long | Number of counters created or modified so far. |
| TimeSeriesProcessed | long | Number of time series entries appended so far. |