The Ursa Labs team has been creating popular open source data science tools for over a decade.
One of our goals is to empower and accelerate the work of data scientists through more efficient and scalable in-memory computing. Many popular data science tools (such as the popular Python pandas library) in general do not effectively leverage modern hardware (large RAM, multicore CPU, GPU-equipped, etc.). Much effort in the ecosystem has focused more on deep learning and problems related to machine learning, leaving fundamental issues in data access, data manipulation, exploratory data analysis, and feature engineering comparatively underattended.
So far, our work has been concentrated in the Apache Arrow project which has a broader application scope than data science. In time, we may expand to create software artifacts focused more specifically on the data science domain.
In addition to providing the foundation for the technology stack described in this document, the Arrow project serves as a unifying, high-performance data interoperability layer beyond the data science world. Systems using Arrow as a native memory format will be more interoperable with each other and have more opportunities for collaboration and code reuse. As more systems adopt Arrow, Arrow-based data science tools will reap the benefits. Read more about the Arrow project in general at http://arrow.apache.org.
Apache Arrow Platform
The Arrow C++ project provides some of the initial building blocks of this technology stack.
- Standardized in-memory columnar («data frame») format for analytical processing
- Schema and value type descriptions
- Batch and Streaming messaging (RPC or shared memory IPC) with zero-copy access
- Low-level memory management
- Low-level storage / filesystem interfaces, including shared memory
- Columnar format stability & feature hardening
The Arrow columnar memory format has been in development since 2015. The key components of this system (the «specification») are:
- Physical memory layout for primitive and nested types
- Logical type definitions (e.g. “what is a timestamp”)
- Serialized metadata representation (we are using Google’s Flatbuffers project to serialize Arrow metadata)
- Protocol for messaging (described below), which together with the above is generally called the «Arrow binary format» or the «wire format»
We have refined and expanded many of these details since commencing development at the beginning of 2016. As time goes on, we must grow the columnar format responsibly and sustainably to support our own needs as well as the needs of other Arrow users (which are developers of other data systems). The objective is to build a large, robust, and stable Arrow ecosystem.
We plan to work with the Apache Arrow community to reach binary format stability and make a 1.0 release later in 2018 or in 2019. Many users (other OSS projects and companies) have indicated that they will begin to adopt Arrow more wholeheartedly once the binary format is stable.
There are some other things that we have contemplated adding to Arrow (which may increase over time), such as:
- Column-wise compression in binary/on-wire format
- Record batch-level column statistics
- Richer messaging / RPC system
The Arrow specification describes the binary format for serializing schemas and representing units of data (“record batches”, chunks of tables). On top of this binary format, we have developed batch and streaming messaging formats for transmitting datasets.
Thus far, message transport and RPC details have not been fully developed. We propose to implement efficient integration with well-established messaging frameworks, like gRPC and ZeroMQ, to make it easy for our users to send and receive data between themselves and with data access layers supporting these native Arrow RPC protocols (vs. proprietary protocols, like ODBC, that must be deserialized to Arrow format).
Low-level Storage Interfaces
In C++, we have developed abstractions for managing memory lifetime using RAII and smart pointers (http://en.cppreference.com/w/cpp/language/raii). This also permits clean hierarchical memory references, shared buffer ownership and parent-child semantics, copy-on-write, and other benefits.
To interface with the outside world, we must build bridges between low-level IO interfaces and filesystem-like storage layers and Arrow’s C++ memory subsystem. These should support both synchronous and, where relevant, asynchronous access. We wish to maximize throughput from each kind of data source, while providing a consistent, robust, fast, and full featured API for each data source. Among other things, this includes:
- Local files / shared memory
- Memory mapped files
- POSIX shared memory segments (or named shared memory in Windows)
- Normal operating system files
- GPU: CUDA or OpenCL-enabled devices
- Local network interfaces
- UNIX or Windows sockets
- Hadoop Filesystem (HDFS)
- HDFS RPC connectivity using libhdfs
- Amazon S3
- Google Cloud Storage
- Azure Blob Storage
Serialization Formats and other Data Access
With Arrow as our runtime memory format for analytics and interchange, we must be able to convert between various popular storage and raw data input formats, and back.
For file formats, these include, but are not limited to:
- CSV / Delimited Files
- Apache Parquet: Popular columnar file format
- Apache ORC: Another Parquet-esque columnar data support, with good support i
- Apache Avro: Dynamic serialization format designed for RPC in the Hadoop ecosystem
- HDF5: Popular scientific data format for storing large array data
In addition to supporting various file-based data storage formats, we will want to support some other kinds of binary data protocols in popular use
- ODBC: Supported by many SQL databases (Prior art for Arrow-ODBC in https://github.com/blue-yonder/turbodbc)
- SQLite: Bind natively to libsqlite3/4
- Apache Kudu: Distributed columnar data store
- Hive / Impala (HiveServer2 protocol)
Supporting predicate pushdown
Some data sources may benefit from evaluating boolean conditions while reading from the files to avoid deserializing data unnecessarily. Parquet and ORC are great examples – in addition to providing cached statistics about row groups, we can also evaluate simple predicates like FOO > 5 and filter columns as we go rather than after the fact.
Shared Memory / IPC Object Store
In 2017 Apache Arrow acquired a component called Plasma (http://incubator.apache.org/ip-clearance/arrow-plasma-object-store.html), developed to support the Ray project (https://github.com/ray-project/ray), a machine learning development platform from the UC Berkeley RISELab.
The Plasma store can assist with developing applications involving multiple processes that need to share data, which may reside in CPU or GPU memory. Computational processes live separately from the Plasma store, a third party daemon. The processes are able to access data managed by Plasma through zero-copy shared memory access, and so by employing the Arrow columnar format to encode structural information, can describe complex datasets and make them available with minimal serialization overhead.
We wish to provide strong support for managing datasets used by multiple processes living on the CPU or GPU.
Arrow-native Computation Engine
Historically, data manipulation libraries in data science languages like Python and R have been bespoke, vertically integrated systems developed by small groups of developers. These include
- Custom in-memory data representations (example: R’s data frames, pandas’s data representation implemented on top of NumPy arrays)
- Libraries of algorithms targeting that data representation (see pandas’s internal algorithms library: https://github.com/pandas-dev/pandas/tree/master/pandas/_libs )
- Naive eagerly-evaluated “runtime” (i.e. eager function evaluation in the host programming language)
- Data access / IO specific to data representation (see: R’s various data ingest functions, pandas’s equivalent functions)
In some cases, a deferred evaluation system has been created. A good example is dplyr (http://dplyr.tidyverse.org/ and https://github.com/tidyverse/dplyr), which is specific to the R language and R’s data frame representation.
We propose to develop portable, embeddable computation engine in C++ with the following features:
- Core system ships as portable C or C++ shared libraries, with bindings for each host language (Python, R, etc.)
- Runtime in-memory format is Arrow columnar format, and auxiliary data structures that can be described by composing Arrow data structures
- Reusable operator “kernel” library containing functions utilizing Arrow format as input and output. This includes pandas-style array functions as well as SQL-style relational operations (joins, aggregations, etc.)
- Multithreaded graph dataflow-based execution engine for efficient evaluation of lazy data frame expressions created in the host language
- Subgraph compilation using LLVM; optimize commonly operator patterns
- Support for user-defined operators and function kernels
Initially, development will focus on efficient computing on a single node. We would be interested in developing a distributed runtime in the future.
An architecturally similar system that operates on a different data model (numerical multidimensional array data) is the TensorFlow project from Google. Here we are processing data frames, not tensors.
There is some prior art in building an Arrow-based computation engine. One example is Dremio (See http://dremio.com/ and https://github.com/dremio/dremio-oss), which started out as a fork of the Apache Drill SQL-on-Hadoop system.
Arrow-native Operator / Kernel Library
To build an Arrow-native data processing engine, we must implement functions which perform computations on the Arrow memory format without conversion to some other intermediate representation.
The lowest-level unit of computation we call a kernel. A particular kernel
corresponds to an operator, which may be things like “add”. In J and other APL
derivatives, operators are called “verbs”. An operator performs computations on
input data structures having well-defined types. Given an operator and input
Add(Float64, Float64), the kernel is selected which performs the
operation on the particular input types.
Data processing runtimes may break data into chunks and perform multiple kernel
evaluations in separate threads. For example, the operation
x + y can be
computed by any number of threads in parallel.
To build a functionally-complete data processing library such as pandas, we require operator kernel implementations for a wide variety of operations, such as (incomplete listing):
- Type casting; conversions from one data type to another
- Binary mathematical operators: numeric arithmetic, comparisons, bit-wise operations, etc.
- Mathematical functions (like sqrt, exp, log, in NumPy these are called “ufuncs”)
- Aggregation functions (“reductions”)
- Streaming-friendly aggregators (count, sum, mean, min, max, standard deviation, hyperloglog, count min sketch, etc.)
- Order statistics (quantiles, median, etc.)
- Attainment statistics (value at min/max, index of min/max, first crossing point, etc.)
- Aggregation on data resulting from inner kernel or user-defined function (“UDF”) evaluation. For example, “max(length(x))”
- General array functions (such as found in APL / J), such as:
- Integer selection (“take”, called “From” in J)
- Integer assignment (“put”, called “Amend” in J / immutable APL)
- Boolean selection (“take where”)
- Boolean assignment (“put where”)
- Vectorized conditionals / if-else
- Substitute values
- Substitute / fill nulls
- Fill (“pad”) forward / backward
- Sort values
- Compute sort indices (called “argsort” in NumPy, and “grade up” in J)
- Sort array by indirect sort key (see /: dyad in J)
- Repeating values (called “repeat” in NumPy, “Copy” in J)
- String functions (length, uppercase, lowercase, splitting, concatenation, regular expressions, and so forth)
- Hash table-based functions: unique, match (called “index-of” in J), dictionary-encode (called “factorize” in pandas), value counts, and isin (“member of”)
- Categorical data manipulations
- Conform categories
- Incremental dictionary-encode
- Nested data manipulations
- List flatten, nest
- Kernel evaluate (e.g. f(x) for each x in List
- Struct (or union) group or split
For a given operator, there may be many implementations: Non-SIMD CPU, SIMD CPU, GPU (CUDA or OpenCL), etc. The computation engine can select the appropriate kernel implementation at runtime.
In addition to array-based mathematical functions and other manipulations, we must also be able to perform relational algebra (joins) on tables (semantically collections of arrays): inner, outer, left, right, anti, semi.
Logical Operator Graphs
Given an operator and its input types and any parameters, we can resolve its output type. An operator’s output, therefore, can be viewed as an unevaluated input to another operator. Similar to TensorFlow and other graph-based computation engines, we can compose our operators to create computational graphs having well-specified input types and output types.
As an example, consider the expression log(x + 1) - 1. This can be described as the graph:
Subtract ( Log ( Add ( x, 1 ) ) )
Such logical graphs or “programs” do not indicate the execution strategy. In this graph, nodes are operators and edges are data dependencies.
We additionally propose to specify an implementation-independent serialization format for describing such operator graphs. This will eventually permit multiple implementations of execution engines for these graphs to exist. We will probably develop a reference execution engine in C++ as described in the next section.
Dataflow Runtime / Virtual Machine
Note that the most naive “execution engine” or “runtime” is one that evaluates each operator serially, using 1 or more threads, fully materializing the result of each operator and input to subsequent operators. This is what the current pandas implementation does (and many of the other systems being used by data scientists currently):
- Results of operators are fully materialized as temporary arrays
- Most operators are single-threaded
- There is little to no optimization across operators (for example, there are specialized implementations of certain “group by” aggregations)
We propose to develop a faster and more efficient execution engine for our analytical operator graphs. The primary strategies we will use when possible are:
- Avoiding temporary memory allocations, reusing memory in between operators
- Use all available processor cores
- Optimizing task sizes to improve CPU cache efficiency (this is sometimes called “pipelining”)
- Employ SIMD or GPU acceleration when feasible
- Perform cross-operator optimizations
These techniques have been widely studied and employed in production systems in the analytic database world, so little novel research should be necessary.
Note that each operator should be able to evaluate “eagerly” to emulate the existing “one operator at at time” execution approach.
Subgraph Compilation / Code Generation
Operator pipelining (combining multiple tasks into one, elimination of temporaries) yields some of the most significant performance improvements over the current naive approaches.
Runtime code-generation with LLVM for specialized code paths is the next level of performance. This is essentially the approach that the TensorFlow team has taken by building XLA (https://www.tensorflow.org/performance/xla/) for a different class of computations (tensor math / linear algebra)
Most modern query engines feature some amount of code-generation, mostly using LLVM as the compiler infrastructure.
The interpreted / virtual machine approach has some drawbacks which can be remedied through code-generation / JIT compilation with LLVM:
- Portable C++ shared libraries may feature suboptimal code over runtime-generated code from LLVM
- It may not be practical to compile libraries with -march on the target host. What some projects have done is pre-compile non-SIMD,
- Reduced virtual machine and (in some cases) dynamic dispatch overhead in kernel evaluation
- Some classes of analytic optimizations more difficult or impossible to
implement. Classic example ALL(PREDICATE). An example in pandas would be
f(x).all(), where f is some function returning a boolean array. In a JIT’d version, f(x) can be computed in the same loop as the boolean reduction, enabling short-circuiting on the first observation of a false value
APL interpreters, prior to the advent of compiler technology like LLVM, would
partially address compute subgraph acceleration by special casing common
operator patterns (for example, the J expression
&/ a = b might be recognized
by the interpreter and be rewritten as a specialized all-equals operator)
There is substantial prior art in databases and other data processing systems for using runtime code generations to accelerate hot code paths. The Weld project (https://cs.stanford.edu/~matei/papers/2017/cidr_weld.pdf), for example, is being developed currently by the Apache Spark creator’s research group at Stanford to accelerate internal code paths in Spark that form part of the Databricks Runtime (https://databricks.com/product/databricks-runtime).
We must develop comprehensive interoperability with the Python programming language and other widely deployed 3rd party libraries like NumPy and pandas. Priorities will depend on the needs of Python users.
- Conversions between Python built-in data structures and Arrow format
- Conversions between NumPy and pandas data structures and Arrow format
- Python API for declaring operations to be executed by the C++ native execution engine
- Machinery for implementing new Arrow operators in Python, also called “user-defined functions” or UDFs. This may include a NumPy or pandas-based API (for convenience) as well as an Arrow-based API.
- Python bindings / API for other components of the Arrow ecosystem: file formats, IO interfaces, etc.
The R language interface will aim to provide equivalent functionality (but R-flavored) as with the Python bindings, with roadmap being driven by the needs of R users.
- Conversions between R’s built-in data structures (vectors, data frames) and Arrow format
- R API for building and executing computation graphs
- dplyr / tidyverse-compatible interface
- data.table interface
- API for file formats, IO, other Arrow-based components
- Developer Productivity and User Support
Build Systems, Continuous Integration, and Software Delivery
Pursuing this technology roadmap wholeheartedly will require a significant amount of development on build systems, packaging, continuous integration and testing, benchmarking, etc. Failing to adequately support developer productivity and packaging and deployment for users will slow the project’s progress.
In time, we will grow an increasingly large stack of 3rd party C and C++ dependencies for building the core libraries. While the build system and its dependencies may grow more complex, developers and users must be protected from this increased complexity as much as possible.
To put it simply, being a developer on this project must not be painful. In particular, this means that:
- It must be straightforward to set up a development environment on any of the supported platforms (Windows, macOS, and major Linux distributions)
- Build dependencies must be available either as pre-compiled binaries (for either dynamic or static linking) or for source build
- The process for adding new build dependencies must be well-documented
- As of this writing (April 2018), we have been relying on the conda-forge packaging platform (https://conda-forge.org/) and separate Dockerfiles (for non-conda Python builds) for maintaining our binary toolchain for developers. We will have to do better.
Packaging, Releases, Nightly Builds
The workflow for making the software available to normal users must be as painless and streamlined as possible for the developers of the project. This includes:
- Versioned, production release builds on all major platforms (Windows, macOS, major Linux distributions), and any required binary dependencies
- Nightly snapshot builds on all platforms
- Painless, automated installation in downstream consumer languages (Python, R, and so forth)
Maintaining the packaging toolchain and builds is quite a lot of work. As of this writing, there are still many manual steps. We are publishing a limited set of nightly builds, and only for conda users on Linux.
We must develop reliable continuous integration (CI) services to test the software that is able to scale in two key dimensions:
- The number of concurrent developers
- The complexity of the codebase(s)
Thus far, we have been getting by using the Apache Software Foundation’s Travis CI (Linux, macOS) and Appveyor (Windows) capacity for CI.
Downstream Integration Testing
In addition to endogenous testing of the Arrow-based data science library stack, we must have confidence on a nightly or ad hoc basis that each code patch merged does not break downstream users of the projects. This includes other OSS projects like Apache Spark, Dremio, pandas, Dask, and others.
Running such integration tests with downstream systems may be too onerous in general to include with the standard continuous integration processes. We propose to provide a straightforward process for developers to create integration tests which can be added to a nightly validation process, so that any breakages will be made known to the development team within 24 hours.
This nightly process integration testing should also be available for ad hoc validation of «sensitive» patches.
Performance Testing and Benchmarking
In addition to validating the correctness and well-functioning of this software, we must also continuously monitor and improve its performance in different execution environments. Frequently, the performance of critical operations may become degraded or have major regressions by accident, even though the calculations are still correct.Our benchmarking systems must provide:
A well-documented, straightforward way to write new benchmarks in the implementation (C++) and user (Python, R, etc.) languages.
Nightly benchmarking and continuous monitoring. The data produced by this process should be browsable, and it should send “alerts” when there are any performance deviations outside some acceptable tolerance. See https://github.com/airspeed-velocity/asv as one example tool to assist with automating benchmark data collection and analysis