diff --git a/src/arrow1/ffi.rs b/src/arrow1/ffi.rs deleted file mode 100644 index 815f1052..00000000 --- a/src/arrow1/ffi.rs +++ /dev/null @@ -1,181 +0,0 @@ -use arrow::array::{Array, StructArray}; -use arrow::datatypes::{Field, Schema}; -use arrow::ffi::{self, from_ffi, to_ffi}; -use arrow::record_batch::RecordBatch; -use wasm_bindgen::prelude::*; - -use crate::arrow1::error::Result; - -/// Wrapper around an ArrowArray FFI struct in Wasm memory. -#[wasm_bindgen] -pub struct FFIArrowArray(Box); - -#[wasm_bindgen] -impl FFIArrowArray { - #[wasm_bindgen] - pub fn addr(&self) -> *const ffi::FFI_ArrowArray { - self.0.as_ref() as *const _ - } - - #[wasm_bindgen] - pub fn free(self) { - drop(self.0) - } - - #[wasm_bindgen] - pub fn drop(self) { - drop(self.0) - } -} - -/// Wrapper around an ArrowSchema FFI struct in Wasm memory. -#[wasm_bindgen] -pub struct FFIArrowField(Box); - -#[wasm_bindgen] -impl FFIArrowField { - #[wasm_bindgen] - pub fn addr(&self) -> *const ffi::FFI_ArrowSchema { - self.0.as_ref() as *const _ - } -} - -impl From<&Field> for FFIArrowField { - fn from(_value: &Field) -> Self { - todo!() - } -} - -/// Wrapper around a collection of FFI ArrowSchema structs in Wasm memory -#[wasm_bindgen] -pub struct FFIArrowSchema(Vec); - -#[wasm_bindgen] -impl FFIArrowSchema { - /// The number of fields in this schema - #[wasm_bindgen] - pub fn length(&self) -> usize { - self.0.len() - } - - #[wasm_bindgen] - pub fn addr(&self, i: usize) -> *const ffi::FFI_ArrowSchema { - self.0.get(i).unwrap().addr() - } -} - -impl From<&Schema> for FFIArrowSchema { - fn from(value: &Schema) -> Self { - for _field in value.fields.into_iter() {} - todo!() - } -} - -/// Wrapper to represent an Arrow Chunk in Wasm memory, e.g. a collection of FFI ArrowArray -/// structs -#[wasm_bindgen] -pub struct FFIArrowRecordBatch { - array: Box, - field: Box, -} - -#[wasm_bindgen] -impl FFIArrowRecordBatch { - /// Get the pointer to one ArrowSchema FFI struct - /// @param i number the index of the field in the schema to use - #[wasm_bindgen(js_name = schemaAddr)] - pub fn field_addr(&self) -> *const ffi::FFI_ArrowSchema { - self.field.as_ref() as *const _ - } - - /// Get the pointer to one ArrowArray FFI struct for a given chunk index and column index - /// @param column number The column index to use - /// @returns number pointer to an ArrowArray FFI struct in Wasm memory - #[wasm_bindgen(js_name = arrayAddr)] - pub fn array_addr(&self) -> *const ffi::FFI_ArrowArray { - self.array.as_ref() as *const _ - } -} - -impl From for FFIArrowRecordBatch { - fn from(value: RecordBatch) -> Self { - let intermediate = StructArray::from(value).into_data(); - let (out_array, out_schema) = to_ffi(&intermediate).unwrap(); - Self { - array: Box::new(out_array), - field: Box::new(out_schema), - } - } -} - -impl From for RecordBatch { - fn from(value: FFIArrowRecordBatch) -> Self { - let array_data = from_ffi(*value.array, &value.field).unwrap(); - let intermediate = StructArray::from(array_data); - - RecordBatch::from(intermediate) - } -} - -/// Wrapper around an Arrow Table in Wasm memory (a list of FFI ArrowSchema structs plus a list of -/// lists of ArrowArray FFI structs.) -#[wasm_bindgen] -pub struct FFIArrowTable(Vec); - -impl From> for FFIArrowTable { - fn from(value: Vec) -> Self { - let mut batches = Vec::with_capacity(value.len()); - for batch in value { - batches.push(batch.into()); - } - Self(batches) - } -} - -#[wasm_bindgen] -impl FFIArrowTable { - #[wasm_bindgen(js_name = numBatches)] - pub fn num_batches(&self) -> usize { - self.0.len() - } - - /// Get the pointer to one ArrowSchema FFI struct - #[wasm_bindgen(js_name = schemaAddr)] - pub fn schema_addr(&self) -> *const ffi::FFI_ArrowSchema { - self.0[0].field_addr() - } - - /// Get the pointer to one ArrowArray FFI struct for a given chunk index and column index - /// @param chunk number The chunk index to use - /// @returns number pointer to an ArrowArray FFI struct in Wasm memory - #[wasm_bindgen(js_name = arrayAddr)] - pub fn array_addr(&self, chunk: usize) -> *const ffi::FFI_ArrowArray { - self.0[chunk].array_addr() - } -} - -impl From> for FFIArrowTable { - fn from(batches: Vec) -> Self { - Self(batches) - } -} - -impl FFIArrowTable { - pub fn from_iterator(value: impl IntoIterator) -> Self { - let mut batches = vec![]; - for batch in value.into_iter() { - batches.push(batch.into()); - } - Self(batches) - } - - pub fn try_from_iterator( - value: impl IntoIterator>, - ) -> Result { - let mut batches = vec![]; - for batch in value.into_iter() { - batches.push(batch?.into()); - } - Ok(Self(batches)) - } -} diff --git a/src/arrow1/mod.rs b/src/arrow1/mod.rs index 0d27c0fe..e9edc85c 100644 --- a/src/arrow1/mod.rs +++ b/src/arrow1/mod.rs @@ -1,8 +1,6 @@ #[cfg(feature = "reader")] pub mod reader; -pub mod ffi; - pub mod wasm; #[cfg(feature = "writer")] diff --git a/src/arrow1/reader.rs b/src/arrow1/reader.rs index fb46b700..928797e9 100644 --- a/src/arrow1/reader.rs +++ b/src/arrow1/reader.rs @@ -1,5 +1,4 @@ use crate::arrow1::error::Result; -use crate::arrow1::ffi::FFIArrowTable; use arrow_wasm::arrow1::Table; use bytes::Bytes; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; @@ -22,14 +21,3 @@ pub fn read_parquet(parquet_file: Vec) -> Result { Ok(Table::new(batches)) } - -pub fn read_parquet_ffi(parquet_file: Vec) -> Result { - // Create Parquet reader - let cursor: Bytes = parquet_file.into(); - let builder = ParquetRecordBatchReaderBuilder::try_new(cursor).unwrap(); - - // Create Arrow reader - let reader = builder.build().unwrap(); - - FFIArrowTable::try_from_iterator(reader) -} diff --git a/src/arrow2/ffi.rs b/src/arrow2/ffi.rs deleted file mode 100644 index 8f41f370..00000000 --- a/src/arrow2/ffi.rs +++ /dev/null @@ -1,254 +0,0 @@ -use crate::arrow2::error::ParquetWasmError; -use crate::arrow2::error::Result; -use arrow2::array::{Array, StructArray}; -use arrow2::chunk::Chunk; -use arrow2::datatypes::{DataType, Field, Schema}; -use arrow2::ffi; -use wasm_bindgen::prelude::*; - -type ArrowTable = Vec>>; - -/// Wrapper around an ArrowArray FFI struct in Wasm memory. -#[wasm_bindgen] -pub struct FFIArrowArray(Box); - -impl From> for FFIArrowArray { - fn from(array: Box) -> Self { - Self(Box::new(ffi::export_array_to_c(array))) - } -} - -impl FFIArrowArray { - fn import(self, data_type: DataType) -> Result> { - let imported = unsafe { ffi::import_array_from_c(*self.0, data_type) }; - Ok(imported?) - } -} - -#[wasm_bindgen] -impl FFIArrowArray { - #[wasm_bindgen] - pub fn addr(&self) -> *const ffi::ArrowArray { - self.0.as_ref() as *const _ - } - - #[wasm_bindgen] - pub fn free(self) { - drop(self.0) - } - - #[wasm_bindgen] - pub fn drop(self) { - drop(self.0) - } -} - -/// Wrapper around an ArrowSchema FFI struct in Wasm memory. -#[wasm_bindgen] -pub struct FFIArrowField(Box); - -impl From<&Field> for FFIArrowField { - fn from(field: &Field) -> Self { - Self(Box::new(ffi::export_field_to_c(field))) - } -} - -impl TryFrom<&FFIArrowField> for arrow2::datatypes::Field { - type Error = ParquetWasmError; - - fn try_from(field: &FFIArrowField) -> Result { - let imported = unsafe { ffi::import_field_from_c(&field.0) }; - Ok(imported?) - } -} - -#[wasm_bindgen] -impl FFIArrowField { - #[wasm_bindgen] - pub fn addr(&self) -> *const ffi::ArrowSchema { - self.0.as_ref() as *const _ - } -} - -/// Wrapper an Arrow RecordBatch stored as FFI in Wasm memory. -/// -/// Refer to {@linkcode readParquetFFI} for instructions on how to use this. -#[wasm_bindgen] -pub struct FFIArrowRecordBatch { - field: Box, - array: Box, -} - -impl FFIArrowRecordBatch { - pub fn new(field: Box, array: Box) -> Self { - Self { field, array } - } - - pub fn from_chunk(chunk: Chunk>, schema: Schema) -> Self { - let data_type = DataType::Struct(schema.fields); - let struct_array = StructArray::try_new(data_type.clone(), chunk.to_vec(), None).unwrap(); - let field = Field::new("", data_type, false).with_metadata(schema.metadata); - - Self { - field: Box::new(ffi::export_field_to_c(&field)), - array: Box::new(ffi::export_array_to_c(struct_array.boxed())), - } - } -} - -#[wasm_bindgen] -impl FFIArrowRecordBatch { - #[wasm_bindgen(js_name = arrayAddr)] - pub fn array_addr(&self) -> *const ffi::ArrowArray { - self.array.as_ref() as *const _ - } - - #[wasm_bindgen(js_name = schemaAddr)] - pub fn field_addr(&self) -> *const ffi::ArrowSchema { - self.field.as_ref() as *const _ - } -} - -/// Wrapper to represent an Arrow Chunk in Wasm memory, e.g. a collection of FFI ArrowArray -/// structs -#[wasm_bindgen] -pub struct FFIArrowChunk(Vec); - -impl From>> for FFIArrowChunk { - fn from(chunk: Chunk>) -> Self { - // TODO: is this clone necessary here? - let ffi_arrays: Vec = - chunk.iter().map(|array| array.clone().into()).collect(); - Self(ffi_arrays) - } -} - -impl FFIArrowChunk { - pub fn import(self, data_types: &[&DataType]) -> Result>> { - let mut arrays: Vec> = vec![]; - for (i, ffi_array) in self.0.into_iter().enumerate() { - arrays.push(ffi_array.import(data_types[i].clone())?); - } - - Ok(Chunk::new(arrays)) - } -} - -#[wasm_bindgen] -impl FFIArrowChunk { - #[wasm_bindgen] - pub fn length(&self) -> usize { - self.0.len() - } - - #[wasm_bindgen] - pub fn addr(&self, i: usize) -> *const ffi::ArrowArray { - self.0.get(i).unwrap().addr() - } -} - -/// Wrapper around a collection of FFI ArrowSchema structs in Wasm memory -#[wasm_bindgen] -pub struct FFIArrowSchema(Vec); - -impl From<&Schema> for FFIArrowSchema { - fn from(schema: &Schema) -> Self { - let ffi_fields: Vec = - schema.fields.iter().map(|field| field.into()).collect(); - Self(ffi_fields) - } -} - -impl FFIArrowSchema { - pub fn import(&self, i: usize) -> Result { - let ffi_arrow_field = &self.0[i]; - ffi_arrow_field.try_into() - } -} - -impl TryFrom<&FFIArrowSchema> for Schema { - type Error = ParquetWasmError; - - fn try_from(schema: &FFIArrowSchema) -> Result { - let mut fields: Vec = vec![]; - for i in 0..schema.length() { - fields.push(schema.import(i)?); - } - - Ok(fields.into()) - } -} - -#[wasm_bindgen] -impl FFIArrowSchema { - #[wasm_bindgen] - pub fn length(&self) -> usize { - self.0.len() - } - - #[wasm_bindgen] - pub fn addr(&self, i: usize) -> *const ffi::ArrowSchema { - self.0.get(i).unwrap().addr() - } -} - -/// Wrapper around an Arrow Table in Wasm memory (a list of FFIArrowRecordBatch objects.) -/// -/// Refer to {@linkcode readParquetFFI} for instructions on how to use this. -#[wasm_bindgen] -pub struct FFIArrowTable(Vec); - -impl FFIArrowTable { - pub fn new(batches: Vec) -> Self { - Self(batches) - } -} - -#[wasm_bindgen] -impl FFIArrowTable { - /// Get the total number of record batches in the table - #[wasm_bindgen(js_name = numBatches)] - pub fn num_batches(&self) -> usize { - self.0.len() - } - - /// Get the pointer to one ArrowSchema FFI struct - #[wasm_bindgen(js_name = schemaAddr)] - pub fn schema_addr(&self) -> *const ffi::ArrowSchema { - // Note: this assumes that every record batch has the same schema - self.0[0].field_addr() - } - - /// Get the pointer to one ArrowArray FFI struct for a given chunk index and column index - /// @param chunk number The chunk index to use - /// @returns number pointer to an ArrowArray FFI struct in Wasm memory - #[wasm_bindgen(js_name = arrayAddr)] - pub fn array_addr(&self, chunk: usize) -> *const ffi::ArrowArray { - self.0[chunk].array_addr() - } - - #[wasm_bindgen] - pub fn drop(self) { - drop(self.0); - } -} - -impl FFIArrowTable { - pub fn import(self) -> Result<(Schema, ArrowTable)> { - todo!() - // let schema: Schema = self.schema.as_ref().try_into()?; - // let data_types: Vec<&DataType> = schema - // .fields - // .iter() - // .map(|field| field.data_type()) - // .collect(); - - // let mut chunks: Vec>> = vec![]; - // for chunk in self.chunks.into_iter() { - // let imported = chunk.import(&data_types)?; - // chunks.push(imported); - // } - - // Ok((schema, chunks)) - } -} diff --git a/src/arrow2/mod.rs b/src/arrow2/mod.rs index 7b9b8c54..2fdd16a1 100644 --- a/src/arrow2/mod.rs +++ b/src/arrow2/mod.rs @@ -1,5 +1,3 @@ -pub mod ffi; - #[cfg(feature = "reader")] pub mod reader;