Milestones Continuous Workbench Misc Custom Infrastructure Scenarios 11B Benchmarks Request Password

An 11 Billion Documents Benchmark Story

Why a 10B Documents Benchmark in 2020

This chapter presents the reasons why we wanted to run a new benchmark campaign in 2020.

Not our First Benchmarks

In the past, we published several large scale benchmarks:

But this time, we wanted to do things a little differently:

We wanted the benchmark to be as realistic as possible to provide our customers and our ops team real answers on what a multi-billions repository architecture looks like and the different scalability steps between 1B and 10B.

To achieve this goal, there are some additional constraints:

Production-Like, Not a Lab

Nuxeo Cloud Infrastructure

The goal is to leverage our Nuxeo Cloud automation to deploy the Nuxeo Application precisely the way we do for our customers.

In our case, it means:

Production Infrastructure

Not only is the AWS infrastructure provisioned automatically using Terraform, but we also comply with all the security and production rules:

Dashboard for Phase 1

Dashboard for Phase 2

The goal is to define architecture blueprints of what infrastructure we need to deploy a 1B, 2B, 3B, or 10B documents Nuxeo application inside Nuxeo Cloud.

Testing an Application - Not a Storage Service

To extract meaningful insights, we want to test an application that reliably translates the challenges our customers will be facing in the future. We want to test a content-centric application with real use cases, not just benchmarking a database.

Our starting point was to interview some customers to better understand their requirements and challenges regarding future scalability. The main takeaway is that while the existing benchmarks well cover the standard Document Management and Digital Asset Management use cases, some uses cases in the Financial Services industry were not.

A typical use case would be building a large consolidated document repository, collecting all the data on customers for a bank or an insurance company.

In addition to the ability to scale to multiple billions of documents, the requirements were:


Phases and Methodology

This chapter presents how we structured our benchmarking effort.

Two Different Phases

We know from our past experiences that doing a benchmark can be very time-consuming, and we decided to split the effort into two phases with different goals and deliverables.

Phase 1

This first phase aims to provide guidance to our customers who will reach 1B, 2B, or more documents in the next 18 months.

Since most of these customers have already deployed their Nuxeo application, we need to apply some constraints:

Deliverables for Phase 1:

Phase 2

We know that we will host multi-billions repositories within the next 18 months. We want to be ahead of these deployments: clear the way for these future projects and have some data to be able to guide them.

For phase 2:

Deliverables for Phase 2:

This last point means that we are not aiming for a “one-shot” benchmark. We want to run this benchmark continuously to verify that performance evolves with the platform in the right way.

We also want to have the capability to rebuild this 10B+ documents demo on-demand, so that people who need to “see it to believe it” can do it.

Phase 1: Step by Step Approach

We want to identify the scalability steps, so it implies:

The volumization is part of the testing process, for each of the steps:

We gather metrics for each technical task.

Once each step is reached, we run complete functional tests using Gatling:

For each major step (1B, 2B, 3B) we run some additional tests like:

Phase 2: Volumize and Test

Because of our time constraints, we could not afford to do the same step by step approach as in Phase 1.

So, we relied on what we learned during Phase 1 to define:

So, unlike Phase 1, the Phase 2 benchmark contains fewer steps:

We wanted this benchmark to be a learning experience, so we also used the long import process to test different import strategies:

It may seem obvious, but 11 Billion documents is a lot, so we need to be careful with the import throughput. This is the reason why we choose to leverage cloud elasticity and define two different architectures:

  1. Import architecture: using enough hardware to be sure we can import 11B documents in a reasonable time
  2. Standard operation architecture: reduce the hardware cost by scaling down to a “good enough” infrastructure

Content Generation and Fileplan Strategies

This chapter focuses on how we generated the 10B documents and how we stored them in Nuxeo repositories.

Why Realistic Data Does Matter

To tests a 10B documents repository, we need to have 10B documents that are:

Challenges with Generating 10B Files

Generating 10B unique documents and attachments (files) comes with a few challenges:

Generation time

Generation needs to have very high-throughput if we want the system to be usable. For example, if we were to generate files at 1,000 files/s, it would still take more than 100 days to generate the 10B. This means that we need to have a generator that generates both metadata and files at a much higher throughput.

During phase 2, the generation of the 11B documents in Kafka took about 4 days with an average throughput of 2 million doc/min.

NB: the plateaus on a plot correspond to when it took us a few hours to trigger the next step of the initial data generation.

Storage

