kuniga.me > NP-Incompleteness > Paper Reading - Ray

04 Aug 2021

In this post we’ll discuss the paper Ray: A Distributed Framework for Emerging AI Applications by Moritz et al [1]. Ray is a framework aimed at Reinforcement Leaning (RL) applications.

## Background

Since Ray is specifically targeted for RL applications, it’s worth going over it briefly.

### Reinforcement Learning

Reinforcement Learning is one of the three machine learning paradigms [2] (in addition to supervised and unsupervised learning). It’s usually modeled as a Markov decision process (MDP).

The model contains two entities: the agent and the environment.

The environment represents the real-world (or some simulator of it), for example, in self-driving cars, the real world is basically the input to all the sensors and cameras in the car. At any given moment the environment is in one of the states from $S$, which could be GPS coordinates and speed for example.

The environment can take in an action from a set $A$ which will move it to another state. This process is called simulation since it will require either performing the action in real-world (speed the car up) or more likely simulating the outcome of the action to compute the resulting state. This transition is probabilistic, defined by $P_a(s, s’)$, that is the probability of transitioning from $s$ to $s’$ when action $a$ is performed.

There is a reward associated to the transition from states $s$ to $s’$ when action $a$ is performed, $R_a(s, s’)$.

The choice of action is performed based on a probability function $\pi$, called policy, defined as $\pi: A \times S \rightarrow [0, 1]$, where $\pi(a, s)$ represents the probability the action $a$ will be chosen when we’re in state $s$. This process is known as serving. This policy is the variable that the RL algorithm will modify.

In an ideal world, we’ll be able to tell the best action to perform from the environment either analytically or by simulating all possible actions, but in practice this is infeasible, hence we work with probabilistic functions.

We can evaluate a policy against an environment, which we call rollout, by starting the environment from an initial state, then iteratively computing the action and getting the results (new state + reward) from the environment. The sequence of states and rewards resulting from this simulation is called trajectory and is the product of this function:

Given a set of trajectories evaluated previously, the goal of the algorithm is to find the policy $\pi$ that maximizes the rewards across all trajectories. This process is called training, and we keep repeating this process until the policy converges.

## Requirements

To tailor the system to RL applications, Ray combines the Serving, Simulation and Training steps within itself, for the sake of reducing latency.

These steps vary in requirements, for example performing an action might take a few milliseconds while training could take hours. It’s necessary thus to support heterogeneous computation, not only in terms of execution time but also hardward (GPUs, CPUs).

Some simulations require state to be carried from one function to the next thus the system should support stateful computation. In a similar vein, one function might spawn new functions to be executed, which cannot be known ahead of time, so the system should allow dynamic execution.

It should also enable integrating with existing simulators and deep learning frameworks.

After covering the architecture of the system, we’ll revisit these requirements to see how they’re satified.

## Components

These are some high-level entities the system uses.

There are two types of remote function execution:

• Actor is stateful

Both tasks and actors are executed by a remote worker. Both are non-blocking by default but the API provides a way to wait (i.e. block) for the computation to finish.

It’s possible for remote functions to invoke other remote functions (nested remote calls). In addition, actors can be passed as parameters to remote functions, so its internal state can be reused.

Perhaps we can make analogies with programming: tasks are pure functions and actors are instances of classes. Perhaps a more precise description would be that actor methods are the stateful execution.

The execution of both tasks and actor methods is automatically triggered by the system when their inputs become available. To know when this happens, Ray builds a DAG to track dependencies.

Nodes in this graph can be either: data, tasks or actor methods. There are 3 types of edges:

• Data (data → task) - when task depends on some data as input
• Stateful (actor method → actor method) - sequential calls of methods within the same actor

This graph is dynamic in the sense that it changes during the execution of the program. As new tasks and actors are created, nodes and edges are added.

## Architecture

There are three types of nodes in the system: worker node, global schedulers and the global control state (GCS). Figure 3 shows an example of a configuration of the nodes.

Let’s look at each of these node types in more detail.

### Worker Node

Within each worker node we have a few processes and storage.

• Driver - a process executing the user program (the non-remote functions)
• Worker - a process executing a task
• Actor - a stateful process executing methods from an actor
• Local scheduler - when a task is created inside this node, the local scheduler either executes it locally (by sending it to a worker), or, if the node is overloaded, delegate to the global scheduler to route to a different node.
• Oject store - stores the input/output of tasks

Let’s cover the object store in more details. It’s an in-memory storage shared between the worker processes in this node. The input to functions must be available in the object store before starting.

Suppose a worker in node $N_1$ needs data $a$ to execute, but it doesn’t have it locally. It then asks the global control state who knows which node, say $N_0$ has that input. Then $N_1$ copies the data directly from $N_0$ and stores it locally.

Each object has an associated ID and its immutable (mutating the object means creating new IDs), thus keeping an object replicated in multiple places is safe from the consistency perspective. This mechanism of replicating the data also helps to distribute the load since if there is some hot data, it’s likely replicated in multiple nodes which the GCS can load-balance from.

My comment: One interesting aspect of Ray is that the process that executes the user program is co-located with the one that executes the heavy-load. In other distributed execution systems I’ve seen, they’re separated, for example a client sending some SQL input to an engine which will execute it in a backend.

### Global Scheduler

As discussed in the previous section, the global scheduler is only used if the local scheduler in the node decides not to schedule it locally, hence the paper calls this scheduling strategy bottom-up scheduling.

