Skip to content

Commit

Permalink
Find a way to communicate the ordering of a file back with the existi…
Browse files Browse the repository at this point in the history
…ng listing table implementation
  • Loading branch information
zhuqi-lucas committed Dec 28, 2024
1 parent a08dc0a commit f43782a
Show file tree
Hide file tree
Showing 28 changed files with 364 additions and 15 deletions.
13 changes: 11 additions & 2 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! This module provides data structures to represent statistics
use std::collections::HashMap;
use std::fmt::{self, Debug, Display};

use crate::{Result, ScalarValue};
Expand Down Expand Up @@ -223,6 +224,12 @@ pub struct Statistics {
/// Statistics on a column level. It contains a [`ColumnStatistics`] for
/// each field in the schema of the table to which the [`Statistics`] refer.
pub column_statistics: Vec<ColumnStatistics>,
/// Additional metadata about the statistics.
/// For example:
/// key: "sort_by"
/// value: "timestamp ASC NULLS LAST"
/// It will be used to optimize the query execution, when we know the data is sorted.
pub metadata: HashMap<String, String>,
}

impl Statistics {
Expand All @@ -233,6 +240,7 @@ impl Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: Statistics::unknown_column(schema),
metadata: HashMap::new(),
}
}

Expand Down Expand Up @@ -419,8 +427,8 @@ impl Display for Statistics {

write!(
f,
"Rows={}, Bytes={}, [{}]",
self.num_rows, self.total_byte_size, column_stats
"Rows={}, Bytes={}, [{}], metadata: {:?}",
self.num_rows, self.total_byte_size, column_stats, self.metadata
)?;

Ok(())
Expand Down Expand Up @@ -638,6 +646,7 @@ mod tests {
num_rows: Precision::Exact(42),
total_byte_size: Precision::Exact(500),
column_statistics: counts.into_iter().map(col_stats_i64).collect(),
metadata: HashMap::new(),
}
}

Expand Down
170 changes: 170 additions & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1977,6 +1977,10 @@ mod tests {

use crate::prelude::{CsvReadOptions, NdJsonReadOptions, ParquetReadOptions};
use arrow::array::Int32Array;
use arrow_array::TimestampNanosecondArray;
use arrow_schema::TimeUnit;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::instant::Instant;
use datafusion_common::{assert_batches_eq, Constraint, Constraints, ScalarValue};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::expr::WindowFunction;
Expand All @@ -1989,6 +1993,7 @@ mod tests {
use datafusion_functions_window::nth_value::first_value_udwf;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
use rand::Rng;
use sqlparser::ast::NullTreatment;
use tempfile::TempDir;

Expand Down Expand Up @@ -4327,4 +4332,169 @@ mod tests {
);
Ok(())
}

#[tokio::test]
async fn write_data_gen() -> Result<()> {
// 定义数据模式
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
]));

let ctx = SessionContext::new_with_config(
SessionConfig::default()
.set_bool("datafusion.execution.collect_statistics", true),
);

println!("ctx catalog: {:?}", ctx.catalog_names());

// 创建随机数据
let num_rows = 5_000_000; // 1500万行
let mut rng = rand::thread_rng();
let ids: Vec<i64> = (0..num_rows).collect();
let timestamps: Vec<i64> = (0..num_rows)
.map(|_| rng.gen_range(1_700_000_000_000..1_800_000_000_000))
.collect();

let id_array = Arc::new(Int64Array::from(ids));
let timestamp_array = Arc::new(TimestampNanosecondArray::from(timestamps));

let batch =
RecordBatch::try_new(schema.clone(), vec![id_array, timestamp_array])?;
//
// 写入 Parquet 文件
let file = std::path::Path::new("/tmp/testSorted.parquet");
let write_df = ctx.read_batch(batch)?;

write_df
.write_parquet(
file.to_str().unwrap(),
DataFrameWriteOptions::new().with_sort_by(vec![
col("timestamp").sort(true, false),
col("id").sort(false, false),
]),
Some(TableParquetOptions::new()),
)
.await?;

println!("Parquet 文件已写入完成");

let location = std::path::Path::new("/tmp/testSorted.parquet");
let sql_str =
"create external table sortData(id INT, timestamp TIMESTAMP) stored as parquet location'"
.to_owned()
+ location.to_str().unwrap()
+ "'";

ctx.sql(sql_str.as_str()).await?.collect().await?;

println!("创建排序表成功");
// // 第一次查询:
// let start = Instant::now();
// let df = ctx.sql("SELECT * FROM sortData").await?;
// let results = df.collect().await?;
// println!("第一次查询排序表耗时: {:?}", start.elapsed());

let start = Instant::now();
let df = ctx
.read_parquet(
location.to_str().unwrap().to_string(),
ParquetReadOptions::default()
.file_sort_order(vec![vec![col("timestamp").sort(true, false)]]),
)
.await?;

let df_1 = df
.sort(vec![col("timestamp").sort(true, false)])?
.limit(0, Some(5));
df_1?.show().await?;
println!("第一次查询排序 加排序表耗时: {:?}", start.elapsed());

// 第一次查询:ORDER BY
let start = Instant::now();
let df = ctx
.sql("SELECT * FROM sortData order by timestamp limit 5")
.await?;

let formatted = df.logical_plan().display_indent_schema().to_string();

println!("logic result {}", formatted);
let _ = df.collect().await?;

println!(
"explain: {:?}",
ctx.sql("explain SELECT * FROM sortData order by timestamp limit 5")
.await?
.collect()
.await?
);

println!("第一次查询排序 加排序表耗时: {:?}", start.elapsed());

Ok(())
}

