Skip to content

Commit

Permalink
use native parquet reader
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Dec 23, 2024
1 parent e69b4a2 commit 1c6b0fd
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 104 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ leptos-use = { version = "0.14.0", features = [
] }
codee = { version = "0.2", features = ["msgpack_serde"] }
leptos_router = { version = "0.7.0", features = ["nightly"] }
object_store = "0.11.1"
async-trait = "0.1.83"

[profile.release]
strip = true
Expand Down
46 changes: 33 additions & 13 deletions src/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ use bytes::Bytes;
use leptos::prelude::*;
use leptos::wasm_bindgen::{prelude::Closure, JsCast};
use leptos_router::hooks::query_signal;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
use opendal::{services::Http, services::S3, Operator};
use web_sys::{js_sys, Url};

use crate::INMEMORY_STORE;

const S3_ENDPOINT_KEY: &str = "s3_endpoint";
const S3_ACCESS_KEY_ID_KEY: &str = "s3_access_key_id";
const S3_SECRET_KEY_KEY: &str = "s3_secret_key";
Expand All @@ -29,6 +33,20 @@ fn save_to_storage(key: &str, value: &str) {
}
}

async fn update_file(
bytes: Bytes,
bytes_setter: WriteSignal<Option<Bytes>>,
file_name: &String,
file_name_setter: WriteSignal<String>,
) {
let object_store = &*INMEMORY_STORE;
let path = Path::parse(format!("{}.parquet", file_name)).unwrap();
let payload = PutPayload::from_bytes(bytes.clone());
object_store.put(&path, payload).await.unwrap();
bytes_setter.set(Some(bytes));
file_name_setter.set(file_name.clone());
}

