External Threat Management

Indexing Made Easy: Sharded Bulk Indexing with SolrCloud and MapReduce

At RiskIQ, we extract and aggregate large quantities of data from the billions of HTTP requests performed by our web crawlers each day—but the real magic is in how we make this data accessible, and valuable, to our customers.

It starts with using a MapReduce pipeline to handle this influx of information. Once per day, an extraction job is run on the results of that day’s crawls, separating desired data into several sets. Another set of jobs then runs over each dataset, merging the day’s newly-extracted data with an all-time aggregate. These merge jobs also create a delta dataset which contains new records and updated records for that 24-hour period. Storing this wealth of data as files on our MapR cluster affords easy access to our engineers and analysts, but it doesn’t quite deliver much immediate value to our customers.

To better share this digital treasure trove with our users, we need to create indexes which can quickly serve up answers to their queries. Being the good, practical engineers we are, we attempted the simplest solution first: We set up an Elasticsearch (ES) cluster and used the utilities provided by the ES-Hadoop library to index our data. The ES nodes were running on bare metal on hefty machines (24 cores, 128gb of RAM, SSD storage), which we thought would surely be up to the task. Unfortunately, with most of these datasets containing more than a billion records, it took well over 24 hours to index our daily aggregates using this method. Since we were creating a new version of each dataset every day, clearly a different approach was needed.

Enter Apache Solr and Solr MapReduce. While ES offers flexibility and ease of use, it requires that its indices be created using a live cluster. Solr, in its relative simplicity, can load and use indexes which were generated offline by a separate process. Until version 6.6.0, the Lucene/Solr codebase included Solr MapReduce, a library which facilitates the offline creation of Lucene indexes using MapReduce jobs. Creating the indexes offline, copying them to a pair of machines running Solr, and loading the cores took at most a few hours per index. At that rate, we could afford to churn out new indexes each day, so we went ahead with this strategy.

While offline indexing was working for us, devoting three to six hours of cluster time per index per day to that process seemed a bit excessive—and since those datasets grow over time, it was only going to get worse. Although each full aggregate dataset contained billions of records, the delta datasets, representing the difference between yesterday’s aggregate and that of today, typically held only a few million documents. Due to their significantly smaller size, we were able to index the delta datasets to live Solr instances over HTTP in a matter of minutes. Taking advantage of this, and using Oozie to orchestrate our indexing jobs, we devised a system whereby a full index would be created once per week for each dataset, and each index would be updated using the deltas during the intervening days. This process significantly reduced the time and CPU cycles expended on indexing while still keeping our customers supplied with fresh data. Win!

As is the case with many data engineering solutions, this worked perfectly until it suddenly failed catastrophically. A single Lucene index can only accommodate 2,147,483,647 documents (int max). When one of our datasets crossed that threshold, we were no longer able to perform daily updates or generate a new index. Solr does provide a mechanism to get around this limitation: sharding. With its concept of collections, Solrcloud makes sharding relatively easy, and Solr MapReduce supports the offline creation of indexes with multiple shards. Getting Solrcloud to recognize those shards as part of a collection would prove to be the difficult part.

After some trials, many errors, and no lack of expletives, we were able to trick Solrcloud into accepting our pre-constructed collection using the following workflow:

  1. Create the shards offline using Solr MapReduce.
  2. Copy the shards to the destination Solrcloud nodes using rsync.
  3. Use solr to copy the configuration and schema for the new index to Zookeeper.
  4. Add each shard to Solrcloud individually using the CoreAdmin API’s CREATE call like so:
    http://<host>:<port>/solr/admin/cores?action=CREATE&name=<shard name>&instanceDir=<shard directory>&configSet=<configuration name>&collection.configName=<configuration name>&collection=<collection name>&shard=<shard number>
  5. Create an alias for the new collection using Solrcloud’s collections API.

Note that we never explicitly created a new collection. Solrcloud creates the new collection when the first core creation call is made, and then adds the new cores to the collection on each subsequent call. The alias creation step is optional, but we found it to be a very convenient mechanism for ensuring that clients were using the latest version of a given index as soon as it became available.

Problem solved... for now. Be sure to tune in next week for part two of this blog to read about another indexing problem we tackled head on to make querying our internet telemetry data fast and easy for our customers.

Subscribe to Our Newsletter

Subscribe to the RiskIQ newsletter to stay up-to-date on our latest content, headlines, research, events, and more.

Base Editor