The Ursa Labs team has been creating popular open source data science tools for over a decade.
One of our goals is to empower and accelerate the work of data scientists through more efficient and scalable in-memory computing. Many popular data science tools (such as the popular Python pandas library) in general do not effectively leverage modern hardware (large RAM, multicore CPU, GPU-equipped, etc.). Much effort in the ecosystem has focused more on deep learning and problems related to machine learning, leaving fundamental issues in data access, data manipulation, exploratory data analysis, and feature engineering comparatively underattended.
So far, our work has been concentrated in the Apache Arrow project which has a broader application scope than data science. In time, we may expand to create software artifacts focused more specifically on the data science domain.
In addition to providing the foundation for the technology stack described in this document, the Arrow project serves as a unifying, high-performance data interoperability layer beyond the data science world. Systems using Arrow as a native memory format are more interoperable with each other and have more opportunities for collaboration and code reuse. As more systems adopt Arrow, Arrow-based data science tools will reap the benefits. Read more about the Arrow project in general at http://arrow.apache.org.
The Arrow C++ project provides some of the initial building blocks of this technology stack.
The Arrow columnar memory format has been in development since 2015. The key components of this system (the “specification”) are:
We have refined and expanded many of these details since commencing development. As time goes on, we must grow the columnar format responsibly and sustainably to support our own needs as well as the needs of other Arrow users (which are developers of other data systems). The objective is to build a large, robust, and stable Arrow ecosystem.
We plan to work with the Apache Arrow community to reach binary format stability and make a 1.0 release in 2019. Many users (other OSS projects and companies) have indicated that they will begin to adopt Arrow more wholeheartedly once the binary format is declared stable.
There are some other things that we have contemplated adding to Arrow (which may increase over time), such as:
The Arrow specification describes the binary format for serializing schemas and representing units of data (“record batches”, chunks of tables). On top of this binary format, we have developed batch and streaming messaging formats for transmitting datasets.
Thus far, message transport and RPC details have not been fully developed. We propose to implement efficient integration with well-established messaging frameworks, like gRPC and ZeroMQ, to make it easy for our users to send and receive data between themselves and with data access layers supporting these native Arrow RPC protocols (vs. proprietary protocols, like ODBC, that must be deserialized to Arrow format).
In C++, we have developed abstractions for managing memory lifetime using RAII and smart pointers (http://en.cppreference.com/w/cpp/language/raii). This also permits clean hierarchical memory references, shared buffer ownership and parent-child semantics, copy-on-write, and other benefits.
To interface with the outside world, we must build bridges between low-level IO interfaces and filesystem-like storage layers and Arrow’s C++ memory subsystem. These should support both synchronous and, where relevant, asynchronous access. We wish to maximize throughput from each kind of data source, while providing a consistent, robust, fast, and full featured API for each data source. Among other things, this includes:
With Arrow as our runtime memory format for analytics and interchange, we must be able to convert between various popular storage and raw data input formats, and back.
For file formats, these include but are not limited to:
In addition to supporting various file-based data storage formats, we will want to support some other kinds of binary data protocols in popular use
Some data sources may benefit from evaluating boolean conditions while reading from the files to avoid deserializing data unnecessarily. Parquet and ORC are great examples – in addition to providing cached statistics about row groups, we can also evaluate simple predicates like FOO > 5 and filter columns as we go rather than after the fact.
In 2017 Apache Arrow acquired a component called Plasma (http://incubator.apache.org/ip-clearance/arrow-plasma-object-store.html), developed to support the Ray project (https://github.com/ray-project/ray), a machine learning development platform from the UC Berkeley RISELab.
The Plasma store can assist with developing applications involving multiple processes that need to share data, which may reside in CPU or GPU memory. Computational processes live separately from the Plasma store, a third party daemon. The processes are able to access data managed by Plasma through zero-copy shared memory access, and so by employing the Arrow columnar format to encode structural information, can describe complex datasets and make them available with minimal serialization overhead.
We wish to provide strong support for managing datasets used by multiple processes living on the CPU or GPU.
Historically, data manipulation libraries in data science languages like Python and R have been bespoke, vertically integrated systems developed by small groups of developers. These include
In some cases, a deferred evaluation system has been created. A good example is dplyr (https://dplyr.tidyverse.org/), which is specific to the R language and R’s data frame representation.
We plan to develop portable, embeddable computation engine in C++ with the following features:
Initially, development will focus on efficient computing on a single node. We would be interested in developing a distributed runtime in the future.
An architecturally similar system that operates on a different data model (numerical multidimensional array data) is the TensorFlow project from Google. Here we are processing data frames, not tensors.
There is some prior art in building an Arrow-based computation engine. One example is Dremio (See http://dremio.com/ and https://github.com/dremio/dremio-oss), which started out as a fork of the Apache Drill SQL-on-Hadoop system.
To build an Arrow-native data processing engine, we must implement functions which perform computations on the Arrow memory format without conversion to some other intermediate representation.
The lowest-level unit of computation we call a kernel. A particular kernel
corresponds to an operator, which may be things like “add”. In J and other APL
derivatives, operators are called “verbs”. An operator performs computations on
input data structures having well-defined types. Given an operator and input
Add(Float64, Float64), the kernel is selected which performs the
operation on the particular input types.
Data processing runtimes may break data into chunks and perform multiple kernel
evaluations in separate threads. For example, the operation
x + y can be
computed by any number of threads in parallel.
To build a functionally-complete data processing library such as pandas, we require operator kernel implementations for a wide variety of operations, such as (incomplete listing):
For a given operator, there may be many implementations: Non-SIMD CPU, SIMD CPU, GPU (CUDA or OpenCL), etc. The computation engine can select the appropriate kernel implementation at runtime.
In addition to array-based mathematical functions and other manipulations, we must also be able to perform relational algebra (joins) on tables (semantically collections of arrays): inner, outer, left, right, anti, semi.
Given an operator and its input types and any parameters, we can resolve its output type. An operator’s output, therefore, can be viewed as an unevaluated input to another operator. Similar to TensorFlow and other graph-based computation engines, we can compose our operators to create computational graphs having well-specified input types and output types.
As an example, consider the expression log(x + 1) - 1. This can be described as the graph:
Subtract ( Log ( Add ( x, 1 ) ) )
Such logical graphs or “programs” do not indicate the execution strategy. In this graph, nodes are operators and edges are data dependencies.
We additionally propose to specify an implementation-independent serialization format for describing such operator graphs. This will eventually permit multiple implementations of execution engines for these graphs to exist. We will probably develop a reference execution engine in C++ as described in the next section.
Note that the most naive “execution engine” or “runtime” is one that evaluates each operator serially, using 1 or more threads, fully materializing the result of each operator and input to subsequent operators. This is what the current pandas implementation does (and many of the other systems being used by data scientists currently):
We propose to develop a faster and more efficient execution engine for our analytical operator graphs. The primary strategies we will use when possible are:
These techniques have been widely studied and employed in production systems in the analytic database world, so little novel research should be necessary.
Note that each operator should be able to evaluate “eagerly” to emulate the existing “one operator at at time” execution approach.
Operator pipelining (combining multiple tasks into one, elimination of temporaries) yields some of the most significant performance improvements over the current naive approaches.
Runtime code-generation with LLVM for specialized code paths is the next level of performance. This is essentially the approach that the TensorFlow team has taken by building XLA (https://www.tensorflow.org/performance/xla/) for a different class of computations (tensor math / linear algebra)
Most modern query engines feature some amount of code-generation, mostly using LLVM as the compiler infrastructure.
The interpreted / virtual machine approach has some drawbacks which can be remedied through code-generation / JIT compilation with LLVM:
f(x).all(), where f is some function returning a boolean array. In a JIT’d version, f(x) can be computed in the same loop as the boolean reduction, enabling short-circuiting on the first observation of a false value
APL interpreters, prior to the advent of compiler technology like LLVM, would
partially address compute subgraph acceleration by special casing common
operator patterns (for example, the J expression
&/ a = b might be recognized
by the interpreter and be rewritten as a specialized all-equals operator)
There is substantial prior art in databases and other data processing systems for using runtime code generations to accelerate hot code paths. The Weld project (https://cs.stanford.edu/~matei/papers/2017/cidr_weld.pdf), for example, is being developed currently by the Apache Spark creator’s research group at Stanford to accelerate internal code paths in Spark that form part of the Databricks Runtime (https://databricks.com/product/databricks-runtime).
We must develop comprehensive interoperability with the Python programming language and other widely deployed 3rd party libraries like NumPy and pandas. Priorities will depend on the needs of Python users.
The R language interface will aim to provide equivalent functionality (but R-flavored) as with the Python bindings, with roadmap being driven by the needs of R users.
Pursuing this technology roadmap wholeheartedly will require a significant amount of development on build systems, packaging, continuous integration and testing, benchmarking, etc. Failing to adequately support developer productivity and packaging and deployment for users will slow the project’s progress.
In time, we will grow an increasingly large stack of 3rd party C and C++ dependencies for building the core libraries. While the build system and its dependencies may grow more complex, developers and users must be protected from this increased complexity as much as possible.
To put it simply, being a developer on this project must not be painful. In particular, this means that:
The workflow for making the software available to normal users must be as painless and streamlined as possible for the developers of the project. This includes:
Maintaining the packaging toolchain and builds is quite a lot of work. As of this writing, there are still many manual steps. We are publishing a limited set of nightly builds, and only for conda users on Linux.
We must develop reliable continuous integration (CI) services to test the software that is able to scale in two key dimensions:
Thus far, we have been getting by using the Apache Software Foundation’s Travis CI (Linux, macOS) and Appveyor (Windows) capacity for CI.
In addition to endogenous testing of the Arrow-based data science library stack, we must have confidence on a nightly or ad hoc basis that each code patch merged does not break downstream users of the projects. This includes other OSS projects like Apache Spark, Dremio, pandas, Dask, and others.
Running such integration tests with downstream systems may be too onerous in general to include with the standard continuous integration processes. We propose to provide a straightforward process for developers to create integration tests which can be added to a nightly validation process, so that any breakages will be made known to the development team within 24 hours.
This nightly process integration testing should also be available for ad hoc validation of “sensitive” patches.
In addition to validating the correctness and well-functioning of this software, we must also continuously monitor and improve its performance in different execution environments. Frequently, the performance of critical operations may become degraded or have major regressions by accident, even though the calculations are still correct.Our benchmarking systems must provide:
A well-documented, straightforward way to write new benchmarks in the implementation (C++) and user (Python, R, etc.) languages.
Nightly benchmarking and continuous monitoring. The data produced by this process should be browsable, and it should send “alerts” when there are any performance deviations outside some acceptable tolerance. See https://github.com/airspeed-velocity/asv as one example tool to assist with automating benchmark data collection and analysis