From 1c6b0fd67924b5a690e1053feec7418833ba3535 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Mon, 23 Dec 2024 17:12:50 -0600 Subject: [PATCH] use native parquet reader --- Cargo.lock | 2 + Cargo.toml | 2 + src/file_reader.rs | 46 +++++++++++----- src/main.rs | 131 +++++++++++++++++++++++++++++---------------- src/query_input.rs | 57 +++++--------------- 5 files changed, 134 insertions(+), 104 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b7c5468..40b2f8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2561,6 +2561,7 @@ dependencies = [ "arrow", "arrow-array", "arrow-schema", + "async-trait", "bytes", "chrono", "codee", @@ -2570,6 +2571,7 @@ dependencies = [ "leptos", "leptos-use", "leptos_router", + "object_store", "opendal", "parquet", "serde", diff --git a/Cargo.toml b/Cargo.toml index 4144130..34b94f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,8 @@ leptos-use = { version = "0.14.0", features = [ ] } codee = { version = "0.2", features = ["msgpack_serde"] } leptos_router = { version = "0.7.0", features = ["nightly"] } +object_store = "0.11.1" +async-trait = "0.1.83" [profile.release] strip = true diff --git a/src/file_reader.rs b/src/file_reader.rs index 1b2ae8e..7415383 100644 --- a/src/file_reader.rs +++ b/src/file_reader.rs @@ -2,9 +2,13 @@ use bytes::Bytes; use leptos::prelude::*; use leptos::wasm_bindgen::{prelude::Closure, JsCast}; use leptos_router::hooks::query_signal; +use object_store::path::Path; +use object_store::{ObjectStore, PutPayload}; use opendal::{services::Http, services::S3, Operator}; use web_sys::{js_sys, Url}; +use crate::INMEMORY_STORE; + const S3_ENDPOINT_KEY: &str = "s3_endpoint"; const S3_ACCESS_KEY_ID_KEY: &str = "s3_access_key_id"; const S3_SECRET_KEY_KEY: &str = "s3_secret_key"; @@ -29,6 +33,20 @@ fn save_to_storage(key: &str, value: &str) { } } +async fn update_file( + bytes: Bytes, + bytes_setter: WriteSignal>, + file_name: &String, + file_name_setter: WriteSignal, +) { + let object_store = &*INMEMORY_STORE; + let path = Path::parse(format!("{}.parquet", file_name)).unwrap(); + let payload = PutPayload::from_bytes(bytes.clone()); + object_store.put(&path, payload).await.unwrap(); + bytes_setter.set(Some(bytes)); + file_name_setter.set(file_name.clone()); +} + #[component] pub fn FileReader( set_error_message: WriteSignal>, @@ -142,24 +160,27 @@ pub fn FileReader( let file_reader = web_sys::FileReader::new().unwrap(); let file_reader_clone = file_reader.clone(); + let table_name = file + .name() + .strip_suffix(".parquet") + .unwrap_or(&file.name()) + .to_string(); + let onload = Closure::wrap(Box::new(move |_: web_sys::Event| { + let table_name = table_name.clone(); let result = file_reader_clone.result().unwrap(); let array_buffer = result.dyn_into::().unwrap(); let uint8_array = js_sys::Uint8Array::new(&array_buffer); let bytes = bytes::Bytes::from(uint8_array.to_vec()); - set_file_bytes.set(Some(bytes.clone())); - set_is_folded.set(true); + leptos::task::spawn_local(async move { + update_file(bytes.clone(), set_file_bytes, &table_name, set_file_name).await; + set_is_folded.set(true); + }); }) as Box); file_reader.set_onload(Some(onload.as_ref().unchecked_ref())); file_reader.read_as_array_buffer(&file).unwrap(); onload.forget(); - let table_name = file - .name() - .strip_suffix(".parquet") - .unwrap_or(&file.name()) - .to_string(); - set_file_name.set(table_name); }; let on_url_submit = move || { @@ -182,7 +203,6 @@ pub fn FileReader( .strip_suffix(".parquet") .unwrap_or("uploaded") .to_string(); - set_file_name.set(table_name); leptos::task::spawn_local(async move { let builder = Http::default().endpoint(&endpoint); @@ -194,7 +214,7 @@ pub fn FileReader( match op.read(&path).await { Ok(bs) => { - set_file_bytes.set(Some(bs.to_bytes())); + update_file(bs.to_bytes(), set_file_bytes, &table_name, set_file_name).await; set_is_folded.set(true); } Err(e) => { @@ -226,9 +246,8 @@ pub fn FileReader( .strip_suffix(".parquet") .unwrap_or("uploaded") .to_string(); - set_file_name.set(file_name); - wasm_bindgen_futures::spawn_local(async move { + leptos::task::spawn_local(async move { let cfg = S3::default() .endpoint(&endpoint) .access_key_id(&access_key_id) @@ -241,7 +260,8 @@ pub fn FileReader( let operator = op.finish(); match operator.read(&s3_file_path.get()).await { Ok(bs) => { - set_file_bytes.set(Some(bs.to_bytes())); + update_file(bs.to_bytes(), set_file_bytes, &file_name, set_file_name) + .await; set_is_folded.set(true); } Err(e) => { diff --git a/src/main.rs b/src/main.rs index 8b76683..d711973 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,13 @@ mod schema; use datafusion::physical_plan::ExecutionPlan; use file_reader::{get_stored_value, FileReader}; +use futures::{future::BoxFuture, FutureExt}; use leptos_router::{ components::Router, hooks::{query_signal, use_query_map}, }; +use object_store::memory::InMemory; use query_results::{export_to_csv_inner, export_to_parquet_inner, QueryResults}; use schema::SchemaSection; @@ -16,13 +18,13 @@ mod row_group_column; mod metadata; use metadata::MetadataSection; -use std::sync::Arc; +use std::{ops::Range, sync::Arc, sync::LazyLock}; use arrow::datatypes::SchemaRef; use bytes::Bytes; use leptos::{logging, prelude::*}; use parquet::{ - arrow::parquet_to_arrow_schema, + arrow::{async_reader::AsyncFileReader, parquet_to_arrow_schema}, errors::ParquetError, file::metadata::{ParquetMetaData, ParquetMetaDataReader}, }; @@ -33,6 +35,60 @@ use query_input::{execute_query_inner, QueryInput}; mod settings; use settings::{Settings, ANTHROPIC_API_KEY}; +pub(crate) static INMEMORY_STORE: LazyLock> = + LazyLock::new(|| Arc::new(InMemory::new())); + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct ParquetReader { + bytes: Bytes, + parquet_info: ParquetInfo, +} + +impl ParquetReader { + pub fn new(bytes: Bytes) -> Result { + let mut footer = [0_u8; 8]; + footer.copy_from_slice(&bytes[bytes.len() - 8..]); + let metadata_len = ParquetMetaDataReader::decode_footer(&footer)?; + + let mut metadata_reader = ParquetMetaDataReader::new() + .with_page_indexes(true) + .with_column_indexes(true) + .with_offset_indexes(true); + metadata_reader.try_parse(&bytes)?; + let metadata = metadata_reader.finish()?; + + let parquet_info = ParquetInfo::from_metadata(metadata, metadata_len as u64)?; + + Ok(Self { + bytes, + parquet_info, + }) + } + + fn info(&self) -> &ParquetInfo { + &self.parquet_info + } +} + +impl AsyncFileReader for ParquetReader { + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let rt = ranges.iter().map(|r| self.bytes.slice(r.clone())).collect(); + async move { Ok(rt) }.boxed() + } + + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + let sliced = self.bytes.slice(range); + async move { Ok(sliced) }.boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + async move { Ok(self.parquet_info.metadata.clone()) }.boxed() + } +} + #[derive(Debug, Clone, PartialEq)] struct ParquetInfo { file_size: u64, @@ -70,8 +126,17 @@ impl ParquetInfo { let first_row_group = metadata.row_groups().first(); let first_column = first_row_group.and_then(|rg| rg.columns().first()); - let has_column_index = metadata.column_index().map(|ci| ci.first().map(|c| c.len()> 0)).flatten().unwrap_or(false); - let has_page_index = metadata.offset_index().map(|ci| ci.first().map(|c| c.len()> 0)).flatten().unwrap_or(false); + let has_column_index = metadata + .column_index() + .map(|ci| ci.first().map(|c| c.len() > 0)) + .flatten() + .unwrap_or(false); + let has_page_index = metadata + .offset_index() + .map(|ci| ci.first().map(|c| c.len() > 0)) + .flatten() + .unwrap_or(false); + Ok(Self { file_size: compressed_size, uncompressed_size, @@ -94,23 +159,6 @@ impl ParquetInfo { } } -fn get_parquet_info(bytes: Bytes) -> Result { - let mut footer = [0_u8; 8]; - footer.copy_from_slice(&bytes[bytes.len() - 8..]); - let metadata_len = ParquetMetaDataReader::decode_footer(&footer)?; - - let mut metadata_reader = ParquetMetaDataReader::new() - .with_page_indexes(true) - .with_column_indexes(true) - .with_offset_indexes(true); - metadata_reader.try_parse(&bytes)?; - let metadata = metadata_reader.finish()?; - - let parquet_info = ParquetInfo::from_metadata(metadata, metadata_len as u64)?; - - Ok(parquet_info) -} - fn format_rows(rows: u64) -> String { let mut result = rows.to_string(); let mut i = result.len(); @@ -156,11 +204,9 @@ impl std::fmt::Display for ParquetInfo { async fn execute_query_async( query: String, - bytes: Bytes, table_name: String, - parquet_info: ParquetInfo, ) -> Result<(Vec, Arc), String> { - let (results, physical_plan) = execute_query_inner(&table_name, parquet_info, bytes, &query) + let (results, physical_plan) = execute_query_inner(&table_name, &query) .await .map_err(|e| format!("Failed to execute query: {}", e))?; @@ -182,14 +228,14 @@ fn App() -> impl IntoView { let (show_settings, set_show_settings) = signal(false); let api_key = get_stored_value(ANTHROPIC_API_KEY, ""); - let parquet_info = Memo::new(move |_| { + let parquet_reader = Memo::new(move |_| { file_bytes .get() - .and_then(|bytes| get_parquet_info(bytes.clone()).ok()) + .and_then(|bytes| ParquetReader::new(bytes.clone()).ok()) }); Effect::watch( - parquet_info, + parquet_reader, move |info, _, _| { if let Some(info) = info { match user_input.get() { @@ -197,7 +243,7 @@ fn App() -> impl IntoView { set_user_input.set(Some(user_input)); } None => { - logging::log!("{}", info.to_string()); + logging::log!("{}", info.info().to_string()); let default_query = format!("select * from \"{}\" limit 10", file_name.get_untracked()); set_user_input.set(Some(default_query)); @@ -209,7 +255,7 @@ fn App() -> impl IntoView { ); Effect::watch( - move || (user_input.get(), parquet_info.get()), + move || (user_input.get(), parquet_reader.get()), move |(user_input, parquet), _, _| { let Some(user_input_str) = user_input else { return; @@ -221,12 +267,12 @@ fn App() -> impl IntoView { let user_input = user_input_str.clone(); let api_key = api_key.clone(); leptos::task::spawn_local(async move { - let Some(parquet_info) = parquet_info.get() else { + let Some(parquet_info) = parquet_reader.get() else { return; }; let sql = match query_input::user_input_to_sql( &user_input, - &parquet_info.schema, + &parquet_info.info().schema, &file_name(), &api_key, ) @@ -257,21 +303,12 @@ fn App() -> impl IntoView { return; } - if let Some(bytes) = bytes_opt { - let parquet_info = match parquet_info() { - Some(content) => content, - None => { - set_error_message.set(Some("Failed to get file schema".into())); - return; - } - }; - + if let Some(_bytes) = bytes_opt { let query = query.clone(); let export_to = export_to.clone(); leptos::task::spawn_local(async move { - match execute_query_async(query.clone(), bytes, table_name, parquet_info).await - { + match execute_query_async(query.clone(), table_name).await { Ok((results, physical_plan)) => { set_physical_plan.set(Some(physical_plan)); if let Some(export_to) = export_to { @@ -368,9 +405,9 @@ fn App() -> impl IntoView { file_bytes .get() .map(|_| { - match parquet_info() { + match parquet_reader() { Some(info) => { - if info.row_group_count > 0 { + if info.info().row_group_count > 0 { view! { impl IntoView {
{move || { - let info = parquet_info(); + let info = parquet_reader(); match info { Some(info) => { view! {
- +
- +
} diff --git a/src/query_input.rs b/src/query_input.rs index b2ea904..73cc3d2 100644 --- a/src/query_input.rs +++ b/src/query_input.rs @@ -2,54 +2,25 @@ use std::sync::Arc; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; -use bytes::Bytes; use datafusion::{ error::DataFusionError, - execution::{SendableRecordBatchStream, TaskContext}, - physical_plan::{ - collect, stream::RecordBatchStreamAdapter, streaming::PartitionStream, ExecutionPlan, - }, - prelude::SessionConfig, + execution::object_store::ObjectStoreUrl, + physical_plan::{collect, ExecutionPlan}, + prelude::{ParquetReadOptions, SessionConfig}, }; use leptos::{logging, prelude::*}; use leptos::{ reactive::wrappers::write::SignalSetter, wasm_bindgen::{JsCast, JsValue}, }; -use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use serde_json::json; use wasm_bindgen_futures::JsFuture; use web_sys::{js_sys, Headers, Request, RequestInit, RequestMode, Response}; -use crate::ParquetInfo; - -use futures::StreamExt; -#[derive(Debug)] -struct DummyStreamPartition { - schema: SchemaRef, - bytes: Bytes, -} - -impl PartitionStream for DummyStreamPartition { - fn schema(&self) -> &SchemaRef { - &self.schema - } - - fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { - let parquet_builder = ParquetRecordBatchReaderBuilder::try_new(self.bytes.clone()).unwrap(); - let reader = parquet_builder.build().unwrap(); - Box::pin(RecordBatchStreamAdapter::new( - self.schema.clone(), - futures::stream::iter(reader) - .map(|batch| batch.map_err(|e| DataFusionError::ArrowError(e, None))), - )) - } -} +use crate::INMEMORY_STORE; pub(crate) async fn execute_query_inner( table_name: &str, - parquet_info: ParquetInfo, - data: Bytes, query: &str, ) -> Result<(Vec, Arc), DataFusionError> { let mut config = SessionConfig::new(); @@ -57,17 +28,15 @@ pub(crate) async fn execute_query_inner( let ctx = datafusion::prelude::SessionContext::new_with_config(config); - let schema = parquet_info.schema.clone(); - - let streaming_table = datafusion::datasource::streaming::StreamingTable::try_new( - schema.clone(), - vec![Arc::new(DummyStreamPartition { - schema: schema.clone(), - bytes: data.clone(), - })], - )?; - - ctx.register_table(table_name, Arc::new(streaming_table))?; + let object_store_url = ObjectStoreUrl::parse("mem://").unwrap(); + let object_store = INMEMORY_STORE.clone(); + ctx.register_object_store(object_store_url.as_ref(), object_store); + ctx.register_parquet( + table_name, + &format!("mem:///{}.parquet", table_name), + ParquetReadOptions::default(), + ) + .await?; let plan = ctx.sql(query).await?;