Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve deserialize_to_struct example #13958

Merged
merged 3 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ cargo run --example dataframe
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results (Arrow ArrayRefs) into Rust structs
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
Expand Down
158 changes: 116 additions & 42 deletions datafusion-examples/examples/deserialize_to_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,62 +15,136 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::AsArray;
use arrow::array::{AsArray, PrimitiveArray};
use arrow::datatypes::{Float64Type, Int32Type};
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::assert_batches_eq;
use futures::StreamExt;

/// This example shows that it is possible to convert query results into Rust structs .
/// This example shows how to convert query results into Rust structs by using
/// the Arrow APIs to convert the results into Rust native types.
///
/// This is a bit tricky initially as the results are returned as columns stored
/// as [ArrayRef]
///
/// [ArrayRef]: arrow::array::ArrayRef
#[tokio::main]
async fn main() -> Result<()> {
let data_list = Data::new().await?;
println!("{data_list:#?}");
Ok(())
}
// Run a query that returns two columns of data
let ctx = SessionContext::new();
let testdata = datafusion::test_util::parquet_test_data();
ctx.register_parquet(
"alltypes_plain",
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;
let df = ctx
.sql("SELECT int_col, double_col FROM alltypes_plain")
.await?;

#[derive(Debug)]
struct Data {
#[allow(dead_code)]
int_col: i32,
#[allow(dead_code)]
double_col: f64,
}
// print out the results showing we have an int32 and a float64 column
let results = df.clone().collect().await?;
assert_batches_eq!(
[
"+---------+------------+",
"| int_col | double_col |",
"+---------+------------+",
"| 0 | 0.0 |",
"| 1 | 10.1 |",
"| 0 | 0.0 |",
"| 1 | 10.1 |",
"| 0 | 0.0 |",
"| 1 | 10.1 |",
"| 0 | 0.0 |",
"| 1 | 10.1 |",
"+---------+------------+",
],
&results
);

impl Data {
pub async fn new() -> Result<Vec<Self>> {
// this group is almost the same as the one you find it in parquet_sql.rs
let ctx = SessionContext::new();
// We will now convert the query results into a Rust struct
let mut stream = df.execute_stream().await?;
let mut list = vec![];

let testdata = datafusion::test_util::parquet_test_data();
// DataFusion produces data in chunks called `RecordBatch`es which are
// typically 8000 rows each. This loop processes each `RecordBatch` as it is
// produced by the query plan and adds it to the list
while let Some(b) = stream.next().await.transpose()? {
// Each `RecordBatch` has one or more columns. Each column is stored as
// an `ArrayRef`. To interact with data using Rust native types we need to
// convert these `ArrayRef`s into concrete array types using APIs from
// the arrow crate.

ctx.register_parquet(
"alltypes_plain",
&format!("{testdata}/alltypes_plain.parquet"),
ParquetReadOptions::default(),
)
.await?;
// In this case, we know that each batch has two columns of the Arrow
// types Int32 and Float64, so first we cast the two columns to the
// appropriate Arrow PrimitiveArray (this is a fast / zero-copy cast).:
let int_col: &PrimitiveArray<Int32Type> = b.column(0).as_primitive();
let float_col: &PrimitiveArray<Float64Type> = b.column(1).as_primitive();

let df = ctx
.sql("SELECT int_col, double_col FROM alltypes_plain")
.await?;
// With PrimitiveArrays, we can access to the values as native Rust
// types i32 and f64, and forming the desired `Data` structs
for (i, f) in int_col.values().iter().zip(float_col.values()) {
list.push(Data {
int_col: *i,
double_col: *f,
})
}
}

df.clone().show().await?;
// Finally, we have the results in the list of Rust structs
let res = format!("{list:#?}");
assert_eq!(
res,
r#"[
Data {
int_col: 0,
double_col: 0.0,
},
Data {
int_col: 1,
double_col: 10.1,
},
Data {
int_col: 0,
double_col: 0.0,
},
Data {
int_col: 1,
double_col: 10.1,
},
Data {
int_col: 0,
double_col: 0.0,
},
Data {
int_col: 1,
double_col: 10.1,
},
Data {
int_col: 0,
double_col: 0.0,
},
Data {
int_col: 1,
double_col: 10.1,
},
]"#
);

let mut stream = df.execute_stream().await?;
let mut list = vec![];
while let Some(b) = stream.next().await.transpose()? {
let int_col = b.column(0).as_primitive::<Int32Type>();
let float_col = b.column(1).as_primitive::<Float64Type>();
// Use the fields in the struct to avoid clippy complaints
let int_sum = list.iter().fold(0, |acc, x| acc + x.int_col);
let double_sum = list.iter().fold(0.0, |acc, x| acc + x.double_col);
assert_eq!(int_sum, 4);
assert_eq!(double_sum, 40.4);

for (i, f) in int_col.values().iter().zip(float_col.values()) {
list.push(Data {
int_col: *i,
double_col: *f,
})
}
}
Ok(())
}

Ok(list)
}
/// This is target struct where we want the query results.
#[derive(Debug)]
struct Data {
int_col: i32,
double_col: f64,
}
Loading