From f648756b2f7b6bc01eebb5ec410f284413c9fb72 Mon Sep 17 00:00:00 2001 From: Song Jiacheng Date: Thu, 21 Nov 2024 18:15:53 +0800 Subject: [PATCH] [Enhancement] Kudu scanner should be allowed to do non-share scan. Signed-off-by: Song Jiacheng --- be/src/connector/connector.h | 2 +- be/src/connector/hive_connector.cpp | 5 +++++ be/src/connector/hive_connector.h | 1 + be/src/connector/lake_connector.h | 2 +- be/src/exec/connector_scan_node.cpp | 2 +- be/test/storage/lake/lake_data_source_test.cpp | 3 ++- 6 files changed, 11 insertions(+), 4 deletions(-) diff --git a/be/src/connector/connector.h b/be/src/connector/connector.h index 4cebb339d70421..cb20715fe0d089 100644 --- a/be/src/connector/connector.h +++ b/be/src/connector/connector.h @@ -159,7 +159,7 @@ class DataSourceProvider { virtual const TupleDescriptor* tuple_descriptor(RuntimeState* state) const = 0; - virtual bool always_shared_scan() const { return true; } + virtual bool always_shared_scan(TScanRangeParams scan_range) const { return true; } virtual void peek_scan_ranges(const std::vector& scan_ranges) {} diff --git a/be/src/connector/hive_connector.cpp b/be/src/connector/hive_connector.cpp index 117ae3be18aeda..2f7bb60af83aae 100644 --- a/be/src/connector/hive_connector.cpp +++ b/be/src/connector/hive_connector.cpp @@ -49,6 +49,11 @@ DataSourcePtr HiveDataSourceProvider::create_data_source(const TScanRange& scan_ return std::make_unique(this, scan_range); } +bool HiveDataSourceProvider::always_shared_scan(TScanRangeParams scan_range) const { + return !scan_range.scan_range.hdfs_scan_range.__isset.use_kudu_jni_reader && + !scan_range.scan_range.hdfs_scan_range.use_kudu_jni_reader; +} + const TupleDescriptor* HiveDataSourceProvider::tuple_descriptor(RuntimeState* state) const { return state->desc_tbl().get_tuple_descriptor(_hdfs_scan_node.tuple_id); } diff --git a/be/src/connector/hive_connector.h b/be/src/connector/hive_connector.h index 79b41b9d39e24c..5c547673325214 100644 --- a/be/src/connector/hive_connector.h +++ b/be/src/connector/hive_connector.h @@ -44,6 +44,7 @@ class HiveDataSourceProvider final : public DataSourceProvider { HiveDataSourceProvider(ConnectorScanNode* scan_node, const TPlanNode& plan_node); DataSourcePtr create_data_source(const TScanRange& scan_range) override; const TupleDescriptor* tuple_descriptor(RuntimeState* state) const override; + bool always_shared_scan(TScanRangeParams scan_range) const override; void peek_scan_ranges(const std::vector& scan_ranges) override; void default_data_source_mem_bytes(int64_t* min_value, int64_t* max_value) override; diff --git a/be/src/connector/lake_connector.h b/be/src/connector/lake_connector.h index e6fefe0b3ffe09..8a04ca368436c9 100644 --- a/be/src/connector/lake_connector.h +++ b/be/src/connector/lake_connector.h @@ -199,7 +199,7 @@ class LakeDataSourceProvider final : public DataSourceProvider { const TupleDescriptor* tuple_descriptor(RuntimeState* state) const override; // always enable shared scan for cloud native table - bool always_shared_scan() const override { return true; } + bool always_shared_scan(TScanRangeParams scan_range) const override { return true; } StatusOr convert_scan_range_to_morsel_queue( const std::vector& scan_ranges, int node_id, int32_t pipeline_dop, diff --git a/be/src/exec/connector_scan_node.cpp b/be/src/exec/connector_scan_node.cpp index 9b8720aca38410..b720b614c48e52 100644 --- a/be/src/exec/connector_scan_node.cpp +++ b/be/src/exec/connector_scan_node.cpp @@ -680,7 +680,7 @@ void ConnectorScanNode::_init_counter() { } bool ConnectorScanNode::always_shared_scan() const { - return _data_source_provider->always_shared_scan(); + return _scan_ranges.size() > 0 && _data_source_provider->always_shared_scan(_scan_ranges[0]); } StatusOr ConnectorScanNode::convert_scan_range_to_morsel_queue( diff --git a/be/test/storage/lake/lake_data_source_test.cpp b/be/test/storage/lake/lake_data_source_test.cpp index 2c5d6f73712cad..5d0ebc92073197 100644 --- a/be/test/storage/lake/lake_data_source_test.cpp +++ b/be/test/storage/lake/lake_data_source_test.cpp @@ -163,7 +163,8 @@ TEST_F(LakeDataSourceTest, test_convert_scan_range_to_morsel_queue) { auto data_source_provider = dynamic_cast(scan_node->data_source_provider()); data_source_provider->set_lake_tablet_manager(_tablet_mgr.get()); - ASSERT_TRUE(data_source_provider->always_shared_scan()); + TScanRangeParams scan_range; + ASSERT_TRUE(data_source_provider->always_shared_scan(scan_range)); config::tablet_internal_parallel_max_splitted_scan_bytes = 32; config::tablet_internal_parallel_min_splitted_scan_rows = 4;