diff --git a/.github/actions/test_sqllogic_stage/action.yml b/.github/actions/test_sqllogic_stage/action.yml index 30b34b1c5afd..4627eb936232 100644 --- a/.github/actions/test_sqllogic_stage/action.yml +++ b/.github/actions/test_sqllogic_stage/action.yml @@ -45,18 +45,9 @@ runs: aws --endpoint-url http://127.0.0.1:9900/ s3 mb s3://testbucket aws --endpoint-url http://127.0.0.1:9900/ s3 cp tests/data s3://testbucket/data --recursive --no-progress - - name: Run sqllogic Tests with Standalone mode parquet2 lib - shell: bash - env: - TEST_HANDLERS: ${{ inputs.handlers }} - TEST_STAGE_STORAGE: ${{ inputs.storage }} - TEST_STAGE_PARQUET_LIB: parquet2 - run: bash ./scripts/ci/ci-run-sqllogic-tests-without-sandbox.sh ${{ inputs.dirs }} - - name: Run sqllogic Tests with Standalone mode parquet_rs lib shell: bash env: TEST_HANDLERS: ${{ inputs.handlers }} TEST_STAGE_STORAGE: ${{ inputs.storage }} - TEST_STAGE_PARQUET_LIB: parquet_rs run: bash ./scripts/ci/ci-run-sqllogic-tests-without-sandbox.sh ${{ inputs.dirs }} diff --git a/Cargo.lock b/Cargo.lock index 5bff8df3e107..f8de86ae8ba3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3738,7 +3738,6 @@ dependencies = [ "databend-common-meta-app", "databend-common-metrics", "databend-common-pipeline-core", - "databend-common-pipeline-sources", "databend-common-sql", "databend-common-storage", "databend-storages-common-pruner", diff --git a/src/query/catalog/src/plan/datasource/datasource_info/data_source_info.rs b/src/query/catalog/src/plan/datasource/datasource_info/data_source_info.rs index 1d471300f1c1..a67399fda5a1 100644 --- a/src/query/catalog/src/plan/datasource/datasource_info/data_source_info.rs +++ b/src/query/catalog/src/plan/datasource/datasource_info/data_source_info.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use databend_common_expression::TableSchema; use databend_common_meta_app::schema::TableInfo; -use crate::plan::Parquet2TableInfo; use crate::plan::ParquetTableInfo; use crate::plan::ResultScanTableInfo; use crate::plan::StageTableInfo; @@ -30,7 +29,6 @@ pub enum DataSourceInfo { StageSource(StageTableInfo), // stage source with parquet format used for select. ParquetSource(ParquetTableInfo), - Parquet2Source(Parquet2TableInfo), // Table Function Result_Scan ResultScanSource(ResultScanTableInfo), } @@ -41,7 +39,6 @@ impl DataSourceInfo { DataSourceInfo::TableSource(table_info) => table_info.schema(), DataSourceInfo::StageSource(table_info) => table_info.schema(), DataSourceInfo::ParquetSource(table_info) => table_info.schema(), - DataSourceInfo::Parquet2Source(table_info) => table_info.schema(), DataSourceInfo::ResultScanSource(table_info) => table_info.schema(), } } @@ -51,7 +48,6 @@ impl DataSourceInfo { DataSourceInfo::TableSource(table_info) => table_info.desc.clone(), DataSourceInfo::StageSource(table_info) => table_info.desc(), DataSourceInfo::ParquetSource(table_info) => table_info.desc(), - DataSourceInfo::Parquet2Source(table_info) => table_info.desc(), DataSourceInfo::ResultScanSource(table_info) => table_info.desc(), } } diff --git a/src/query/catalog/src/plan/datasource/datasource_info/mod.rs b/src/query/catalog/src/plan/datasource/datasource_info/mod.rs index 4a98da119d96..2068b27f3dc4 100644 --- a/src/query/catalog/src/plan/datasource/datasource_info/mod.rs +++ b/src/query/catalog/src/plan/datasource/datasource_info/mod.rs @@ -14,7 +14,6 @@ mod data_source_info; mod parquet; -mod parquet2; mod parquet_read_options; mod result_scan; mod stage; @@ -23,7 +22,6 @@ pub use data_source_info::DataSourceInfo; pub use parquet::FullParquetMeta; pub use parquet::ParquetTableInfo; pub use parquet::ParquetTableInfo as ParquetTableInfoV2; -pub use parquet2::Parquet2TableInfo; pub use parquet_read_options::ParquetReadOptions; pub use result_scan::ResultScanTableInfo; pub use stage::StageTableInfo; diff --git a/src/query/catalog/src/plan/datasource/datasource_info/parquet2.rs b/src/query/catalog/src/plan/datasource/datasource_info/parquet2.rs deleted file mode 100644 index bc1be25eed95..000000000000 --- a/src/query/catalog/src/plan/datasource/datasource_info/parquet2.rs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use databend_common_arrow::arrow::datatypes::Schema as ArrowSchema; -use databend_common_arrow::parquet::metadata::SchemaDescriptor; -use databend_common_expression::TableSchema; -use databend_common_meta_app::principal::StageInfo; -use databend_common_meta_app::schema::TableInfo; -use databend_common_storage::StageFileInfo; -use databend_common_storage::StageFilesInfo; - -use crate::plan::datasource::datasource_info::parquet_read_options::ParquetReadOptions; -use crate::table::Parquet2TableColumnStatisticsProvider; - -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] -pub struct Parquet2TableInfo { - pub read_options: ParquetReadOptions, - pub stage_info: StageInfo, - pub files_info: StageFilesInfo, - - pub table_info: TableInfo, - pub arrow_schema: ArrowSchema, - pub schema_descr: SchemaDescriptor, - pub files_to_read: Option>, - pub schema_from: String, - pub compression_ratio: f64, - - pub column_statistics_provider: Parquet2TableColumnStatisticsProvider, -} - -impl Parquet2TableInfo { - pub fn schema(&self) -> Arc { - self.table_info.schema() - } - - pub fn desc(&self) -> String { - self.stage_info.stage_name.clone() - } -} diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 04563db148a2..9e63338068fd 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -526,35 +526,6 @@ pub struct NavigationDescriptor { use std::collections::HashMap; -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)] -pub struct Parquet2TableColumnStatisticsProvider { - column_stats: HashMap>, - num_rows: u64, -} - -impl Parquet2TableColumnStatisticsProvider { - pub fn new(column_stats: HashMap, num_rows: u64) -> Self { - let column_stats = column_stats - .into_iter() - .map(|(column_id, stat)| (column_id, stat.get_useful_stat(num_rows))) - .collect(); - Self { - column_stats, - num_rows, - } - } -} - -impl ColumnStatisticsProvider for Parquet2TableColumnStatisticsProvider { - fn column_statistics(&self, column_id: ColumnId) -> Option<&BasicColumnStatistics> { - self.column_stats.get(&column_id).and_then(|s| s.as_ref()) - } - - fn num_rows(&self) -> Option { - Some(self.num_rows) - } -} - #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)] pub struct ParquetTableColumnStatisticsProvider { column_stats: HashMap>, diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index f4b1e806c487..1d74c1a667af 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -576,9 +576,6 @@ impl AccessChecker for PrivilegeAccess { DataSourceInfo::StageSource(stage_info) => { self.validate_stage_access(&stage_info.stage_info, UserPrivilegeType::Read).await?; } - DataSourceInfo::Parquet2Source(stage_info) => { - self.validate_stage_access(&stage_info.stage_info, UserPrivilegeType::Read).await?; - } DataSourceInfo::ParquetSource(stage_info) => { self.validate_stage_access(&stage_info.stage_info, UserPrivilegeType::Read).await?; } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 6830f536a2a8..8144642c0baa 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -84,7 +84,6 @@ use databend_common_storage::StorageMetrics; use databend_common_storages_delta::DeltaTable; use databend_common_storages_fuse::TableContext; use databend_common_storages_iceberg::IcebergTable; -use databend_common_storages_parquet::Parquet2Table; use databend_common_storages_parquet::ParquetRSTable; use databend_common_storages_result_cache::ResultScan; use databend_common_storages_stage::StageTable; @@ -332,7 +331,6 @@ impl TableContext for QueryContext { stage_info, plan.tbl_args.clone(), ), - DataSourceInfo::Parquet2Source(table_info) => Parquet2Table::from_info(table_info), DataSourceInfo::ParquetSource(table_info) => ParquetRSTable::from_info(table_info), DataSourceInfo::ResultScanSource(table_info) => ResultScan::from_info(table_info), } diff --git a/src/query/service/tests/it/parquet_rs/prune_pages.rs b/src/query/service/tests/it/parquet_rs/prune_pages.rs index 9d0d4a68cf1e..e8d9f708ac11 100644 --- a/src/query/service/tests/it/parquet_rs/prune_pages.rs +++ b/src/query/service/tests/it/parquet_rs/prune_pages.rs @@ -26,7 +26,7 @@ use parquet::arrow::arrow_reader::RowSelector; use crate::parquet_rs::data::make_test_file_page; use crate::parquet_rs::data::Scenario; -use crate::parquet_rs::utils::create_parquet2_test_fixture; +use crate::parquet_rs::utils::create_parquet_test_fixture; use crate::parquet_rs::utils::get_data_source_plan; async fn test(scenario: Scenario, predicate: &str, expected_selection: RowSelection) { @@ -34,7 +34,7 @@ async fn test(scenario: Scenario, predicate: &str, expected_selection: RowSelect let file_path = file.path().to_string_lossy(); let sql = format!("select * from 'fs://{file_path}' where {predicate}"); - let fixture = create_parquet2_test_fixture().await; + let fixture = create_parquet_test_fixture().await; let plan = get_data_source_plan(fixture.new_query_ctx().await.unwrap(), &sql) .await .unwrap(); diff --git a/src/query/service/tests/it/parquet_rs/prune_row_groups.rs b/src/query/service/tests/it/parquet_rs/prune_row_groups.rs index 23cfc42dcfef..52159e4d164e 100644 --- a/src/query/service/tests/it/parquet_rs/prune_row_groups.rs +++ b/src/query/service/tests/it/parquet_rs/prune_row_groups.rs @@ -23,7 +23,7 @@ use databend_common_storages_parquet::ParquetRSPruner; use super::data::make_test_file_rg; use super::data::Scenario; use super::utils::get_data_source_plan; -use crate::parquet_rs::utils::create_parquet2_test_fixture; +use crate::parquet_rs::utils::create_parquet_test_fixture; /// Enable row groups pruning and test. async fn test(scenario: Scenario, predicate: &str, expected_rgs: Vec) { @@ -40,7 +40,7 @@ async fn test_impl(scenario: Scenario, predicate: &str, expected_rgs: Vec let file_path = file.path().to_string_lossy(); let sql = format!("select * from 'fs://{file_path}' where {predicate}"); - let fixture = create_parquet2_test_fixture().await; + let fixture = create_parquet_test_fixture().await; let plan = get_data_source_plan(fixture.new_query_ctx().await.unwrap(), &sql) .await .unwrap(); diff --git a/src/query/service/tests/it/parquet_rs/utils.rs b/src/query/service/tests/it/parquet_rs/utils.rs index 07de8e8ba1da..d168734347eb 100644 --- a/src/query/service/tests/it/parquet_rs/utils.rs +++ b/src/query/service/tests/it/parquet_rs/utils.rs @@ -23,17 +23,10 @@ use databend_common_sql::Planner; use databend_query::test_kits::ConfigBuilder; use databend_query::test_kits::TestFixture; -pub async fn create_parquet2_test_fixture() -> TestFixture { +pub async fn create_parquet_test_fixture() -> TestFixture { let mut conf = ConfigBuilder::create().config(); conf.storage.allow_insecure = true; - let test_fixture = TestFixture::setup_with_config(&conf).await.unwrap(); - test_fixture - .default_session() - .get_settings() - .set_use_parquet2(false) - .unwrap(); - - test_fixture + TestFixture::setup_with_config(&conf).await.unwrap() } pub async fn get_data_source_plan(ctx: Arc, sql: &str) -> Result { diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index ee1e957f1814..34a65cb03ced 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -517,7 +517,7 @@ impl DefaultSettings { }), ("use_parquet2", DefaultSettingValue { value: UserSettingValue::UInt64(0), - desc: "Use parquet2 instead of parquet_rs when infer_schema().", + desc: "This setting is deprecated", mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index 0ea1050c90d3..65c7d409eada 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -72,7 +72,6 @@ use databend_common_meta_types::MetaId; use databend_common_storage::DataOperator; use databend_common_storage::StageFileInfo; use databend_common_storage::StageFilesInfo; -use databend_common_storages_parquet::Parquet2Table; use databend_common_storages_parquet::ParquetRSTable; use databend_common_storages_result_cache::ResultCacheMetaManager; use databend_common_storages_result_cache::ResultCacheReader; @@ -979,7 +978,6 @@ impl Binder { let table = match stage_info.file_format_params { FileFormatParams::Parquet(..) => { - let use_parquet2 = table_ctx.get_settings().get_use_parquet2()?; let mut read_options = ParquetReadOptions::default(); if !table_ctx.get_settings().get_enable_parquet_page_index()? { @@ -997,25 +995,14 @@ impl Binder { read_options = read_options.with_do_prewhere(false); } - if use_parquet2 { - Parquet2Table::create( - table_ctx.clone(), - stage_info.clone(), - files_info, - read_options, - files_to_copy, - ) - .await? - } else { - ParquetRSTable::create( - table_ctx.clone(), - stage_info.clone(), - files_info, - read_options, - files_to_copy, - ) - .await? - } + ParquetRSTable::create( + table_ctx.clone(), + stage_info.clone(), + files_info, + read_options, + files_to_copy, + ) + .await? } FileFormatParams::NdJson(..) => { let schema = Arc::new(TableSchema::new(vec![TableField::new( diff --git a/src/query/storages/parquet/Cargo.toml b/src/query/storages/parquet/Cargo.toml index 8f7a85d3bb5a..596db581632e 100644 --- a/src/query/storages/parquet/Cargo.toml +++ b/src/query/storages/parquet/Cargo.toml @@ -21,7 +21,6 @@ databend-common-functions = { path = "../../functions" } databend-common-meta-app = { path = "../../../meta/app" } databend-common-metrics = { path = "../../../common/metrics" } databend-common-pipeline-core = { path = "../../pipeline/core" } -databend-common-pipeline-sources = { path = "../../pipeline/sources" } databend-common-storage = { path = "../../../common/storage" } databend-storages-common-pruner = { path = "../common/pruner" } databend-storages-common-table-meta = { path = "../common/table_meta" } diff --git a/src/query/storages/parquet/src/lib.rs b/src/query/storages/parquet/src/lib.rs index faf1e64ecc1e..3ab9d737fdb1 100644 --- a/src/query/storages/parquet/src/lib.rs +++ b/src/query/storages/parquet/src/lib.rs @@ -23,13 +23,11 @@ #![feature(int_roundings)] #![feature(box_patterns)] -mod parquet2; mod parquet_part; mod parquet_rs; mod read_settings; mod utils; -pub use parquet2::Parquet2Table; pub use parquet_part::ParquetFilesPart; pub use parquet_part::ParquetPart; pub use parquet_rs::InMemoryRowGroup; diff --git a/src/query/storages/parquet/src/parquet2/mod.rs b/src/query/storages/parquet/src/parquet2/mod.rs deleted file mode 100644 index 73f97c3dad33..000000000000 --- a/src/query/storages/parquet/src/parquet2/mod.rs +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod parquet_reader; -mod parquet_table; -mod partition; -mod processors; -mod projection; -mod pruning; -mod statistics; - -pub use parquet_table::Parquet2Table; -pub use partition::Parquet2RowGroupPart; diff --git a/src/query/storages/parquet/src/parquet2/parquet_reader/data.rs b/src/query/storages/parquet/src/parquet2/parquet_reader/data.rs deleted file mode 100644 index 0cdb4e326d1a..000000000000 --- a/src/query/storages/parquet/src/parquet2/parquet_reader/data.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use bytes::Bytes; -use databend_common_exception::Result; -use databend_common_expression::DataBlock; -use databend_common_expression::FieldIndex; - -pub trait BlockIterator: Iterator> + Send { - /// checking has_next() after next can avoid processor from entering SYNC for nothing. - fn has_next(&self) -> bool; -} - -pub struct OneBlock(pub Option); - -impl Iterator for OneBlock { - type Item = Result; - - fn next(&mut self) -> Option { - Ok(self.0.take()).transpose() - } -} - -impl BlockIterator for OneBlock { - fn has_next(&self) -> bool { - self.0.is_some() - } -} - -pub trait SeekRead: std::io::Read + std::io::Seek {} - -impl SeekRead for T where T: std::io::Read + std::io::Seek {} - -pub type IndexedChunk = (FieldIndex, Vec); -pub type IndexedChunks = HashMap; - -pub enum Parquet2PartData { - RowGroup(IndexedChunks), - SmallFiles(Vec>), -} diff --git a/src/query/storages/parquet/src/parquet2/parquet_reader/deserialize.rs b/src/query/storages/parquet/src/parquet2/parquet_reader/deserialize.rs deleted file mode 100644 index 3d13f911c316..000000000000 --- a/src/query/storages/parquet/src/parquet2/parquet_reader/deserialize.rs +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::sync::Arc; -use std::sync::Mutex; - -use databend_common_arrow::arrow::bitmap::Bitmap; -use databend_common_arrow::arrow::datatypes::Field; -use databend_common_arrow::arrow::io::parquet::read::column_iter_to_arrays; -use databend_common_arrow::arrow::io::parquet::read::nested_column_iter_to_arrays; -use databend_common_arrow::arrow::io::parquet::read::ArrayIter; -use databend_common_arrow::arrow::io::parquet::read::InitNested; -use databend_common_arrow::arrow::io::parquet::read::RowGroupDeserializer; -use databend_common_arrow::parquet::metadata::ColumnDescriptor; -use databend_common_arrow::parquet::page::CompressedPage; -use databend_common_arrow::parquet::read::BasicDecompressor; -use databend_common_arrow::parquet::read::PageMetaData; -use databend_common_arrow::parquet::read::PageReader; -use databend_common_base::runtime::GLOBAL_MEM_STAT; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::DataBlock; -use databend_common_expression::DataSchema; -use databend_common_storage::ColumnNode; -use log::debug; - -use super::filter::FilterState; -use crate::parquet2::parquet_reader::Parquet2Reader; -use crate::parquet2::partition::ColumnMeta; - -impl Parquet2Reader { - /// The number of columns can be greater than 1 because the it may be a nested type. - /// Combine multiple columns into one arrow array. - pub(crate) fn to_array_iter( - metas: Vec<(&ColumnMeta, &ColumnDescriptor)>, - chunks: Vec>, - rows: usize, - field: Field, - init: Vec, - ) -> Result> { - let (columns, types) = metas - .iter() - .zip(chunks) - .map(|(&(meta, descriptor), chunk)| { - let pages = PageReader::new_with_page_meta( - std::io::Cursor::new(chunk), - PageMetaData { - column_start: meta.offset, - num_values: meta.num_values, - compression: meta.compression, - descriptor: descriptor.descriptor.clone(), - }, - Arc::new(|_, _| true), - vec![], - usize::MAX, - ); - ( - BasicDecompressor::new(pages, vec![]), - &descriptor.descriptor.primitive_type, - ) - }) - .unzip(); - - let array_iter = if init.is_empty() { - column_iter_to_arrays(columns, types, field, Some(rows), rows)? - } else { - nested_column_iter_to_arrays(columns, types, field, init, Some(rows), rows)? - }; - Ok(array_iter) - } - - /// Almost the same as `to_array_iter`, but with a filter. - pub(crate) fn to_array_iter_with_filter( - metas: Vec<(&ColumnMeta, &ColumnDescriptor)>, - chunks: Vec>, - rows: usize, - field: Field, - init: Vec, - filter: Bitmap, - ) -> Result> { - let (columns, types) = metas - .iter() - .zip(chunks) - .map(|(&(meta, descriptor), chunk)| { - let filter_state = Arc::new(Mutex::new(FilterState::new(filter.clone()))); - let iter_filter_state = filter_state.clone(); - - let pages = PageReader::new_with_page_meta( - std::io::Cursor::new(chunk), - PageMetaData { - column_start: meta.offset, - num_values: meta.num_values, - compression: meta.compression, - descriptor: descriptor.descriptor.clone(), - }, - Arc::new(move |_, header| { - // If the bitmap for current page is all unset, skip it. - let mut state = filter_state.lock().unwrap(); - let num_rows = header.num_values(); - let all_unset = state.range_all_unset(num_rows); - if all_unset { - // skip this page. - state.advance(num_rows); - } - !all_unset - }), - vec![], - usize::MAX, - ) - .map(move |page| { - page.map(|page| match page { - CompressedPage::Data(mut page) => { - let num_rows = page.num_values(); - let mut state = iter_filter_state.lock().unwrap(); - if state.range_all_unset(num_rows) { - page.select_rows(vec![]); - } else if !state.range_all_set(num_rows) { - page.select_rows(state.convert_to_intervals(num_rows)); - }; - state.advance(num_rows); - CompressedPage::Data(page) - } - CompressedPage::Dict(_) => page, // do nothing - }) - }); - ( - BasicDecompressor::new(pages, vec![]), - &descriptor.descriptor.primitive_type, - ) - }) - .unzip(); - - let array_iter = if init.is_empty() { - column_iter_to_arrays( - columns, - types, - field, - Some(rows - filter.unset_bits()), - rows, - )? - } else { - nested_column_iter_to_arrays( - columns, - types, - field, - init, - Some(rows - filter.unset_bits()), - rows, - )? - }; - Ok(array_iter) - } - - pub(crate) fn full_deserialize( - &self, - deserializer: &mut RowGroupDeserializer, - ) -> Result { - try_next_block(&self.output_schema, deserializer) - } - - // Build a map to record the count number of each leaf_id - pub(crate) fn build_projection_count_map(columns: &[ColumnNode]) -> HashMap { - let mut cnt_map = HashMap::with_capacity(columns.len()); - for column in columns { - for index in &column.leaf_indices { - if let Entry::Vacant(e) = cnt_map.entry(*index) { - e.insert(1); - } else { - let cnt = cnt_map.get_mut(index).unwrap(); - *cnt += 1; - } - } - } - cnt_map - } -} - -pub(crate) fn try_next_block( - schema: &DataSchema, - deserializer: &mut RowGroupDeserializer, -) -> Result { - match deserializer.next() { - None => Err(ErrorCode::Internal( - "deserializer from row group: fail to get a chunk", - )), - Some(Err(cause)) => Err(ErrorCode::from(cause)), - Some(Ok(chunk)) => { - debug!(mem = GLOBAL_MEM_STAT.get_memory_usage(), peak_mem = GLOBAL_MEM_STAT.get_peak_memory_usage(); "before load arrow chunk"); - let block = DataBlock::from_arrow_chunk(&chunk, schema); - debug!(mem = GLOBAL_MEM_STAT.get_memory_usage(), peak_mem = GLOBAL_MEM_STAT.get_peak_memory_usage(); "after load arrow chunk"); - - block - } - } -} diff --git a/src/query/storages/parquet/src/parquet2/parquet_reader/filter.rs b/src/query/storages/parquet/src/parquet2/parquet_reader/filter.rs deleted file mode 100644 index 8f455d6511bc..000000000000 --- a/src/query/storages/parquet/src/parquet2/parquet_reader/filter.rs +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use databend_common_arrow::arrow::bitmap::Bitmap; -use databend_common_arrow::parquet::indexes::Interval; - -/// A wrapper of [`Bitmap`] with a position mark. It is used to filter rows when reading a parquet file. -/// -/// If a whole page is filtered, there is no need to decompress the page. -/// If a row is filtered, there is no need to decode the row. -pub struct FilterState { - bitmap: Bitmap, - pos: usize, -} - -impl FilterState { - pub fn new(bitmap: Bitmap) -> Self { - Self { bitmap, pos: 0 } - } - - #[inline] - pub fn advance(&mut self, num: usize) { - self.pos += num; - } - - /// Return true if [`self.pos`, `self.pos + num`) are set. - #[inline] - pub fn range_all_set(&self, num: usize) -> bool { - self.bitmap.null_count_range(self.pos, num) == 0 - } - - /// Return true if [`self.pos`, `self.pos + num`) are unset. - #[inline] - pub fn range_all_unset(&self, num: usize) -> bool { - self.bitmap.null_count_range(self.pos, num) == num - } - - /// Convert the validity of [`self.pos`, `self.pos + num`) to [`Interval`]s. - pub fn convert_to_intervals(&self, num_rows: usize) -> Vec { - let mut res = vec![]; - let mut started = false; - let mut start = 0; - for (i, v) in self.bitmap.iter().skip(self.pos).take(num_rows).enumerate() { - if v { - if !started { - start = i; - started = true; - } - } else if started { - res.push(Interval::new(start, i - start)); - started = false; - } - } - - if started { - res.push(Interval::new(start, num_rows - start)); - } - - res - } -} diff --git a/src/query/storages/parquet/src/parquet2/parquet_reader/mod.rs b/src/query/storages/parquet/src/parquet2/parquet_reader/mod.rs deleted file mode 100644 index 801d9d053aca..000000000000 --- a/src/query/storages/parquet/src/parquet2/parquet_reader/mod.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod data; -mod deserialize; -mod filter; -mod reader; -mod reader_merge_io; -mod reader_merge_io_async; -mod reader_merge_io_sync; - -pub use data::BlockIterator; -pub use data::IndexedChunk; -pub use data::IndexedChunks; -pub use data::Parquet2PartData; -pub use reader::Parquet2Reader; -pub use reader_merge_io::MergeIOReadResult; -pub use reader_merge_io::OwnerMemory; diff --git a/src/query/storages/parquet/src/parquet2/parquet_reader/reader.rs b/src/query/storages/parquet/src/parquet2/parquet_reader/reader.rs deleted file mode 100644 index 4e0a8f02715f..000000000000 --- a/src/query/storages/parquet/src/parquet2/parquet_reader/reader.rs +++ /dev/null @@ -1,323 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::collections::HashSet; -use std::sync::Arc; -use std::time::Instant; - -use databend_common_arrow::arrow::bitmap::Bitmap; -use databend_common_arrow::arrow::datatypes::Schema as ArrowSchema; -use databend_common_arrow::arrow::io::parquet::read::RowGroupDeserializer; -use databend_common_arrow::parquet::metadata::ColumnDescriptor; -use databend_common_arrow::parquet::metadata::SchemaDescriptor; -use databend_common_catalog::plan::PartInfoPtr; -use databend_common_catalog::plan::Projection; -use databend_common_catalog::table_context::TableContext; -use databend_common_exception::Result; -use databend_common_expression::DataBlock; -use databend_common_expression::DataSchema; -use databend_common_expression::DataSchemaRef; -use databend_common_expression::FieldIndex; -use databend_common_metrics::storage::*; -use databend_common_storage::ColumnNodes; -use opendal::Operator; - -use super::data::OneBlock; -use super::BlockIterator; -use super::IndexedChunk; -use super::IndexedChunks; -use super::Parquet2PartData; -use crate::parquet2::parquet_reader::deserialize::try_next_block; -use crate::parquet2::parquet_table::arrow_to_table_schema; -use crate::parquet2::projection::project_parquet_schema; -use crate::parquet2::Parquet2RowGroupPart; -use crate::ParquetPart; -use crate::ReadSettings; - -/// The reader to parquet files with a projected schema. -/// -/// **ALERT**: dictionary type is not supported yet. -/// If there are dictionary pages in the parquet file, the reading process may fail. -#[derive(Clone)] -pub struct Parquet2Reader { - operator: Operator, - /// The indices of columns need to read by this reader. - /// - /// Use [`HashSet`] to avoid duplicate indices. - /// Duplicate indices will exist when there are nested types or - /// select a same field multiple times. - /// - /// For example: - /// - /// ```sql - /// select a, a.b, a.c from t; - /// select a, b, a from t; - /// ``` - columns_to_read: HashSet, - /// The schema of the [`common_expression::DataBlock`] this reader produces. - /// - /// ``` - /// output_schema = DataSchema::from(projected_arrow_schema) - /// ``` - pub output_schema: DataSchemaRef, - /// The actual schema used to read parquet. - /// - /// The reason of using [`ArrowSchema`] to read parquet is that - /// There are some types that Databend not support such as Timestamp of nanoseconds. - /// Such types will be convert to supported types after deserialization. - pub(crate) projected_arrow_schema: ArrowSchema, - /// [`ColumnNodes`] corresponding to the `projected_arrow_schema`. - pub(crate) projected_column_nodes: ColumnNodes, - /// [`ColumnDescriptor`]s corresponding to the `projected_arrow_schema`. - pub(crate) projected_column_descriptors: HashMap, -} - -/// Project the schema and get the needed column leaves. -/// Part -> IndexedReaders -> IndexedChunk -> DataBlock -impl Parquet2Reader { - pub fn create( - operator: Operator, - schema: &ArrowSchema, - schema_descr: &SchemaDescriptor, - projection: Projection, - ) -> Result> { - let ( - projected_arrow_schema, - projected_column_nodes, - projected_column_descriptors, - columns_to_read, - ) = project_parquet_schema(schema, schema_descr, &projection)?; - - let t_schema = arrow_to_table_schema(projected_arrow_schema.clone()); - let output_schema = DataSchema::from(&t_schema); - - Ok(Arc::new(Parquet2Reader { - operator, - columns_to_read, - output_schema: Arc::new(output_schema), - projected_arrow_schema, - projected_column_nodes, - projected_column_descriptors, - })) - } - - pub fn output_schema(&self) -> &DataSchema { - &self.output_schema - } - - pub fn columns_to_read(&self) -> &HashSet { - &self.columns_to_read - } - - pub fn operator(&self) -> &Operator { - &self.operator - } - - pub fn deserialize( - &self, - part: &Parquet2RowGroupPart, - chunks: Vec<(FieldIndex, Vec)>, - filter: Option, - ) -> Result { - if chunks.is_empty() { - return Ok(DataBlock::new(vec![], part.num_rows)); - } - - let mut chunk_map: HashMap> = chunks.into_iter().collect(); - let mut columns_array_iter = Vec::with_capacity(self.projected_arrow_schema.fields.len()); - let mut nested_columns_array_iter = - Vec::with_capacity(self.projected_arrow_schema.fields.len()); - let mut normal_fields = Vec::with_capacity(self.projected_arrow_schema.fields.len()); - let mut nested_fields = Vec::with_capacity(self.projected_arrow_schema.fields.len()); - - let column_nodes = &self.projected_column_nodes.column_nodes; - let mut cnt_map = Self::build_projection_count_map(column_nodes); - - for (idx, column_node) in column_nodes.iter().enumerate() { - let indices = &column_node.leaf_indices; - let mut metas = Vec::with_capacity(indices.len()); - let mut chunks = Vec::with_capacity(indices.len()); - for index in indices { - // in `read_parquet` function, there is no `TableSchema`, so index treated as column id - let column_meta = &part.column_metas[index]; - let cnt = cnt_map.get_mut(index).unwrap(); - *cnt -= 1; - let column_chunk = if cnt > &mut 0 { - chunk_map.get(index).unwrap().clone() - } else { - chunk_map.remove(index).unwrap() - }; - let descriptor = &self.projected_column_descriptors[index]; - metas.push((column_meta, descriptor)); - chunks.push(column_chunk); - } - if let Some(ref bitmap) = filter { - // Filter push down for nested type is not supported now. - // If the array is nested type, do not push down filter to it. - if chunks.len() > 1 { - nested_columns_array_iter.push(Self::to_array_iter( - metas, - chunks, - part.num_rows, - column_node.field.clone(), - column_node.init.clone(), - )?); - nested_fields.push(self.output_schema.field(idx).clone()); - } else { - columns_array_iter.push(Self::to_array_iter_with_filter( - metas, - chunks, - part.num_rows, - column_node.field.clone(), - column_node.init.clone(), - bitmap.clone(), - )?); - normal_fields.push(self.output_schema.field(idx).clone()); - } - } else { - columns_array_iter.push(Self::to_array_iter( - metas, - chunks, - part.num_rows, - column_node.field.clone(), - column_node.init.clone(), - )?) - } - } - - if nested_fields.is_empty() { - let mut deserializer = - RowGroupDeserializer::new(columns_array_iter, part.num_rows, None); - return self.full_deserialize(&mut deserializer); - } - - let bitmap = filter.unwrap(); - let normal_block = try_next_block( - &DataSchema::new(normal_fields.clone()), - &mut RowGroupDeserializer::new(columns_array_iter, part.num_rows, None), - )?; - let nested_block = try_next_block( - &DataSchema::new(nested_fields.clone()), - &mut RowGroupDeserializer::new(nested_columns_array_iter, part.num_rows, None), - )?; - // need to filter nested block - let nested_block = DataBlock::filter_with_bitmap(nested_block, &bitmap)?; - - // Construct the final output - let mut final_columns = Vec::with_capacity(self.output_schema.fields().len()); - final_columns.extend_from_slice(normal_block.columns()); - final_columns.extend_from_slice(nested_block.columns()); - let final_block = DataBlock::new(final_columns, bitmap.len() - bitmap.unset_bits()); - - normal_fields.extend_from_slice(&nested_fields); - let src_schema = DataSchema::new(normal_fields); - final_block.resort(&src_schema, &self.output_schema) - } - - pub fn get_deserializer( - &self, - part: &Parquet2RowGroupPart, - chunks: Vec<(FieldIndex, Vec)>, - filter: Option, - ) -> Result> { - let block = self.deserialize(part, chunks, filter)?; - Ok(Box::new(OneBlock(Some(block)))) - } - - pub fn read_from_merge_io( - &self, - column_chunks: &mut IndexedChunks, - ) -> Result> { - let mut chunks = Vec::with_capacity(self.columns_to_read().len()); - - for index in self.columns_to_read() { - let bytes = column_chunks.get_mut(index).unwrap(); - let data = bytes.to_vec(); - - chunks.push((*index, data)); - } - - Ok(chunks) - } - - pub fn readers_from_blocking_io( - &self, - ctx: Arc, - part: PartInfoPtr, - ) -> Result { - let part = ParquetPart::from_part(&part)?; - match part { - ParquetPart::Parquet2RowGroup(part) => Ok(Parquet2PartData::RowGroup( - self.sync_read_columns_data_by_merge_io( - &ReadSettings::from_ctx(&ctx)?, - part, - &self.operator().blocking(), - )? - .column_buffers()?, - )), - ParquetPart::ParquetFiles(part) => { - let op = self.operator().blocking(); - let mut buffers = Vec::with_capacity(part.files.len()); - for path in &part.files { - let buffer = op.read(path.0.as_str())?; - buffers.push(buffer); - } - metrics_inc_copy_read_size_bytes(part.compressed_size()); - Ok(Parquet2PartData::SmallFiles(buffers)) - } - ParquetPart::ParquetRSRowGroup(_) => unreachable!(), - } - } - - #[async_backtrace::framed] - pub async fn readers_from_non_blocking_io( - &self, - ctx: Arc, - part: &ParquetPart, - ) -> Result { - match part { - ParquetPart::Parquet2RowGroup(part) => { - let chunks = self - .read_columns_data_by_merge_io( - &ReadSettings::from_ctx(&ctx)?, - part, - self.operator(), - ) - .await? - .column_buffers()?; - Ok(Parquet2PartData::RowGroup(chunks)) - } - ParquetPart::ParquetFiles(part) => { - let mut join_handlers = Vec::with_capacity(part.files.len()); - for (path, _) in part.files.iter() { - let op = self.operator().clone(); - join_handlers.push(async move { op.read(path.as_str()).await }); - } - - let start = Instant::now(); - let buffers = futures::future::try_join_all(join_handlers).await?; - - // Perf. - { - metrics_inc_copy_read_size_bytes(part.compressed_size()); - metrics_inc_copy_read_part_cost_milliseconds(start.elapsed().as_millis() as u64); - } - - Ok(Parquet2PartData::SmallFiles(buffers)) - } - ParquetPart::ParquetRSRowGroup(_) => unreachable!(), - } - } -} diff --git a/src/query/storages/parquet/src/parquet2/parquet_reader/reader_merge_io.rs b/src/query/storages/parquet/src/parquet2/parquet_reader/reader_merge_io.rs deleted file mode 100644 index c74335e4c4b9..000000000000 --- a/src/query/storages/parquet/src/parquet2/parquet_reader/reader_merge_io.rs +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::ops::Range; - -use bytes::Bytes; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::FieldIndex; - -pub struct OwnerMemory { - chunks: HashMap, -} - -impl OwnerMemory { - pub fn create(chunks: Vec<(usize, Vec)>) -> OwnerMemory { - let chunks = chunks - .into_iter() - .map(|(idx, chunk)| (idx, Bytes::from(chunk))) - .collect(); - OwnerMemory { chunks } - } - - pub fn get_chunk(&self, index: usize, path: &str) -> Result { - match self.chunks.get(&index) { - Some(chunk) => Ok(chunk.clone()), - None => Err(ErrorCode::Internal(format!( - "It's a terrible bug, not found range data, merged_range_idx:{}, path:{}", - index, path - ))), - } - } -} - -pub struct MergeIOReadResult { - block_path: String, - columns_chunk_offsets: HashMap)>, - owner_memory: OwnerMemory, -} - -impl MergeIOReadResult { - pub fn create(owner_memory: OwnerMemory, capacity: usize, path: String) -> MergeIOReadResult { - MergeIOReadResult { - block_path: path, - columns_chunk_offsets: HashMap::with_capacity(capacity), - owner_memory, - } - } - - pub fn column_buffers(&self) -> Result> { - let mut res = HashMap::with_capacity(self.columns_chunk_offsets.len()); - - // merge column data fetched from object storage - for (column_id, (chunk_idx, range)) in &self.columns_chunk_offsets { - let chunk = self.owner_memory.get_chunk(*chunk_idx, &self.block_path)?; - res.insert(*column_id, chunk.slice(range.clone())); - } - - Ok(res) - } - - pub fn add_column_chunk( - &mut self, - chunk_index: usize, - column_id: FieldIndex, - range: Range, - ) { - self.columns_chunk_offsets - .insert(column_id, (chunk_index, range)); - } -} diff --git a/src/query/storages/parquet/src/parquet2/parquet_reader/reader_merge_io_async.rs b/src/query/storages/parquet/src/parquet2/parquet_reader/reader_merge_io_async.rs deleted file mode 100644 index 0c55aa663540..000000000000 --- a/src/query/storages/parquet/src/parquet2/parquet_reader/reader_merge_io_async.rs +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::ops::Range; -use std::time::Instant; - -use databend_common_base::rangemap::RangeMerger; -use databend_common_base::runtime::UnlimitedFuture; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_metrics::storage::metrics_inc_copy_read_part_cost_milliseconds; -use databend_common_metrics::storage::metrics_inc_copy_read_size_bytes; -use futures::future::try_join_all; -use opendal::Operator; - -use crate::parquet2::parquet_reader::MergeIOReadResult; -use crate::parquet2::parquet_reader::OwnerMemory; -use crate::parquet2::parquet_reader::Parquet2Reader; -use crate::parquet2::Parquet2RowGroupPart; -use crate::ReadSettings; - -impl Parquet2Reader { - /// This is an optimized for data read, works like the Linux kernel io-scheduler IO merging. - /// If the distance between two IO request ranges to be read is less than storage_io_min_bytes_for_seek(Default is 48Bytes), - /// will read the range that contains both ranges, thus avoiding extra seek. - /// - /// It will *NOT* merge two requests: - /// if the last io request size is larger than storage_io_page_bytes_for_read(Default is 512KB). - #[async_backtrace::framed] - pub async fn merge_io_read( - read_settings: &ReadSettings, - op: Operator, - location: &str, - raw_ranges: Vec<(usize, Range)>, - ) -> Result { - if raw_ranges.is_empty() { - // shortcut - let read_res = MergeIOReadResult::create( - OwnerMemory::create(vec![]), - raw_ranges.len(), - location.to_string(), - ); - return Ok(read_res); - } - - // Build merged read ranges. - let ranges = raw_ranges - .iter() - .map(|(_, r)| r.clone()) - .collect::>(); - let range_merger = RangeMerger::from_iter( - ranges, - read_settings.max_gap_size, - read_settings.max_range_size, - ); - let merged_ranges = range_merger.ranges(); - - // Read merged range data. - let mut read_handlers = Vec::with_capacity(merged_ranges.len()); - for (idx, range) in merged_ranges.iter().enumerate() { - read_handlers.push(UnlimitedFuture::create(Self::read_range( - op.clone(), - location, - idx, - range.start, - range.end, - ))); - } - let start = Instant::now(); - let owner_memory = OwnerMemory::create(try_join_all(read_handlers).await?); - let mut read_res = - MergeIOReadResult::create(owner_memory, raw_ranges.len(), location.to_string()); - - // Perf. - { - metrics_inc_copy_read_part_cost_milliseconds(start.elapsed().as_millis() as u64); - } - - for (raw_idx, raw_range) in &raw_ranges { - let column_range = raw_range.start..raw_range.end; - - // Find the range index and Range from merged ranges. - let (merged_range_idx, merged_range) = range_merger.get(column_range.clone()).ok_or(ErrorCode::Internal(format!( - "It's a terrible bug, not found raw range:[{:?}], path:{} from merged ranges\n: {:?}", - column_range, location, merged_ranges - )))?; - - // Fetch the raw data for the raw range. - let start = (column_range.start - merged_range.start) as usize; - let end = (column_range.end - merged_range.start) as usize; - let column_id = *raw_idx; - read_res.add_column_chunk(merged_range_idx, column_id, start..end); - } - - Ok(read_res) - } - - #[async_backtrace::framed] - pub async fn read_columns_data_by_merge_io( - &self, - setting: &ReadSettings, - part: &Parquet2RowGroupPart, - operator: &Operator, - ) -> Result { - let mut ranges = vec![]; - for index in self.columns_to_read() { - let meta = &part.column_metas[index]; - ranges.push((*index, meta.offset..(meta.offset + meta.length))); - metrics_inc_copy_read_size_bytes(meta.length); - } - - Self::merge_io_read(setting, operator.clone(), &part.location, ranges).await - } - - #[inline] - #[async_backtrace::framed] - pub async fn read_range( - op: Operator, - path: &str, - index: usize, - start: u64, - end: u64, - ) -> Result<(usize, Vec)> { - let chunk = op.read_with(path).range(start..end).await?; - Ok((index, chunk)) - } -} diff --git a/src/query/storages/parquet/src/parquet2/parquet_reader/reader_merge_io_sync.rs b/src/query/storages/parquet/src/parquet2/parquet_reader/reader_merge_io_sync.rs deleted file mode 100644 index 147efdfb2d01..000000000000 --- a/src/query/storages/parquet/src/parquet2/parquet_reader/reader_merge_io_sync.rs +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::ops::Range; - -use databend_common_base::rangemap::RangeMerger; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_metrics::storage::metrics_inc_copy_read_size_bytes; -use opendal::BlockingOperator; - -use crate::parquet2::parquet_reader::MergeIOReadResult; -use crate::parquet2::parquet_reader::OwnerMemory; -use crate::parquet2::parquet_reader::Parquet2Reader; -use crate::parquet2::Parquet2RowGroupPart; -use crate::ReadSettings; - -impl Parquet2Reader { - pub fn sync_merge_io_read( - read_settings: &ReadSettings, - op: BlockingOperator, - location: &str, - raw_ranges: Vec<(usize, Range)>, - ) -> Result { - let path = location.to_string(); - - // Build merged read ranges. - let ranges = raw_ranges - .iter() - .map(|(_, r)| r.clone()) - .collect::>(); - let range_merger = RangeMerger::from_iter( - ranges, - read_settings.max_gap_size, - read_settings.max_range_size, - ); - let merged_ranges = range_merger.ranges(); - - // Read merged range data. - let mut io_res = Vec::with_capacity(merged_ranges.len()); - for (idx, range) in merged_ranges.iter().enumerate() { - io_res.push(Self::sync_read_range( - op.clone(), - location, - idx, - range.start, - range.end, - )?); - } - - let owner_memory = OwnerMemory::create(io_res); - - let mut read_res = MergeIOReadResult::create(owner_memory, raw_ranges.len(), path.clone()); - - for (raw_idx, raw_range) in &raw_ranges { - let column_id = *raw_idx; - let column_range = raw_range.start..raw_range.end; - - // Find the range index and Range from merged ranges. - let (merged_range_idx, merged_range) = range_merger.get(column_range.clone()).ok_or(ErrorCode::Internal(format!( - "It's a terrible bug, not found raw range:[{:?}], path:{} from merged ranges\n: {:?}", - column_range, path, merged_ranges - )))?; - - // Fetch the raw data for the raw range. - let start = (column_range.start - merged_range.start) as usize; - let end = (column_range.end - merged_range.start) as usize; - read_res.add_column_chunk(merged_range_idx, column_id, start..end); - } - - Ok(read_res) - } - - pub fn sync_read_columns_data_by_merge_io( - &self, - setting: &ReadSettings, - part: &Parquet2RowGroupPart, - operator: &BlockingOperator, - ) -> Result { - let mut ranges = vec![]; - for index in self.columns_to_read() { - let meta = &part.column_metas[index]; - ranges.push((*index, meta.offset..(meta.offset + meta.length))); - metrics_inc_copy_read_size_bytes(meta.length); - } - Self::sync_merge_io_read(setting, operator.clone(), &part.location, ranges) - } - - #[inline] - pub fn sync_read_range( - op: BlockingOperator, - path: &str, - index: usize, - start: u64, - end: u64, - ) -> Result<(usize, Vec)> { - let chunk = op.read_with(path).range(start..end).call()?; - Ok((index, chunk)) - } -} diff --git a/src/query/storages/parquet/src/parquet2/parquet_table/create.rs b/src/query/storages/parquet/src/parquet2/parquet_table/create.rs deleted file mode 100644 index 7302ce402a33..000000000000 --- a/src/query/storages/parquet/src/parquet2/parquet_table/create.rs +++ /dev/null @@ -1,192 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::sync::Arc; - -use databend_common_arrow::arrow::datatypes::Schema as ArrowSchema; -use databend_common_arrow::arrow::io::parquet::read as pread; -use databend_common_arrow::parquet::metadata::FileMetaData; -use databend_common_arrow::parquet::metadata::SchemaDescriptor; -use databend_common_catalog::plan::ParquetReadOptions; -use databend_common_catalog::query_kind::QueryKind; -use databend_common_catalog::table::Parquet2TableColumnStatisticsProvider; -use databend_common_catalog::table::Table; -use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_meta_app::principal::StageInfo; -use databend_common_storage::infer_schema_with_extension; -use databend_common_storage::init_stage_operator; -use databend_common_storage::read_parquet_metas_in_parallel; -use databend_common_storage::StageFileInfo; -use databend_common_storage::StageFilesInfo; -use opendal::Operator; - -use super::table::create_parquet_table_info; -use super::Parquet2Table; -use crate::parquet2::parquet_table::table::create_parquet2_statistics_provider; - -impl Parquet2Table { - #[async_backtrace::framed] - pub async fn create( - ctx: Arc, - stage_info: StageInfo, - files_info: StageFilesInfo, - read_options: ParquetReadOptions, - files_to_read: Option>, - ) -> Result> { - let operator = init_stage_operator(&stage_info)?; - let first_file = match &files_to_read { - Some(files) => files[0].path.clone(), - None => files_info.first_file(&operator).await?.path.clone(), - }; - - let (arrow_schema, schema_descr, compression_ratio) = - Self::prepare_metas(&first_file, operator.clone()).await?; - - // If the query is `COPY`, we don't need to collect column statistics. - // It's because the only transform could be contained in `COPY` command is projection. - let need_stats_provider = !matches!(ctx.get_query_kind(), QueryKind::CopyIntoTable); - let mut table_info = create_parquet_table_info(arrow_schema.clone(), &stage_info); - let column_statistics_provider = if need_stats_provider { - let file_metas = get_parquet2_file_meta( - ctx, - &files_to_read, - &files_info, - &operator, - &schema_descr, - &first_file, - ) - .await?; - let num_rows = file_metas.iter().map(|m| m.num_rows as u64).sum(); - table_info.meta.statistics.number_of_rows = num_rows; - create_parquet2_statistics_provider(file_metas, &arrow_schema)? - } else { - Parquet2TableColumnStatisticsProvider::new(HashMap::new(), 0) - }; - - Ok(Arc::new(Parquet2Table { - table_info, - arrow_schema, - operator, - read_options, - schema_descr, - stage_info, - files_info, - files_to_read, - compression_ratio, - schema_from: first_file, - column_statistics_provider, - })) - } - - #[async_backtrace::framed] - async fn prepare_metas( - path: &str, - operator: Operator, - ) -> Result<(ArrowSchema, SchemaDescriptor, f64)> { - // Infer schema from the first parquet file. - // Assume all parquet files have the same schema. - // If not, throw error during reading. - let mut reader = operator.reader(path).await?; - let first_meta = pread::read_metadata_async(&mut reader).await.map_err(|e| { - ErrorCode::Internal(format!("Read parquet file '{}''s meta error: {}", path, e)) - })?; - let arrow_schema = infer_schema_with_extension(&first_meta)?; - let compression_ratio = get_compression_ratio(&first_meta); - let schema_descr = first_meta.schema_descr; - Ok((arrow_schema, schema_descr, compression_ratio)) - } -} - -pub(super) async fn get_parquet2_file_meta( - ctx: Arc, - files_to_read: &Option>, - files_info: &StageFilesInfo, - operator: &Operator, - expect_schema: &SchemaDescriptor, - schema_from: &str, -) -> Result> { - let locations = match files_to_read { - Some(files) => files - .iter() - .map(|f| (f.path.clone(), f.size)) - .collect::>(), - None => files_info - .list(operator, false, None) - .await? - .into_iter() - .map(|f| (f.path, f.size)) - .collect::>(), - }; - - // Read parquet meta data, async reading. - let max_threads = ctx.get_settings().get_max_threads()? as usize; - let max_memory_usage = ctx.get_settings().get_max_memory_usage()?; - let file_metas = read_parquet_metas_in_parallel( - operator.clone(), - locations.clone(), - max_threads, - max_memory_usage, - ) - .await?; - for (idx, file_meta) in file_metas.iter().enumerate() { - check_parquet_schema( - expect_schema, - file_meta.schema(), - &locations[idx].0, - schema_from, - )?; - } - Ok(file_metas) -} - -pub fn check_parquet_schema( - expect: &SchemaDescriptor, - actual: &SchemaDescriptor, - path: &str, - schema_from: &str, -) -> Result<()> { - if expect.fields() != actual.fields() || expect.columns() != actual.columns() { - return Err(ErrorCode::BadBytes(format!( - "infer schema from '{}', but get diff schema in file '{}'. Expected schema: {:?}, actual: {:?}", - schema_from, path, expect, actual - ))); - } - Ok(()) -} - -pub fn get_compression_ratio(filemeta: &FileMetaData) -> f64 { - let compressed_size: usize = filemeta - .row_groups - .iter() - .map(|g| g.compressed_size()) - .sum(); - let uncompressed_size: usize = filemeta - .row_groups - .iter() - .map(|g| { - g.columns() - .iter() - .map(|c| c.uncompressed_size() as usize) - .sum::() - }) - .sum(); - if compressed_size == 0 { - 1.0 - } else { - (uncompressed_size as f64) / (compressed_size as f64) - } -} diff --git a/src/query/storages/parquet/src/parquet2/parquet_table/mod.rs b/src/query/storages/parquet/src/parquet2/parquet_table/mod.rs deleted file mode 100644 index b4f41955f01c..000000000000 --- a/src/query/storages/parquet/src/parquet2/parquet_table/mod.rs +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod create; -mod partition; -mod read; -mod table; - -pub use read::Parquet2PrewhereInfo; -pub(crate) use table::arrow_to_table_schema; -pub use table::Parquet2Table; diff --git a/src/query/storages/parquet/src/parquet2/parquet_table/partition.rs b/src/query/storages/parquet/src/parquet2/parquet_table/partition.rs deleted file mode 100644 index 26f4cd3a23d6..000000000000 --- a/src/query/storages/parquet/src/parquet2/parquet_table/partition.rs +++ /dev/null @@ -1,162 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use databend_common_catalog::plan::PartStatistics; -use databend_common_catalog::plan::Partitions; -use databend_common_catalog::plan::Projection; -use databend_common_catalog::plan::PushDownInfo; -use databend_common_catalog::query_kind::QueryKind; -use databend_common_catalog::table_context::TableContext; -use databend_common_exception::Result; -use databend_common_functions::BUILTIN_FUNCTIONS; -use databend_storages_common_pruner::RangePrunerCreator; - -use super::table::arrow_to_table_schema; -use super::Parquet2Table; -use crate::parquet2::projection::project_parquet_schema; -use crate::parquet2::pruning::build_column_page_pruners; -use crate::parquet2::pruning::PartitionPruner; - -impl Parquet2Table { - pub(crate) fn create_pruner( - &self, - ctx: Arc, - push_down: Option, - is_small_file: bool, - ) -> Result { - let settings = ctx.get_settings(); - let parquet_fast_read_bytes = if is_small_file { - 0_usize - } else { - settings.get_parquet_fast_read_bytes()? as usize - }; - // `plan.source_info.schema()` is the same as `TableSchema::from(&self.arrow_schema)` - let projection = if let Some(PushDownInfo { - projection: Some(prj), - .. - }) = &push_down - { - prj.clone() - } else { - let indices = (0..self.arrow_schema.fields.len()).collect::>(); - Projection::Columns(indices) - }; - - // Currently, arrow2 doesn't support reading stats of a inner column of a nested type. - // Therefore, if there is inner fields in projection, we skip the row group pruning. - let skip_pruning = matches!(projection, Projection::InnerColumns(_)); - - // Use `projected_column_nodes` to collect stats from row groups for pruning. - // `projected_column_nodes` contains the smallest column set that is needed for the query. - // Use `projected_arrow_schema` to create `row_group_pruner` (`RangePruner`). - // - // During pruning evaluation, - // `RangePruner` will use field name to find the offset in the schema, - // and use the offset to find the column stat from `StatisticsOfColumns` (HashMap). - // - // How the stats are collected can be found in `ParquetReader::collect_row_group_stats`. - let (projected_arrow_schema, projected_column_nodes, _, columns_to_read) = - project_parquet_schema(&self.arrow_schema, &self.schema_descr, &projection)?; - let schema = Arc::new(arrow_to_table_schema(projected_arrow_schema)); - - let filter = push_down.as_ref().and_then(|extra| { - extra - .filters - .as_ref() - .map(|f| f.filter.as_expr(&BUILTIN_FUNCTIONS)) - }); - - let inverted_filter = push_down.as_ref().and_then(|extra| { - extra - .filters - .as_ref() - .map(|f| f.inverted_filter.as_expr(&BUILTIN_FUNCTIONS)) - }); - - let func_ctx = ctx.get_function_context()?; - - let row_group_pruner = if self.read_options.prune_row_groups() { - let p1 = RangePrunerCreator::try_create(func_ctx.clone(), &schema, filter.as_ref())?; - let p2 = RangePrunerCreator::try_create( - func_ctx.clone(), - &schema, - inverted_filter.as_ref(), - )?; - Some((p1, p2)) - } else { - None - }; - - let page_pruners = if self.read_options.prune_pages() && filter.is_some() { - Some(build_column_page_pruners( - func_ctx, - &schema, - filter.as_ref().unwrap(), - )?) - } else { - None - }; - - Ok(PartitionPruner { - schema, - schema_descr: self.schema_descr.clone(), - schema_from: self.schema_from.clone(), - row_group_pruner, - page_pruners, - columns_to_read, - column_nodes: projected_column_nodes, - skip_pruning, - parquet_fast_read_bytes, - compression_ratio: self.compression_ratio, - max_memory_usage: settings.get_max_memory_usage()?, - }) - } - - #[inline] - #[async_backtrace::framed] - pub(super) async fn do_read_partitions( - &self, - ctx: Arc, - push_down: Option, - ) -> Result<(PartStatistics, Partitions)> { - let pruner = self.create_pruner(ctx.clone(), push_down.clone(), false)?; - - let file_locations = match &self.files_to_read { - Some(files) => files - .iter() - .map(|f| (f.path.clone(), f.size)) - .collect::>(), - None => if self.operator.info().native_capability().blocking { - self.files_info.blocking_list(&self.operator, false, None) - } else { - self.files_info.list(&self.operator, false, None).await - }? - .into_iter() - .map(|f| (f.path, f.size)) - .collect::>(), - }; - - pruner - .read_and_prune_partitions( - self.operator.clone(), - &file_locations, - ctx.get_settings().get_max_threads()? as usize, - &ctx.get_copy_status(), - matches!(ctx.get_query_kind(), QueryKind::CopyIntoTable), - ) - .await - } -} diff --git a/src/query/storages/parquet/src/parquet2/parquet_table/read.rs b/src/query/storages/parquet/src/parquet2/parquet_table/read.rs deleted file mode 100644 index 56a613660852..000000000000 --- a/src/query/storages/parquet/src/parquet2/parquet_table/read.rs +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use databend_common_catalog::plan::DataSourcePlan; -use databend_common_catalog::plan::PushDownInfo; -use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::DataSchema; -use databend_common_expression::DataSchemaRefExt; -use databend_common_expression::Expr; -use databend_common_expression::FunctionContext; -use databend_common_expression::RemoteExpr; -use databend_common_expression::TableSchemaRef; -use databend_common_functions::BUILTIN_FUNCTIONS; -use databend_common_pipeline_core::Pipeline; - -use super::Parquet2Table; -use crate::parquet2::parquet_reader::Parquet2Reader; -use crate::parquet2::processors::AsyncParquet2Source; -use crate::parquet2::processors::Parquet2DeserializeTransform; -use crate::parquet2::processors::SyncParquet2Source; -use crate::utils::calc_parallelism; - -#[derive(Clone)] -pub struct Parquet2PrewhereInfo { - pub func_ctx: FunctionContext, - pub reader: Arc, - pub filter: Expr, -} - -impl Parquet2Table { - fn build_filter(filter: &RemoteExpr, schema: &DataSchema) -> Expr { - filter - .as_expr(&BUILTIN_FUNCTIONS) - .project_column_ref(|name| schema.index_of(name).unwrap()) - } - - #[inline] - pub(super) fn do_read_data( - &self, - ctx: Arc, - plan: &DataSourcePlan, - pipeline: &mut Pipeline, - ) -> Result<()> { - let table_schema: TableSchemaRef = self.table_info.schema(); - let source_projection = - PushDownInfo::projection_of_push_downs(&table_schema, plan.push_downs.as_ref()); - - // The front of the src_fields are prewhere columns (if exist). - // The back of the src_fields are remain columns. - let mut src_fields = Vec::with_capacity(source_projection.len()); - - // The schema of the data block `read_data` output. - let output_schema: Arc = Arc::new(plan.schema().into()); - - // Build the reader for parquet source. - let source_reader = Parquet2Reader::create( - self.operator.clone(), - &self.arrow_schema, - &self.schema_descr, - source_projection, - )?; - - // Build prewhere info. - let push_down_prewhere = PushDownInfo::prewhere_of_push_downs(plan.push_downs.as_ref()); - - // Build remain reader. - // If there is no prewhere filter, remain reader is the same as source reader (no prewhere phase, deserialize directly). - let remain_reader = if let Some(p) = &push_down_prewhere { - Parquet2Reader::create( - self.operator.clone(), - &self.arrow_schema, - &self.schema_descr, - p.remain_columns.clone(), - )? - } else { - source_reader.clone() - }; - - let prewhere_info = push_down_prewhere - .map(|p| { - let reader = Parquet2Reader::create( - self.operator.clone(), - &self.arrow_schema, - &self.schema_descr, - p.prewhere_columns, - )?; - src_fields.extend_from_slice(reader.output_schema.fields()); - let filter = Self::build_filter(&p.filter, &reader.output_schema); - let func_ctx = ctx.get_function_context()?; - Ok::<_, ErrorCode>(Parquet2PrewhereInfo { - func_ctx, - reader, - filter, - }) - }) - .transpose()?; - - src_fields.extend_from_slice(remain_reader.output_schema.fields()); - let src_schema = DataSchemaRefExt::create(src_fields); - let is_blocking = self.operator.info().native_capability().blocking; - - let num_deserializer = calc_parallelism(&ctx, plan)?; - if is_blocking { - pipeline.add_source( - |output| SyncParquet2Source::create(ctx.clone(), output, source_reader.clone()), - num_deserializer, - )?; - } else { - pipeline.add_source( - |output| AsyncParquet2Source::create(ctx.clone(), output, source_reader.clone()), - num_deserializer, - )?; - }; - - pipeline.add_transform(|input, output| { - Parquet2DeserializeTransform::create( - ctx.clone(), - input, - output, - src_schema.clone(), - output_schema.clone(), - prewhere_info.clone(), - source_reader.clone(), - remain_reader.clone(), - Arc::new(self.create_pruner(ctx.clone(), plan.push_downs.clone(), true)?), - ) - }) - } -} diff --git a/src/query/storages/parquet/src/parquet2/partition.rs b/src/query/storages/parquet/src/parquet2/partition.rs deleted file mode 100644 index 936315abf3c3..000000000000 --- a/src/query/storages/parquet/src/parquet2/partition.rs +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use databend_common_arrow::parquet::compression::Compression; -use databend_common_arrow::parquet::indexes::Interval; -use databend_common_expression::FieldIndex; -use databend_common_expression::Scalar; - -#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Clone, Debug)] -pub struct ColumnMeta { - pub offset: u64, - pub length: u64, - pub num_values: i64, - pub compression: Compression, - pub uncompressed_size: u64, - pub min_max: Option<(Scalar, Scalar)>, - - // if has dictionary, we can not push down predicate to deserialization. - pub has_dictionary: bool, -} - -#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Debug, Clone)] -pub struct Parquet2RowGroupPart { - pub location: String, - pub num_rows: usize, - pub column_metas: HashMap, - pub row_selection: Option>, - - pub sort_min_max: Option<(Scalar, Scalar)>, -} - -impl Parquet2RowGroupPart { - pub fn uncompressed_size(&self) -> u64 { - self.column_metas - .values() - .map(|c| c.uncompressed_size) - .sum() - } - - pub fn compressed_size(&self) -> u64 { - self.column_metas.values().map(|c| c.length).sum() - } -} diff --git a/src/query/storages/parquet/src/parquet2/processors/mod.rs b/src/query/storages/parquet/src/parquet2/processors/mod.rs deleted file mode 100644 index b1765f454517..000000000000 --- a/src/query/storages/parquet/src/parquet2/processors/mod.rs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod deserialize_transform; -mod source; - -pub use deserialize_transform::Parquet2DeserializeTransform; -pub use source::AsyncParquet2Source; -pub use source::SyncParquet2Source; diff --git a/src/query/storages/parquet/src/parquet2/processors/source.rs b/src/query/storages/parquet/src/parquet2/processors/source.rs deleted file mode 100644 index 3b6cb9296c75..000000000000 --- a/src/query/storages/parquet/src/parquet2/processors/source.rs +++ /dev/null @@ -1,195 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::fmt::Debug; -use std::fmt::Formatter; -use std::sync::Arc; - -use databend_common_catalog::plan::PartInfoPtr; -use databend_common_catalog::table_context::TableContext; -use databend_common_exception::Result; -use databend_common_expression::BlockMetaInfo; -use databend_common_expression::DataBlock; -use databend_common_metrics::storage::*; -use databend_common_pipeline_core::processors::Event; -use databend_common_pipeline_core::processors::OutputPort; -use databend_common_pipeline_core::processors::Processor; -use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_common_pipeline_sources::SyncSource; -use databend_common_pipeline_sources::SyncSourcer; -use serde::Deserializer; -use serde::Serializer; - -use crate::parquet2::parquet_reader::Parquet2PartData; -use crate::parquet2::parquet_reader::Parquet2Reader; -use crate::ParquetPart; - -pub struct Parquet2SourceMeta { - pub parts: Vec<(PartInfoPtr, Parquet2PartData)>, -} - -impl Debug for Parquet2SourceMeta { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ParquetSourceMeta") - .field( - "part", - &self.parts.iter().map(|(p, _)| p).collect::>(), - ) - .finish() - } -} - -impl serde::Serialize for Parquet2SourceMeta { - fn serialize(&self, _: S) -> databend_common_exception::Result - where S: Serializer { - unimplemented!("Unimplemented serialize ParquetSourceMeta") - } -} - -impl<'de> serde::Deserialize<'de> for Parquet2SourceMeta { - fn deserialize(_: D) -> databend_common_exception::Result - where D: Deserializer<'de> { - unimplemented!("Unimplemented deserialize ParquetSourceMeta") - } -} - -#[typetag::serde(name = "parquet_source")] -impl BlockMetaInfo for Parquet2SourceMeta { - fn equals(&self, _: &Box) -> bool { - unimplemented!("Unimplemented equals ParquetSourceMeta") - } - - fn clone_self(&self) -> Box { - unimplemented!("Unimplemented clone ParquetSourceMeta") - } -} - -pub struct SyncParquet2Source { - ctx: Arc, - block_reader: Arc, -} - -impl SyncParquet2Source { - pub fn create( - ctx: Arc, - output: Arc, - block_reader: Arc, - ) -> Result { - SyncSourcer::create(ctx.clone(), output, SyncParquet2Source { - ctx, - block_reader, - }) - } -} - -impl SyncSource for SyncParquet2Source { - const NAME: &'static str = "SyncParquetSource"; - - fn generate(&mut self) -> Result> { - match self.ctx.get_partition() { - None => Ok(None), - Some(part) => { - let part_clone = part.clone(); - let data = self - .block_reader - .readers_from_blocking_io(self.ctx.clone(), part)?; - metrics_inc_copy_read_part_counter(); - Ok(Some(DataBlock::empty_with_meta(Box::new( - Parquet2SourceMeta { - parts: vec![(part_clone, data)], - }, - )))) - } - } - } -} - -pub struct AsyncParquet2Source { - finished: bool, - ctx: Arc, - block_reader: Arc, - - output: Arc, - output_data: Option>, -} - -impl AsyncParquet2Source { - pub fn create( - ctx: Arc, - output: Arc, - block_reader: Arc, - ) -> Result { - Ok(ProcessorPtr::create(Box::new(AsyncParquet2Source { - ctx, - output, - block_reader, - finished: false, - output_data: None, - }))) - } -} - -#[async_trait::async_trait] -impl Processor for AsyncParquet2Source { - fn name(&self) -> String { - String::from("AsyncParquetSource") - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - if self.finished { - self.output.finish(); - return Ok(Event::Finished); - } - - if self.output.is_finished() { - return Ok(Event::Finished); - } - - if !self.output.can_push() { - return Ok(Event::NeedConsume); - } - - if let Some(parts) = self.output_data.take() { - let output = DataBlock::empty_with_meta(Box::new(Parquet2SourceMeta { parts })); - self.output.push_data(Ok(output)); - } - - Ok(Event::Async) - } - - #[async_backtrace::framed] - async fn async_process(&mut self) -> Result<()> { - let parts = self.ctx.get_partitions(1); - - if !parts.is_empty() { - let part = parts[0].clone(); - let parquet_part = ParquetPart::from_part(&part)?; - let block_reader = self.block_reader.clone(); - let data = block_reader - .readers_from_non_blocking_io(self.ctx.clone(), parquet_part) - .await?; - metrics_inc_copy_read_part_counter(); - self.output_data = Some(vec![(part, data)]); - } else { - self.finished = true; - self.output_data = None; - } - Ok(()) - } -} diff --git a/src/query/storages/parquet/src/parquet2/projection.rs b/src/query/storages/parquet/src/parquet2/projection.rs deleted file mode 100644 index 434f58eebcac..000000000000 --- a/src/query/storages/parquet/src/parquet2/projection.rs +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::collections::HashSet; - -use databend_common_arrow::arrow::datatypes::Schema as ArrowSchema; -use databend_common_arrow::parquet::metadata::ColumnDescriptor; -use databend_common_arrow::parquet::metadata::SchemaDescriptor; -use databend_common_arrow::schema_projection as ap; -use databend_common_catalog::plan::Projection; -use databend_common_exception::Result; -use databend_common_expression::FieldIndex; -use databend_common_storage::ColumnNodes; - -#[allow(clippy::type_complexity)] -pub fn project_parquet_schema( - schema: &ArrowSchema, - schema_descr: &SchemaDescriptor, - projection: &Projection, -) -> Result<( - ArrowSchema, - ColumnNodes, - HashMap, - HashSet, -)> { - // Full schema and column leaves. - - let column_nodes = ColumnNodes::new_from_schema(schema, None); - // Project schema - let projected_arrow_schema = match projection { - Projection::Columns(indices) => ap::project(schema, indices), - Projection::InnerColumns(path_indices) => ap::inner_project(schema, path_indices), - }; - // Project column leaves - let projected_column_nodes = ColumnNodes { - column_nodes: projection - .project_column_nodes(&column_nodes)? - .iter() - .map(|&leaf| leaf.clone()) - .collect(), - }; - let column_nodes = &projected_column_nodes.column_nodes; - // Project column descriptors and collect columns to read - let mut projected_column_descriptors = HashMap::with_capacity(column_nodes.len()); - let mut columns_to_read = HashSet::with_capacity( - column_nodes - .iter() - .map(|leaf| leaf.leaf_indices.len()) - .sum(), - ); - for column_node in column_nodes { - for index in &column_node.leaf_indices { - columns_to_read.insert(*index); - projected_column_descriptors.insert(*index, schema_descr.columns()[*index].clone()); - } - } - Ok(( - projected_arrow_schema, - projected_column_nodes, - projected_column_descriptors, - columns_to_read, - )) -} diff --git a/src/query/storages/parquet/src/parquet2/pruning.rs b/src/query/storages/parquet/src/parquet2/pruning.rs deleted file mode 100644 index 687a71dd91a4..000000000000 --- a/src/query/storages/parquet/src/parquet2/pruning.rs +++ /dev/null @@ -1,914 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::collections::HashSet; -use std::io::Read; -use std::io::Seek; -use std::sync::Arc; - -use databend_common_arrow::arrow::datatypes::Field as ArrowField; -use databend_common_arrow::arrow::io::parquet::read::get_field_pages; -use databend_common_arrow::arrow::io::parquet::read::indexes::compute_page_row_intervals; -use databend_common_arrow::arrow::io::parquet::read::indexes::read_columns_indexes; -use databend_common_arrow::arrow::io::parquet::read::indexes::FieldPageStatistics; -use databend_common_arrow::parquet::indexes::Interval; -use databend_common_arrow::parquet::metadata::FileMetaData; -use databend_common_arrow::parquet::metadata::RowGroupMetaData; -use databend_common_arrow::parquet::metadata::SchemaDescriptor; -use databend_common_arrow::parquet::read::read_metadata_with_size; -use databend_common_arrow::parquet::read::read_pages_locations; -use databend_common_catalog::plan::PartInfo; -use databend_common_catalog::plan::PartStatistics; -use databend_common_catalog::plan::Partitions; -use databend_common_catalog::plan::PartitionsShuffleKind; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::Expr; -use databend_common_expression::FieldIndex; -use databend_common_expression::FunctionContext; -use databend_common_expression::TableSchemaRef; -use databend_common_storage::read_parquet_metas_in_parallel; -use databend_common_storage::ColumnNodes; -use databend_common_storage::CopyStatus; -use databend_common_storage::FileStatus; -use databend_storages_common_pruner::RangePruner; -use databend_storages_common_pruner::RangePrunerCreator; -use log::info; -use opendal::Operator; - -use super::partition::ColumnMeta; -use super::Parquet2RowGroupPart; -use crate::parquet2::statistics::collect_row_group_stats; -use crate::parquet2::statistics::BatchStatistics; -use crate::parquet_part::collect_small_file_parts; -use crate::parquet_part::ParquetPart; - -/// Prune parquet row groups and pages. -pub struct PartitionPruner { - /// Table schema. - pub schema: TableSchemaRef, - pub schema_descr: SchemaDescriptor, - pub schema_from: String, - /// Pruner to prune row groups. (filter & inverted filter) - pub row_group_pruner: Option<( - Arc, - Arc, - )>, - /// Pruners to prune pages. - pub page_pruners: Option, - /// The projected column indices. - pub columns_to_read: HashSet, - /// The projected column nodes. - pub column_nodes: ColumnNodes, - /// Whether to skip pruning. - pub skip_pruning: bool, - // TODO: use limit information for pruning - // /// Limit of this query. If there is order by and filter, it will not be used (assign to `usize::MAX`). - // pub limit: usize, - pub parquet_fast_read_bytes: usize, - pub compression_ratio: f64, - pub max_memory_usage: u64, -} - -impl PartitionPruner { - pub fn prune_one_file( - &self, - path: &str, - op: &Operator, - file_size: u64, - ) -> Result> { - let blocking_op = op.blocking(); - let mut reader = blocking_op.reader(path)?; - let file_meta = read_metadata_with_size(&mut reader, file_size).map_err(|e| { - ErrorCode::Internal(format!("Read parquet file '{}''s meta error: {}", path, e)) - })?; - let (_, parts) = self.read_and_prune_file_meta(path, file_meta, op.clone())?; - Ok(parts) - } - - #[async_backtrace::framed] - pub fn read_and_prune_file_meta( - &self, - path: &str, - file_meta: FileMetaData, - operator: Operator, - ) -> Result<(PartStatistics, Vec)> { - let mut stats = PartStatistics::default(); - let mut partitions = vec![]; - - let is_blocking_io = operator.info().native_capability().blocking; - let mut row_group_pruned = vec![false; file_meta.row_groups.len()]; - - let no_stats = file_meta.row_groups.iter().any(|r| { - r.columns() - .iter() - .any(|c| c.metadata().statistics.is_none()) - }); - - let _row_group_stats = if no_stats { - None - } else if self.row_group_pruner.is_some() && !self.skip_pruning { - let (pruner, _) = self.row_group_pruner.as_ref().unwrap(); - // If collecting stats fails or `should_keep` is true, we still read the row group. - // Otherwise, the row group will be pruned. - if let Ok(row_group_stats) = - collect_row_group_stats(&self.column_nodes, &file_meta.row_groups) - { - for (idx, (stats, _rg)) in row_group_stats - .iter() - .zip(file_meta.row_groups.iter()) - .enumerate() - { - row_group_pruned[idx] = !pruner.should_keep(stats, None); - } - Some(row_group_stats) - } else { - None - } - } else { - None - }; - - for (rg_idx, rg) in file_meta.row_groups.iter().enumerate() { - if row_group_pruned[rg_idx] { - continue; - } - - stats.read_rows += rg.num_rows(); - stats.read_bytes += rg.total_byte_size(); - stats.partitions_scanned += 1; - - // Currently, only blocking io is allowed to prune pages. - let row_selection = if self.page_pruners.is_some() - && is_blocking_io - && rg.columns().iter().all(|c| { - c.column_chunk().column_index_offset.is_some() - && c.column_chunk().column_index_length.is_some() - }) { - let mut reader = operator.blocking().reader(path)?; - self.page_pruners - .as_ref() - .map(|pruners| filter_pages(&mut reader, &self.schema, rg, pruners)) - .transpose() - .unwrap_or(None) - } else { - None - }; - - let mut column_metas = HashMap::with_capacity(self.columns_to_read.len()); - for index in self.columns_to_read.iter() { - let c = &rg.columns()[*index]; - let (offset, length) = c.byte_range(); - - column_metas.insert(*index, ColumnMeta { - offset, - length, - num_values: c.num_values(), - compression: c.compression(), - uncompressed_size: c.uncompressed_size() as u64, - min_max: None, - has_dictionary: c.dictionary_page_offset().is_some(), - }); - } - - partitions.push(Parquet2RowGroupPart { - location: path.to_string(), - num_rows: rg.num_rows(), - column_metas, - row_selection, - sort_min_max: None, - }) - } - Ok((stats, partitions)) - } - - /// Try to read parquet meta to generate row-group-wise partitions. - /// And prune row groups an pages to generate the final row group partitions. - #[async_backtrace::framed] - pub async fn read_and_prune_partitions( - &self, - operator: Operator, - locations: &Vec<(String, u64)>, - num_threads: usize, - copy_status: &Arc, - is_copy: bool, - ) -> Result<(PartStatistics, Partitions)> { - // part stats - let mut stats = PartStatistics::default(); - - let mut large_files = vec![]; - let mut small_files = vec![]; - for (location, size) in locations { - if *size > self.parquet_fast_read_bytes as u64 { - large_files.push((location.clone(), *size)); - } else { - small_files.push((location.clone(), *size)); - } - } - - let mut partitions = Vec::with_capacity(locations.len()); - - // 1. Read parquet meta data. Distinguish between sync and async reading. - let file_metas = read_parquet_metas_in_parallel( - operator.clone(), - large_files.clone(), - num_threads, - self.max_memory_usage, - ) - .await?; - - // 2. Use file meta to prune row groups or pages. - let mut max_compression_ratio = self.compression_ratio; - let mut max_compressed_size = 0u64; - - for (file_id, file_meta) in file_metas.into_iter().enumerate() { - let path = &large_files[file_id].0; - if is_copy { - copy_status.add_chunk(path, FileStatus { - num_rows_loaded: file_meta.num_rows, - error: None, - }); - } - stats.partitions_total += file_meta.row_groups.len(); - let (sub_stats, parts) = - self.read_and_prune_file_meta(path, file_meta, operator.clone())?; - - for p in parts { - max_compression_ratio = max_compression_ratio - .max(p.uncompressed_size() as f64 / p.compressed_size() as f64); - max_compressed_size = max_compressed_size.max(p.compressed_size()); - partitions.push(Arc::new( - Box::new(ParquetPart::Parquet2RowGroup(p)) as Box - )); - } - stats.merge(&sub_stats); - } - - let num_large_partitions = partitions.len(); - let mut partitions = Partitions::create_nolazy(PartitionsShuffleKind::Mod, partitions); - - // If there are only row group parts, the `stats` is exact. - // It will be changed to `false` if there are small files parts. - if small_files.is_empty() { - stats.is_exact = true; - } else { - stats.is_exact = false; - collect_small_file_parts( - small_files, - max_compression_ratio, - max_compressed_size, - &mut partitions, - &mut stats, - self.columns_to_read.len(), - ); - } - - info!( - "copy {num_large_partitions} large partitions and {} small partitions.", - partitions.len() - num_large_partitions - ); - - Ok((stats, partitions)) - } -} - -/// [`RangePruner`]s for each column -type ColumnRangePruners = Vec<(usize, Arc)>; - -/// Build page pruner of each column. -/// Only one column expression can be used to build the page pruner. -pub fn build_column_page_pruners( - func_ctx: FunctionContext, - schema: &TableSchemaRef, - filter: &Expr, -) -> Result { - let mut results = vec![]; - for (column, _) in filter.column_refs() { - let col_idx = schema.index_of(&column)?; - let range_pruner = RangePrunerCreator::try_create(func_ctx.clone(), schema, Some(filter))?; - results.push((col_idx, range_pruner)); - } - Ok(results) -} - -/// Filter pages by filter expression. -/// -/// Returns the final selection of rows. -fn filter_pages( - reader: &mut R, - schema: &TableSchemaRef, - row_group: &RowGroupMetaData, - pruners: &ColumnRangePruners, -) -> Result> { - let mut fields = Vec::with_capacity(pruners.len()); - for (col_idx, _) in pruners { - let field: ArrowField = schema.field(*col_idx).into(); - fields.push(field); - } - - let num_rows = row_group.num_rows(); - - // one vec per column - let locations = read_pages_locations(reader, row_group.columns())?; - // one Vec> per field (non-nested contain a single entry on the first column) - let locations = fields - .iter() - .map(|field| get_field_pages(row_group.columns(), &locations, &field.name)) - .collect::>(); - - // one ColumnPageStatistics per field - let page_stats = read_columns_indexes(reader, row_group.columns(), &fields)?; - - let intervals = locations - .iter() - .map(|locations| { - locations - .iter() - .map(|locations| Ok(compute_page_row_intervals(locations, num_rows)?)) - .collect::>>() - }) - .collect::>>()?; - - // Currently, only non-nested types are supported. - let mut row_selections = Vec::with_capacity(pruners.len()); - for (i, (col_offset, pruner)) in pruners.iter().enumerate() { - let stat = &page_stats[i]; - let page_intervals = &intervals[i][0]; - let data_type = schema.field(*col_offset).data_type(); - - let mut row_selection = vec![]; - match stat { - FieldPageStatistics::Single(stats) => { - let stats = BatchStatistics::from_column_statistics(stats, &data_type.into())?; - for (page_num, intv) in page_intervals.iter().enumerate() { - let stat = stats.get(page_num); - if pruner.should_keep(&HashMap::from([(*col_offset as u32, stat)]), None) { - row_selection.push(*intv); - } - } - } - _ => { - return Err(ErrorCode::Internal( - "Only non-nested types are supported in page filter.", - )); - } - } - row_selections.push(row_selection); - } - - Ok(combine_intervals(row_selections)) -} - -/// Combine row selection of each column into a final selection of the whole row group. -fn combine_intervals(row_selections: Vec>) -> Vec { - if row_selections.is_empty() { - return vec![]; - } - let mut selection = row_selections[0].clone(); - for sel in row_selections.iter().skip(1) { - selection = and_intervals(&selection, sel); - } - - // Merge intervals if they are consecutive - let mut res = vec![]; - for sel in selection { - if res.is_empty() { - res.push(sel); - continue; - } - let back = res.last_mut().unwrap(); - if back.start + back.length == sel.start { - back.length += sel.length; - } else { - res.push(sel); - } - } - - res -} - -/// Do "and" operation on two row selections. -/// Select the rows which both `sel1` and `sel2` select. -fn and_intervals(sel1: &[Interval], sel2: &[Interval]) -> Vec { - let mut res = vec![]; - - for sel in sel1 { - res.extend(is_in(*sel, sel2)); - } - res -} - -/// If `probe` overlaps with `intervals`, -/// return the overlapping part of `probe` in `intervals`. -/// Otherwise, return an empty vector. -fn is_in(probe: Interval, intervals: &[Interval]) -> Vec { - intervals - .iter() - .filter_map(|interval| { - let interval_end = interval.start + interval.length; - let probe_end = probe.start + probe.length; - let overlaps = (probe.start < interval_end) && (probe_end > interval.start); - if overlaps { - let start = interval.start.max(probe.start); - let end = interval_end.min(probe_end); - Some(Interval::new(start, end - start)) - } else { - None - } - }) - .collect() -} - -#[cfg(test)] -mod tests { - use std::io::Cursor; - - use databend_common_arrow::parquet::compression::CompressionOptions; - use databend_common_arrow::parquet::encoding::hybrid_rle::encode_bool; - use databend_common_arrow::parquet::encoding::Encoding; - use databend_common_arrow::parquet::indexes::Interval; - use databend_common_arrow::parquet::metadata::Descriptor; - use databend_common_arrow::parquet::metadata::SchemaDescriptor; - use databend_common_arrow::parquet::page::DataPage; - use databend_common_arrow::parquet::page::DataPageHeader; - use databend_common_arrow::parquet::page::DataPageHeaderV1; - use databend_common_arrow::parquet::page::Page; - use databend_common_arrow::parquet::read::read_metadata; - use databend_common_arrow::parquet::schema::types::ParquetType; - use databend_common_arrow::parquet::schema::types::PhysicalType; - use databend_common_arrow::parquet::statistics::serialize_statistics; - use databend_common_arrow::parquet::statistics::PrimitiveStatistics; - use databend_common_arrow::parquet::statistics::Statistics; - use databend_common_arrow::parquet::types::NativeType; - use databend_common_arrow::parquet::write::Compressor; - use databend_common_arrow::parquet::write::DynIter; - use databend_common_arrow::parquet::write::DynStreamingIterator; - use databend_common_arrow::parquet::write::FileWriter; - use databend_common_arrow::parquet::write::Version; - use databend_common_arrow::parquet::write::WriteOptions; - use databend_common_exception::Result; - use databend_common_expression::types::DataType; - use databend_common_expression::types::NumberDataType; - use databend_common_expression::types::NumberScalar; - use databend_common_expression::FunctionContext; - use databend_common_expression::Scalar; - use databend_common_expression::TableDataType; - use databend_common_expression::TableField; - use databend_common_expression::TableSchemaRef; - use databend_common_expression::TableSchemaRefExt; - use databend_common_sql::plans::BoundColumnRef; - use databend_common_sql::plans::ConstantExpr; - use databend_common_sql::plans::FunctionCall; - use databend_common_sql::plans::ScalarExpr; - use databend_common_sql::ColumnBindingBuilder; - use databend_common_sql::Visibility; - use databend_common_storage::ColumnNodes; - use databend_storages_common_pruner::RangePrunerCreator; - - use crate::parquet2::pruning::and_intervals; - use crate::parquet2::pruning::build_column_page_pruners; - use crate::parquet2::pruning::combine_intervals; - use crate::parquet2::pruning::filter_pages; - use crate::parquet2::statistics::collect_row_group_stats; - - #[test] - fn test_and_intervals() { - // [12, 35), [38, 43) - let sel1 = vec![Interval::new(12, 23), Interval::new(38, 5)]; - // [0, 5), [9, 24), [30, 40) - let sel2 = vec![ - Interval::new(0, 5), - Interval::new(9, 15), - Interval::new(30, 10), - ]; - - // [12, 24), [30, 35), [38, 40) - let expected = vec![ - Interval::new(12, 12), - Interval::new(30, 5), - Interval::new(38, 2), - ]; - let actual = and_intervals(&sel1, &sel2); - - assert_eq!(expected, actual); - } - - #[test] - fn test_combine_intervals() { - { - // sel1: [12, 35), [38, 43) - // sel2: [0, 5), [9, 24), [30, 40) - // sel3: [1,2), [4, 31), [30, 41) - let intervals = vec![ - vec![Interval::new(12, 23), Interval::new(38, 5)], - vec![ - Interval::new(0, 5), - Interval::new(9, 15), - Interval::new(30, 10), - ], - vec![ - Interval::new(1, 1), - Interval::new(4, 27), - Interval::new(39, 2), - ], - ]; - - // [12, 24), [30, 31), [39, 40) - let expected = vec![ - Interval::new(12, 12), - Interval::new(30, 1), - Interval::new(39, 1), - ]; - - let actual = combine_intervals(intervals); - - assert_eq!(expected, actual); - } - - { - // sel1: [1,2), [2, 4), [4, 7) - let intervals = vec![vec![ - Interval::new(1, 1), - Interval::new(2, 2), - Interval::new(4, 3), - ]]; - - // [12, 24), [30, 31), [39, 40) - let expected = vec![Interval::new(1, 6)]; - - let actual = combine_intervals(intervals); - - assert_eq!(expected, actual); - } - } - - fn unzip_option( - array: &[Option], - ) -> databend_common_arrow::parquet::error::Result<(Vec, Vec)> { - // leave the first 4 bytes announcing the length of the def level - // this will be overwritten at the end, once the length is known. - // This is unknown at this point because of the uleb128 encoding, - // whose length is variable. - let mut validity = std::io::Cursor::new(vec![0; 4]); - validity.set_position(4); - - let mut values = vec![]; - let iter = array.iter().map(|value| { - if let Some(item) = value { - values.extend_from_slice(item.to_le_bytes().as_ref()); - true - } else { - false - } - }); - encode_bool(&mut validity, iter)?; - - // write the length, now that it is known - let mut validity = validity.into_inner(); - let length = validity.len() - 4; - // todo: pay this small debt (loop?) - let length = length.to_le_bytes(); - validity[0] = length[0]; - validity[1] = length[1]; - validity[2] = length[2]; - validity[3] = length[3]; - - Ok((values, validity)) - } - - pub fn array_to_page_v1( - array: &[Option], - options: &WriteOptions, - descriptor: &Descriptor, - ) -> databend_common_arrow::parquet::error::Result { - let (values, mut buffer) = unzip_option(array)?; - - buffer.extend_from_slice(&values); - - let statistics = if options.write_statistics { - let statistics = &PrimitiveStatistics { - primitive_type: descriptor.primitive_type.clone(), - null_count: Some((array.len() - array.iter().flatten().count()) as i64), - distinct_count: None, - max_value: array.iter().flatten().max_by(|x, y| x.ord(y)).copied(), - min_value: array.iter().flatten().min_by(|x, y| x.ord(y)).copied(), - } as &dyn Statistics; - Some(serialize_statistics(statistics)) - } else { - None - }; - - let header = DataPageHeaderV1 { - num_values: array.len() as i32, - encoding: Encoding::Plain.into(), - definition_level_encoding: Encoding::Rle.into(), - repetition_level_encoding: Encoding::Rle.into(), - statistics, - }; - - Ok(Page::Data(DataPage::new( - DataPageHeader::V1(header), - buffer, - descriptor.clone(), - Some(array.len()), - ))) - } - - fn write_test_parquet() -> Result<(TableSchemaRef, Vec)> { - let page1 = vec![Some(0), Some(1), None, Some(3), Some(4), Some(5), Some(6)]; - let page2 = vec![Some(10), Some(11)]; - - let options = WriteOptions { - write_statistics: true, - version: Version::V1, - }; - - let schema = SchemaDescriptor::new("schema".to_string(), vec![ParquetType::from_physical( - "col1".to_string(), - PhysicalType::Int32, - )]); - - let pages = vec![ - array_to_page_v1::(&page1, &options, &schema.columns()[0].descriptor), - array_to_page_v1::(&page2, &options, &schema.columns()[0].descriptor), - ]; - - let pages = DynStreamingIterator::new(Compressor::new( - DynIter::new(pages.into_iter()), - CompressionOptions::Uncompressed, - vec![], - )); - let columns = std::iter::once(Ok(pages)); - - let writer = Cursor::new(vec![]); - let mut writer = FileWriter::new(writer, schema, options, None); - - writer.write(DynIter::new(columns))?; - writer.end(None)?; - - Ok(( - TableSchemaRefExt::create(vec![TableField::new( - "col1", - TableDataType::Number(NumberDataType::Int32), - )]), - writer.into_inner().into_inner(), - )) - } - - #[test] - fn test_prune_row_group() -> Result<()> { - let (schema, data) = write_test_parquet()?; - let mut reader = Cursor::new(data); - let metadata = read_metadata(&mut reader)?; - let rgs = metadata.row_groups; - let arrow_schema = schema.as_ref().into(); - let column_nodes = ColumnNodes::new_from_schema(&arrow_schema, None); - - let row_group_stats = collect_row_group_stats(&column_nodes, &rgs)?; - - // col1 > 12 - { - let filter = ScalarExpr::FunctionCall(FunctionCall { - span: None, - func_name: "gt".to_string(), - params: vec![], - arguments: vec![ - ScalarExpr::BoundColumnRef(BoundColumnRef { - span: None, - column: ColumnBindingBuilder::new( - "col1".to_string(), - 0, - Box::new(DataType::Number(NumberDataType::Int32)), - Visibility::Visible, - ) - .build(), - }), - ScalarExpr::ConstantExpr(ConstantExpr { - span: None, - value: Scalar::Number(NumberScalar::Int32(12)), - }), - ], - }); - let filter = filter - .as_expr()? - .project_column_ref(|col| col.column_name.clone()); - let pruner = - RangePrunerCreator::try_create(FunctionContext::default(), &schema, Some(&filter))?; - assert!(!pruner.should_keep(&row_group_stats[0], None)); - } - - // col1 < 0 - { - let filter = ScalarExpr::FunctionCall(FunctionCall { - span: None, - func_name: "lt".to_string(), - params: vec![], - arguments: vec![ - ScalarExpr::BoundColumnRef(BoundColumnRef { - span: None, - column: ColumnBindingBuilder::new( - "col1".to_string(), - 0, - Box::new(DataType::Number(NumberDataType::Int32)), - Visibility::Visible, - ) - .build(), - }), - ScalarExpr::ConstantExpr(ConstantExpr { - span: None, - value: Scalar::Number(NumberScalar::Int32(0)), - }), - ], - }); - let filter = filter - .as_expr()? - .project_column_ref(|col| col.column_name.clone()); - let pruner = - RangePrunerCreator::try_create(FunctionContext::default(), &schema, Some(&filter))?; - assert!(!pruner.should_keep(&row_group_stats[0], None)); - } - - // col1 <= 5 - { - let filter = ScalarExpr::FunctionCall(FunctionCall { - span: None, - func_name: "lte".to_string(), - params: vec![], - arguments: vec![ - ScalarExpr::BoundColumnRef(BoundColumnRef { - span: None, - column: ColumnBindingBuilder::new( - "col1".to_string(), - 0, - Box::new(DataType::Number(NumberDataType::Int32)), - Visibility::Visible, - ) - .build(), - }), - ScalarExpr::ConstantExpr(ConstantExpr { - span: None, - value: Scalar::Number(NumberScalar::Int32(5)), - }), - ], - }); - let filter = filter - .as_expr()? - .project_column_ref(|col| col.column_name.clone()); - let pruner = - RangePrunerCreator::try_create(FunctionContext::default(), &schema, Some(&filter))?; - assert!(pruner.should_keep(&row_group_stats[0], None)); - } - - Ok(()) - } - - #[test] - fn test_filter_pages() -> Result<()> { - let (schema, data) = write_test_parquet()?; - let mut reader = Cursor::new(data); - let metadata = read_metadata(&mut reader)?; - let rg = &metadata.row_groups[0]; - - // col1 > 12 - { - let filter = ScalarExpr::FunctionCall(FunctionCall { - span: None, - func_name: "gt".to_string(), - params: vec![], - arguments: vec![ - ScalarExpr::BoundColumnRef(BoundColumnRef { - span: None, - column: ColumnBindingBuilder::new( - "col1".to_string(), - 0, - Box::new(DataType::Number(NumberDataType::Int32)), - Visibility::Visible, - ) - .build(), - }), - ScalarExpr::ConstantExpr(ConstantExpr { - span: None, - value: Scalar::Number(NumberScalar::Int32(12)), - }), - ], - }); - let filter = filter - .as_expr()? - .project_column_ref(|col| col.column_name.clone()); - let pruners = build_column_page_pruners(FunctionContext::default(), &schema, &filter)?; - let row_selection = filter_pages(&mut reader, &schema, rg, &pruners)?; - - assert_eq!(Vec::::new(), row_selection); - } - - // col1 <= 5 - { - let filter = ScalarExpr::FunctionCall(FunctionCall { - span: None, - func_name: "lte".to_string(), - params: vec![], - arguments: vec![ - ScalarExpr::BoundColumnRef(BoundColumnRef { - span: None, - column: ColumnBindingBuilder::new( - "col1".to_string(), - 0, - Box::new(DataType::Number(NumberDataType::Int32)), - Visibility::Visible, - ) - .build(), - }), - ScalarExpr::ConstantExpr(ConstantExpr { - span: None, - value: Scalar::Number(NumberScalar::Int32(5)), - }), - ], - }); - let filter = filter - .as_expr()? - .project_column_ref(|col| col.column_name.clone()); - let pruners = build_column_page_pruners(FunctionContext::default(), &schema, &filter)?; - let row_selection = filter_pages(&mut reader, &schema, rg, &pruners)?; - - assert_eq!(vec![Interval::new(0, 7)], row_selection); - } - - // col1 > 10 - { - let filter = ScalarExpr::FunctionCall(FunctionCall { - span: None, - func_name: "gt".to_string(), - params: vec![], - arguments: vec![ - ScalarExpr::BoundColumnRef(BoundColumnRef { - span: None, - column: ColumnBindingBuilder::new( - "col1".to_string(), - 0, - Box::new(DataType::Number(NumberDataType::Int32)), - Visibility::Visible, - ) - .build(), - }), - ScalarExpr::ConstantExpr(ConstantExpr { - span: None, - value: Scalar::Number(NumberScalar::Int32(10)), - }), - ], - }); - let filter = filter - .as_expr()? - .project_column_ref(|col| col.column_name.clone()); - let pruners = build_column_page_pruners(FunctionContext::default(), &schema, &filter)?; - let row_selection = filter_pages(&mut reader, &schema, rg, &pruners)?; - - assert_eq!(vec![Interval::new(7, 2)], row_selection); - } - - // col1 <= 10 - { - let filter = ScalarExpr::FunctionCall(FunctionCall { - span: None, - func_name: "lte".to_string(), - params: vec![], - arguments: vec![ - ScalarExpr::BoundColumnRef(BoundColumnRef { - span: None, - column: ColumnBindingBuilder::new( - "col1".to_string(), - 0, - Box::new(DataType::Number(NumberDataType::Int32)), - Visibility::Visible, - ) - .build(), - }), - ScalarExpr::ConstantExpr(ConstantExpr { - span: None, - value: Scalar::Number(NumberScalar::Int32(10)), - }), - ], - }); - let filter = filter - .as_expr()? - .project_column_ref(|col| col.column_name.clone()); - let pruners = build_column_page_pruners(FunctionContext::default(), &schema, &filter)?; - let row_selection = filter_pages(&mut reader, &schema, rg, &pruners)?; - - assert_eq!(vec![Interval::new(0, 9)], row_selection); - } - - Ok(()) - } -} diff --git a/src/query/storages/parquet/src/parquet2/statistics.rs b/src/query/storages/parquet/src/parquet2/statistics.rs deleted file mode 100644 index 93a37d443ab0..000000000000 --- a/src/query/storages/parquet/src/parquet2/statistics.rs +++ /dev/null @@ -1,157 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use databend_common_arrow::arrow::array::UInt64Array; -use databend_common_arrow::arrow::buffer::Buffer; -use databend_common_arrow::arrow::io::parquet::read as pread; -use databend_common_arrow::parquet::metadata::RowGroupMetaData; -use databend_common_catalog::statistics::BasicColumnStatistics; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::types::DataType; -use databend_common_expression::Column; -use databend_common_expression::DataField; -use databend_common_storage::ColumnNodes; -use databend_storages_common_table_meta::meta::ColumnStatistics; -use databend_storages_common_table_meta::meta::StatisticsOfColumns; - -/// Collect statistics of a batch of row groups of the specified columns. -/// -/// The returned vector's length is the same as `rgs`. -pub fn collect_row_group_stats( - column_nodes: &ColumnNodes, - rgs: &[RowGroupMetaData], -) -> Result> { - let mut stats = Vec::with_capacity(rgs.len()); - let mut stats_of_row_groups = HashMap::with_capacity(rgs.len()); - - // Each row_group_stat is a `HashMap` holding key-value pairs. - // The first element of the pair is the offset in the schema, - // and the second element is the statistics of the column (according to the offset) - // `column_nodes` is parallel to the schema, so we can iterate `column_nodes` directly. - for (index, column_node) in column_nodes.column_nodes.iter().enumerate() { - let field = &column_node.field; - let data_field = DataField::try_from(field).unwrap(); - let column_stats = pread::statistics::deserialize(field, rgs)?; - stats_of_row_groups.insert( - index, - BatchStatistics::from_statistics(&column_stats, data_field.data_type())?, - ); - } - - for (rg_idx, _) in rgs.iter().enumerate() { - let mut cols_stats = HashMap::with_capacity(stats.capacity()); - for index in 0..column_nodes.column_nodes.len() { - let col_stats = stats_of_row_groups[&index].get(rg_idx); - cols_stats.insert(index as u32, col_stats); - } - stats.push(cols_stats); - } - - Ok(stats) -} - -/// Collect basic column statistics of a batch of row groups of the specified columns. -pub fn collect_basic_column_stats( - column_nodes: &ColumnNodes, - rgs: &[RowGroupMetaData], -) -> Result> { - debug_assert!(!rgs.is_empty()); - let num_columns = column_nodes.column_nodes.len(); - let mut columns_stats: Vec = Vec::with_capacity(num_columns); - // `column_nodes` is parallel to the schema, so we can iterate `column_nodes` directly. - for column_node in column_nodes.column_nodes.iter() { - let field = &column_node.field; - let data_field = DataField::try_from(field).unwrap(); - let column_stats = pread::statistics::deserialize(field, rgs)?; - let batch_stats = BatchStatistics::from_statistics(&column_stats, data_field.data_type())?; - let mut col_stats: BasicColumnStatistics = batch_stats.get(0).into(); - for rg_idx in 1..rgs.len() { - col_stats.merge(batch_stats.get(rg_idx).into()); - } - columns_stats.push(col_stats); - } - Ok(columns_stats) -} - -/// A temporary struct to present [`pread::statistics::Statistics`]. -/// -/// Convert the inner fields into Databend data structures. -pub struct BatchStatistics { - pub null_count: Buffer, - pub distinct_count: Option>, - pub min_values: Column, - pub max_values: Column, -} - -impl BatchStatistics { - pub fn get(&self, index: usize) -> ColumnStatistics { - ColumnStatistics::new( - unsafe { self.min_values.index_unchecked(index).to_owned() }, - unsafe { self.max_values.index_unchecked(index).to_owned() }, - self.null_count[index], - 0, // this field is not used. - self.distinct_count.as_ref().map(|d| d[index]), - ) - } - - pub fn from_statistics( - stats: &pread::statistics::Statistics, - data_type: &DataType, - ) -> Result { - let null_count = stats - .null_count - .as_any() - .downcast_ref::() - .ok_or_else(|| { - ErrorCode::Internal(format!( - "null_count should be UInt64Array, but is {:?}", - stats.null_count.data_type() - )) - })? - .values() - .clone(); - let distinct_count = stats - .distinct_count - .as_any() - .downcast_ref::() - .map(|d| d.values()) - .cloned(); - let min_values = Column::from_arrow(&*stats.min_value, data_type)?; - let max_values = Column::from_arrow(&*stats.max_value, data_type)?; - Ok(Self { - null_count, - distinct_count, - min_values, - max_values, - }) - } - - pub fn from_column_statistics( - stats: &pread::indexes::ColumnPageStatistics, - data_type: &DataType, - ) -> Result { - let null_count = stats.null_count.values().clone(); - let min_values = Column::from_arrow(&*stats.min, data_type)?; - let max_values = Column::from_arrow(&*stats.max, data_type)?; - Ok(Self { - null_count, - distinct_count: None, - min_values, - max_values, - }) - } -} diff --git a/src/query/storages/parquet/src/parquet_part.rs b/src/query/storages/parquet/src/parquet_part.rs index a8618596fc84..fe2180f5b935 100644 --- a/src/query/storages/parquet/src/parquet_part.rs +++ b/src/query/storages/parquet/src/parquet_part.rs @@ -26,12 +26,10 @@ use databend_common_catalog::plan::Partitions; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use crate::parquet2::Parquet2RowGroupPart; use crate::parquet_rs::ParquetRSRowGroupPart; #[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Debug, Clone)] pub enum ParquetPart { - Parquet2RowGroup(Parquet2RowGroupPart), ParquetFiles(ParquetFilesPart), ParquetRSRowGroup(ParquetRSRowGroupPart), } @@ -39,7 +37,6 @@ pub enum ParquetPart { impl ParquetPart { pub fn uncompressed_size(&self) -> u64 { match self { - ParquetPart::Parquet2RowGroup(r) => r.uncompressed_size(), ParquetPart::ParquetFiles(p) => p.uncompressed_size(), ParquetPart::ParquetRSRowGroup(p) => p.uncompressed_size(), } @@ -47,7 +44,6 @@ impl ParquetPart { pub fn compressed_size(&self) -> u64 { match self { - ParquetPart::Parquet2RowGroup(r) => r.compressed_size(), ParquetPart::ParquetFiles(p) => p.compressed_size(), ParquetPart::ParquetRSRowGroup(p) => p.compressed_size(), } @@ -83,7 +79,6 @@ impl PartInfo for ParquetPart { fn hash(&self) -> u64 { let path = match self { - ParquetPart::Parquet2RowGroup(r) => &r.location, ParquetPart::ParquetFiles(p) => &p.files[0].0, ParquetPart::ParquetRSRowGroup(p) => &p.location, }; diff --git a/src/query/storages/parquet/src/parquet_rs/source.rs b/src/query/storages/parquet/src/parquet_rs/source.rs index 171fb5e6ed68..2a51ff7d429a 100644 --- a/src/query/storages/parquet/src/parquet_rs/source.rs +++ b/src/query/storages/parquet/src/parquet_rs/source.rs @@ -228,7 +228,6 @@ impl Processor for ParquetSource { let buffers = futures::future::try_join_all(handlers).await?; self.state = State::ReadFiles(buffers); } - _ => unreachable!(), } } else { self.is_finished = true; diff --git a/tests/sqllogictests/scripts/prepare_stage.sh b/tests/sqllogictests/scripts/prepare_stage.sh index 7b5f90e42484..d52da455aa1a 100644 --- a/tests/sqllogictests/scripts/prepare_stage.sh +++ b/tests/sqllogictests/scripts/prepare_stage.sh @@ -20,9 +20,3 @@ fi echo "drop table if exists ontime" | $BENDSQL_CLIENT_CONNECT cat tests/data/ddl/ontime.sql | $BENDSQL_CLIENT_CONNECT - -if [ "$TEST_STAGE_PARQUET_LIB" == "parquet_rs" ]; then - echo "set global use_parquet2=0;" | $BENDSQL_CLIENT_CONNECT -else - echo "set global use_parquet2=1;" | $BENDSQL_CLIENT_CONNECT -fi diff --git a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.result b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.result index 94127486f1cf..7ee9414c1c00 100755 --- a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.result +++ b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.result @@ -1,4 +1,3 @@ -USE_PARQUET2=0 --- named internal stage 2 45 789 1 2 3 diff --git a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.sh b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.sh index d7357a7e1fbf..42ef18e72a51 100755 --- a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.sh +++ b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.sh @@ -3,10 +3,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . "$CURDIR"/../../../../shell_env.sh -for USE_PARQUET2 in 0; do - -echo "USE_PARQUET2=${USE_PARQUET2}" - echo "drop table if exists t1;" | $BENDSQL_CLIENT_CONNECT echo "CREATE TABLE t1 (id INT, name VARCHAR, age INT);" | $BENDSQL_CLIENT_CONNECT echo "insert into t1 (id,name,age) values(1,'2',3), (4, '5', 6);" | $BENDSQL_CLIENT_CONNECT @@ -15,7 +11,7 @@ echo '--- named internal stage' echo "drop stage if exists s1;" | $BENDSQL_CLIENT_CONNECT echo "create stage s1 FILE_FORMAT = (type = PARQUET);" | $BENDSQL_CLIENT_CONNECT echo "copy into @s1 from t1;" | $BENDSQL_CLIENT_CONNECT -echo "set use_parquet2 = ${USE_PARQUET2} ; select * from @s1;" | $BENDSQL_CLIENT_CONNECT +echo "select * from @s1;" | $BENDSQL_CLIENT_CONNECT DATADIR_PATH="/tmp/08_00_00" rm -rf ${DATADIR_PATH} @@ -28,12 +24,12 @@ echo "copy into '${DATADIR}' from t1 FILE_FORMAT = (type = PARQUET);" | $BENDSQL echo '--- external stage' echo "drop stage if exists s2;" | $BENDSQL_CLIENT_CONNECT echo "create stage s2 url = '${DATADIR}' FILE_FORMAT = (type = PARQUET);" | $BENDSQL_CLIENT_CONNECT -echo "set use_parquet2 = ${USE_PARQUET2} ; select * from @s2;" | $BENDSQL_CLIENT_CONNECT +echo "select * from @s2;" | $BENDSQL_CLIENT_CONNECT echo '--- file_format' echo "drop stage if exists s3;" | $BENDSQL_CLIENT_CONNECT echo "create stage s3 url = '${DATADIR}' FILE_FORMAT = (type = CSV);" | $BENDSQL_CLIENT_CONNECT -echo "set use_parquet2 = ${USE_PARQUET2} ; select * from @s3 (FILE_FORMAT => 'PARQUET');" | $BENDSQL_CLIENT_CONNECT +echo "select * from @s3 (FILE_FORMAT => 'PARQUET');" | $BENDSQL_CLIENT_CONNECT echo "drop table if exists t2;" | $BENDSQL_CLIENT_CONNECT echo "CREATE TABLE t2 (id INT, data VARIANT);" | $BENDSQL_CLIENT_CONNECT @@ -43,8 +39,6 @@ echo '--- variant named internal stage' echo "drop stage if exists s4;" | $BENDSQL_CLIENT_CONNECT echo "create stage s4 FILE_FORMAT = (type = PARQUET);" | $BENDSQL_CLIENT_CONNECT echo "copy into @s4 from t2;" | $BENDSQL_CLIENT_CONNECT -echo "set use_parquet2 = ${USE_PARQUET2} ; select * from @s4;" | $BENDSQL_CLIENT_CONNECT +echo "select * from @s4;" | $BENDSQL_CLIENT_CONNECT rm -rf ${DATADIR_PATH} - -done \ No newline at end of file