As you can see in the time before the event, we saw a 3x increase in demand for our computing platform as teams across the company worked hard to make sure everything was ready to go. Our systems were able to automatically scale to meet this demand, all while maintaining 100% reliability and zero outages of any mission-critical systems. None of that would have been possible without a robust, flexible, and highly-scalable system that all teams can leverage.
So how does Aurora’s Compute Infrastructure team do all this and create and manage backend systems that enable our engineers, data scientists, and researchers to continue advancing self-driving technology?
At the heart of the Compute team’s platform is a batch-processing system that we developed in-house known as Batch API. Its primary design philosophy, like any batch-processing system, is “throughput over latency.” However, Batch API has some unique features that allow us to flip that concept and provide “latency over throughput” for teams who have more strict requirements around timeliness. It also has a strong focus on efficiency and cost-effectiveness as it is designed to operate in both on-premises data centers and cloud environments.
Batch API is a collection of services and components (almost exclusively written in Go) that runs workloads (or “jobs”) on an elastic pool of Kubernetes pods (or “workers”). Aurora engineers, scientists, and developers (who we refer to as “users”) create jobs that consist of a collection of processes to run, called “tasks,” which are then executed by the workers according to how the user has defined the task ordering, represented by a directed acyclic graph (DAG) of inter-task dependencies. Batch API creates and manages workers for each job and coordinates task execution based on a job’s DAG. Jobs can run in multiple computing clusters of computing systems, or even multiple regions, completely transparently to the user who created them.
This level of automation is tricky at best, but compounded even further by the additional complication of managing the actual computing resources that Batch API uses to run its workers and execute its tasks. We leverage the power of Kubernetes and Amazon’s Elastic Kubernetes Service (EKS) to manage the raw computing resources and, as we’ll discuss, the Compute team has come up with clever ways to use these products to keep ahead of our teams’ demands.
Disclosure: Amazon has invested in Aurora.
Why build our own system?
The answer is simple; no existing technology could address all of our needs:
- Understandable tasks - Whenever you venture out into the world of highly-parallel computing, you inevitably find yourself trying to debug a failure in one small piece of a much larger computing job. As these jobs get more complex, the ability to understand the individual bits can get harder and more abstract. Even for a job with a million tiny bits, users should be able to find one task and understand it.
- Heterogeneous workloads - Our teams run a variety of workloads, many of which they haven’t even developed yet. If our system is unable to manage different kinds of jobs and tasks smoothly, we either force users to limit the kinds of workloads they run, or we end up creating hacks or bespoke solutions that incur tech debt.
- Fault tolerance - Our systems and our team’s workloads must be resilient. A system that doesn’t break and shut down completely each time a fault occurs will save us valuable time and ensure that our teams are not constantly disrupted.
- Resource efficiency - Our systems must scale to meet our users’ demands, and we want to be able to make smart decisions on their behalf. Computing resources (and our colleagues’ time) are not infinite so we need a system that can provide exactly the resources needed, at exactly the right time, and have the tools to tell Aurora’s Compute Infrastructure team when this is not happening so that we can improve the system.
- Single entrypoint - Our teams shouldn’t have to learn and maintain many different systems. Rather, they should only need a single set of APIs and a common framework that they can leverage with confidence.
Our team has had extensive experience with some of the most popular open source batch systems over the years. Such systems have compelling feature sets and work well for certain use cases (such as running SQL queries over massive data sets). However, one of the great challenges with using many of these systems to run highly-parallel computation is determining exactly where a failure has occurred.
For example, the common map operation (part of the ubiquitous MapReduce pattern), which runs a user-defined function on each element of a dataframe lasting several minutes or even hours, can fail in an unexpected way on a particular element.
Many open source batch systems do not provide workload-agnostic means of determining the precise element (or set of elements) upon which the function failed. The process of debugging the function on the input that causes it to fail is thus tedious and brittle.
The reason for this is that “task inputs” are often represented as collections of records rather than individual records. The high cardinality of records in the input of a typical job means that storing per-record debug information is impractical and slow. This speaks to the fact that many batch systems were designed to execute a small number of well-understood and well-behaved functions (such as those in the SQL standard) on trillions of small records (like click events).
For our purposes at Aurora, the batch system we use must instead execute novel algorithms on millions of large chunks of data (such as snippets of robotic sensor data), so it makes much more sense to structure the problem as “one function performed on one element equals one task.” The “function” in this case is typically a unix process definition with parameters specified as unix process arguments. In this way, when one task among thousands fails on some input data, the user can quickly run the process locally in the debugger on the exact set element that produced the unexpected behavior.
A challenge with scaling many existing workflow systems beyond fairly simple, static DAGs, is the granularity of dependencies. There are many cases where the parallelism needed for a given step is not known ahead of time, or is higher than what many systems can generally handle. This leads to situations where one system is used to define a relatively static DAG (the workflow component), and steps in this DAG negotiate with other large-scale batch systems to perform highly parallelized computation (the embarrassingly parallel component). This pattern, which we call coarse-grained dependency management, results in increased whole-DAG runtime (and thus the runtime of users’ code) in two ways.
First, the critical path becomes the sum of the longest-running task in each step, as illustrated below: