From 62341930f9460341371ab013abce1cf9287b6546 Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Fri, 3 Jan 2025 20:03:55 -0800 Subject: [PATCH] Chore: update wasm-supported crates --- datafusion/wasmtest/Cargo.toml | 9 ++++- datafusion/wasmtest/README.md | 12 +++++- datafusion/wasmtest/src/lib.rs | 70 +++++++++++++++++++++++++++++----- 3 files changed, 79 insertions(+), 12 deletions(-) diff --git a/datafusion/wasmtest/Cargo.toml b/datafusion/wasmtest/Cargo.toml index 69b9bd61a341..26fe4f796fbb 100644 --- a/datafusion/wasmtest/Cargo.toml +++ b/datafusion/wasmtest/Cargo.toml @@ -46,11 +46,18 @@ datafusion = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-expr-common = { workspace = true } datafusion-optimizer = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-physical-plan = { workspace = true } datafusion-sql = { workspace = true } - +datafusion-physical-expr-common = { workspace = true } +datafusion-functions = { workspace = true } +datafusion-functions-aggregate = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } +datafusion-functions-table = { workspace = true } +datafusion-catalog = { workspace = true } +datafusion-common-runtime = { workspace = true } # getrandom must be compiled with js feature getrandom = { version = "0.2.8", features = ["js"] } diff --git a/datafusion/wasmtest/README.md b/datafusion/wasmtest/README.md index 2e525ee909f3..8843eed697ec 100644 --- a/datafusion/wasmtest/README.md +++ b/datafusion/wasmtest/README.md @@ -87,5 +87,13 @@ The following DataFusion crates are verified to work in a wasm-pack environment - `datafusion-physical-expr` - `datafusion-physical-plan` - `datafusion-sql` - -The difficulty with getting the remaining DataFusion crates compiled to WASM is that they have non-optional dependencies on the [`parquet`](https://docs.rs/crate/parquet/) crate with its default features enabled. Several of the default parquet crate features require native dependencies that are not compatible with WASM, in particular the `lz4` and `zstd` features. If we can arrange our feature flags to make it possible to depend on parquet with these features disabled, then it should be possible to compile the core `datafusion` crate to WASM as well. +- `datafusion-expr-common` +- `datafusion-physical-expr-common` +- `datafusion-functions` +- `datafusion-functions-aggregate` +- `datafusion-functions-aggregate-common` +- `datafusion-functions-table` +- `datafusion-catalog` +- `datafusion-common-runtime` + +The `datafusion-ffi` crate cannot compile for the wasm32-unknown-unknown target because it relies on lzma-sys, which depends on native C libraries (liblzma). The wasm32-unknown-unknown target lacks a standard C library (stdlib.h) and POSIX-like environment, preventing the native code from being compiled. diff --git a/datafusion/wasmtest/src/lib.rs b/datafusion/wasmtest/src/lib.rs index 54b662514c88..c7c620d1be3a 100644 --- a/datafusion/wasmtest/src/lib.rs +++ b/datafusion/wasmtest/src/lib.rs @@ -26,7 +26,6 @@ use datafusion_sql::sqlparser::dialect::GenericDialect; use datafusion_sql::sqlparser::parser::Parser; use std::sync::Arc; use wasm_bindgen::prelude::*; - pub fn set_panic_hook() { // When the `console_error_panic_hook` feature is enabled, we can call the // `set_panic_hook` function at least once during initialization, and then @@ -77,7 +76,14 @@ pub fn basic_parse() { #[cfg(test)] mod test { use super::*; - use datafusion::execution::context::SessionContext; + use datafusion::{ + arrow::{ + array::{ArrayRef, Int32Array, RecordBatch, StringArray}, + datatypes::{DataType, Field, Schema}, + }, + datasource::MemTable, + execution::context::SessionContext, + }; use datafusion_execution::{ config::SessionConfig, disk_manager::DiskManagerConfig, runtime_env::RuntimeEnvBuilder, @@ -95,19 +101,21 @@ mod test { basic_parse(); } - #[wasm_bindgen_test(unsupported = tokio::test)] - async fn basic_execute() { - let sql = "SELECT 2 + 2;"; - - // Execute SQL (using datafusion) + fn get_ctx() -> Arc { let rt = RuntimeEnvBuilder::new() .with_disk_manager(DiskManagerConfig::Disabled) .build_arc() .unwrap(); let session_config = SessionConfig::new().with_target_partitions(1); - let session_context = - Arc::new(SessionContext::new_with_config_rt(session_config, rt)); + Arc::new(SessionContext::new_with_config_rt(session_config, rt)) + } + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn basic_execute() { + let sql = "SELECT 2 + 2;"; + + // Execute SQL (using datafusion) + let session_context = get_ctx(); let statement = DFParser::parse_sql(sql).unwrap().pop_back().unwrap(); let logical_plan = session_context @@ -124,4 +132,48 @@ mod test { let task_ctx = session_context.task_ctx(); let _ = collect(physical_plan, task_ctx).await.unwrap(); } + + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn basic_df_function_execute() { + let sql = "SELECT abs(-1.0);"; + let statement = DFParser::parse_sql(sql).unwrap().pop_back().unwrap(); + let ctx = get_ctx(); + let logical_plan = ctx.state().statement_to_plan(statement).await.unwrap(); + let data_frame = ctx.execute_logical_plan(logical_plan).await.unwrap(); + let physical_plan = data_frame.create_physical_plan().await.unwrap(); + + let task_ctx = ctx.task_ctx(); + let _ = collect(physical_plan, task_ctx).await.unwrap(); + } + + #[wasm_bindgen_test(unsupported = tokio::test)] + async fn test_basic_aggregate() { + let sql = + "SELECT FIRST_VALUE(value) OVER (ORDER BY id) as first_val FROM test_table;"; + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, false), + ])); + + let data: Vec = vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(StringArray::from(vec!["a"])), + ]; + + let batch = RecordBatch::try_new(schema.clone(), data).unwrap(); + let table = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap(); + + let ctx = get_ctx(); + ctx.register_table("test_table", Arc::new(table)).unwrap(); + + let statement = DFParser::parse_sql(sql).unwrap().pop_back().unwrap(); + + let logical_plan = ctx.state().statement_to_plan(statement).await.unwrap(); + let data_frame = ctx.execute_logical_plan(logical_plan).await.unwrap(); + let physical_plan = data_frame.create_physical_plan().await.unwrap(); + + let task_ctx = ctx.task_ctx(); + let _ = collect(physical_plan, task_ctx).await.unwrap(); + } }