Storage costs on S3 depend on volume but also on the number of requests you issue.

In our case, doing 10B PUT on S3 would definitely not be cost-efficient, which is why we wanted to leverage the AWS Snowball service.

However, at the time we wanted to initialize the data, we faced some issues:

Anyway, because of these uncertainties, we prepared a plan B that was to have the custom Nuxeo Blob Store that would generate the PDF files on the fly in a consistent and repeatable manner. This was a way for us to not be too dependent on the Snowball shipping (generating a PDF on the fly takes more resources than reading it from S3).

Generation Approach

Random Yet Reproducible

The core of the system is a Java library that generates random metadata:

This data generation needs to be random (to avoid skewing the dataset), yet it needs to be reproducible so we can run the same generation process for several targets (Snowball, Importer, Gatling tests) and still have matching data.

Rendering

We use the randomly generated metadata to generate different types of files:

NB: the initial plan was to generate the identity cards are TIFF, but we did not find a way to write TIFF fast enough from Java and had to fall back to JPEG to avoid waiting for weeks.

Different Clients

Multiple clients then use this data generation and rendition library:

NB: The data generation library is available in this github repository.

File Plan

The complete target content model includes:

Phase 1 File Plan

For Phase 1, we started the tests before the Snowball was shipped: this means we did not have the data for Customers and Accounts.

So, we did the Phase 1 benchmark using only the Statement Document type and our ability to quickly generate as many statements as we wanted, both in terms of metadata and in terms of files.

Phase 2 File Plan

For phase 2, we wanted to use the full content model:

Based on what was generated on the Snowball:

Total = 89,997,827 x 2 + 179,998,862 x 62 = 11,339,925,098

The data is split in 3 repositories:

This means:

Data Ingestion

Nuxeo Stream Importer

We used Nuxeo Stream importer to ingest the data.

The document import process runs in 2 steps:

This two-steps process provides a few advantages:

For similar reasons, the import is initially done without indexing. Once the documents are imported, we use the Bulk Action Framework to trigger re-indexing.

Later during the Phase 2 benchmark, we used a new feature from 11.x to enable “on-the-fly” bulk indexing.

Dedicated Importer Node

A standard Nuxeo deployment usually contains 2 types of nodes:

In the context of the benchmark, we added a 3rd type of Nuxeo node, “importer nodes”:

Having a single Nuxeo importer node was enough to get the best possible throughput in nearly all the cases. However, we scaled up this node (compared to the other Nuxeo nodes) to ensure that data generation was not limited by CPU.

In the context of Phase 2, we did use two concurrent importer nodes against a big sharded MongoDB cluster to achieve a 25K docs/s import throughput.


Phase 1: Test and Results

This chapter provides an overview of the results for Phase 1 and the takeaways from our tests.

TL;DR:

  1. Reached 3B documents
  1. Proven elasticity
  1. Works in real life
  1. Using PaaS services truly make life easier

Proven Elasticity

The main achievement is that from 0 to 3B documents, we were able to adjust the infrastructure to maintain the performance at the same level.

When you increase the number of documents, you expect that the performance will progressively decrease. The goal is to be sure that you can leverage the cloud elasticity to scale out or up, to maintain good performance for your workload.

Based on the tests we did at each “volumization step”, we were able to determine the limiting factor and scale-out/up the needed part:

Obviously, the different aspects of the application (CRUD, Navigation, Search) do not evolve the same way with volume, so we optimized for what seemed to be the best trade-off for each step.

Nuxeo Nodes

Nuxeo nodes can easily be added to the cluster dynamically when the workload needs it.

Typically:

Nuxeo nodes are grouped in “AutoScalingGroups” according to their node type to make the scale-out process easy.

For example, during Phase 1, we did a full-reindex of the 3B documents repository and scaled the number of worker nodes to 9 in order to maximize throughput.

Unsurprisingly, you can start with the default settings, but the more the volume grows, the more you need to adjust the application settings to optimize for the workload you want to handle and to avoid adding more hardware, unless absolutely necessary.

Here are some examples of the parameters we adjusted:

The entire configuration is available in this git repository

Data Tier

Scaling data nodes can be a little bit trickier than the Nuxeo nodes because it can imply moving data and can make the whole process slower.

So, while you don’t want to scale up and down as fast as you can do it on the Nuxeo side, we did really benefit from the full PaaS experience and were able to scale out or up in a manner that is easy and completely transparent for Nuxeo.

MongoDB

We used MongoDB Atlas on AWS as our primary database for all the tests.

