From 07e633ed98e4e8ac74de3a12e82c4a0fb1258d9b Mon Sep 17 00:00:00 2001 From: Kyle Booker Date: Wed, 28 Feb 2024 00:33:15 -0500 Subject: [PATCH 1/7] minor proposal changes --- proposal/presentation.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/proposal/presentation.md b/proposal/presentation.md index 0efec1f..0335f59 100644 --- a/proposal/presentation.md +++ b/proposal/presentation.md @@ -5,7 +5,7 @@ class: invert # Remove this line for light mode paginate: true --- -# Execution Engine: KCS +# Execution Engine: Eggstrain
@@ -105,9 +105,10 @@ Need to spill the data to local disk. # Step 3: Implement operators * TableScan -* FilterProject -* HashAggregation -* HashProbe + HashBuild +* Filter (Completed) +* Projection (Completed) +* HashAggregation (In-Progress) +* HashProbe + HashBuild (In-Progress) * MergeJoin * NestedLoopJoin * OrderBy @@ -218,4 +219,4 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -``` \ No newline at end of file +``` From 69c9a8debf4c6d1bca452f0b04475516c97f2064 Mon Sep 17 00:00:00 2001 From: Kyle Booker Date: Wed, 28 Feb 2024 00:34:51 -0500 Subject: [PATCH 2/7] minor proposal changes pt2 --- proposal/presentation.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proposal/presentation.md b/proposal/presentation.md index 0335f59..5823a1c 100644 --- a/proposal/presentation.md +++ b/proposal/presentation.md @@ -94,7 +94,7 @@ Finalize API with other teams: ![bg right:50% 80%](./images/bufferpool.png) -Need to spill the data to local disk. +Need to spill the data to local disk. This will be done after the first operators are implemented. * Can potentially rip out the [`memory_pool`](https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/index.html) From 13bc967945d63b1b91a551210d8d96ff0c3dbb1e Mon Sep 17 00:00:00 2001 From: Kyle Booker Date: Wed, 28 Feb 2024 14:11:10 -0500 Subject: [PATCH 3/7] Update proposal.md --- proposal/proposal.md | 69 +++++++------------------------------------- 1 file changed, 10 insertions(+), 59 deletions(-) diff --git a/proposal/proposal.md b/proposal/proposal.md index 3f57f21..05a2b76 100644 --- a/proposal/proposal.md +++ b/proposal/proposal.md @@ -13,7 +13,7 @@ The purpose of this project is to create the Execution Engine (EE) for a distrib We will be taking heavy inspiration from [DataFusion](https://arrow.apache.org/datafusion/), [Velox](https://velox-lib.io/), and [InfluxDB](https://github.com/influxdata/influxdb) (which itself is built on top of DataFusion). -There are two subgoals. The first is to develop a functional EE, with a sufficient number of operators working and tested. Since all other components will have to rely on us to see any sort of outputs, it would be ideal if we had naive implementations of operators that just work (even if they are slow). +There are two subgoals. The first is to develop a functional EE, with a sufficient number of operators working and tested. Since all other components will have to rely on us to see any sort of outputs, it would be ideal if we had naive implementations of operators that just work (even if they are inefficient). The second is to add either interesting features or optimize the engine to be more performant (or both). Since it is unlikely that we will outperform any off-the-shelf EEs like DataFusion, we will likely try to test some new feature that these engines do not use themselves. @@ -30,15 +30,14 @@ We will be creating a vectorized push-based EE. This means operators will push b We will implement a subset of the operators that [Velox implements](https://facebookincubator.github.io/velox/develop/operators.html): - TableScan -- FilterProject -- HashAggregation -- HashProbe + HashBuild +- Filter (Completed) +- Project (Completed) +- HashAggregation (In-progress) +- HashProbe + HashBuild (In-progress) - MergeJoin -- NestedLoopJoin -- OrderBy -- TopN -- Limit -- Values +- OrderBy (Completed) +- TopN (Completed) +- Exchange The `trait` / interface to define these operators is unknown right now. We will likely follow whatever DataFusion is outputting from their [`ExecutionPlan::execute()`](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.execute) methods. @@ -56,7 +55,7 @@ The difference is a bit subtle, but it comes down to using `tokio` for interacti There is a Rust-native [`arrow`](https://arrow.apache.org/rust/arrow/index.html) crate that gives us bindings into the [Apache Arrow data format](https://arrow.apache.org/overview/). Notably, it provides a [`RecordBatch`](https://arrow.apache.org/rust/arrow/array/struct.RecordBatch.html) type that can hold vectorized batches of columnar data. This will be the main form of data communicated between the EE and other components, as well as between the operators of the EE itself. -Finally, there is a [`substrait`](https://docs.rs/substrait/latest/substrait/) crate that provides a way to decode query plan fragments encoded in the substrait format. +We will also be making heavy use of [`datafusion`](https://docs.rs/datafusion/latest/datafusion/). Since we are using their physical plan representation we will make heavy use of their proprietary datatypes. We will also use the DataFusion implementations for the operators we do not plan to implement ourselves for the time being. --- @@ -64,33 +63,7 @@ Finally, there is a [`substrait`](https://docs.rs/substrait/latest/substrait/) c Each EE can be treated as a mathematical function. It receives as input a physical plan from the scheduler, and outputs an Apache Arrow `RecordBatch` or `SendableRecordBatchStream`. -The physical plan tree it receives as input has nodes corresponding to one of the operators listed above. These plans will be given to each EE by the Scheduler / Coordinator as [substrait](https://substrait.io/) query plan fragments. More specifically, the EE will parse out the specific nodes in the relational tree, nodes represented in this [`substrait::proto::rel::RelType` enum](https://docs.rs/substrait/latest/substrait/proto/rel/enum.RelType.html). - -```rust -pub enum RelType { - Read(Box), - Filter(Box), - Fetch(Box), - Aggregate(Box), - Sort(Box), - Join(Box), - Project(Box), - Set(SetRel), - ExtensionSingle(Box), - ExtensionMulti(ExtensionMultiRel), - ExtensionLeaf(ExtensionLeafRel), - Cross(Box), - Reference(ReferenceRel), - Write(Box), - Ddl(Box), - HashJoin(Box), - MergeJoin(Box), - NestedLoopJoin(Box), - Window(Box), - Exchange(Box), - Expand(Box), -} -``` +The physical plan tree it receives as input has nodes corresponding to one of the operators listed above. These plans will be given to each EE by the Scheduler / Coordinator as DataFusion query plan fragments. More specifically, the EE will parse out the specific nodes in the relational tree. Once the EE parses the plan and will figure out what data it requires. From there, it will make a high-level request for data it needs from the IO Service (e.g. logical columns from a table). For the first stages of this project, we will just request entire columns of data from specific tables. Later in the semester, we may want more granular columnar data, or even point lookups. @@ -199,8 +172,6 @@ fn create_column_request() -> storage_client::BlobData { Once the EE receives the stream, it has all the information it needs, and can start executing the physical plan from the bottom, pushing the output of operators to the next operator. This way, `tokio` does necessarily have to wait for multiple operators to finish before starting the next level's operator. The execution nodes themselves will be on their own dedicated threads (either `tokio` threads that call into `rayon`, but do not make any IO and `async` calls, or OS threads). -The operator interface will probably end up being something similar to how DataFusion outputs data from their [`ExecutionPlan::execute()`](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.execute) method. - After finishing execution, the EE can notify the scheduler that it has finished executing the query fragment, and it can send the data (in the form of Apache Arrow data) to wherever the scheduler needs it to be sent. This could be to another EE node, the Catalog, or even the Optimizer. --- @@ -241,23 +212,3 @@ The integration test will be TPC-H, or something similar to TPC-H. This is a str > If you are introducing new concepts or giving unintuitive names to components, write them down here. - "Vectorized execution" is the name given to the concept of outputting batches of data. But since there is a `Vec`tor type in Rust, we'll likely be calling everything Batches instead of Vectors. - - - - - - - - - - - - - - - - - - - - From 5b244d6c758f51a9de20500e336280af1fa10d47 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Sun, 3 Mar 2024 23:48:44 -0500 Subject: [PATCH 4/7] New design document draft --- README.md | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..9d8b26d --- /dev/null +++ b/README.md @@ -0,0 +1,82 @@ +# Eggstrain + +`eggstrain` is an discrete Execution Engine targeting analytical workloads. Written in Rust, `eggstrain` is built on top of and inspired by [DataFusion](https://arrow.apache.org/datafusion/). + +`eggstrain` is being built by Connor Tsui, Kyle Booker, and Sarvesh Tandon. + + +## Architectural Design + +`eggstrain` is closely tied to the high-performance asynchronous runtime `tokio`, which implements a work-stealing scheduler. By relying on `tokio` to manage all dataflow, `eggstrain` is able to offload the complexity of managing dataflow between operators to an asynchronous scheduler, while focusing on high parallel performance by leveraging the `rayon` crate. + +`eggstrain` is neither a push- nor pull-based Execution Engine in the traditional sense. Since data flow is asynchronous, the `tokio` scheduler gets to decide whether a call to `execute` from a parent operator results in the parent waiting for data to be pushed from the child operator, or if another operator gets to run while the parent operator yields. More specifically, it is possible that data is pushed from the bottom to the top of a pipeline on a single thread, without any interference, but is is also possible that data gets pushed to a parent operator without the parent operator running for a long time. + +The integration with `tokio` channels, `rayon`, and other crates is explained later in [3rd-party Crates](#3rd-party-crates). + +--- + +### Operators + +`eggstrain` supports a few operators, and there are more to come. + +- `TableScan` +- `Filter` (Completed) +- `Project` (Completed) +- `HashAggregate` (in progress) +- `HashProbe` + `HashBuild` (in progress) +- `MergeJoin` +- `Sort` (Completed) +- `Exchange` + +All intermediate operators (non-leaf operators) must implement either the `UnaryOperator` or `BinaryOperator` trait, depending on the number of logical children an operator has. Special nodes like `TableScan` and `Exchange` are dealt with separately. Both of these traits rely on the `async-trait` crate, as asynchronous sendable trait objects are not stable as of Rust 1.76. + +--- + +### 3rd-party Crates + +`eggstrain` makes heavy use of `tokio` and `rayon`. The `tracing` crate may be implemented in the future for logging and tracking statistics. + +[**`tokio`**](https://tokio.rs/) is a high-performance asynchronous runtime that provides a well-tested work-stealing scheduler over a lightweight thread pool of asynchronous tasks (otherwise known as coroutines). `tokio` is open-source, very well-documented, and has many features and a high degree of support. + +Other than the runtime provided by `tokio::main`, `tokio` provides asynchronous channels that allow data to be sent between threads in a completely thread-safe, efficient manner. `tokio` channels come in 4 flavors, but `eggstrain` only cares about the `oneshot` and `broadcast` (it is possible that in the future, `broadcast` will be replaced with a combination of `mpsc` and an `Exchange` operator). See the [Data Flow](#data-flow) section for more information on how channels are used in `eggstrain`. + +One final thing to note is that the `tokio` scheduler has a feature called a LIFO slot. Since `tokio` has a thread pool that takes work off of a shared work queue, it is possible that in a naive implementation, data is sent to a parent operator, but the parent operator gets placed at the back of the queue, and thus the parent operator does not get to run for a "long" time. If this were to happen, all memory locality would be completely lost, since other operators may run and replace the pages in the CPU cache _and_ TLB. + +The LIFO slot is included to prevent this from happening. When data is passed through a channel, the task on the receiving channel is placed in the LIFO slot. Before worker threads look at the work queue, they will check if there is a task in the LIFO slot, and if there is, they will run that task instead of the first item on the work queue. This reduces the number of cache and TLB misses that might take place if the parent task runs a "long" time after the child task has sent the data upwards. See this [blog](https://tokio.rs/blog/2019-10-scheduler#optimizing-for-message-passing-patterns) from `tokio` for more details. + +--- + +[**`rayon`**](https://docs.rs/rayon/latest/rayon/) is a parallelism crate that allows us to write parallelized and safe code using a OS thread pool with almost no developer cost. As opposed to `tokio`, which uses lightweight tasks (coroutines) which are completely in userspace, `rayon` is much more "heavyweight", as OS threads will incur system calls and other synchronization-related costs. + +The reason `eggstrain` needs both `tokio` and `rayon` is subtle, but it comes down to using `tokio` for interacting with anything that might need to wait for a bit of time without doing computation (the storage client from the I/O Service component, disk reads and writes, and even data movement between threads), and using `rayon` for any CPU-intensive workloads (actual execution and computation). Since `tokio` provides the scheduling for `eggstrain`, all `eggstrain` needs to focus on is making operators run as fast as possible by leveraging `rayon`. + +--- + +There is a Rust-native [`arrow`](https://arrow.apache.org/rust/arrow/index.html) crate that gives bindings into the [Apache Arrow data format](https://arrow.apache.org/overview/). Notably, it provides a [`RecordBatch`](https://arrow.apache.org/rust/arrow/array/struct.RecordBatch.html) type that can hold vectorized batches of columnar data. This will be the main form of intermediate data communicated between `eggstrain` and other components, as well as between the operators of the EE itself. + +`eggstrain` is very similar to and makes heavy use of code from [`datafusion`](https://docs.rs/datafusion/latest/datafusion/). `eggstrain` takes as input a DataFusion [`ExecutionPlan`](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html), and outputs a stream of `RecordBatch`es. + +_Note: Since we are using their physical plan representation we will make heavy use of their proprietary data types. We will also use the DataFusion implementations for the operators we do not plan to implement ourselves for the time being._ + +--- + +### Data Flow + +TODO ASCII diagram + + +### Interface and API + +`eggstrain` can be treated as a mathematical function. It receives as input an `ExecutionPlan` physical plan from the scheduler, and outputs a stream of Apache Arrow `RecordBatch`es, or a `SendableRecordBatchStream`. + +The `ExecutionPlan` that `eggstrain` receives as input has nodes corresponding to one of the operators listed above. These plans will be given to each `eggstrain` instance by a Scheduler / Coordinator as DataFusion query plan fragments. More specifically, `eggstrain` will parse out the specific nodes in the relational DAG, and construct its own DAG of operators, connected by channels instead of pointers. + +Once `eggstrain` parses the plan, it will figure out what data it requires. From there, it will make a high-level request for data it needs from the IO Service (e.g. logical columns from a table). + +TODO something about `TableScan` + +--- + +### Other Sections + +TODO From 31b32001c5dfcad2fe6d45e550cebb43ab30d77f Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Mon, 4 Mar 2024 12:39:25 -0500 Subject: [PATCH 5/7] add ascii diagrams --- README.md | 158 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 132 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 9d8b26d..88f204a 100644 --- a/README.md +++ b/README.md @@ -7,15 +7,14 @@ ## Architectural Design -`eggstrain` is closely tied to the high-performance asynchronous runtime `tokio`, which implements a work-stealing scheduler. By relying on `tokio` to manage all dataflow, `eggstrain` is able to offload the complexity of managing dataflow between operators to an asynchronous scheduler, while focusing on high parallel performance by leveraging the `rayon` crate. +`eggstrain` is closely tied to the high-performance asynchronous runtime `tokio`. By relying on `tokio` to manage all dataflow, `eggstrain` is able to offload the complexity of managing dataflow between operators to an asynchronous scheduler, while focusing on high parallel performance. -`eggstrain` is neither a push- nor pull-based Execution Engine in the traditional sense. Since data flow is asynchronous, the `tokio` scheduler gets to decide whether a call to `execute` from a parent operator results in the parent waiting for data to be pushed from the child operator, or if another operator gets to run while the parent operator yields. More specifically, it is possible that data is pushed from the bottom to the top of a pipeline on a single thread, without any interference, but is is also possible that data gets pushed to a parent operator without the parent operator running for a long time. +`eggstrain` is neither a push- nor pull-based Execution Engine in the traditional sense. Since data flow is asynchronous, the `tokio` scheduler gets to decide whether a call to `execute` from a parent operator results in the parent waiting for data to be pushed from the child operator, or if another operator gets to run while the parent operator yields. More specifically, it is possible that data is pushed all the way from the bottom to the top of a pipeline on a single thread (without any contention from other threads), but is is also possible that data gets pushed to a parent operator without the parent operator running with that data for a long time. The integration with `tokio` channels, `rayon`, and other crates is explained later in [3rd-party Crates](#3rd-party-crates). ---- -### Operators +## Operators `eggstrain` supports a few operators, and there are more to come. @@ -30,53 +29,160 @@ The integration with `tokio` channels, `rayon`, and other crates is explained la All intermediate operators (non-leaf operators) must implement either the `UnaryOperator` or `BinaryOperator` trait, depending on the number of logical children an operator has. Special nodes like `TableScan` and `Exchange` are dealt with separately. Both of these traits rely on the `async-trait` crate, as asynchronous sendable trait objects are not stable as of Rust 1.76. ---- -### 3rd-party Crates +## 3rd-party Crates -`eggstrain` makes heavy use of `tokio` and `rayon`. The `tracing` crate may be implemented in the future for logging and tracking statistics. +`eggstrain` is tightly coupled to `tokio` and `datafusion`. `rayon` and `arrow` are also used heavily in `eggstrain`. The `tracing` crate may be implemented in the future for logging and tracking statistics. -[**`tokio`**](https://tokio.rs/) is a high-performance asynchronous runtime that provides a well-tested work-stealing scheduler over a lightweight thread pool of asynchronous tasks (otherwise known as coroutines). `tokio` is open-source, very well-documented, and has many features and a high degree of support. +[**`tokio`**](https://tokio.rs/) is a high-performance asynchronous runtime that provides a well-tested work-stealing scheduler over a lightweight thread pool of asynchronous tasks (otherwise known as coroutines). `tokio` is open-source, very well-documented, and has many features and a high degree of community support. Other than the runtime provided by `tokio::main`, `tokio` provides asynchronous channels that allow data to be sent between threads in a completely thread-safe, efficient manner. `tokio` channels come in 4 flavors, but `eggstrain` only cares about the `oneshot` and `broadcast` (it is possible that in the future, `broadcast` will be replaced with a combination of `mpsc` and an `Exchange` operator). See the [Data Flow](#data-flow) section for more information on how channels are used in `eggstrain`. -One final thing to note is that the `tokio` scheduler has a feature called a LIFO slot. Since `tokio` has a thread pool that takes work off of a shared work queue, it is possible that in a naive implementation, data is sent to a parent operator, but the parent operator gets placed at the back of the queue, and thus the parent operator does not get to run for a "long" time. If this were to happen, all memory locality would be completely lost, since other operators may run and replace the pages in the CPU cache _and_ TLB. +One final thing to note is that the `tokio` scheduler has a feature called a LIFO slot. Since `tokio` has a thread pool that takes work off of a shared work queue, it is possible that in a naive implementation, data is sent upwards to a parent task, but the parent task gets placed at the back of the queue, and thus the parent task does not get to run for a "long" time ("long" meaning however long all other queued tasks take to run). If this were to happen, all memory locality would be completely lost, since other tasks may run and replace the pages in the CPU cache _and_ TLB. -The LIFO slot is included to prevent this from happening. When data is passed through a channel, the task on the receiving channel is placed in the LIFO slot. Before worker threads look at the work queue, they will check if there is a task in the LIFO slot, and if there is, they will run that task instead of the first item on the work queue. This reduces the number of cache and TLB misses that might take place if the parent task runs a "long" time after the child task has sent the data upwards. See this [blog](https://tokio.rs/blog/2019-10-scheduler#optimizing-for-message-passing-patterns) from `tokio` for more details. +The LIFO slot is implemented on `tokio`'s channels to prevent this from happening. When data is passed through a channel, the task on the receiving end of the channel is placed in the LIFO slot. Before worker threads look at the work queue, they will check if there is a task in the LIFO slot, and if there is, they will run that task instead of the first item on the work queue. This reduces the number of cache and TLB misses that might take place if the parent task runs a "long" time after the child task has sent the data upwards. See this [blog](https://tokio.rs/blog/2019-10-scheduler#optimizing-for-message-passing-patterns) from `tokio` for more details. --- [**`rayon`**](https://docs.rs/rayon/latest/rayon/) is a parallelism crate that allows us to write parallelized and safe code using a OS thread pool with almost no developer cost. As opposed to `tokio`, which uses lightweight tasks (coroutines) which are completely in userspace, `rayon` is much more "heavyweight", as OS threads will incur system calls and other synchronization-related costs. -The reason `eggstrain` needs both `tokio` and `rayon` is subtle, but it comes down to using `tokio` for interacting with anything that might need to wait for a bit of time without doing computation (the storage client from the I/O Service component, disk reads and writes, and even data movement between threads), and using `rayon` for any CPU-intensive workloads (actual execution and computation). Since `tokio` provides the scheduling for `eggstrain`, all `eggstrain` needs to focus on is making operators run as fast as possible by leveraging `rayon`. +The reason `eggstrain` needs both `tokio` and `rayon` is subtle, but it comes down to using `tokio` for interacting with anything that might need to wait for a bit of time without doing computation (the storage client from the I/O Service component, disk reads and writes, and even data movement between threads), and using `rayon` for any CPU-intensive workloads (actual execution and computation). Since `tokio` provides the scheduling for `eggstrain`, all `eggstrain` needs to focus on is making operators run as fast as possible. --- -There is a Rust-native [`arrow`](https://arrow.apache.org/rust/arrow/index.html) crate that gives bindings into the [Apache Arrow data format](https://arrow.apache.org/overview/). Notably, it provides a [`RecordBatch`](https://arrow.apache.org/rust/arrow/array/struct.RecordBatch.html) type that can hold vectorized batches of columnar data. This will be the main form of intermediate data communicated between `eggstrain` and other components, as well as between the operators of the EE itself. +There is a Rust-native [`arrow`](https://arrow.apache.org/rust/arrow/index.html) crate that gives bindings into the [Apache Arrow data format](https://arrow.apache.org/overview/). Notably, it provides a [`RecordBatch`](https://arrow.apache.org/rust/arrow/array/struct.RecordBatch.html) type that can hold vectorized batches of columnar data. This will be the main form of intermediate data communicated between `eggstrain` and other components, as well as between the operators of `eggstrain` itself. `eggstrain` is very similar to and makes heavy use of code from [`datafusion`](https://docs.rs/datafusion/latest/datafusion/). `eggstrain` takes as input a DataFusion [`ExecutionPlan`](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html), and outputs a stream of `RecordBatch`es. -_Note: Since we are using their physical plan representation we will make heavy use of their proprietary data types. We will also use the DataFusion implementations for the operators we do not plan to implement ourselves for the time being._ - ---- - -### Data Flow - -TODO ASCII diagram - - -### Interface and API - -`eggstrain` can be treated as a mathematical function. It receives as input an `ExecutionPlan` physical plan from the scheduler, and outputs a stream of Apache Arrow `RecordBatch`es, or a `SendableRecordBatchStream`. +_Note: Since `eggstrain` is using `datafusion`'s physical plan representation, we will make heavy use of their proprietary data types (for now). We will also use the DataFusion implementations for the operators we do not plan to implement ourselves._ + + +## Data Flow + +`eggstrain` takes as input a `datafusion` `ExecutionPlan` as input, which is a DAG of physical plan nodes, connected by pointers (`Arc`). `eggstrain` mimics this DAG by creating a DAG of operator tasks, connected by `tokio` channels. + +Every intermediate operator (non-leaf) runs in its own `tokio` task (lightweight thread). Depending on if the logical node has 1 or 2 children (a unary or binary node), the task will have one or two input channel receivers over the `RecordBatch` type. For non-pipeline breakers, the operator will process `RecordBatch`es immediately, and then send the data up via an input channel sender, which sends the data to the parent operator's task. For pipeline breaker such as `Sort` or `HashBuild`, data is gathered until all input data has been sent from the child, and from there all data is processed together. + + +### `Project` Example + +Here is a diagram of the `Project` operator, an intermediate operator that is _not_ a pipeline breaker. + +```text + ▲ + │ + │ + │ + │ RecordBatch + Project Operator Task │ + │ + ┌──────────┬────┴──────┬────────────┐ + │ │ │ │ + │ │ tx │ │ + │ │ │ │ + │ └────▲──────┘ │ + │ │ │ + │ │ + │ project(record_batch); │ + │ ▲ │ + │ │ │ + │ ┌────┴──────┐ │ + │ │ │ │ + │ │ rx │ │ + │ │ │ │ + └──────────┴────▲──────┴────────────┘ + │ + │ + │ + │ RecordBatch + │ + │ + │ +``` + +Since `Project` is not a pipeline breaker, it will apply a projection to every `RecordBatch` it encounters from `rx`, and then immediately send it to its parent via `tx`. + + +### `Sort` Example + +Here is a diagram of the `Sort` operator, which _is_ a pipeline breaker (as it needs all input data before it can sort anything). + +```text + │ + │ + ▲ Asynchronous Land │ Synchronous Land + │ (tokio) │ (rayon) + │ │ + │ RecordBatch │ + Sort Operator Task │ │ + │ │ +┌──────────────────┬────┴─────┬──────────────────────┐ │ +│ │ │ │ │ +│ │ tx │ │ │ +│ │ │ │ │ +│ └────▲─────┘ │ │ Rayon Thread +│ │ ┌─────────────┤ │ ┌─────────────┐ +│ │ │ │ │ │ │ +│ └──────────────┤ oneshot::rx ◄───┼──┤ oneshot::tx │ +│ │ │ │ │ │ +│ └─────────────┤ │ ├──────▲──────┤ +│ Temporary Buffer │ │ │ │ │ +│ ┌───────────────────────────┐ │ │ │ │ │ +│ │ │ │ │ │ │ │ +│ │ Buffer of RecordBatches │ rayon::spawn(); │ │ │ │ +│ │ ├────────────────────┼───┼──► sort(data); │ +│ │ Stored in memory and │ │ │ │ │ +│ │ spilled to disk │ │ │ └─────────────┘ +│ │ │ │ │ +│ └─────────────▲─────────────┘ │ │ +│ │ │ │ +│ ┌──────┘ │ │ +│ │ │ │ +│ ┌────┴─────┐ │ │ +│ │ │ │ │ +│ │ rx │ │ │ +│ │ │ │ │ +└─────┴────▲─────┴───────────────────────────────────┘ │ + │ │ + │ │ + │ RecordBatch │ + │ │ + │ +``` + + +`Sort` will continuously poll its child task by calling `rx.recv()` on the input channel, until it returns a `RecvError::Closed`. All `RecordBatch`es are stored in some temporary buffer, either completely in memory, or with some contents spilled to temporary files on disk. See the [async I/O](#asynchronous-io-and-disk-manager) section for more details. + +Once all data has been buffered, the `Sort` operator can begin the sorting phase. Instead of computing the computationally heavy sort algorithm in the `tokio` task, `Sort` will instead spawn a `rayon`-managed OS thread to offload the computational workload to a more "heavyweight" worker. Since `tokio` is a completely userspace thread library, this is incredibly important, since work done by a `Sort` operator within a `tokio` task would cause other `tokio` tasks to block on the `sort` computation. By instead sending the data to a synchronous OS thread, other lightweight tasks do not need to block on the `sort` computation, and the CPU running those threads can choose to run something else. + +This is only possible via the `oneshot::channel`, which provides a synchronous method `tx.send()` to send data from a synchronous thread to an asynchronous task. On the synchronous side, the OS thread only needs to call `tx.send(sorted_data)`, while the asynchronous task only needs to call `rx.await`. This is key, since the `.await` call will allow the `tokio` scheduler to run other tasks while the `sort` is running. + +Once the `rx.await` call returns with the sorted data, `Sort` can send data up to the next operator in fixed-sized batches. + + +## Asynchronous I/O and Buffer Pool Manager + +TODO: This section is in progress. + +Since `eggstrain` is so tightly coupled with an asynchronous runtime, it makes sense to use that to our benefit. Typically, we have to wait for disk reads and writes to finish before we can get access to a memory page. In synchronous land, there is nothing that we can do but yield while the OS runs a system call. The OS is allowed to context switch while this disk I/O is happening, meaning the read/write might take a "long" time to return. + +It is very possible we could see large performance benefits from using asynchronous I/O for a buffer pool manager. More specifically, we could implement a buffer pool manager that has both synchronous (blocking) and asynchronous methods for reading and writing, which would give as plenty of flexibility in when we want to use one or the other. + +This would probably take more time, but it might also be a great contribution we could make, since as of right now, our individual operators will never be able to outperform `datafusion`. + + +## Interface and API + +`eggstrain` can be treated as a mathematical function. It receives as input an `ExecutionPlan` physical plan, and outputs a stream of Apache Arrow `RecordBatch`es, or a `SendableRecordBatchStream`. The `ExecutionPlan` that `eggstrain` receives as input has nodes corresponding to one of the operators listed above. These plans will be given to each `eggstrain` instance by a Scheduler / Coordinator as DataFusion query plan fragments. More specifically, `eggstrain` will parse out the specific nodes in the relational DAG, and construct its own DAG of operators, connected by channels instead of pointers. Once `eggstrain` parses the plan, it will figure out what data it requires. From there, it will make a high-level request for data it needs from the IO Service (e.g. logical columns from a table). -TODO something about `TableScan` +TODO something about `TableScan`, need to talk more with the I/O teams. --- -### Other Sections +## Other Sections TODO From eabc4b44e59790763a706ed7e5fc73eb91161d9b Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Mon, 4 Mar 2024 12:42:02 -0500 Subject: [PATCH 6/7] update --- README.md | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 88f204a..a47e877 100644 --- a/README.md +++ b/README.md @@ -151,7 +151,7 @@ Here is a diagram of the `Sort` operator, which _is_ a pipeline breaker (as it n ``` -`Sort` will continuously poll its child task by calling `rx.recv()` on the input channel, until it returns a `RecvError::Closed`. All `RecordBatch`es are stored in some temporary buffer, either completely in memory, or with some contents spilled to temporary files on disk. See the [async I/O](#asynchronous-io-and-disk-manager) section for more details. +`Sort` will continuously poll its child task by calling `rx.recv()` on the input channel, until it returns a `RecvError::Closed`. All `RecordBatch`es are stored in some temporary buffer, either completely in memory, or with some contents spilled to temporary files on disk. See the [async I/O](#asynchronous-io-and-buffer-pool-manager) section for more details. Once all data has been buffered, the `Sort` operator can begin the sorting phase. Instead of computing the computationally heavy sort algorithm in the `tokio` task, `Sort` will instead spawn a `rayon`-managed OS thread to offload the computational workload to a more "heavyweight" worker. Since `tokio` is a completely userspace thread library, this is incredibly important, since work done by a `Sort` operator within a `tokio` task would cause other `tokio` tasks to block on the `sort` computation. By instead sending the data to a synchronous OS thread, other lightweight tasks do not need to block on the `sort` computation, and the CPU running those threads can choose to run something else. @@ -159,10 +159,15 @@ This is only possible via the `oneshot::channel`, which provides a synchronous m Once the `rx.await` call returns with the sorted data, `Sort` can send data up to the next operator in fixed-sized batches. +
+
+
-## Asynchronous I/O and Buffer Pool Manager +# In Progress Sections + +All of these sections are incomplete, but give some insight into our plans for the future. -TODO: This section is in progress. +## Asynchronous I/O and Buffer Pool Manager Since `eggstrain` is so tightly coupled with an asynchronous runtime, it makes sense to use that to our benefit. Typically, we have to wait for disk reads and writes to finish before we can get access to a memory page. In synchronous land, there is nothing that we can do but yield while the OS runs a system call. The OS is allowed to context switch while this disk I/O is happening, meaning the read/write might take a "long" time to return. @@ -180,9 +185,3 @@ The `ExecutionPlan` that `eggstrain` receives as input has nodes corresponding t Once `eggstrain` parses the plan, it will figure out what data it requires. From there, it will make a high-level request for data it needs from the IO Service (e.g. logical columns from a table). TODO something about `TableScan`, need to talk more with the I/O teams. - ---- - -## Other Sections - -TODO From cf3a750c187331c2f9ec59544f4e5392ef275d1e Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Mon, 4 Mar 2024 12:47:10 -0500 Subject: [PATCH 7/7] include readme in eggstrain --- eggstrain/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/eggstrain/src/lib.rs b/eggstrain/src/lib.rs index f4cc39a..bba75fd 100644 --- a/eggstrain/src/lib.rs +++ b/eggstrain/src/lib.rs @@ -1,3 +1,5 @@ +#![doc = include_str!("../../README.md")] + use arrow::record_batch::RecordBatch; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*;