Skip to content

Commit

Permalink
Consolidate csv_opener.rs and json_opener.rs into a single example (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Zhukov committed Jan 2, 2025
1 parent 9eca7d1 commit b01293c
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 105 deletions.
1 change: 1 addition & 0 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ cargo run --example dataframe
- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example of using multiple extension codecs for serialization / deserialization
- [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a streaming query plan from a SQL statement against a local CSV file
- [`csv_json_opener.rs`](examples/csv_json_opener.rs): Demonstrate a scanning against an Arrow data source (CSV/JSON) and fetching results
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,34 @@
// specific language governing permissions and limitations
// under the License.

use std::{sync::Arc, vec};
use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema};
use datafusion::{
assert_batches_eq,
datasource::{
file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream, JsonOpener},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};

use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};

/// This example demonstrates a scanning against an Arrow data source (CSV) and
/// This example demonstrates a scanning against an Arrow data source (CSV/JSON) and
/// fetching results
#[tokio::main]
async fn main() -> Result<()> {
csv_opener().await?;
json_opener().await?;
Ok(())
}

async fn csv_opener() -> Result<()> {
let object_store = Arc::new(LocalFileSystem::new());
let schema = aggr_test_schema();

Expand All @@ -59,18 +65,17 @@ async fn main() -> Result<()> {

let path = std::path::Path::new(&path).canonicalize()?;

let scan_config =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));

let result =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())
.unwrap()
.map(|b| b.unwrap())
.collect::<Vec<_>>()
.await;
let scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));

let mut result = vec![];
let mut stream =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())?;
while let Some(batch) = stream.next().await.transpose()? {
result.push(batch);
}
assert_batches_eq!(
&[
"+--------------------------------+----+",
Expand All @@ -87,3 +92,54 @@ async fn main() -> Result<()> {
);
Ok(())
}

async fn json_opener() -> Result<()> {
let object_store = InMemory::new();
let path = object_store::path::Path::from("demo.json");
let data = bytes::Bytes::from(
r#"{"num":5,"str":"test"}
{"num":2,"str":"hello"}
{"num":4,"str":"foo"}"#,
);

object_store.put(&path, data.into()).await?;

let schema = Arc::new(Schema::new(vec![
Field::new("num", DataType::Int64, false),
Field::new("str", DataType::Utf8, false),
]));

let projected = Arc::new(schema.clone().project(&[1, 0])?);

let opener = JsonOpener::new(
8192,
projected,
FileCompressionType::UNCOMPRESSED,
Arc::new(object_store),
);

let scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10));

let mut stream =
FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new())?;
let mut result = vec![];
while let Some(batch) = stream.next().await.transpose()? {
result.push(batch);
}
assert_batches_eq!(
&[
"+-------+-----+",
"| str | num |",
"+-------+-----+",
"| test | 5 |",
"| hello | 2 |",
"| foo | 4 |",
"+-------+-----+",
],
&result
);
Ok(())
}
88 changes: 0 additions & 88 deletions datafusion-examples/examples/json_opener.rs

This file was deleted.

0 comments on commit b01293c

Please sign in to comment.