Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Update proposal.md
Browse files Browse the repository at this point in the history
  • Loading branch information
ktbooker committed Feb 28, 2024
1 parent 69c9a8d commit 13bc967
Showing 1 changed file with 10 additions and 59 deletions.
69 changes: 10 additions & 59 deletions proposal/proposal.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The purpose of this project is to create the Execution Engine (EE) for a distrib

We will be taking heavy inspiration from [DataFusion](https://arrow.apache.org/datafusion/), [Velox](https://velox-lib.io/), and [InfluxDB](https://github.com/influxdata/influxdb) (which itself is built on top of DataFusion).

There are two subgoals. The first is to develop a functional EE, with a sufficient number of operators working and tested. Since all other components will have to rely on us to see any sort of outputs, it would be ideal if we had naive implementations of operators that just work (even if they are slow).
There are two subgoals. The first is to develop a functional EE, with a sufficient number of operators working and tested. Since all other components will have to rely on us to see any sort of outputs, it would be ideal if we had naive implementations of operators that just work (even if they are inefficient).

The second is to add either interesting features or optimize the engine to be more performant (or both). Since it is unlikely that we will outperform any off-the-shelf EEs like DataFusion, we will likely try to test some new feature that these engines do not use themselves.

Expand All @@ -30,15 +30,14 @@ We will be creating a vectorized push-based EE. This means operators will push b

We will implement a subset of the operators that [Velox implements](https://facebookincubator.github.io/velox/develop/operators.html):
- TableScan
- FilterProject
- HashAggregation
- HashProbe + HashBuild
- Filter (Completed)
- Project (Completed)
- HashAggregation (In-progress)
- HashProbe + HashBuild (In-progress)
- MergeJoin
- NestedLoopJoin
- OrderBy
- TopN
- Limit
- Values
- OrderBy (Completed)
- TopN (Completed)
- Exchange

The `trait` / interface to define these operators is unknown right now. We will likely follow whatever DataFusion is outputting from their [`ExecutionPlan::execute()`](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.execute) methods.

Expand All @@ -56,41 +55,15 @@ The difference is a bit subtle, but it comes down to using `tokio` for interacti

There is a Rust-native [`arrow`](https://arrow.apache.org/rust/arrow/index.html) crate that gives us bindings into the [Apache Arrow data format](https://arrow.apache.org/overview/). Notably, it provides a [`RecordBatch`](https://arrow.apache.org/rust/arrow/array/struct.RecordBatch.html) type that can hold vectorized batches of columnar data. This will be the main form of data communicated between the EE and other components, as well as between the operators of the EE itself.

Finally, there is a [`substrait`](https://docs.rs/substrait/latest/substrait/) crate that provides a way to decode query plan fragments encoded in the substrait format.
We will also be making heavy use of [`datafusion`](https://docs.rs/datafusion/latest/datafusion/). Since we are using their physical plan representation we will make heavy use of their proprietary datatypes. We will also use the DataFusion implementations for the operators we do not plan to implement ourselves for the time being.

---

### Interface and API

Each EE can be treated as a mathematical function. It receives as input a physical plan from the scheduler, and outputs an Apache Arrow `RecordBatch` or `SendableRecordBatchStream`.

The physical plan tree it receives as input has nodes corresponding to one of the operators listed above. These plans will be given to each EE by the Scheduler / Coordinator as [substrait](https://substrait.io/) query plan fragments. More specifically, the EE will parse out the specific nodes in the relational tree, nodes represented in this [`substrait::proto::rel::RelType` enum](https://docs.rs/substrait/latest/substrait/proto/rel/enum.RelType.html).

```rust
pub enum RelType {
Read(Box<ReadRel>),
Filter(Box<FilterRel>),
Fetch(Box<FetchRel>),
Aggregate(Box<AggregateRel>),
Sort(Box<SortRel>),
Join(Box<JoinRel>),
Project(Box<ProjectRel>),
Set(SetRel),
ExtensionSingle(Box<ExtensionSingleRel>),
ExtensionMulti(ExtensionMultiRel),
ExtensionLeaf(ExtensionLeafRel),
Cross(Box<CrossRel>),
Reference(ReferenceRel),
Write(Box<WriteRel>),
Ddl(Box<DdlRel>),
HashJoin(Box<HashJoinRel>),
MergeJoin(Box<MergeJoinRel>),
NestedLoopJoin(Box<NestedLoopJoinRel>),
Window(Box<ConsistentPartitionWindowRel>),
Exchange(Box<ExchangeRel>),
Expand(Box<ExpandRel>),
}
```
The physical plan tree it receives as input has nodes corresponding to one of the operators listed above. These plans will be given to each EE by the Scheduler / Coordinator as DataFusion query plan fragments. More specifically, the EE will parse out the specific nodes in the relational tree.

Once the EE parses the plan and will figure out what data it requires. From there, it will make a high-level request for data it needs from the IO Service (e.g. logical columns from a table). For the first stages of this project, we will just request entire columns of data from specific tables. Later in the semester, we may want more granular columnar data, or even point lookups.

Expand Down Expand Up @@ -199,8 +172,6 @@ fn create_column_request() -> storage_client::BlobData {

Once the EE receives the stream, it has all the information it needs, and can start executing the physical plan from the bottom, pushing the output of operators to the next operator. This way, `tokio` does necessarily have to wait for multiple operators to finish before starting the next level's operator. The execution nodes themselves will be on their own dedicated threads (either `tokio` threads that call into `rayon`, but do not make any IO and `async` calls, or OS threads).

The operator interface will probably end up being something similar to how DataFusion outputs data from their [`ExecutionPlan::execute()`](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.execute) method.

After finishing execution, the EE can notify the scheduler that it has finished executing the query fragment, and it can send the data (in the form of Apache Arrow data) to wherever the scheduler needs it to be sent. This could be to another EE node, the Catalog, or even the Optimizer.

---
Expand Down Expand Up @@ -241,23 +212,3 @@ The integration test will be TPC-H, or something similar to TPC-H. This is a str
> If you are introducing new concepts or giving unintuitive names to components, write them down here.
- "Vectorized execution" is the name given to the concept of outputting batches of data. But since there is a `Vec`tor type in Rust, we'll likely be calling everything Batches instead of Vectors.




















0 comments on commit 13bc967

Please sign in to comment.