The global scheduler uses a bunch of different criteria to determine which machine it will assign a task to, including:

• Estimated time the time will stay in the queue
• Transfer time (a function of input size and network bandwidth)

The scheduler gets information like the tasks’ input from the GCS. It also probes a worker node to determine its queu size and resources via heartbeat.

My question: does it need to probe all nodes or only a subset of them?

One thing that the scheduler does not do is task dispatching, i.e. retreiving the inputs for the task to execute. This is done by the GCS.

By staying stateless it becomes easy to scale the global scheduler horizontally.

### Global Control State (GCS)

The GCS holds the metadata of the system. As seen in Figure 3, it stores:

• Table of object metadata - which node contains what object (data), note that all objects have an associated ID.

It’s worth calling out that the GCS does not keep the actual data. It’s distributed across the worker nodes.

The GCS is implemented using a sharded key-value store (one Redis per shard). The shards are done by object and task IDs. Each shard uses chain-replication [5] for fault-tolerance.

My question: why not let Redis handle the sharding? Perhaps the system wants more control over the replication strategy?

Periodically, GCS flushes the data to disk, both to keep the memory usage capped but also to serve as snapshot for recovery.

## Results

The paper provides a bunch of micro-benchmarks testing the performance of specific features, like end-to-end scalability, delays from GCS, tasks and actors’ fault-tolerance mechanisms, etc.

It also compares the performance against multiple existing systems including Clipper, Horovod, OpenMPI, etc. And it outperforms them in many RL-specific tasks.

Finally it compares features and designs with some of the systems above as well, and points our where they fall short of efficiently meeting RL application requirements.

## Analysis

As promised, we revisit the requirements from the Motivation section to see how they were addressed:

• Combines the Serving, Simulation and Training - it does it by having a simgle Python API for computing tasks and actors. It doesn’t distinguish between these steps.
• Support heterogeneous computation - tasks can be used for a variety of workloads and be run in different hardware. Coupled with the scheduler, which knows these requirements and th hardware settings, heteregenous tasks can be modeled transparently.
• Support stateful computation - the actors models this use case.
• Integrates with existing simulators and deep learning frameworks - by virtual of allowing arbitrary Python execution it can leverage existing libraries.

## Conclusion

I really like the idea of co-locating data with the execution node. This is a natural way to distribute the data and avoids a lot of the latency of storing it in a centralized database.

I haven’t dealt with Reinforcement Learning before, so I’m sure I’m missing a lot of the details for the motivations behind this system, but I’m happy to have learned a bit about it and some other distributed concepts (in the Appendix).

I originally thought Ray was a framework for general Python distributed computing and this is what motivated me to read the paper. Well, it might as well be, but I didn’t know the strong focus on RL applications.

## Appendix

Here we discuss some terminology and concepts I had to look up to understand the paper.

### Allreduce

All reduce is a distributed protocol for nodes in a system to share their data among all the nodes, while consolidating (reducing) them in the process. One contrived example is one in which each node holds a number and the goal is to each node hold the sum of the values from all nodes in the system.

One naïve way to do this is to have each node send its data to all other nodes requiring $O(n^2)$ network transmissions, then adding the values received locally, but it’s possible to do it with $O(n)$ transmissions.

This very informative article by Edir Garcia Lazo provides a visual explanation of a popular implementation of the protocol called ring allreduce.

### Chain-replication

The basic idea is to have a chain (or linked list) of storages $s_1, s_2, \cdots, s_N$, where writes are peformed in $s_1$ (head) but reads from $s_N$ (tail). When $s_i$ gets written to, it propagates the write to $s_{i+1}$.

Once $s_N$ is written to, it sends an ack back to $s_{N-1}$, which in turn sends it to the previous node, all the way up to $s_1$.

By reading from the tail we guarantee the data has been replicated in all nodes.

When a node fails it can be removed as if we were removing a node from a linked list. Losses could occur if they didn’t have a chance to be propagated.

It’s interesting to note that when node $s_i$ fails, the pending acks from $s_{i+1}$ will now go to $s_{i-1}$ and from the perspective of the other nodes nothing happened. If node $s_i$ fails after receiving a write request but before sending it to $s_{i+1}$, that would be lost, but node $s_{i-1}$ could have a retry mechanism if it didn’t receive an ack in some time.

### Lineage Storage

To recover from failure, systems usually persist data from memory to disk. The idea is to, from time to time, persist snapshots of the state of the system and also the exact steps performed since that snapshot was taken.

One type of implementation, called global checkpoint, only relies on the snapshots, logging no steps. On recovery, it has to re-run the job from the latest checkpoint, and if the execution is non-deterministic, it might lead to different results.

On the other extreme implementation we have what’s known as lineage, which only logs the steps but no snapshots, so on recovery it needs to replay the whole computation [6].

Steps can usually be logged more frequently than state snapshots because of the size, but for small tasks they might pose a bigger overhead.

### Parameter Server

According to [4], parameters servers:

store the parameters of a machine learning model (e.g., the weights of a neural network) and to serve them to clients (clients are often workers that process data and compute updates to the parameters)

It’s usually implemented as key-value store.

## References

• [1] Ray: A Distributed Framework for Emerging AI Applications
• [2] Reinforcement learning
• [3] Visual intuition on ring-Allreduce for distributed Deep Learning
• [4] Implementing A Parameter Server in 15 Lines of Python with Ray
• [5] Chain replication : how to build an effective KV-storage
• [6] Lineage Stash: Fault Tolerance Off the Critical Path