MongoDB is the database that gives us more options in terms of scalability and sharding.

For Phase 1, we used a single non-sharded MongoDB Atlas Cluster.

Logically, the write throughput decreases with the volume. The main limitation comes from the size of the indexes vs the available cluster memory.

If we were in a “test lab”, we could easily have reduced the number of indexes we create by default in the MongoDB collection, and it would have allowed us to speed up the import. However, in a real-life scenario, you still want to be able to use the document repository while you are importing content, so it is not possible to drop key indexes because it would create a lot of long-running queries that would cripple the database performance.

However, we did work on optimizing indexing by:

The repository configuration for Phase 1 is accessible here optimized-repository-config.xml.

To speed up the importer as much as possible , we wanted to accelerate the IO on the MongoDB Atlas side. Increasing IOPS gave some significant results, but we ended up using an NVMe cluster to maximize IOPS and to get the best of our MongoDB Cluster.

For this reason, we started the import using an M60 NMVe cluster.

As expected, the pure import throughput decreased with volume, and after 1B documents, we started to reach a point where the IO was slowing the cluster too much (CPU IO Wait to reach more than 10%).

We scaled directly to M200 because:

With the M200 cluster, the throughput almost recovered the same performance we had with an empty database, and the data migration was completely transparent.

Elasticsearch

For Elasticseach, the first limiting factor was the CPU usage during the indexing. After some profiling, we removed the html_strip from the analyzers and scaled out the cluster.

When doing the full-reindex, we can adjust the number of Nuxeo workers and the number of threads. It allows us to choose how much pressure we want to put on the Elasticsearch cluster.

Elasticsearch has some built-in protection mechanisms (limited queues and circuit-breakers). Thanks to that, when Nuxeo nodes were sending too many indexing requests to the Elasticsearch clusters, the Nuxeo nodes received an error and stopped the processing.

NB: this is where you want to have thoughts about your retry policies

Then around 1.5B documents, we started hitting the limit in terms of shard size: our total index was becoming too big for the initial number of shards, so we had to increase from 35 to 100.

We saw 2 types of queries that could overwhelm Elasticsearch and trigger the circuit breakers:

Queries matching too many documents

The tests ran the same queries from 0 to 3B documents.

However, when the volume increased, some of the queries were started to match too many documents. Basically, some queries that were initially retrieving a few hundreds of results were yielding x00,000 of results when reaching 2B documents repository.

As a side effect, it put much more pressure on Elasticsearch to:

The first side effect of more pressure was to make the query become slower. Still, given the number of incoming queries, it increased the amount of Lucene index Elasticsearch needed to load into memory and ended up triggering the circuit breaker.

The key point is that the queries that were creating the issue were actually pointless: the tests were simulating a UI listing with aggregates and sorting, but doing this with x00,000+ results is actually not usable by a human anyway. So, we simply added additional query criteria to make the query more selective, which solved the issue.

fielddata

