How to efficiently load data to memory

Jorge C. Leitão
6 min readJan 8, 2022

--

Together with QP Hou. Tradeoffs between formats, computer resources, and performance.

During the past months, we have been implementing “readers” and “writers” to a plenitude of data formats to Arrow in-memory format from scratch. To be specific, the formats that we covered were Apache Arrow IPC, Apache Parquet, Apache Avro, JSON, CSV and NDJSON. In all cases, the goal was to load them as fast as possible to a columnar in-memory format, which in this case was Apache Arrow (in Rust, see e.g. arrow2).

This post describes general observations we have been taking from this exercise. The core result presented here is that formats whose loading (IO-bound task) is explicitly separated from deserialization (CPU-bound task) are significantly faster to load to memory and allow for better resource utilization.

Loading data is a dreaded operation: everyone just wants to get over it and do the fun stuff with the data — a bit like “my code’s compiling”

“My data’s loading” would work as well

Furthermore, loading is often the most important bottleneck in ETL and ML workloads. There are whole technological movements aimed at addressing this specific problem.

This post is divided in 4 (small) sections:

  • The first section introduces what loading data to memory is composed of
  • The second and third sections go through each of the components (CPU and IO tasks)
  • The fourth section describes why these tasks are usually incompatible to be run together
  • The fifth section explains how in general we optimize data loading

IO-bounded vs CPU-bounded

Loading a file is usually composed by two types of operations:

  • loading bytes from a storage device (disk, cloud, distributed disk, etc.) to memory (RAM)
  • deserializing bytes to a target in-memory format (e.g. an array of strings or numbers)

The abstraction of each category is the follows: loading bytes is limited by our ability to copy bytes from storage to memory — think of usage of the SSD or network; deserializing bytes is limited by our ability to operate on bytes — think usage of processors.

Virtually all data formats require both types of operations. For example, CSV requires reading bytes from the file to disk and computing where a row ends and a new one starts, whether the delimiter is part of a quoted expression, etc.

CPU tasks

Deserialization is essentially using a processor to convert bytes from one format to bytes representing something else. Examples:

  • given two bytes [206,160], can we interpret them as a valid string? (it is the utf-8 symbol п)
  • is there a float representation of the bytes [51, 46, 49, 52, 49, 53, 57]? (yes,3.14159)
  • decompress bytes from snappy

IO tasks

Loading is something that a computer program usually waits for. Examples:

  • Load the schema of a Parquet file from s3
  • Load the next Avro block from a file locally
  • Download a CSV file from the internet

In all cases, the program executes: “send request, wait, wait, wait, …, wait, heyyy the file is available”.

It is wasteful to have a processor waiting if there are other important tasks available and there are programming models dedicated to this type of “waiting” tasks. Here we focus on the async/await cooperative multi-tasking model that many programming languages implement. In this model, we can tell the processor to “if waiting, switch to another task”. In the context of the s3 example above, the “wait” word would be replaced by “do something else”.

CPU vs IO

The big catch is that CPU tasks block asynchronous execution. I.e. If we need to download 10 files by one program, we can have the main thread concurrently download them all at once since it can “fire first request, switch, fire second request, switch,…” and then wait for each of the files to arrive, asynchronously. However, if we had to perform a computation with one of the downloaded files, we would block that process, even if we could “heyyy the file is available” in all the other ones — we blocked the thread from switching (see also Async: what is blocking).

So, let’s look at our CSV example and see how blocking affects reading a CSV file located in the cloud. A naive CSV reader is implemented as follows: read a byte, update its state (check if it is a delimiter and close the previous item of the row it if is, check if the byte is a quote, etc.), read a new byte, update its state; and repeat this until we reach the end of the file. Reading a byte is an IO task, updating its state is a CPU task.

As an astute reader, you can conclude that reading CSV has some challenges: we have a CPU task mixed together with an IO task that causes either the IO task to have to “wait”, or the CPU task to block an asynchronous read.

There is an obvious solution to the example above: download the whole file to memory asynchronously in a single request, and then perform the deserialization to rows with a dedicated (blocking) thread. This way the thread can do other IO-tasks while it waits for the download, and not get blocked while downloading the file. This solution is an extreme case of a more general principle:

trade memory for use of IO and CPU

If we use too much memory we may crash the program with an out of memory error; if we use too little memory we may never finish processing the file. And this is what most CSV readers offer: a chunk size “slider” that we use to work out this tradeoff.

Even though chunk size helps, CSV is still difficult to process. The root cause is that there is no way to decouple the IO task of reading bytes from the CPU task of deserializing bytes into rows. E.g. if we only wanted the first 1000 rows of a CSV file, we do not even know how much size the first 1000 rows occupy and we thus need to “read and deserialize” some, then try to read some more, etc. In CSV this happens because CSV must be parsed via a state machine (i.e., CPU task).

Decouple CPU from IO

The necessary and sufficient condition to be able to decouple CPU and IO tasks in loading data is

loading a group of entities (e.g. a rows, arrays) requires CPU tasks that are at most O(1) in the number of bytes being read

An equivalent way of stating this property is that deserializing a group of entities can be done without any loading (i.e. no IO).

To offer two examples whereby this condition is fulfilled:

  • A Parquet page is composed by a set of rows from a column; it is not possible for an element of a column to be divided between two pages
  • An Avro block is composed by a set of rows; it is not possible to divide a row between two blocks

This property is crucial because it allows any block to be deserializable independently of others blocks, making such blocks “units of independent work” that can be moved between IO and CPU tasks. E.g. we can process a parquet file in pages, an Avro file in blocks, an Arrow IPC file in buffers, etc.

In practice, this separation allows using a common thread synchronization pattern, multi-producer multi-consumer: have a (green) thread pool reading the file in chunks (so, only IO, non-blocking) and a second (normal) thread pool deserializing the chunks (so, only CPU, blocking). Common implementations of this pattern use bounded channels, which limit the number of chunks that can be allocated at any given time. In summary, the knobs of this implementation are:

  • the producer thread pool controls IO utilization
  • the channel bounds controls memory usage
  • the consumer thread pool controls processes utilization

For systems where a single reader is sufficient to saturate IO, the multi-producer can be replaced by a single producer. For systems with limited memory, the channel bound can be set to 1. For systems with a low number of processors, the multi-consumer can be replaced by a single consumer. Yet, in general, an API should allow users to choose the setup they prefer based on their own use-case (e.g. I am reading from s3 or from SSD?)

There are many data formats that do not fulfill this condition: CSV, flatbuffers, protobuffer, thrift, etc. In these examples, we need to interleave CPU tasks with IO tasks, making it difficult to be able to tradeoff CPU, IO and memory usage. In opposition, Avro, Parquet, Arrow IPC are all formats that are notoriously good in this regard because they allow a complete decoupling of IO-tasks from CPU-tasks. Arrow IPC format in particular goes one step further in that the bytes can be copied as-is to memory, therefore allowing for almost no CPU work involved.

Concluding remarks

Reading data to memory is a common bottleneck in many analytical engines. This post outlines some of the tradeoffs that loading data has and how they relate to different computing properties (CPU, IO, and memory). Finally, it maps how these parts interplay in loading data in a multi-threaded environment.

--

--