Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip - CometNativeScan #1078

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(true)

val COMET_FULL_NATIVE_SCAN_ENABLED: ConfigEntry[Boolean] = conf(
"spark.comet.native.scan.enabled")
.internal()
.doc(
"Whether to enable the fully native scan. When this is turned on, Spark will use Comet to " +
"read supported data sources (currently only Parquet is supported natively)." +
" By default, this config is true.")
.booleanConf
.createWithDefault(true)

val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.parallel.io.enabled")
.doc(
Expand Down
186 changes: 92 additions & 94 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,118 +948,116 @@ impl PhysicalPlanner {
Arc::new(SortExec::new(LexOrdering::new(exprs?), child).with_fetch(fetch)),
))
}
OpStruct::Scan(scan) => {
OpStruct::NativeScan(scan) => {
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();

if scan.source == "CometScan parquet (unknown)" {
let data_schema = parse_message_type(&scan.data_schema).unwrap();
let required_schema = parse_message_type(&scan.required_schema).unwrap();
println!("data_schema: {:?}", data_schema);
println!("required_schema: {:?}", required_schema);

let data_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
let data_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(
&data_schema_descriptor,
None,
)
.unwrap(),
);
println!("data_schema_arrow: {:?}", data_schema_arrow);

let required_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
let required_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(
&required_schema_descriptor,
None,
)
println!("NATIVE: SCAN: {:?}", scan);
let data_schema = parse_message_type(&*scan.data_schema).unwrap();
let required_schema = parse_message_type(&*scan.required_schema).unwrap();
println!("data_schema: {:?}", data_schema);
println!("required_schema: {:?}", required_schema);

let data_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
let data_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(&data_schema_descriptor, None)
.unwrap(),
);
println!("required_schema_arrow: {:?}", required_schema_arrow);
);
println!("data_schema_arrow: {:?}", data_schema_arrow);

let required_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
let required_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(
&required_schema_descriptor,
None,
)
.unwrap(),
);
println!("required_schema_arrow: {:?}", required_schema_arrow);

assert!(!required_schema_arrow.fields.is_empty());
assert!(!required_schema_arrow.fields.is_empty());

let mut projection_vector: Vec<usize> =
Vec::with_capacity(required_schema_arrow.fields.len());
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
required_schema_arrow.fields.iter().for_each(|field| {
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
});
println!("projection_vector: {:?}", projection_vector);
let mut projection_vector: Vec<usize> =
Vec::with_capacity(required_schema_arrow.fields.len());
// TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of.
required_schema_arrow.fields.iter().for_each(|field| {
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
});
println!("projection_vector: {:?}", projection_vector);

assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());
assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());

// Convert the Spark expressions to Physical expressions
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
.data_filters
.iter()
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema_arrow)))
.collect();

// Create a conjunctive form of the vector because ParquetExecBuilder takes
// a single expression
let data_filters = data_filters?;
let test_data_filters =
data_filters.clone().into_iter().reduce(|left, right| {
Arc::new(BinaryExpr::new(
left,
datafusion::logical_expr::Operator::And,
right,
))
});

println!("data_filters: {:?}", data_filters);
println!("test_data_filters: {:?}", test_data_filters);

let object_store_url = ObjectStoreUrl::local_filesystem();
let paths: Vec<Url> = scan
.path
.iter()
.map(|path| Url::parse(path).unwrap())
.collect();
// Convert the Spark expressions to Physical expressions
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
.data_filters
.iter()
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema_arrow)))
.collect();

let object_store = object_store::local::LocalFileSystem::new();
// register the object store with the runtime environment
let url = Url::try_from("file://").unwrap();
self.session_ctx
.runtime_env()
.register_object_store(&url, Arc::new(object_store));
// Create a conjunctive form of the vector because ParquetExecBuilder takes
// a single expression
let data_filters = data_filters?;
let test_data_filters = data_filters.clone().into_iter().reduce(|left, right| {
Arc::new(BinaryExpr::new(
left,
datafusion::logical_expr::Operator::And,
right,
))
});

