At FullStory, we run real-time ETL pipelines to power our search and analytics features: as data comes in, we instantly index it and make it available to our customers. Shipping new search features is then simply a matter of shipping new code to our production data ingestion and search pipelines. That raises some questions:

  • How do you backport a new feature to all of the customer’s old data?
  • Or even backport a bugfix that caused the search pipelines to add something like extra whitespace to some text fields?
  • How do you do that while avoiding downtime and keeping search and analytics available for the customer?
  • How do you do this without reanalyzing the customer’s entire dataset?

The way that existing large scale off-the-shelf search services handle this to actually drop and rebuild the indices for the affected database fields. This can be incredibly inefficient: you may only need to update parts of the indices since your backport may only affect a subset of the customer’s total dataset. Here we choose to reindex over dropping the indices wholesale: update the document in-place and merge those changes into existing indices. We combined this with our data migration expertise and built out a declarative reindexing framework to confidently and surgically reindex billions of documents with zero query downtime. Let’s go through a toy example!

Toy Example

As part of your normal search requirements, you may want to find documents in your dataset that happened within a certain time period. Naturally, you would very likely have a date field in your schema. There are a lot of date and time standards out there so one day when you were setting it all up you picked one, let’s say the RFC3339 date format, and then proceeded to forget about it.

2006-01-02T15:04:05Z07:00

Assume that many months go by and your systems have accumulated many terabytes of data across billions of documents and a multitude of customers. One day you and your team, due to some customer issue, realize that the RFC3339 standard is too precise and decide that it’s time to switch over to something like RFC822.

02 Jan 06 15:04 MST

No problem! Should be a quick 5 minute PR to switch out the formats and do a push to prod. But oh no! All customers’ existing data still have the old format, how do we go about fixing that? Well this seems like a task for our new reindexing framework! Okay, that sounds cool but how does it work? The key insight that helped drive the design for this was realizing that almost all reindexing tasks are going to have 3 core stages:

  1. Finding the documents that need to be updated
  2. Defining some transformation
  3. Sending the document back to be re-persisted
3 Core stages depicted here. 1. Documents to be updated are queried for and fetched. 2. The user provided transform is applied to all found documents. 3. The final stage sends them back for persisting and merging with existing indices.

Getting back to our toy example, we just successfully pushed out the new date format to prod which means that all documents that were last modified before this afternoon have the old format. Let’s define a query that finds those documents! N.B.: at FullStory, Golang is our backend language of choice, so all code snippets below reflect that.

Now that we’ve found our documents, let’s turn our attention to our document processor. This is the function that will be run across all matching documents returned by the above query and is responsible for handling the backport logic. In this example, it updates the date field to use the format we decided on earlier, RFC822.

Putting it all together, this is what a standalone reindex job binary might look like with the user defined query, the document processor that will update each document to the new date format and the job invocation which handles fetching documents and sending them back to the database!

Ok that’s great and all but what’s really going on behind the scenes?


Reindexing at scale

Each reindex job is deployed in its own container and all containers are running the same code under the hood. This means that every process in a complex multiprocess job initialization routine can operate deterministically where the decisions they make are the same given the same inputs. This is really important in resolving mutual exclusion conflicts and later we dive into how two processes decide what to do when they both want to do the same thing. Ultimately, this enables us to remove any hard affinity requirements: each process and each thread in the process can work on any job partition for any customer, increasing overall system throughput.

Each process begins by determining the customers to work on for that job: this information is always pulled from the user defined job specification. It then proceeds to inspect the external state shared across all active processes. Has the affected data for this customer been partitioned out into small parallelizable chunks? If not, let’s go ahead and do that.

The horizontal line above represents all of the customers data as a contiguous block of data. Each of the vertical lines represent delineations, or chunks, which break apart the larger block into smaller disjoint blocks.

Nice! But how do we enforce mutual exclusion? Earlier we noted that any process and any thread of execution can work on any customer in parallel and we wouldn’t want processes to partition out a customer's data more than once.

Multi-process job initialization mutual exclusion

Above we show how we use a distributed barrier algorithm to handle multiple processes attempting to initialize the job state for two customers. The first process to begin working on a customer assumes the leader role and all other processes wait on the leader to signal that the customer’s data is ready to be worked on. The leader is responsible for breaking out the total data for a customer into chunks that all processes/threads can work on in parallel. When it finishes, it marks the customer as ready, allowing for other processes to pass the barrier. It is possible for processes to simultaneously be leaders and followers as depicted with the differential leaders for customer A and customer B.

Once the job state for a customer has been initialized, each worker is free to claim a portion of that job to work on and the rest moves along naturally.

Each thread in a process can concurrently work on up to some max number of chunks. Mutual exclusion for chunks is handled at the coordination layer using distributed locking.

Conclusion

There you have it! What’s really important to note is that all of the resource management, including job partitioning, mutual exclusion, cursoring and error propagation across multiple containers running the same reindex job are all handled behind the scenes by the framework on behalf of the user. Moreover, the reduction depicted in the toy example is no exaggeration: we’ve been able to deploy reindex jobs to our K8s cluster that are less than 100 lines of code which reindex a customer’s entire dataset in a matter of hours. Now that’s cool!