Skip to content

Commit

Permalink
[comet-parquet-exec] Fix compilation errors in Rust tests, remove som…
Browse files Browse the repository at this point in the history
…e debug logging (#1080)

* update tests, remove some debug logging

* update tests, remove some debug logging

* update tests, remove some debug logging

* remove unused import
  • Loading branch information
andygrove authored Nov 13, 2024
1 parent bd68db8 commit 33d2b23
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 46 deletions.
18 changes: 3 additions & 15 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,21 +949,15 @@ impl PhysicalPlanner {
))
}
OpStruct::NativeScan(scan) => {
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();

println!("NATIVE: SCAN: {:?}", scan);
let data_schema = parse_message_type(&*scan.data_schema).unwrap();
let required_schema = parse_message_type(&*scan.required_schema).unwrap();
println!("data_schema: {:?}", data_schema);
println!("required_schema: {:?}", required_schema);

let data_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
let data_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(&data_schema_descriptor, None)
.unwrap(),
);
println!("data_schema_arrow: {:?}", data_schema_arrow);

let required_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
Expand All @@ -974,8 +968,6 @@ impl PhysicalPlanner {
)
.unwrap(),
);
println!("required_schema_arrow: {:?}", required_schema_arrow);

assert!(!required_schema_arrow.fields.is_empty());

let mut projection_vector: Vec<usize> =
Expand All @@ -984,7 +976,6 @@ impl PhysicalPlanner {
required_schema_arrow.fields.iter().for_each(|field| {
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
});
println!("projection_vector: {:?}", projection_vector);

assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());

Expand All @@ -1006,9 +997,6 @@ impl PhysicalPlanner {
))
});

println!("data_filters: {:?}", data_filters);
println!("test_data_filters: {:?}", test_data_filters);

let object_store_url = ObjectStoreUrl::local_filesystem();
let paths: Vec<Url> = scan
.path
Expand Down Expand Up @@ -2300,7 +2288,7 @@ mod tests {
let input_array = DictionaryArray::new(keys, Arc::new(values));
let input_batch = InputBatch::Batch(vec![Arc::new(input_array)], row_count);

let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![]).unwrap();
let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![], 1).unwrap();
scans[0].set_input_batch(input_batch);

let session_ctx = SessionContext::new();
Expand Down Expand Up @@ -2381,7 +2369,7 @@ mod tests {
let input_array = DictionaryArray::new(keys, Arc::new(values));
let input_batch = InputBatch::Batch(vec![Arc::new(input_array)], row_count);

let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![]).unwrap();
let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![], 1).unwrap();

// Scan's schema is determined by the input batch, so we need to set it before execution.
scans[0].set_input_batch(input_batch);
Expand Down Expand Up @@ -2453,7 +2441,7 @@ mod tests {
let op = create_filter(op_scan, 0);
let planner = PhysicalPlanner::default();

let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![]).unwrap();
let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![], 1).unwrap();

let scan = &mut scans[0];
scan.set_input_batch(InputBatch::EOF);
Expand Down
7 changes: 0 additions & 7 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
use arrow::datatypes::DataType as ArrowDataType;
use arrow_array::RecordBatch;
use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion::{
execution::{
disk_manager::DiskManagerConfig,
Expand Down Expand Up @@ -379,12 +378,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(

let plan = exec_context.root_op.as_ref().unwrap();

println!(
"Executing partition {} of {}",
partition_id,
plan.output_partitioning().partition_count()
);

let stream = plan.execute(partition_id as usize, task_ctx)?;
exec_context.stream = Some(stream);
} else {
Expand Down
24 changes: 0 additions & 24 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2493,32 +2493,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
// Sink operators don't have children
result.clearChildren()

// scalastyle:off println
// System.out.println(op.simpleStringWithNodeId())
// System.out.println(scanTypes.asJava) // Spark types for output.
System.out.println(scan.output) // This is the names of the output columns.
// System.out.println(cometScan.requiredSchema); // This is the projected columns.
System.out.println(
scan.dataFilters
); // This is the filter expressions that have been pushed down.

val dataFilters = scan.dataFilters.map(exprToProto(_, scan.output))
nativeScanBuilder.addAllDataFilters(dataFilters.map(_.get).asJava)
// System.out.println(cometScan.relation.location.inputFiles(0))
// System.out.println(cometScan.partitionFilters);
// System.out.println(cometScan.relation.partitionSchema)
// System.out.println(cometScan.metadata);

// System.out.println("requiredSchema:")
// cometScan.requiredSchema.fields.foreach(field => {
// System.out.println(field.dataType)
// })

// System.out.println("relation.dataSchema:")
// cometScan.relation.dataSchema.fields.foreach(field => {
// System.out.println(field.dataType)
// })
// scalastyle:on println

val requiredSchemaParquet =
new SparkToParquetSchemaConverter(conf).convert(scan.requiredSchema)
Expand Down

0 comments on commit 33d2b23

Please sign in to comment.