let files: Vec<PartitionedFile> = paths
.iter()
.map(|path| PartitionedFile::from_path(path.path().to_string()).unwrap())
.collect();
println!("data_filters: {:?}", data_filters);
println!("test_data_filters: {:?}", test_data_filters);

// partition the files
// TODO really should partition the row groups
let object_store_url = ObjectStoreUrl::local_filesystem();
let paths: Vec<Url> = scan
.path
.iter()
.map(|path| Url::parse(path).unwrap())
.collect();

let mut file_groups = vec![vec![]; partition_count];
files.iter().enumerate().for_each(|(idx, file)| {
file_groups[idx % partition_count].push(file.clone());
});
let object_store = object_store::local::LocalFileSystem::new();
// register the object store with the runtime environment
let url = Url::try_from("file://").unwrap();
self.session_ctx
.runtime_env()
.register_object_store(&url, Arc::new(object_store));

let file_scan_config =
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
.with_file_groups(file_groups)
.with_projection(Some(projection_vector));
let files: Vec<PartitionedFile> = paths
.iter()
.map(|path| PartitionedFile::from_path(path.path().to_string()).unwrap())
.collect();

let mut table_parquet_options = TableParquetOptions::new();
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;
// partition the files
// TODO really should partition the row groups

let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options);
let mut file_groups = vec![vec![]; partition_count];
files.iter().enumerate().for_each(|(idx, file)| {
file_groups[idx % partition_count].push(file.clone());
});

if let Some(filter) = test_data_filters {
builder = builder.with_predicate(filter);
}
let file_scan_config =
FileScanConfig::new(object_store_url, Arc::clone(&data_schema_arrow))
.with_file_groups(file_groups)
.with_projection(Some(projection_vector));

let scan = builder.build();
return Ok((vec![], Arc::new(scan)));
let mut table_parquet_options = TableParquetOptions::new();
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;

let mut builder = ParquetExecBuilder::new(file_scan_config)
.with_table_parquet_options(table_parquet_options);

if let Some(filter) = test_data_filters {
builder = builder.with_predicate(filter);
}

let scan = builder.build();
return Ok((vec![], Arc::new(scan)));
}
OpStruct::Scan(scan) => {
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();

// If it is not test execution context for unit test, we should have at least one
// input source
if self.exec_context_id != TEST_EXEC_CONTEXT_ID && inputs.is_empty() {
Expand Down
9 changes: 9 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ message Operator {
SortMergeJoin sort_merge_join = 108;
HashJoin hash_join = 109;
Window window = 110;
NativeScan native_scan = 111;
}
}

Expand All @@ -52,6 +53,14 @@ message Scan {
// is purely for informational purposes when viewing native query plans in
// debug mode.
string source = 2;
}

message NativeScan {
repeated spark.spark_expression.DataType fields = 1;
// The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This
// is purely for informational purposes when viewing native query plans in
// debug mode.
string source = 2;
repeated string path = 3;
string required_schema = 4;
string data_schema = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,22 @@ class CometSparkSessionExtensions
}

// data source V1
case scanExec @ FileSourceScanExec(
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
_: Seq[_],
requiredSchema,
_,
_,
_,
_,
_,
_)
if CometNativeScanExec.isSchemaSupported(requiredSchema)
&& CometNativeScanExec.isSchemaSupported(partitionSchema)
&& COMET_FULL_NATIVE_SCAN_ENABLED.get =>
logInfo("Comet extension enabled for v1 Scan")
CometNativeScanExec(scanExec, session)
// data source V1
case scanExec @ FileSourceScanExec(
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
_: Seq[_],
Expand Down Expand Up @@ -1205,7 +1221,8 @@ object CometSparkSessionExtensions extends Logging {
}

def isCometScan(op: SparkPlan): Boolean = {
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec]
op.isInstanceOf[CometBatchScanExec] || op.isInstanceOf[CometScanExec] ||
op.isInstanceOf[CometNativeScanExec]
}

private def shouldApplySparkToColumnar(conf: SQLConf, op: SparkPlan): Boolean = {
Expand Down
Loading
Loading