Skip to content

Commit

Permalink
Revert "wip - CometNativeScan (#1076)"
Browse files Browse the repository at this point in the history
This reverts commit 38e32f7.
  • Loading branch information
andygrove committed Nov 12, 2024
1 parent 38e32f7 commit 311bc9e
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 716 deletions.
10 changes: 0 additions & 10 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,6 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_FULL_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf(
"spark.comet.native.scan.enabled")
.internal()
.doc(
"Whether to enable the fully native scan. When this is turned on, Spark will use Comet to " +
"read supported data sources (currently only Parquet is supported natively)." +
" By default, this config is true.")
.booleanConf
.createWithDefault(true)

val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.parallel.io.enabled")
.doc(
Expand Down
186 changes: 94 additions & 92 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,116 +948,118 @@ impl PhysicalPlanner {
Arc::new(SortExec::new(LexOrdering::new(exprs?), child).with_fetch(fetch)),
))
}
OpStruct::NativeScan(scan) => {
OpStruct::Scan(scan) => {
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();

println!("NATIVE: SCAN: {:?}", scan);
let data_schema = parse_message_type(&*scan.data_schema).unwrap();
let required_schema = parse_message_type(&*scan.required_schema).unwrap();
println!("data_schema: {:?}", data_schema);
println!("required_schema: {:?}", required_schema);

let data_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
let data_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(&data_schema_descriptor, None)
if scan.source == "CometScan parquet (unknown)" {
let data_schema = parse_message_type(&scan.data_schema).unwrap();
let required_schema = parse_message_type(&scan.required_schema).unwrap();
println!("data_schema: {:?}", data_schema);
println!("required_schema: {:?}", required_schema);

let data_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
let data_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(
&data_schema_descriptor,
None,
)
.unwrap(),
);
println!("data_schema_arrow: {:?}", data_schema_arrow);

let required_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
let required_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(
&required_schema_descriptor,
None,
)
.unwrap(),
);
println!("required_schema_arrow: {:?}", required_schema_arrow);

assert!(!required_schema_arrow.fields.is_empty());

let mut projection_vector: Vec<usize> =
Vec::with_capacity(required_schema_arrow.fields.len());
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
required_schema_arrow.fields.iter().for_each(|field| {
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
});
println!("projection_vector: {:?}", projection_vector);
);
println!("data_schema_arrow: {:?}", data_schema_arrow);

let required_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
let required_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(
&required_schema_descriptor,
None,
)
.unwrap(),
);
println!("required_schema_arrow: {:?}", required_schema_arrow);

assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());
assert!(!required_schema_arrow.fields.is_empty());

// Convert the Spark expressions to Physical expressions
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
.data_filters
.iter()
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema_arrow)))
.collect();
let mut projection_vector: Vec<usize> =
Vec::with_capacity(required_schema_arrow.fields.len());
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
required_schema_arrow.fields.iter().for_each(|field| {
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
});
println!("projection_vector: {:?}", projection_vector);

// Create a conjunctive form of the vector because ParquetExecBuilder takes
// a single expression
let data_filters = data_filters?;
let test_data_filters = data_filters.clone().into_iter().reduce(|left, right| {
Arc::new(BinaryExpr::new(
left,
datafusion::logical_expr::Operator::And,
right,
))
});
assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());

println!("data_filters: {:?}", data_filters);
println!("test_data_filters: {:?}", test_data_filters);
// Convert the Spark expressions to Physical expressions
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
.data_filters
.iter()
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema_arrow)))
.collect();

// Create a conjunctive form of the vector because ParquetExecBuilder takes
// a single expression
let data_filters = data_filters?;
let test_data_filters =
data_filters.clone().into_iter().reduce(|left, right| {
Arc::new(BinaryExpr::new(
left,
datafusion::logical_expr::Operator::And,
right,
))
});

println!("data_filters: {:?}", data_filters);
println!("test_data_filters: {:?}", test_data_filters);