The default Nuxeo Elasticsearch mapping was using fielddata to index the title. It allows for searching using the LIKE operator (i.e. dc:title like “something%").

As explained in the ES documentation and in NXP-29357 this does not scale and consumes too much memory. Doing a full-text search will work, and we considered that not being able to do a “like” search on 2B records is an acceptable limit.

Scalability Steps

Hardware is only one aspect of the architecture. As explained earlier, some settings were adjusted at the Nuxeo level and the MongoDB or Elasticsearch level.

1B Documents Architecture

2B Documents Architecture

Main changes:

3B Documents Architecture

Main changes:

Works in Real-Life

Our goal is not only to validate that we can import 3B documents inside a single repository, but also, we want to verify that such a repository can still be used in real-life operations. We want users to have a good experience when using the application and make sure heavy operations like re-indexing or bulk operations can be efficiently executed.

Mixed Workloads

In most uses cases described by our customers, the bulk import does not only happen at the very beginning. Obviously, being able to import the data super fast in order to do the initial migration is a crucial advantage. But in addition to that, we need to be able to import a large amount of data, potentially daily, without affecting the user experience.

Unlike during an initial import where we typically want to take as many resources as we can from the database, in a daily import, we want to be able to balance the resources so that users connected to the application do not have a degraded experience.

The idea is to have an architecture that allows controlling how much pressure we put on the database and throttling the injection if needed to preserve the user experience.

The two-steps import process already allows us to import into Kafka as fast as possible without impacting the user experience. Then we can configure how many threads we want to allocate to the DocumentConsumers and control how fast we want to import.

To verify that, we did several mixed workloads tests:

For example, we did a bulk import + index of 15M documents on a 2B documents repository while running the CRUD tests:

We also observed that depending on the limiting factor in the target platform, we can adjust the throttling:

Adjusting the Infrastructure for Temporary Workload

When the repository reaches a significant volume, some operations can become daunting. The fear is that they will last forever. Such operations include:

We cannot prevent such operations from becoming more expensive when the volume increases. However, we can scale-out the processing to make them execute faster.

Re-Index @3B

To maximize the re-indexing speed, we did two things:

We then ran the re-index:

Once the re-indexing completed, we scaled down:

We executed the scale down while running a Gatling test and saw:

Because we reduced the number of CPU by 60% and the memory by 25% and did everything while a workload was applied, it seems like a satisfactory proof that we can leverage cloud elasticity while keeping the platform running.

Bulk update @3B

We ran a bulk update operation and verified that we could adjust the effective throughput by adjusting Nuxeo worker nodes' number.

Raw Performance Results

Resources


Phase 2 Principles: Building a Reproducible 11B Documents Repository Benchmark

This chapter explains how we designed Phase 2 of the benchmark and the underlying reasoning.

11B is Not Just About Doing x3.5 on the 3B Benchmark

The first difference with Phase 1 is that we want to reach 10B+, but we do not want to keep increasing the 3B architecture until it gets to the target volume.

While we’re confident that we could scale Nuxeo, MongoDB Atlas, and Elasticsearch to fit 10B+ documents inside a single non-sharded repository, it doesn’t seem to be the best approach:

This is why for phase 2 of the benchmark, we want to partition the data between:

The need to partition the data between “live/fresh” and “archive” was clearly expressed by most of our customers looking at a multi-billion document application. In most cases, you do not have multiple billions of documents you need to work with regularly. However, you can have billions of documents that you rarely access, but still need to be accessible and searchable in case you need them.

The partitioning of the data impact:

Full Content Model

Another significant change is that the content model contains all content types defined inside the Studio project since we can rely on the data that was imported via the Snowball.

The initial split and MongoDB sizing were derived from phase 1 of the benchmark:

Experiment with Various Parameters

Because in phase 2 we use the 11.x version, it is easier to prototype and test new solutions.

In this context, we did a few experiments:

Data partitioning in Kafka

When doing bulk ingestion (Document Consumer) at more than 10,000 documents/s, we know that even a small optimization can have a significant impact. When creating a document, the Nuxeo Core needs to fetch the parent document to do the parent/child association. Fetching the parent document is a very lightweight operation, but still requires a round trip between the JVM and MongoDB: again, at 10,000+ operations per second, these actions count.

Nuxeo repository includes a caching system to reduce this kind of round-trip when possible. However, when documents are randomly split between 48 Kafka partitions, there is no reason that documents that will end up being created by the same CoreSession, will have the same parent and be able to benefit from cache.

After some experimentation, we saw that dispatching the document messages in a partition, based on groups sharing the same parent, does improve the cache hit ratio and almost doubles the import throughput.

See SUPINT-1772 for more details.

MongoDB ReadPreferences

MongoDB allows setting ReadPreferences to leverage read from the secondary nodes.

In the context of bulk-indexing, it seemed like a good idea to try to leverage this capability in order to feed the Index Computation as fast as possible.

However, in the tests we ran, this setting created duplicates in the list of documents to re-index, probably because the read operations are not guaranteed to be repeatable. While having a few duplicates should not be a blocker, it revealed a bug on our side (that is now fixed) but forced us to temporarily drop this solution.

Update the import process

We made a few improvements to the importer system.

First, we built a CSV importer that can be used via HTTP.

The principle is simple:

With that principle, with a single Nuxeo injector, we can upload and produce documents in Kafka at 80K+ documents/s.

See DocumentProducers notebook for more details)

Import + bulk indexing on the fly

During Phase 1 and during the first steps of Phase 2, we use the same approach:

As explained earlier, this approach has some advantages, but in the context of the volumizing the 9B+ archives repository, it did not seem to be the best approach.

The goal was to chain the bulk import and bulk indexing without having to run a full scroller against the database. Thanks to NXP-29620, we have a single pipeline that allows us to chain importing and indexing.

Below is the monitoring that corresponds to the last 2B documents that were imported inside the Archive repository.

We can see:

Continuous Improvement

