Maximizing The Throughput of Azure Cosmos DB Bulk Import Library
Before addressing the primary topic, it is worth mentioning that bulk-loading data to a distributed, fault-tolerant, auto-scaled, and auto-indexed database poses a completely different set of challenges than uploading data to a centralized database. The fact that each ingested data point needs to be stored, 4-way replicated, indexed, and automatically load-balanced makes it a lot more complicated than loading data to centralized and index build-deferred systems. While it is not expected that a user would know every detail of Cosmos DB, it goes without saying that having a basic understanding of the underlying architecture and how the Bulk Executor tool works will immensely help one to build efficient bulk import solutions.
With that note, following are the key aspects that one should keep in mind to realize the full potential of Azure Cosmos DB’s bulk import library.
1. How many partitions does the collection have at the beginning of the ingestion process?
2. Is the data sorted in the partition key order?
3. What is the configuration of the client machine?
4. Is the import process receiving throttles?
Why is the number of partitions important?
Reason 1: Higher number of partitions will lead to higher throughput due to increased amount of available parallelism. As an example, if you are trying to upload 10 GB of data, loading them to a collection with 10 partitions would be lot faster than loading them to a partition collection with 1 partition. This is simply because the Bulk Executor tool will load those 10 partitions in parallel (roughly 1 GB per partition).
Reason 2: A more subtle reason is that with lower number of partitions Cosmos DB might be doing additional maintenance work that can negatively affect the throughput of the bulk executor. To understand this better, let’s consider the following set up. The goal is to ingest 100 GB worth of data to collection that has only a one partition to start with (Note that a single partition can hold up to 10 GB worth of data and index). Now, in this scenario, after the ingestion process of started, Cosmos DB has to provision more partitions dynamically, as the existing partitions start reaching their maximum capacity. Additionally, it will also need to re-balance data from older partitions to the newly provisioned ones. For example, to load the full 100 GB, data will re-balanced from initial 1 partition to 2 partitions, 2 to 4 partitions, 4 to 8 partitions, and so on, till there are enough partitions to hold all 100 GB worth of data (note that this is anecdotal and the actual mechanism may differ, however, the philosophy remains the same). It is easy to see that, all these extra work could have been avoided if the collection was set up with sufficient number of partitions to begin with.
Tip 1: Cosmos DB doesn’t allow one to set the precise number partitions in a collection directly. They only let one specify the amount of RUs per collection. However, there is a neat trick that one can use achieve this. Cosmos DB calculates the amount of storage needed based on the initially provisioned RUs. Roughly, if a collection is created with X RUs, the collection will have max(10, floor(X/6000)) number of partitions. Note that, this formula is conceived through reverse-engineering, and the actual formula may differ.
For now, one can use this formula to calculate the amount of RUs that will lead to the desired number of partitions at the collection creation time. Once, that is done, the RUs of the collection can be adjusted immediately as per need (Follow the Replace Offer example for more details on this).
Tip 2: What is the right number of partitions to start with? To answer this question, one will need to think like a capacity planner. The key questions to ask would be: (a) How much data needs to be ingested to bootstrap the application?, and (b) What’s the expected organic growth rate of the data?
If the growth rate is low, the number of partitions calculated by Tip 1 is a good number. Typically, I like to add a 10–20% buffer, considering that the data may not be very evenly distributed.
On the other hand, if the data is expected to grow quickly, it is better to have more buffer. The side-effect of keeping more buffer, not surprisingly, is higher throughput during initial bulk ingestion.
Why is the order of the data important?
The order of the data is very important considering how the Bulk Executor Library works. It pulls batches of data from the ingestion source; loads each batch to Cosmos DB partitions in parallel; then moves on to ingest the next batch.
The ingestion throughput per batch heavily depends on how the data is distributed with in each batch. A batch is considered evenly distributed, if the batch contains roughly equal number of items for each available partitions of the collection. Now, if the data, pulled from source, comes sorted by the partition key of the collection, it’s almost certain that the data items within individual batches won’t be evenly distributed. In fact, in that case the bulk loader will be effectively loading data in serial, rather than going parallel, thereby significantly lowering the overall throughput.
Tip 3: If the incoming data is sorted in the partition key order, or the data distribution is skewed, please consider running multiple instance of the Bulk-Executor in parallel. The idea would be to split the data into multiple ranges or chunks, and then assign one instance of bulk executor per chunk of the data.
Please go through this article (Portal Metrics section) to investigate the data distribution of the collection. The trick would be to ingest only one batch of data (or a few batches), and then look up the distribution in the portal. Please note that, the portal metrics can take some time to update.
Wish 1: I wish that the Bulk Executor had exposed an API to inform about the distribution of the ingested data at run time. This would have made it a lot easier to debug performance issues related to the uneven data distribution, as compared to relying on the portal metrics.
Why is the host machine configuration important?
The configuration of the machine hosting the bulk executor tool can make a significant difference in the overall throughput. At a high-level, the reasons are (not exhaustive):
Reason 1 : Since the throughput of the tool heavily depend on parallely loading multiple partitions, a host machine with higher number of available core will boost performance.
Reason 2: Bulk executor tool requires to pull all the data associated with a batch (referenced here as maxInMemorySortingBatchSize) in memory. Moreover, the tool performs better with larger batch size . So, increasing the amount of available system memory will lead to higher throughput,
Reason 3: Server GC is optimized for throughput as opposed to workstation GC. Turning server GC on will positively impact ingestion throughput.
Reason 4: The client should be located as close to Cosmos DB instance as possible to minimize any network delay. The most ideal place to host the bulk executor library would an Azure VM hosted in the same region as the Cosmos DB instance.
Tip 4: Do consider running the multiple instances of bulk executor library each on a separate machine in order to allocate more CPU core, and system memory overall. This is especially handy when hosting the library in Azure VMs. One can employ a set of small instances instead of one or few large ones. It should be easy to perform a cost benefit analysis to come up with the right set up.
Why is it important look at the throttling metrics?
This is perhaps self-explanatory, however, I still wanted to mention this for completeness.
At a high level, if a collection is being throttled during bulk ingestion, increasing the provisioned throughput of the collection will lead to higher throughput.
Follow this article to know more about throttling aka RequestTooLargeException aka 429s.
Tip 5: The effect of changing provisioned throughput (programmatically or through azure portal) with Cosmos DB is instantaneous.
Wish 2: I wish that the Bulk Executor had exposed an API to inform about the number of throttles, and the amount of time it had spent on retrying at any point during the run time of the tool. This would have allowed one to debug performance issues related to throttling. Moreover, this would allow one dynamically scale up or scale down RUs of a collection, especially when the data source can yield data at a rate that varies over time.