#[component]
pub fn FileReader(
set_error_message: WriteSignal<Option<String>>,
Expand Down Expand Up @@ -142,24 +160,27 @@ pub fn FileReader(
let file_reader = web_sys::FileReader::new().unwrap();
let file_reader_clone = file_reader.clone();

let table_name = file
.name()
.strip_suffix(".parquet")
.unwrap_or(&file.name())
.to_string();

let onload = Closure::wrap(Box::new(move |_: web_sys::Event| {
let table_name = table_name.clone();
let result = file_reader_clone.result().unwrap();
let array_buffer = result.dyn_into::<js_sys::ArrayBuffer>().unwrap();
let uint8_array = js_sys::Uint8Array::new(&array_buffer);
let bytes = bytes::Bytes::from(uint8_array.to_vec());
set_file_bytes.set(Some(bytes.clone()));
set_is_folded.set(true);
leptos::task::spawn_local(async move {
update_file(bytes.clone(), set_file_bytes, &table_name, set_file_name).await;
set_is_folded.set(true);
});
}) as Box<dyn FnMut(_)>);

file_reader.set_onload(Some(onload.as_ref().unchecked_ref()));
file_reader.read_as_array_buffer(&file).unwrap();
onload.forget();
let table_name = file
.name()
.strip_suffix(".parquet")
.unwrap_or(&file.name())
.to_string();
set_file_name.set(table_name);
};

let on_url_submit = move || {
Expand All @@ -182,7 +203,6 @@ pub fn FileReader(
.strip_suffix(".parquet")
.unwrap_or("uploaded")
.to_string();
set_file_name.set(table_name);

leptos::task::spawn_local(async move {
let builder = Http::default().endpoint(&endpoint);
Expand All @@ -194,7 +214,7 @@ pub fn FileReader(

match op.read(&path).await {
Ok(bs) => {
set_file_bytes.set(Some(bs.to_bytes()));
update_file(bs.to_bytes(), set_file_bytes, &table_name, set_file_name).await;
set_is_folded.set(true);
}
Err(e) => {
Expand Down Expand Up @@ -226,9 +246,8 @@ pub fn FileReader(
.strip_suffix(".parquet")
.unwrap_or("uploaded")
.to_string();
set_file_name.set(file_name);

wasm_bindgen_futures::spawn_local(async move {
leptos::task::spawn_local(async move {
let cfg = S3::default()
.endpoint(&endpoint)
.access_key_id(&access_key_id)
Expand All @@ -241,7 +260,8 @@ pub fn FileReader(
let operator = op.finish();
match operator.read(&s3_file_path.get()).await {
Ok(bs) => {
set_file_bytes.set(Some(bs.to_bytes()));
update_file(bs.to_bytes(), set_file_bytes, &file_name, set_file_name)
.await;
set_is_folded.set(true);
}
Err(e) => {
Expand Down
131 changes: 84 additions & 47 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
mod schema;
use datafusion::physical_plan::ExecutionPlan;
use file_reader::{get_stored_value, FileReader};
use futures::{future::BoxFuture, FutureExt};
use leptos_router::{
components::Router,
hooks::{query_signal, use_query_map},
};

use object_store::memory::InMemory;
use query_results::{export_to_csv_inner, export_to_parquet_inner, QueryResults};
use schema::SchemaSection;

Expand All @@ -16,13 +18,13 @@ mod row_group_column;
mod metadata;
use metadata::MetadataSection;

use std::sync::Arc;
use std::{ops::Range, sync::Arc, sync::LazyLock};

use arrow::datatypes::SchemaRef;
use bytes::Bytes;
use leptos::{logging, prelude::*};
use parquet::{
arrow::parquet_to_arrow_schema,
arrow::{async_reader::AsyncFileReader, parquet_to_arrow_schema},
errors::ParquetError,
file::metadata::{ParquetMetaData, ParquetMetaDataReader},
};
Expand All @@ -33,6 +35,60 @@ use query_input::{execute_query_inner, QueryInput};
mod settings;
use settings::{Settings, ANTHROPIC_API_KEY};

pub(crate) static INMEMORY_STORE: LazyLock<Arc<InMemory>> =
LazyLock::new(|| Arc::new(InMemory::new()));

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ParquetReader {
bytes: Bytes,
parquet_info: ParquetInfo,
}

impl ParquetReader {
pub fn new(bytes: Bytes) -> Result<Self> {
let mut footer = [0_u8; 8];
footer.copy_from_slice(&bytes[bytes.len() - 8..]);
let metadata_len = ParquetMetaDataReader::decode_footer(&footer)?;

let mut metadata_reader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_column_indexes(true)
.with_offset_indexes(true);
metadata_reader.try_parse(&bytes)?;
let metadata = metadata_reader.finish()?;

let parquet_info = ParquetInfo::from_metadata(metadata, metadata_len as u64)?;

Ok(Self {
bytes,
parquet_info,
})
}

fn info(&self) -> &ParquetInfo {
&self.parquet_info
}
}

impl AsyncFileReader for ParquetReader {
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
let rt = ranges.iter().map(|r| self.bytes.slice(r.clone())).collect();
async move { Ok(rt) }.boxed()
}

fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
let sliced = self.bytes.slice(range);
async move { Ok(sliced) }.boxed()
}

fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
async move { Ok(self.parquet_info.metadata.clone()) }.boxed()
}
}

#[derive(Debug, Clone, PartialEq)]
struct ParquetInfo {
file_size: u64,
Expand Down Expand Up @@ -70,8 +126,17 @@ impl ParquetInfo {
let first_row_group = metadata.row_groups().first();
let first_column = first_row_group.and_then(|rg| rg.columns().first());

let has_column_index = metadata.column_index().map(|ci| ci.first().map(|c| c.len()> 0)).flatten().unwrap_or(false);
let has_page_index = metadata.offset_index().map(|ci| ci.first().map(|c| c.len()> 0)).flatten().unwrap_or(false);
let has_column_index = metadata
.column_index()
.map(|ci| ci.first().map(|c| c.len() > 0))
.flatten()
.unwrap_or(false);
let has_page_index = metadata
.offset_index()
.map(|ci| ci.first().map(|c| c.len() > 0))
.flatten()
.unwrap_or(false);

Ok(Self {
file_size: compressed_size,
uncompressed_size,
Expand All @@ -94,23 +159,6 @@ impl ParquetInfo {
}
}

fn get_parquet_info(bytes: Bytes) -> Result<ParquetInfo, ParquetError> {
let mut footer = [0_u8; 8];
footer.copy_from_slice(&bytes[bytes.len() - 8..]);
let metadata_len = ParquetMetaDataReader::decode_footer(&footer)?;

let mut metadata_reader = ParquetMetaDataReader::new()
.with_page_indexes(true)
.with_column_indexes(true)
.with_offset_indexes(true);
metadata_reader.try_parse(&bytes)?;
let metadata = metadata_reader.finish()?;

let parquet_info = ParquetInfo::from_metadata(metadata, metadata_len as u64)?;

Ok(parquet_info)
}

fn format_rows(rows: u64) -> String {
let mut result = rows.to_string();
let mut i = result.len();
Expand Down Expand Up @@ -156,11 +204,9 @@ impl std::fmt::Display for ParquetInfo {

async fn execute_query_async(
query: String,
bytes: Bytes,
table_name: String,
parquet_info: ParquetInfo,
) -> Result<(Vec<arrow::array::RecordBatch>, Arc<dyn ExecutionPlan>), String> {
let (results, physical_plan) = execute_query_inner(&table_name, parquet_info, bytes, &query)
let (results, physical_plan) = execute_query_inner(&table_name, &query)
.await
.map_err(|e| format!("Failed to execute query: {}", e))?;

Expand All @@ -182,22 +228,22 @@ fn App() -> impl IntoView {
let (show_settings, set_show_settings) = signal(false);
let api_key = get_stored_value(ANTHROPIC_API_KEY, "");

let parquet_info = Memo::new(move |_| {
let parquet_reader = Memo::new(move |_| {
file_bytes
.get()
.and_then(|bytes| get_parquet_info(bytes.clone()).ok())
.and_then(|bytes| ParquetReader::new(bytes.clone()).ok())
});

Effect::watch(
parquet_info,
parquet_reader,
move |info, _, _| {
if let Some(info) = info {
match user_input.get() {
Some(user_input) => {
set_user_input.set(Some(user_input));
}
None => {
logging::log!("{}", info.to_string());
logging::log!("{}", info.info().to_string());
let default_query =
format!("select * from \"{}\" limit 10", file_name.get_untracked());
set_user_input.set(Some(default_query));
Expand All @@ -209,7 +255,7 @@ fn App() -> impl IntoView {
);

Effect::watch(
move || (user_input.get(), parquet_info.get()),
move || (user_input.get(), parquet_reader.get()),
move |(user_input, parquet), _, _| {
let Some(user_input_str) = user_input else {
return;
Expand All @@ -221,12 +267,12 @@ fn App() -> impl IntoView {
let user_input = user_input_str.clone();
let api_key = api_key.clone();
leptos::task::spawn_local(async move {
let Some(parquet_info) = parquet_info.get() else {
let Some(parquet_info) = parquet_reader.get() else {
return;
};
let sql = match query_input::user_input_to_sql(
&user_input,
&parquet_info.schema,
&parquet_info.info().schema,
&file_name(),
&api_key,
)
Expand Down Expand Up @@ -257,21 +303,12 @@ fn App() -> impl IntoView {
return;
}

if let Some(bytes) = bytes_opt {
let parquet_info = match parquet_info() {
Some(content) => content,
None => {
set_error_message.set(Some("Failed to get file schema".into()));
return;
}
};

if let Some(_bytes) = bytes_opt {
let query = query.clone();
let export_to = export_to.clone();

leptos::task::spawn_local(async move {
match execute_query_async(query.clone(), bytes, table_name, parquet_info).await
{
match execute_query_async(query.clone(), table_name).await {
Ok((results, physical_plan)) => {
set_physical_plan.set(Some(physical_plan));
if let Some(export_to) = export_to {
Expand Down Expand Up @@ -368,9 +405,9 @@ fn App() -> impl IntoView {
file_bytes
.get()
.map(|_| {
match parquet_info() {
match parquet_reader() {
Some(info) => {
if info.row_group_count > 0 {
if info.info().row_group_count > 0 {
view! {
<QueryInput
user_input=user_input
Expand Down Expand Up @@ -407,16 +444,16 @@ fn App() -> impl IntoView {

<div class="mt-8">
{move || {
let info = parquet_info();
let info = parquet_reader();
match info {
Some(info) => {
view! {
<div class="space-y-6">
<div class="w-full">
<MetadataSection parquet_info=info.clone() />
<MetadataSection parquet_info=info.info().clone() />
</div>
<div class="w-full">
<SchemaSection parquet_info=info.clone() />
<SchemaSection parquet_info=info.info().clone() />
</div>
</div>
}
Expand Down
Loading

0 comments on commit 1c6b0fd

Please sign in to comment.