diff --git a/src/file_reader.rs b/src/file_reader.rs index 7415383..de93b4c 100644 --- a/src/file_reader.rs +++ b/src/file_reader.rs @@ -1,4 +1,3 @@ -use bytes::Bytes; use leptos::prelude::*; use leptos::wasm_bindgen::{prelude::Closure, JsCast}; use leptos_router::hooks::query_signal; @@ -7,7 +6,7 @@ use object_store::{ObjectStore, PutPayload}; use opendal::{services::Http, services::S3, Operator}; use web_sys::{js_sys, Url}; -use crate::INMEMORY_STORE; +use crate::{ParquetTable, INMEMORY_STORE}; const S3_ENDPOINT_KEY: &str = "s3_endpoint"; const S3_ACCESS_KEY_ID_KEY: &str = "s3_access_key_id"; @@ -34,24 +33,20 @@ fn save_to_storage(key: &str, value: &str) { } async fn update_file( - bytes: Bytes, - bytes_setter: WriteSignal>, - file_name: &String, - file_name_setter: WriteSignal, + parquet_table: ParquetTable, + parquet_table_setter: WriteSignal>, ) { let object_store = &*INMEMORY_STORE; - let path = Path::parse(format!("{}.parquet", file_name)).unwrap(); - let payload = PutPayload::from_bytes(bytes.clone()); + let path = Path::parse(format!("{}.parquet", parquet_table.table_name)).unwrap(); + let payload = PutPayload::from_bytes(parquet_table.bytes.clone()); object_store.put(&path, payload).await.unwrap(); - bytes_setter.set(Some(bytes)); - file_name_setter.set(file_name.clone()); + parquet_table_setter.set(Some(parquet_table)); } #[component] pub fn FileReader( set_error_message: WriteSignal>, - set_file_bytes: WriteSignal>, - set_file_name: WriteSignal, + set_parquet_table: WriteSignal>, ) -> impl IntoView { let (active_tab, set_active_tab) = query_signal::("tab"); @@ -172,8 +167,9 @@ pub fn FileReader( let array_buffer = result.dyn_into::().unwrap(); let uint8_array = js_sys::Uint8Array::new(&array_buffer); let bytes = bytes::Bytes::from(uint8_array.to_vec()); + let parquet_table = ParquetTable { bytes, table_name }; leptos::task::spawn_local(async move { - update_file(bytes.clone(), set_file_bytes, &table_name, set_file_name).await; + update_file(parquet_table, set_parquet_table).await; set_is_folded.set(true); }); }) as Box); @@ -214,7 +210,11 @@ pub fn FileReader( match op.read(&path).await { Ok(bs) => { - update_file(bs.to_bytes(), set_file_bytes, &table_name, set_file_name).await; + let parquet_table = ParquetTable { + bytes: bs.to_bytes(), + table_name, + }; + update_file(parquet_table, set_parquet_table).await; set_is_folded.set(true); } Err(e) => { @@ -260,8 +260,11 @@ pub fn FileReader( let operator = op.finish(); match operator.read(&s3_file_path.get()).await { Ok(bs) => { - update_file(bs.to_bytes(), set_file_bytes, &file_name, set_file_name) - .await; + let parquet_table = ParquetTable { + bytes: bs.to_bytes(), + table_name: file_name, + }; + update_file(parquet_table, set_parquet_table).await; set_is_folded.set(true); } Err(e) => { diff --git a/src/main.rs b/src/main.rs index ebd0ea5..0d0c68e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ mod schema; use datafusion::physical_plan::ExecutionPlan; use file_reader::{get_stored_value, FileReader}; -use futures::{future::BoxFuture, FutureExt}; use leptos_router::{ components::Router, hooks::{query_signal, use_query_map}, @@ -18,13 +17,13 @@ mod row_group_column; mod metadata; use metadata::MetadataSection; -use std::{ops::Range, sync::Arc, sync::LazyLock}; +use std::{sync::Arc, sync::LazyLock}; use arrow::datatypes::SchemaRef; use bytes::Bytes; use leptos::{logging, prelude::*}; use parquet::{ - arrow::{async_reader::AsyncFileReader, parquet_to_arrow_schema}, + arrow::parquet_to_arrow_schema, errors::ParquetError, file::metadata::{ParquetMetaData, ParquetMetaDataReader}, }; @@ -40,13 +39,14 @@ pub(crate) static INMEMORY_STORE: LazyLock> = #[derive(Debug, Clone, PartialEq)] pub(crate) struct ParquetReader { - bytes: Bytes, + parquet_table: ParquetTable, parquet_info: ParquetInfo, } impl ParquetReader { - pub fn new(bytes: Bytes) -> Result { + pub fn new(table: ParquetTable) -> Result { let mut footer = [0_u8; 8]; + let bytes = &table.bytes; footer.copy_from_slice(&bytes[bytes.len() - 8..]); let metadata_len = ParquetMetaDataReader::decode_footer(&footer)?; @@ -54,13 +54,13 @@ impl ParquetReader { .with_page_indexes(true) .with_column_indexes(true) .with_offset_indexes(true); - metadata_reader.try_parse(&bytes)?; + metadata_reader.try_parse(bytes)?; let metadata = metadata_reader.finish()?; let parquet_info = ParquetInfo::from_metadata(metadata, metadata_len as u64)?; Ok(Self { - bytes, + parquet_table: table, parquet_info, }) } @@ -70,26 +70,11 @@ impl ParquetReader { } fn bytes(&self) -> &Bytes { - &self.bytes - } -} - -impl AsyncFileReader for ParquetReader { - fn get_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, parquet::errors::Result>> { - let rt = ranges.iter().map(|r| self.bytes.slice(r.clone())).collect(); - async move { Ok(rt) }.boxed() - } - - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - let sliced = self.bytes.slice(range); - async move { Ok(sliced) }.boxed() + &self.parquet_table.bytes } - fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { - async move { Ok(self.parquet_info.metadata.clone()) }.boxed() + fn table_name(&self) -> &str { + &self.parquet_table.table_name } } @@ -215,74 +200,85 @@ async fn execute_query_async( Ok((results, physical_plan)) } +#[derive(Debug, Clone, PartialEq)] +struct ParquetTable { + bytes: Bytes, + table_name: String, +} + #[component] fn App() -> impl IntoView { let (error_message, set_error_message) = signal(Option::::None); - let (file_bytes, set_file_bytes) = signal(None::); + let (parquet_table, set_parquet_table) = signal(None::); let (user_input, set_user_input) = query_signal::("query"); let export_to = use_query_map().with(|map| map.get("export").map(|v| v.to_string())); let (sql_query, set_sql_query) = signal(String::new()); let (query_result, set_query_result) = signal(Vec::::new()); - let (file_name, set_file_name) = signal(String::from("uploaded")); let (physical_plan, set_physical_plan) = signal(None::>); let (show_settings, set_show_settings) = signal(false); - let api_key = get_stored_value(ANTHROPIC_API_KEY, ""); let parquet_reader = Memo::new(move |_| { - file_bytes + parquet_table .get() - .and_then(|bytes| ParquetReader::new(bytes.clone()).ok()) + .and_then(|table| ParquetReader::new(table).ok()) }); Effect::watch( parquet_reader, - move |info, _, _| { - if let Some(info) = info { - match user_input.get() { + move |reader, old_reader, _| { + let Some(reader) = reader else { return }; + + match old_reader.flatten() { + Some(old_reader) => { + if old_reader.table_name() != reader.table_name() { + let default_query = + format!("select * from \"{}\" limit 10", reader.table_name()); + set_user_input.set(Some(default_query)); + } + } + None => match user_input.get() { Some(user_input) => { - set_user_input.set(Some(user_input)); + // force update + let new_input = format!("{} ", user_input); + set_user_input.set(Some(new_input)); } None => { - logging::log!("{}", info.info().to_string()); + logging::log!("{}", reader.info().to_string()); let default_query = - format!("select * from \"{}\" limit 10", file_name.get_untracked()); + format!("select * from \"{}\" limit 10", reader.table_name()); set_user_input.set(Some(default_query)); } - } + }, } }, true, ); Effect::watch( - move || (user_input.get(), parquet_reader.get()), - move |(user_input, parquet), _, _| { + user_input, + move |user_input, _, _| { let Some(user_input_str) = user_input else { return; }; - if parquet.is_none() { - return; - } - set_user_input.set(Some(user_input_str.clone())); + let user_input = user_input_str.clone(); - let api_key = api_key.clone(); leptos::task::spawn_local(async move { - let Some(parquet_info) = parquet_reader.get() else { + let Some(parquet_reader) = parquet_reader.get() else { return; }; + let api_key = get_stored_value(ANTHROPIC_API_KEY, ""); let sql = match query_input::user_input_to_sql( &user_input, - &parquet_info.info().schema, - &file_name(), + &parquet_reader.info().schema, + parquet_reader.table_name(), &api_key, ) .await { Ok(response) => response, Err(e) => { - logging::log!("{}", e); set_error_message.set(Some(e)); return; } @@ -297,17 +293,17 @@ fn App() -> impl IntoView { Effect::watch( sql_query, move |query, _, _| { - let bytes_opt = file_bytes.get(); - let table_name = file_name.get(); + let bytes_opt = parquet_table.get(); set_error_message.set(None); if query.trim().is_empty() { return; } - if let Some(_bytes) = bytes_opt { + if let Some(parquet_table) = bytes_opt { let query = query.clone(); let export_to = export_to.clone(); + let table_name = parquet_table.table_name; leptos::task::spawn_local(async move { match execute_query_async(query.clone(), table_name).await { @@ -374,8 +370,7 @@ fn App() -> impl IntoView {
@@ -404,7 +399,7 @@ fn App() -> impl IntoView { }}
{move || { - file_bytes + parquet_table .get() .map(|_| { match parquet_reader() { diff --git a/src/metadata.rs b/src/metadata.rs index fb7d4b0..a7cf4e7 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -109,8 +109,8 @@ pub fn MetadataSection(parquet_reader: super::ParquetReader) -> impl IntoView { Some( view! {
- +
}, ) diff --git a/src/query_results.rs b/src/query_results.rs index e37ac4f..397b5c9 100644 --- a/src/query_results.rs +++ b/src/query_results.rs @@ -17,6 +17,8 @@ use web_sys::wasm_bindgen::JsCast; pub(crate) fn export_to_csv_inner(query_result: &[RecordBatch]) { let mut csv_data = String::new(); + + // Headers remain the same as they're based on schema let headers: Vec = query_result[0] .schema() .fields() @@ -25,20 +27,26 @@ pub(crate) fn export_to_csv_inner(query_result: &[RecordBatch]) { .collect(); csv_data.push_str(&headers.join(",")); csv_data.push('\n'); - for row_idx in 0..query_result[0].num_rows() { - let row: Vec = (0..query_result[0].num_columns()) - .map(|col_idx| { - let column = query_result[0].column(col_idx); - if column.is_null(row_idx) { - "NULL".to_string() - } else { - column.as_ref().value_to_string(row_idx) - } - }) - .collect(); - csv_data.push_str(&row.join(",")); - csv_data.push('\n'); + + // Process all record batches + for batch in query_result { + for row_idx in 0..batch.num_rows() { + let row: Vec = (0..batch.num_columns()) + .map(|col_idx| { + let column = batch.column(col_idx); + if column.is_null(row_idx) { + "NULL".to_string() + } else { + column.as_ref().value_to_string(row_idx) + } + }) + .collect(); + csv_data.push_str(&row.join(",")); + csv_data.push('\n'); + } } + + // Rest of the function remains the same let blob = web_sys::Blob::new_with_str_sequence(&js_sys::Array::of1(&csv_data.into())).unwrap(); let url = web_sys::Url::create_object_url_with_blob(&blob).unwrap(); let a = web_sys::window() @@ -54,10 +62,8 @@ pub(crate) fn export_to_csv_inner(query_result: &[RecordBatch]) { } pub(crate) fn export_to_parquet_inner(query_result: &[RecordBatch]) { - // Create an in-memory buffer to write the parquet data let mut buf = Vec::new(); - // Create a parquet writer with LZ4 compression let props = parquet::file::properties::WriterProperties::builder() .set_compression(parquet::basic::Compression::LZ4) .build(); @@ -65,15 +71,15 @@ pub(crate) fn export_to_parquet_inner(query_result: &[RecordBatch]) { let mut writer = ArrowWriter::try_new(&mut buf, query_result[0].schema(), Some(props)) .expect("Failed to create parquet writer"); - // Write the record batch - writer - .write(&query_result[0]) - .expect("Failed to write record batch"); + // Write all record batches + for batch in query_result { + writer + .write(batch) + .expect("Failed to write record batch"); + } - // Close the writer to flush the data writer.close().expect("Failed to close writer"); - // Create a blob from the buffer let array = js_sys::Uint8Array::from(&buf[..]); let blob = web_sys::Blob::new_with_u8_array_sequence(&js_sys::Array::of1(&array)) .expect("Failed to create blob"); diff --git a/src/row_group_column.rs b/src/row_group_column.rs index 1caf9fa..c55d6d0 100644 --- a/src/row_group_column.rs +++ b/src/row_group_column.rs @@ -136,13 +136,11 @@ pub fn RowGroupColumn(parquet_reader: super::ParquetReader) -> impl IntoView { SerializedPageReader::new(parquet_bytes, col, row_count as usize, None).unwrap(); let mut page_info = Vec::new(); - for page in page_reader { - if let Ok(page) = page { - let page_type = page.page_type(); - let page_size = page.buffer().len() as f64 / 1024.0; - let num_values = page.num_values(); - page_info.push((page_type, page_size, num_values, page.encoding())); - } + for page in page_reader.flatten() { + let page_type = page.page_type(); + let page_size = page.buffer().len() as f64 / 1024.0; + let num_values = page.num_values(); + page_info.push((page_type, page_size, num_values, page.encoding())); } ( @@ -227,7 +225,8 @@ pub fn RowGroupColumn(parquet_reader: super::ParquetReader) -> impl IntoView { .set(event_target_value(&ev).parse::().unwrap_or(0)) } > - {parquet_reader.info() + {parquet_reader + .info() .schema .fields .iter() @@ -244,13 +243,7 @@ pub fn RowGroupColumn(parquet_reader: super::ParquetReader) -> impl IntoView {
{move || { - let ( - compressed_size, - uncompressed_size, - compression, - statistics, - page_info, - ) = column_info(); + let (compressed_size, uncompressed_size, compression, statistics, page_info) = column_info(); view! {
@@ -300,12 +293,8 @@ pub fn RowGroupColumn(parquet_reader: super::ParquetReader) -> impl IntoView { {format!("{} KB", size.round() as i64)} - - {format_rows(values as u64)} - - - {format!("{:?}", encoding)} - + {format_rows(values as u64)} + {format!("{:?}", encoding)}
} })