Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.
The foundational technologies in Ballista are:
- Apache Arrow memory model and compute kernels for efficient processing of data.
- Apache Arrow Flight Protocol for efficient data transfer between processes.
- Google Protocol Buffers for serializing query plans.
- Docker for packaging up executors along with user-defined code.
Ballista can be deployed as a standalone cluster and also supports Kubernetes. In either case, the scheduler can be configured to use etcd as a backing store to (eventually) provide redundancy in the case of a scheduler failing.
This crate is tested with the latest stable version of Rust. We do not currrently test against other, older versions of the Rust compiler.
There are numerous ways to start a Ballista cluster, including support for Docker and Kubernetes. For full documentation, refer to the deployment section of the Ballista User Guide
A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.
cargo install --locked ballista-scheduler
cargo install --locked ballista-executor
With these crates installed, it is now possible to start a scheduler process.
RUST_LOG=info ballista-scheduler
The scheduler will bind to port 50050
by default.
Next, start an executor processes in a new terminal session with the specified concurrency level.
RUST_LOG=info ballista-executor -c 4
The executor will bind to port 50051
by default. Additional executors can be started by
manually specifying a bind port. For example:
RUST_LOG=info ballista-executor --bind-port 50052 -c 4
Ballista provides a BallistaContext
as a starting point for creating queries. DataFrames can be created
by invoking the read_csv
, read_parquet
, and sql
methods.
To build a simple ballista example, run the following command to add the dependencies to your Cargo.toml
file:
cargo add ballista datafusion tokio
The following example runs a simple aggregate SQL query against a Parquet file (yellow_tripdata_2022-01.parquet
) from the
New York Taxi and Limousine Commission
data set. Download the file and add it to the testdata
folder before running the example.
use ballista::prelude::*;
use datafusion::common::Result;
use datafusion::prelude::{col, SessionContext, ParquetReadOptions};
use datafusion::functions_aggregate::{min_max::min, min_max::max, sum::sum, average::avg};
#[tokio::main]
async fn main() -> Result<()> {
// connect to Ballista scheduler
let ctx = SessionContext::remote("df://localhost:50050").await?;
let filename = "testdata/yellow_tripdata_2022-01.parquet";
// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["passenger_count", "fare_amount"])?
.aggregate(
vec![col("passenger_count")],
vec![
min(col("fare_amount")),
max(col("fare_amount")),
avg(col("fare_amount")),
sum(col("fare_amount")),
],
)?
.sort(vec![col("passenger_count").sort(true, true)])?;
// this is equivalent to the following SQL
// SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
// FROM tripdata
// GROUP BY passenger_count
// ORDER BY passenger_count
// print the results
df.show().await?;
Ok(())
}
The output should look similar to the following table.
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
| passenger_count | MIN(?table?.fare_amount) | MAX(?table?.fare_amount) | AVG(?table?.fare_amount) | SUM(?table?.fare_amount) |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
| | -159.5 | 285.2 | 17.60577640099004 | 1258865.829999991 |
| 0 | -115 | 500 | 11.794859107585335 | 614052.1600000001 |
| 1 | -480 | 401092.32 | 12.61028389876563 | 22623542.879999973 |
| 2 | -250 | 640.5 | 13.79501011585127 | 4732047.139999998 |
| 3 | -130 | 480 | 13.473184817311106 | 1139427.2400000002 |
| 4 | -250 | 464 | 14.232650547832726 | 502711.4499999997 |
| 5 | -52 | 668 | 12.160378472086954 | 624289.51 |
| 6 | -52 | 252.5 | 12.576583325529857 | 402916 |
| 7 | 7 | 79 | 61.77777777777778 | 556 |
| 8 | 8.3 | 115 | 79.9125 | 639.3 |
| 9 | 9.3 | 96.5 | 65.26666666666667 | 195.8 |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
More examples can be found in the arrow-ballista repository.