We are very aware that running a full benchmark requires a significant effort to put all the pieces together.

We want to be able to capitalize on this effort and progressively integrate the work done for this 11B benchmark with our continuous integration and release pipeline, pretty much as we did with the default performance test suite and the site benchmarks.nuxeo.com.

Ideally, we want to be able to:

The challenge is that running a benchmark such as this requires coordination of a lot of different moving pieces:

To try to simplify this, we took the following steps:

The usage of Notebook allows us to achieve a few goals:

Import Architecture vs Run Architecture

For the import architecture, we used M80 NVMe MongoDB cluster with 5 shards.

This big MongoDB cluster came with significant cost, but it allowed us to import the documents inside the Archive repository at a great throughput. We reached 25,000 documents/s when importing the first few billions of documents and using 2 Nuxeo injector nodes to maximize the throughput.

Once the import is completed, we can down-scale the cluster to reduce the cost since we do not need the write throughput anymore for the archive repository. We can also revert back from NVMe to simple EBS storage since we do not need the same amount of IO.


Phase 2 Results

This chapter presents the results of the second phase of our benchmark.

Browsing an 11B Documents Repository

Here is a video showing the Nuxeo application running with 11B documents:

Volumization Results

Bulk-Import

For Bulk-Import we were able to achieve 25K documents/s leveraging:

Compared to the tests done in Phase 1, using sharding nearly doubled the import throughput.

The graphic below shows the row documents insert throughput at the MongoDB level.

The two graphics below show two Nuxeo Importer running both with 24 threads and importing documents inside the archives repository:

Bulk Re-index

We ran a full re-index of the archive repository at 3B documents.

The full re-index was achieved in 14 hours, meaning that the average throughput was 57K documents/s.

Here again, we almost double the performance obtained in Phase 1:

The graphics below show:

Import + Index

With the new Import + Index pipeline leveraging the ExtralScroll from the Bulk Action Framework, we can import and index documents at a throughput of 15,000 documents/s.

Gatling Tests Results

We first ran the Gatling tests with a variable number of threads to identify the optimum trade-off between throughput and response time.

Once we have identified how many threads we want to run, we re-ran the tests for a longer duration to gather the results.

CRUD API test

Full test report

Navigation

Full test report

Search

NB: as visible on the graph, there was a transient network issue during the test.

Full test report

Results Summary

Downsizing Results

Motivations

The goal is to reduce the cost of the database used for the archives repository.

There are 3 steps in the downsizing:

Cluster Resizing

Going from Step 0 to Step 1 took about 4 days:

Several aspects can explain the relatively slow migration:

To verify the 3rd point, we ran a performance test during the cluster reconfiguration and did not notice any issue (this is actually the report linked earlier).

After the migration was completed, we re-ran the same tests to verify that the MongoDB cluster’s downsizing for archives did not impact.

The tests gave very similar results, and we see that the CPU of the MongoDB Sharded cluster used for Archive goes higher than before.

Reference Architecture

Architecture used During Data Import

During the import:

Run Architecture

Once the data has been imported, we can reconfigure the platform to be more cost-efficient and allocate resources where needed:

Storage Statistics

Phase 2 Takeaways

These (Repositories) Go to Eleven (Billion documents)

The initial goal was 10B, and because of the way we ran the data generation, we ended up with 11.3B documents in three repositories.

Here the critical result is not so much the total number of documents, but rather the fact that we can:

By transparent, we mean:

Gains of Sharding with MongoDB

We leveraged the MongoDB Sharding to maximize write-throughput to speed-up the import of archives, and it allowed us to reach 25,000 documents/s.

Elasticsearch Configuration

The Elasticsearch configuration we used for the 1B architecture had 9 data nodes. If we had applied the same configuration for 11B documents, we would have needed more than 100 Elasticsearch data nodes.

Thanks to the multi-repositories approach, we were able to use multiple indexes and adjust the configuration according to each repository so that the target hardware configuration was less than double (16 data nodes), whereas we multiplied the data volume by 11.

PaaS and Transparent Infrastructure Changes

Using AWS, Elasticsearch and MongoDB Atlas PaaS Services allowed us to resize the infrastructure dynamically, depending on our needs. With 11B documents you cannot expect any storage change to be fast, so it’s essential to be able to make these changes while the application is live. The truth is that migration speed becomes much less important than maintaining the availability of the service.

With AWS, Elasticsearch and MongoDB Atlas, we were able to validate this capability. This is very important for our Nuxeo Cloud operations team.

Resources