// #[tokio::test]
// async fn write_data_query() -> Result<()> {
// let ctx = SessionContext::new_with_config(SessionConfig::default().
// set_bool("datafusion.execution.collect_statistics", true));
// let location = std::path::Path::new("/tmp/test.parquet");
//
// let sql_str =
// "create external table data(id INT, timestamp TIMESTAMP) stored as parquet location '"
// .to_owned()
// + location.to_str().unwrap()
// + "'";
//
// ctx.sql(sql_str.as_str()).await?.collect().await?;
//
// println!("创建表成功");
//
// // 第一次查询:ORDER BY
// let start = Instant::now();
// let df = ctx.sql("SELECT * FROM data limit 5").await?;
// let results = df.collect().await?;
// println!("第一次查询耗时: {:?}", start.elapsed());
//
// // // 第二次查询:ORDER BY
// // let start = Instant::now();
// // let df = ctx.sql("SELECT * FROM data").await?;
// // let results = df.collect().await?;
// // println!("第二次查询耗时: {:?}", start.elapsed());
//
//
//
// let location = std::path::Path::new("/tmp/testSorted.parquet");
// let sql_str =
// "create external table sortData(id INT, timestamp TIMESTAMP) stored as parquet location'"
// .to_owned()
// + location.to_str().unwrap()
// + "'";
//
// ctx.sql(sql_str.as_str()).await?.collect().await?;
//
// println!("创建排序表成功");
// // // 第一次查询:
// // let start = Instant::now();
// // let df = ctx.sql("SELECT * FROM sortData").await?;
// // let results = df.collect().await?;
// // println!("第一次查询排序表耗时: {:?}", start.elapsed());
//
//
// // 第一次查询:ORDER BY
// let start = Instant::now();
// let df = ctx.sql("SELECT * FROM sortData order by timestamp limit 5").await?;
//
// let formatted = df.logical_plan().display_indent_schema().to_string();
//
// println!("logic result {}", formatted);
// let results = df.collect().await?;
//
// println!("第一次查询排序 加排序表耗时: {:?}", start.elapsed());
// Ok(())
// }
}
24 changes: 20 additions & 4 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,26 @@ impl DataFrame {
);
}

let format = if let Some(parquet_opts) = writer_options {
Arc::new(ParquetFormatFactory::new_with_options(parquet_opts))
} else {
Arc::new(ParquetFormatFactory::new())
let format = match writer_options {
Some(mut parquet_opts) => {
if !options.sort_by.clone().is_empty() {
parquet_opts.key_value_metadata.insert(
"sort_by".to_string(),
Some(
options
.sort_by
.iter()
.map(|sort| sort.to_string())
.collect::<Vec<String>>()
.join(", "),
),
);
}
println!("parquet_opts: {:?}", parquet_opts);

Arc::new(ParquetFormatFactory::new_with_options(parquet_opts))
}
None => Arc::new(ParquetFormatFactory::new()),
};

let file_type = format_as_file_type(format);
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,11 @@ pub fn statistics_from_parquet_meta_calc(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;

statistics.metadata = file_schema.clone().metadata;

println!("file_schema: {:?}", file_schema);

if let Some(merged) = coerce_file_schema_to_string_type(&table_schema, &file_schema) {
file_schema = merged;
}
Expand Down
49 changes: 47 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{col, utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};

Expand All @@ -54,6 +54,7 @@ use datafusion_physical_expr::{

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_expr::expr::Sort;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{future, stream, StreamExt, TryStreamExt};
use itertools::Itertools;
Expand Down Expand Up @@ -860,7 +861,51 @@ impl TableProvider for ListingTable {
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

let output_ordering = self.try_create_output_ordering()?;
let output_ordering;
if let Some(sort_by_value) = statistics.metadata.get("sort_by") {
if !sort_by_value.is_empty() {
// Split the input into individual sort expressions separated by commas
let sort_expressions: Vec<&str> =
sort_by_value.split(',').map(str::trim).collect();

let mut sort_order = vec![];

for sort_expr in sort_expressions {
// Split each expression into components (e.g., "timestamp ASC NULLS LAST")
let tokens: Vec<&str> = sort_expr.split_whitespace().collect();
if tokens.is_empty() {
continue; // Skip empty tokens
}
// Parse the expression, direction, and nulls ordering
let expr = tokens[0].to_string();
let asc = tokens
.get(1)
.map_or(true, |&t| t.eq_ignore_ascii_case("ASC")); // Default to ASC
let nulls_first = tokens
.get(2)
.map_or(false, |&t| t.eq_ignore_ascii_case("NULLS FIRST")); // Default to NULLS LAST

// Create a Sort object
let sort = Sort::new(col(expr), asc, nulls_first);
sort_order.push(sort);
}

if sort_order.is_empty() {
output_ordering = self.try_create_output_ordering()?;
} else {
output_ordering = create_ordering(&self.table_schema, &[sort_order])?;
}
} else {
output_ordering = self.try_create_output_ordering()?;
}
} else {
output_ordering = self.try_create_output_ordering()?;
}

println!("output_ordering: {:?}", output_ordering);

println!("Statistics : {:?}", statistics);

match state
.config_options()
.execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ impl FileScanConfig {
// TODO correct byte size?
total_byte_size: Precision::Absent,
column_statistics: table_cols_stats,
metadata: HashMap::new(),
};

let projected_schema = Arc::new(Schema::new_with_metadata(
Expand Down Expand Up @@ -701,6 +702,7 @@ mod tests {
})
.collect(),
total_byte_size: Precision::Absent,
metadata: HashMap::new(),
},
to_partition_cols(vec![(
"date".to_owned(),
Expand Down Expand Up @@ -1190,6 +1192,7 @@ mod tests {
statistics: Some(Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
metadata: HashMap::new(),
column_statistics: file
.statistics
.into_iter()
Expand Down
Loading

0 comments on commit f43782a

Please sign in to comment.