let object_store_url = ObjectStoreUrl::local_filesystem();
let paths: Vec<Url> = scan
.path
.iter()
.map(|path| Url::parse(path).unwrap())
.collect();

let object_store_url = ObjectStoreUrl::local_filesystem();
let paths: Vec<Url> = scan
.path
.iter()
.map(|path| Url::parse(path).unwrap())
.collect();
let object_store = object_store::local::LocalFileSystem::new();
// register the object store with the runtime environment
let url = Url::try_from("file://").unwrap();
self.session_ctx
.runtime_env()
.register_object_store(&url, Arc::new(object_store));

let object_store = object_store::local::LocalFileSystem::new();
// register the object store with the runtime environment
let url = Url::try_from("file://").unwrap();
self.session_ctx
.runtime_env()
.register_object_store(&url, Arc::new(object_store));
let files: Vec<PartitionedFile> = paths
.iter()
.map(|path| PartitionedFile::from_path(path.path().to_string()).unwrap())
.collect();

let files: Vec<PartitionedFile> = paths
.iter()
.map(|path| PartitionedFile::from_path(path.path().to_string()).unwrap())
.collect();
// partition the files
// TODO really should partition the row groups

// partition the files
// TODO really should partition the row groups
let mut file_groups = vec![vec![]; partition_count];
files.iter().enumerate().for_each(|(idx, file)| {
file_groups[idx % partition_count].push(file.clone());
});

let mut file_groups = vec![vec![]; partition_count];
files.iter().enumerate().for_each(|(idx, file)| {
file_groups[idx % partition_count].push(file.clone());
});
let file_scan_config =
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
.with_file_groups(file_groups)
.with_projection(Some(projection_vector));

let file_scan_config =
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
.with_file_groups(file_groups)
.with_projection(Some(projection_vector));
let mut table_parquet_options = TableParquetOptions::new();
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;

let mut table_parquet_options = TableParquetOptions::new();
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;
let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options);

let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options);
if let Some(filter) = test_data_filters {
builder = builder.with_predicate(filter);
}

if let Some(filter) = test_data_filters {
builder = builder.with_predicate(filter);
let scan = builder.build();
return Ok((vec![], Arc::new(scan)));
}

let scan = builder.build();
return Ok((vec![], Arc::new(scan)));
}
OpStruct::Scan(scan) => {
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();

// If it is not test execution context for unit test, we should have at least one
// input source
if self.exec_context_id != TEST_EXEC_CONTEXT_ID && inputs.is_empty() {
Expand Down
9 changes: 0 additions & 9 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ message Operator {
SortMergeJoin sort_merge_join = 108;
HashJoin hash_join = 109;
Window window = 110;
NativeScan native_scan = 111;
}
}

Expand All @@ -53,14 +52,6 @@ message Scan {
// is purely for informational purposes when viewing native query plans in
// debug mode.
string source = 2;
}

message NativeScan {
repeated spark.spark_expression.DataType fields = 1;
// The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This
// is purely for informational purposes when viewing native query plans in
// debug mode.
string source = 2;
repeated string path = 3;
string required_schema = 4;
string data_schema = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,22 +189,6 @@ class CometSparkSessionExtensions
}

// data source V1
case scanExec @ FileSourceScanExec(
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
_: Seq[_],
requiredSchema,
_,
_,
_,
_,
_,
_)
if CometNativeScanExec.isSchemaSupported(requiredSchema)
&& CometNativeScanExec.isSchemaSupported(partitionSchema)
&& COMET_FULL_NATIVE_SCAN_ENABLED.get =>
logInfo("Comet extension enabled for v1 Scan")
CometNativeScanExec(scanExec, session)
// data source V1
case scanExec @ FileSourceScanExec(
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
_: Seq[_],
Expand Down Expand Up @@ -1221,8 +1205,7 @@ object CometSparkSessionExtensions extends Logging {
}

def isCometScan(op: SparkPlan): Boolean = {
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec] ||
op.isInstanceOf[CometNativeScanExec]
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec]
}

private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {
Expand Down
Loading

0 comments on commit 311bc9e

Please sign in to comment.