Skip to content

Commit

Permalink
Chore: update wasm-supported crates
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Jan 4, 2025
1 parent ab9ff56 commit 6234193
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 12 deletions.
9 changes: 8 additions & 1 deletion datafusion/wasmtest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,18 @@ datafusion = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-expr-common = { workspace = true }
datafusion-optimizer = { workspace = true, default-features = true }
datafusion-physical-expr = { workspace = true, default-features = true }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }

datafusion-physical-expr-common = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-functions-table = { workspace = true }
datafusion-catalog = { workspace = true }
datafusion-common-runtime = { workspace = true }
# getrandom must be compiled with js feature
getrandom = { version = "0.2.8", features = ["js"] }

Expand Down
12 changes: 10 additions & 2 deletions datafusion/wasmtest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,13 @@ The following DataFusion crates are verified to work in a wasm-pack environment
- `datafusion-physical-expr`
- `datafusion-physical-plan`
- `datafusion-sql`

The difficulty with getting the remaining DataFusion crates compiled to WASM is that they have non-optional dependencies on the [`parquet`](https://docs.rs/crate/parquet/) crate with its default features enabled. Several of the default parquet crate features require native dependencies that are not compatible with WASM, in particular the `lz4` and `zstd` features. If we can arrange our feature flags to make it possible to depend on parquet with these features disabled, then it should be possible to compile the core `datafusion` crate to WASM as well.
- `datafusion-expr-common`
- `datafusion-physical-expr-common`
- `datafusion-functions`
- `datafusion-functions-aggregate`
- `datafusion-functions-aggregate-common`
- `datafusion-functions-table`
- `datafusion-catalog`
- `datafusion-common-runtime`

The `datafusion-ffi` crate cannot compile for the wasm32-unknown-unknown target because it relies on lzma-sys, which depends on native C libraries (liblzma). The wasm32-unknown-unknown target lacks a standard C library (stdlib.h) and POSIX-like environment, preventing the native code from being compiled.
70 changes: 61 additions & 9 deletions datafusion/wasmtest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use datafusion_sql::sqlparser::dialect::GenericDialect;
use datafusion_sql::sqlparser::parser::Parser;
use std::sync::Arc;
use wasm_bindgen::prelude::*;

pub fn set_panic_hook() {
// When the `console_error_panic_hook` feature is enabled, we can call the
// `set_panic_hook` function at least once during initialization, and then
Expand Down Expand Up @@ -77,7 +76,14 @@ pub fn basic_parse() {
#[cfg(test)]
mod test {
use super::*;
use datafusion::execution::context::SessionContext;
use datafusion::{
arrow::{
array::{ArrayRef, Int32Array, RecordBatch, StringArray},
datatypes::{DataType, Field, Schema},
},
datasource::MemTable,
execution::context::SessionContext,
};
use datafusion_execution::{
config::SessionConfig, disk_manager::DiskManagerConfig,
runtime_env::RuntimeEnvBuilder,
Expand All @@ -95,19 +101,21 @@ mod test {
basic_parse();
}

#[wasm_bindgen_test(unsupported = tokio::test)]
async fn basic_execute() {
let sql = "SELECT 2 + 2;";

// Execute SQL (using datafusion)
fn get_ctx() -> Arc<SessionContext> {
let rt = RuntimeEnvBuilder::new()
.with_disk_manager(DiskManagerConfig::Disabled)
.build_arc()
.unwrap();
let session_config = SessionConfig::new().with_target_partitions(1);
let session_context =
Arc::new(SessionContext::new_with_config_rt(session_config, rt));
Arc::new(SessionContext::new_with_config_rt(session_config, rt))
}
#[wasm_bindgen_test(unsupported = tokio::test)]
async fn basic_execute() {
let sql = "SELECT 2 + 2;";

// Execute SQL (using datafusion)

let session_context = get_ctx();
let statement = DFParser::parse_sql(sql).unwrap().pop_back().unwrap();

let logical_plan = session_context
Expand All @@ -124,4 +132,48 @@ mod test {
let task_ctx = session_context.task_ctx();
let _ = collect(physical_plan, task_ctx).await.unwrap();
}

#[wasm_bindgen_test(unsupported = tokio::test)]
async fn basic_df_function_execute() {
let sql = "SELECT abs(-1.0);";
let statement = DFParser::parse_sql(sql).unwrap().pop_back().unwrap();
let ctx = get_ctx();
let logical_plan = ctx.state().statement_to_plan(statement).await.unwrap();
let data_frame = ctx.execute_logical_plan(logical_plan).await.unwrap();
let physical_plan = data_frame.create_physical_plan().await.unwrap();

let task_ctx = ctx.task_ctx();
let _ = collect(physical_plan, task_ctx).await.unwrap();
}

#[wasm_bindgen_test(unsupported = tokio::test)]
async fn test_basic_aggregate() {
let sql =
"SELECT FIRST_VALUE(value) OVER (ORDER BY id) as first_val FROM test_table;";

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Utf8, false),
]));

let data: Vec<ArrayRef> = vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(StringArray::from(vec!["a"])),
];

let batch = RecordBatch::try_new(schema.clone(), data).unwrap();
let table = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();

let ctx = get_ctx();
ctx.register_table("test_table", Arc::new(table)).unwrap();

let statement = DFParser::parse_sql(sql).unwrap().pop_back().unwrap();

let logical_plan = ctx.state().statement_to_plan(statement).await.unwrap();
let data_frame = ctx.execute_logical_plan(logical_plan).await.unwrap();
let physical_plan = data_frame.create_physical_plan().await.unwrap();

let task_ctx = ctx.task_ctx();
let _ = collect(physical_plan, task_ctx).await.unwrap();
}
}

0 comments on commit 6234193

Please sign in to comment.