Wednesday, June 28th, 2023 (12 months ago)
TLDR: Xarray can now wrap a range of parallel computation backends, including Cubed, a distributed serverless framework designed to limit memory usage.
One of Xarray's most powerful features is parallel computing on larger-than-memory datasets through its integration with Dask. We love Dask - it's an incredibly powerful tool, a big part of Xarray's appeal, and central to the scalability of the Pangeo stack.
However, the vision of seamlessly processing any array workload at arbitrary scale is incredibly challenging, and its good to explore alternative implementations that might complement Dask's approach.
Cubed is one such alternative - a serverless framework for array processing developed by Tom White. Tom comes to the Xarray community via working on the statistical genetics toolkit, sgkit, and brings with him considerable experience with distributed systems. We will discuss how Cubed works, what its novel design can offer, and demo using it on a realistic problem.
We will also explain how Xarray has been generalized to support wrapping Cubed, opening the door to yet more alternative implementations of chunked parallel array computing in Xarray.
Managing RAM usage in big computations is very hard. In the wider world there are many different frameworks for performing distributed computations, but most of them can't make strict guarantees about limiting memory usage. In practice this means that configuring workers or adjusting details of the computation plan to avoid accidentally exceeding memory limits is often the bottleneck for getting real-world computations to run successfully.
The Dask developers have been making great progress on this problem, releasing big changes to the distributed scheduler, and recently a new approach to rechunking using a P2P algorithm.
We've been working on a different approach: Cubed's framework is specifically designed from the ground up to avoid this memory problem.
Cubed is a library for processing N-dimensional distributed arrays using a bounded-memory serverless model. What does that mean? Let's take these terms one at a time:
N-dimensional arrays
Cubed provides a cubed.Array
class which implements the Python Array API Standard. Like numpy
/cupy
/dask
arrays etc., Cubed arrays understand basic N-dimensional operations such as indexing, aggregations, and linear algebra. Note that Cubed does not understand data structures other than arrays, for example DataFrames.
Distributed
Cubed Arrays are divided into chunks, allowing the work of processing your dataset to be split up and distributed across many machines.
As well as exposing a .chunks
attribute in exactly the same way as Dask, cubed provides functions for applying chunk-aware operations such as map_blocks
, blockwise
, and apply_gufunc
. Cubed owes another huge debt to Dask here for developing and exposing these useful abstractions.
Bounded-memory
Cubed aims to perform operations without exceeding a pre-set bound on the total RAM usage of the computation. By building all common chunked array operations out of basic operations with known memory requirements, Cubed guarantees that the entire computation executes in a series of bounded-memory steps.
Cubed implements all chunked array operations in terms of just two "primitive operations": blockwise and rechunk. Blockwise applies an in-memory function across multiple blocks of multiple inputs, and can be used to express a range of tensor operations. Rechunk simply changes the chunking pattern of an array without altering its shape.
FIGURE: Dependency tree for chunked array operations within Cubed, from cubed's documentation. Notice that all array API operations (white) can be implemented for chunked arrays in terms of "core ops" (orange) and eventually just two "primitive ops" - blockwise and rechunk.
Both blockwise and rechunk can be performed using algorithms which only require loading one chunk from each input array into memory at any time. In the case of rechunk the algorithm comes from the Pangeo Rechunker package, whose model was a direct inspiration for Cubed. As each chunk is of known size, the RAM usage for processing each chunk can be reliably estimated in advance.
Therefore the expected RAM usage of any Cubed computation can be projected in advance, and compared against the known system resources available (specified via the cubed.Spec
object).
Serverless
Serverless computing is a model in which some service runs a users tasks for them on an on-demand basis, allowing the user to avoid worrying about the details of how in fact their tasks are being run on real servers somewhere.
Cubed breaks down every array operation into a set of independent tasks, each of which reads one chunk from cloud storage and operates on it. These tasks are a good fit for a serverless cloud computing model.
However, unlike on a cluster the serverless functions cannot communicate with each other directly over the network. Cubed instead writes an intermediate result to persistent storage between each array operation, saving a history of steps via Zarr. This approach is a generalisation of the way Rechunker's algorithm writes and reads data to an intermediate Zarr store as it executes. (Cubed actually implements some optimizations to avoid writing every intermediate array to disk, but more are possible.)
The benefit of serverless for analytics jobs is that instead of the user creating, managing, and shutting down a cluster manually (which lives either locally or on one or more remote servers), they simply specify the tasks they want performed and hand those tasks off to some paid-for serverless cloud computing service, such as Google Cloud Functions or AWS Lambda. The tradeoff is that this approach is much more I/O intensive, and serverless nodes are generally more expensive than cluster nodes.
Cubed interfaces with a range of serverless cloud services and uses them to execute the chunk-level operations required to process the whole array. This also provides chunk-level parallelism - all the tasks for creating one array are embarrasingly parallel so can be run simultaneously. In other words, if your Cubed operation is writing its result to a Zarr store with 1000 chunks, Cubed will compute that step with 1000 parallel processes simultaneously by default.
FIGURE: There are a large number of commercial cloud services which offer serverless computing services. All of the services pictured here are supported by the Lithops framework, which is only one of many tools Cubed can use to execute computations. So far, using the Lithops executor, Cubed has been tested on both AWS Lambda and Google Cloud Functions. (It has also been run on AWS using Modal and Dataflow using the Beam executor.)
You can try using Cubed arrays directly or you can use them seamlessly through Xarray, in the same way you use Dask.
First conda install -c conda-forge cubed-xarray
, which will install Cubed, the latest version of Xarray (v2023.5.0), and the glue package cubed-xarray
.
To run Cubed in serverless mode you need to set up integration with a cloud service. Alternatively you can just run Cubed locally, but you won't see many of the benefits (local execution is the default if you don't specify an executor
below).
Before creating a Cubed array you need a Spec
, which tells Cubed (a) how much RAM you think a process should need to compute any one chunk, and (b) where to store all the temporary files it will create as it executes. This location can be a local directory or a cloud bucket.
1from cubed import Spec 2 3spec = Spec(work_dir='tmp', allowed_mem='1GB') 4
Now you can create a cubed-backed Xarray object in the same way you would create a dask-backed one:
1ds = open_dataset( 2 'data.zarr', 3 chunked_array_type='cubed', 4 from_array_kwargs={'spec': spec}) 5 chunks={}, 6) 7
The only difference is the addition of the new chunked_array_type
kwarg (which defaults to 'dask'
), and the new from_array_kwargs
dict, which allows passing arbitrary kwargs down to the constructor that creates the underlying chunked array. The chunks
kwarg plays exactly the same role as it does when creating Dask arrays. (This also works with the .chunk
methods on Dataset
/DataArray
, using the same keyword arguments.)
Now you specify your analysis using normal Xarray code, right up until you need to compute the result. In a similar way to how Dask's .compute
method accepts a scheduler
argument, Cubed's .compute
accepts an executor
argument. You need to specify which executor to use by importing it, and configuration options for the cloud service would also be passed at this point.
1from cubed.runtime.executors.lithops import LithopsDagExecutor 2 3ds.compute(executor=LithopsDagExecutor()) 4
Xarray has been coupled to Dask since 2015. How did we generalize it?
Much of the integration works immediately because Cubed implements the Python Array API Standard, and Xarray can wrap any numpy-like array (see the blog post on wrapping pint.Quantity
arrays).
Chunked arrays implement additional attributes (e.g. .chunks
) and methods (e.g. .rechunk
), but also require special functions (e.g. blockwise
, map_blocks
) for mapping in-memory numpy functions over different chunks. These key functions are useful abstractions invented within Dask, and Cubed implements its own versions of each of them.
Chunked array implementations specify how to dispatch computations to the correct implementation of these functions by subclassing a new ChunkManagerEntrypoint
base class. Any array library implementing an array-like API and defining chunk-aware functions can hook in. Using entrypoints means the possible backends can be automatically discovered, which is why we don't need to explicitly import anything from the cubed-xarray
package.
To succinctly demonstrate the relative advantages of Cubed, we'll choose a workload which is representative of what happens when cluster-based frameworks struggle with memory management.
The "Quadratic Means" problem is a simplified version of calculating the cross-product mean from the climatological anomalies of two variables. It requires taking products of two chunked scalar variables $U$ and $V$ and then aggregating the result along the chunked dimension. It is a good example of problem that is simple to express, but creates a dask graph complex enough to trigger suboptimal performance of the distributed scheduler.
FIGURE: Dask's memory usage for a simplified version of the "quadratic means" workload, at a number of different scales. Notice the RAM usage increasing proportially with dataset size, until the workers spill to disk for the largest dataset (1.5TB). If the workload were much larger, or the cluster much smaller, this would have crashed. This was run on 20 16GB RAM workers using Coiled.
We ran this workload on Coiled, for datasets of increasing size, using the latest version of dask.distributed
, both with and without P2P rechunking. We see that as the problem size increases, the peak memory required by the dask cluster increases too. Once the peak memory required is larger than the RAM available to the workers, they continue loading data, but spill it to disk (spilled memory not shown in the plot). If the total memory available is smaller than the dataset size, the cluster will crash (here we used 20 workers to ensure there was enough disk space to complete the 1.5TB workload).
This type of failure occurs sometimes for real array workloads when using dask, but is a symptom of the general difficulty of memory management when using any cluster-based framework. (The performance of on this particular case could almost certainly be improved by changes to dask's distributed scheduler, and indeed dask.distributed
has gotten hugely better at memory management recently. Here though we are just using this case as a specific example of a type of failure that is very difficult to solve in general.)
Trying this problem with Cubed (using the Lithops executor and Google Cloud Functions), we find that Cubed successfully completed the workload at all scales, including the 1.5TB workload. Remarkably, it did so using only 1.3GB of RAM per serverless container, staying below the threshold of 2GB we set in the cubed.Spec
.
If we take just the largest version of this workload (1.5TB), we can visualize the relationship between the various types of memory Cubed accounts for over the course of the computation.
FIGURE: Cubed's actual memory usage vs projected memory usage, for the largest workload (1.5TB). Cubed successfully processed 1.5TB of data whilst only using a maximum of 1.3GB of RAM per serverless container, staying well below the given limit of 2GB of RAM. This was run using Lithops on Google Cloud Functions, which provisioned thousands of serverless containers each with 2048MB of RAM.
You can see again that the projected memory usage is below the allowed memory usage (else Cubed would have raised an exception before the job even started running), and the actual peak memory used was lower still. We've also plotted the reserved memory, which is a parameter intended to account for the memory usage of the executor itself (i.e. Lithops here), and was estimated by measuring beforehand using cubed.measure_reserved_memory()
.
One obvious tradeoff for this memory stability is that Cubed took a lot longer to complete - roughly 4x longer then dask for the 1.5TB workload (45m 22s vs 11m 26s). We will come back to discuss this shortly.
Finally it's interesting to look at Cubed's equivalent of the task graph. To calculate one array (the product $UV$ from the quadratic means problem), Cubed's "Plan" for processing 1.5TB of data looks like this:
FIGURE: Cubed Plan
(i.e. task graph) for processing 1.5TB of array data. This graph corresponds to a single xarray variable (UV
) in the "quadratic means" workload. Each node corresponds to an intermediate Zarr array that gets written to disk as the computation progresses, separated by either rechunk (rc) or blockwise (bw) steps. Notice that the number of nodes in the graph is not proportional to the number of chunks in the array.
The task graph is at the array-level, meaning that it does not grow proportionally to the number of chunks. The final linear series of steps does get longer with problem scale, but only logarithmically, as it represents the number of steps in a tree-like reduction necessary to reduce that quantity of data. (This Plan
is somewhat similar to Dask's "High Level Graph", but array-native, and designed to avoid the problem of over-burdening a scheduler.)
Cubed uses a completely different paradigm to Dask (and other frameworks), and so has various advantages and disadvantages. Let's discuss the obvious disadvantages first.
Overall one could think of different parallel array processing libraries as complementary, best-suited to different problems or to running on different systems.
There is a saying that there are only three important numbers in programming: 0, 1, and infinity. Having gone from 1 to 2 options of parallel backends, can we go to N?
That is the situation in the database analytics world. There SQL (Structured Query Language) queries form a common high-level way of expressing user intent, but the actual execution of a query on a database can be performed using a huge range of technologies, known as Query Engines. These projects compete to determine which can get the best performance on various standardised benchmarks. In the example above Xarray is playing a similar role to SQL as a high-level query language, allowing users to easily try out different array backends depending on their problem.
Here we've focused on Cubed, but this work opens the door to using other alternative distributed array processing frameworks such as Ramba, Arkouda, Aesara, or even others yet to be developed... We're discussing this dream in the Pangeo Distributed Arrays Working Group, so if this inspires you then please join us!
Tom Nicholas' work on this project was by the Gordon and Betty Moore Foundation Data-Driven Discovery program, via a grant to the Columbia Climate Data Science Lab.