Architecture

Armada is designed to manage millions of batch jobs across compute clusters made up of potentially hundreds of thousands of nodes, while providing near-constant uptime. Hence, the Architecture of Armada must be highly resilient and scalable. The current architecture was chosen in early 2022 to achieve these goals while also ensuring new features, e.g., advanced scheduling techniques, can be delivered.

At a high level, Armada is a so-called data stream system (sometimes referred to as an event sourcing system), for which there are two components responsible for tracking the state of the system:

The log is a publish-subscribe system consisting of multiple topics to which messages can be published. Those messages are eventually delivered to all subscribers of the topic. Important properties of the log are:

In Armada, the log is implemented using Apache Pulsar.

In a data stream system, the log is the source of truth and the databases an optimisation to simplify querying – since the databases can be re-constructed by replaying messages from the log, if the log was replayed for each query, although highly unpractical, the databases could be omitted. For example, in Armada there are separate PostgreSQL databases for storing jobs to be scheduled and the jobs to be shown in the UI, Lookout. Both of these derive their state from the log but are otherwise independent.

To change the state of the system, a message (e.g., corresponding to a job being submitted) is published to the log. Later, that message is picked up by a log processor, which updates some database accordingly (in the case of a job being submitted, by storing the new job in the database). Hence, the log serialises state transitions and the database is a materialised view of part of the state of the system, as derived from the state transitions submitted to the log. In effect, a data stream system is a bespoke distributed database with the log acting as the transaction log.

This approach has several benefits:

However, the approach also has some drawbacks:

System overview

Besides the log, Armada consists of the following components:

Job submission logic

Here, we outline the sequence of actions resulting from submitting a job.

  1. A client submits a job to the submit-query API, which is composed of a Kubernetes podspec and some Armada-specific metadata (e.g., the priority of the job).
  2. The submit API authenticates and authorizes the user, validates the submitted job, and, if valid, submits the job spec. to the log. The submit API annotates each job with a randomly generated UUID that uniquely identifies the job. This UUID is returned to the user.
  3. The scheduler receives the job spec. and stores it in-memory (discarding any data it doesn’t need, such as the pod spec.). The scheduler runs periodically, at which point it schedules queued jobs. At the start of each scheduling run, the scheduler queries each executor for its available resources. The scheduler uses this information in making scheduling decisions. When the scheduler assigns a job to an executor, it submits a message to the log indicating this state transition. It also updates its in-memory storage immediately to reflect the change (to avoid scheduling the same job twice).
  4. A log processor receives the message indicating the job was scheduled, and writes this decision to a database acting as the interface between the scheduler and the executor.
  5. Periodically, each executor queries the database for the list of jobs it should be running. It compares that list with the list of jobs it is actually running and makes changes necessary to reconcile any differences.
  6. When a job has finished, the executor responsible for running the job informs the scheduler, which on its behalf submits a “job finished” message to the log. The same log processor as in step 4. updates its database to reflect that the job has finished.

Streams API

Armada does not maintain a user-queryable database of the current state of the system. This is by design to avoid overloading the system with connections. For example, say there is one million active jobs in the system and that there are clients who want to track the state of all of those jobs. With a current-state-of-the-world database, those client would need to resort to polling that database to catch any updates, thus opening a total of one million connections to the database, which, while not impossible to manage, would pose significant challenges.

Instead, users are expected to be notified of updates to their jobs via an event stream (i.e., the streams API), where a client opens a single connection for all jobs in a so-called job set over which all state transitions are streamed as they happen. This approach is highly scalable since data is only sent when something happens and since a single connection that contain updates for thousands of jobs. Users who want to maintain a view of their jobs are thus responsible for maintaining that view themselves by subscribing to events.

Notes on consistency

The data stream approach taken by Armada is not the only way to maintain consistency across views. Here, we compare this approach with the two other possible solutions.

Armada stores its state across several databases. Whenever Armada receives an API call to update its state, all those databases need to be updated. However, if each database were to be updated independently it is possible for some of those updates to succeed while others fail, leading to an inconsistent application state. It would require complex logic to detect and correct for such partial failures. However, even with such logic we could not guarantee that the application state is consistent; if Armada crashes before it has had time to correct for the partial failure the application may remain in an inconsistent state.

There are three commonly used approaches to address this issue:

The first approach results in tight coupling between components and would limit us to a single database technology. Adding a new component (e.g., a new dashboard) could break existing component since all operations part of the transaction are rolled back if one fails. The second approach allows us to use multiple databases (as long as they support the distributed transaction framework), but components are still tightly coupled since they have to be part of the same transaction. Further, there are performance concerns associated with these options, since transactions may not be easily scalable. Hence, we use the third approach, which we explain next.

First, note that if we can replay the sequence of state transitions that led to the current state, in case of a crash we can recover the correct state by truncating the database and replaying all transitions from the beginning of time. Because operations are ordered, this always results in the same end state. If we also, for each database, store the id of the most recent transition successfully applied to that database, we only need to replay transitions more recent than that. This saves us from having to start over from a clean database; because we know where we left off we can keep going from there. For this to work, we need transactions but not distributed transactions. Essentially, applying a transition already written to the database results in a no-op, i.e., the updates are idempotent (meaning that applying the same update twice has the same effect as applying it once).

The two principal drawbacks of this approach are:

Working around eventual consistency requires some care, but is not impossible. For example, it is fine for the UI to show the a job as “running” for a few seconds after the job has finished before showing “completed”. Regarding timeliness, it is not a problem if there is a few seconds delay between a job being submitted and the job being considered for queueing. However, poor timeliness may lead to clients (i.e., the entity submitting jobs to the system) not being able to read their own writes for some time, which may lead to confusion (i.e., there may be some delay between a client submitting a job a that job showing as “pending”). This issue can be worked around by storing the set of submitted jobs in-memory either at the client or at the API endpoint.