Skip to content

Commit

Permalink
[Enhancement] Kudu scanner should be allowed to do non-share scan.
Browse files Browse the repository at this point in the history
Signed-off-by: Song Jiacheng <[email protected]>
  • Loading branch information
Jcnessss committed Nov 27, 2024
1 parent 5411897 commit f724591
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 9 deletions.
2 changes: 1 addition & 1 deletion be/src/connector/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class DataSourceProvider {

virtual const TupleDescriptor* tuple_descriptor(RuntimeState* state) const = 0;

virtual bool always_shared_scan() const { return true; }
virtual bool always_shared_scan(TScanRangeParams* scan_range) const { return true; }

virtual void peek_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {}

Expand Down
5 changes: 5 additions & 0 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ DataSourcePtr HiveDataSourceProvider::create_data_source(const TScanRange& scan_
return std::make_unique<HiveDataSource>(this, scan_range);
}

bool HiveDataSourceProvider::always_shared_scan(TScanRangeParams* scan_range) const {
return scan_range && !scan_range->scan_range.hdfs_scan_range.__isset.use_kudu_jni_reader &&
!scan_range->scan_range.hdfs_scan_range.use_kudu_jni_reader;
}

const TupleDescriptor* HiveDataSourceProvider::tuple_descriptor(RuntimeState* state) const {
return state->desc_tbl().get_tuple_descriptor(_hdfs_scan_node.tuple_id);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/connector/hive_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class HiveDataSourceProvider final : public DataSourceProvider {
HiveDataSourceProvider(ConnectorScanNode* scan_node, const TPlanNode& plan_node);
DataSourcePtr create_data_source(const TScanRange& scan_range) override;
const TupleDescriptor* tuple_descriptor(RuntimeState* state) const override;
bool always_shared_scan(TScanRangeParams* scan_range) const override;

void peek_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
void default_data_source_mem_bytes(int64_t* min_value, int64_t* max_value) override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/connector/lake_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class LakeDataSourceProvider final : public DataSourceProvider {
const TupleDescriptor* tuple_descriptor(RuntimeState* state) const override;

// always enable shared scan for cloud native table
bool always_shared_scan() const override { return true; }
bool always_shared_scan(TScanRangeParams* scan_range) const override { return true; }

StatusOr<pipeline::MorselQueuePtr> convert_scan_range_to_morsel_queue(
const std::vector<TScanRangeParams>& scan_ranges, int node_id, int32_t pipeline_dop,
Expand Down
6 changes: 5 additions & 1 deletion be/src/exec/connector_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,11 @@ void ConnectorScanNode::_init_counter() {
}

bool ConnectorScanNode::always_shared_scan() const {
return _data_source_provider->always_shared_scan();
TScanRangeParams scan_range;
if (_connector_type == connector::HIVE && _scan_ranges.size() > 0) {
scan_range = _scan_ranges[0];
}
return _data_source_provider->always_shared_scan(&scan_range);
}

StatusOr<pipeline::MorselQueuePtr> ConnectorScanNode::convert_scan_range_to_morsel_queue(
Expand Down
54 changes: 49 additions & 5 deletions be/test/exec/connector_scan_node_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ConnectorScanNodeTest : public ::testing::Test {
std::vector<TScanRangeParams> create_scan_ranges_cloud(size_t num);

std::shared_ptr<TPlanNode> create_tplan_node_hive();
std::vector<TScanRangeParams> create_scan_ranges_hive(size_t num);
std::vector<TScanRangeParams> create_scan_ranges_hive(size_t num, std::string scanner_type);

std::shared_ptr<TPlanNode> create_tplan_node_stream_load();
std::vector<TScanRangeParams> create_scan_ranges_stream_load(RuntimeState* runtime_state,
Expand Down Expand Up @@ -188,14 +188,17 @@ std::shared_ptr<TPlanNode> ConnectorScanNodeTest::create_tplan_node_hive() {
return tnode;
}

std::vector<TScanRangeParams> ConnectorScanNodeTest::create_scan_ranges_hive(size_t num) {
std::vector<TScanRangeParams> ConnectorScanNodeTest::create_scan_ranges_hive(size_t num, std::string scanner_type) {
std::vector<TScanRangeParams> scan_ranges;

for (int i = 0; i < num; i++) {
THdfsScanRange hdfs_scan_range;
hdfs_scan_range.__set_full_path("file");
hdfs_scan_range.__set_offset(i);
hdfs_scan_range.__set_length(1);
if (scanner_type == "kudu") {
hdfs_scan_range.__set_use_kudu_jni_reader(true);
}

TScanRange scan_range;
scan_range.__set_hdfs_scan_range(hdfs_scan_range);
Expand Down Expand Up @@ -223,7 +226,7 @@ TEST_F(ConnectorScanNodeTest, test_convert_scan_range_to_morsel_queue_factory_hi

// dop is 1 and not so much morsels
int pipeline_dop = 1;
auto scan_ranges = create_scan_ranges_hive(1);
auto scan_ranges = create_scan_ranges_hive(1, "hive");
ASSIGN_OR_ABORT(auto morsel_queue_factory,
scan_node->convert_scan_range_to_morsel_queue_factory(
scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false,
Expand All @@ -232,7 +235,7 @@ TEST_F(ConnectorScanNodeTest, test_convert_scan_range_to_morsel_queue_factory_hi

// dop is 2 and not so much morsels
pipeline_dop = 2;
scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator());
scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator(), "hive");
ASSIGN_OR_ABORT(morsel_queue_factory,
scan_node->convert_scan_range_to_morsel_queue_factory(
scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false,
Expand All @@ -241,7 +244,7 @@ TEST_F(ConnectorScanNodeTest, test_convert_scan_range_to_morsel_queue_factory_hi

// dop is 2 and so much morsels
pipeline_dop = 2;
scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator() + 1);
scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator() + 1, "hive");
ASSIGN_OR_ABORT(morsel_queue_factory,
scan_node->convert_scan_range_to_morsel_queue_factory(
scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false,
Expand Down Expand Up @@ -351,4 +354,45 @@ TEST_F(ConnectorScanNodeTest, test_stream_load_thread_pool) {
ASSERT_TRUE(scan_node->use_stream_load_thread_pool());
}

TEST_F(ConnectorScanNodeTest, test_convert_scan_range_to_morsel_queue_factory_kudu) {
std::shared_ptr<RuntimeState> runtime_state = create_runtime_state();
std::vector<TypeDescriptor> types;
types.emplace_back(TYPE_INT);
auto* descs = create_table_desc(runtime_state.get(), types);
auto tnode = create_tplan_node_hive();
auto scan_node = std::make_shared<starrocks::ConnectorScanNode>(runtime_state->obj_pool(), *tnode, *descs);
ASSERT_OK(scan_node->init(*tnode, runtime_state.get()));

bool enable_tablet_internal_parallel = false;
auto tablet_internal_parallel_mode = TTabletInternalParallelMode::type::AUTO;
std::map<int32_t, std::vector<TScanRangeParams>> no_scan_ranges_per_driver_seq;

// dop is 1 and not so much morsels
int pipeline_dop = 1;
auto scan_ranges = create_scan_ranges_hive(1, "kudu");
ASSIGN_OR_ABORT(auto morsel_queue_factory,
scan_node->convert_scan_range_to_morsel_queue_factory(
scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false,
enable_tablet_internal_parallel, tablet_internal_parallel_mode));
ASSERT_TRUE(morsel_queue_factory->is_shared());

// dop is 2 and not so much morsels
pipeline_dop = 2;
scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator(), "kudu");
ASSIGN_OR_ABORT(morsel_queue_factory,
scan_node->convert_scan_range_to_morsel_queue_factory(
scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false,
enable_tablet_internal_parallel, tablet_internal_parallel_mode));
ASSERT_FALSE(morsel_queue_factory->is_shared());

// dop is 2 and so much morsels
pipeline_dop = 2;
scan_ranges = create_scan_ranges_hive(pipeline_dop * scan_node->io_tasks_per_scan_operator() + 1, "kudu");
ASSIGN_OR_ABORT(morsel_queue_factory,
scan_node->convert_scan_range_to_morsel_queue_factory(
scan_ranges, no_scan_ranges_per_driver_seq, scan_node->id(), pipeline_dop, false,
enable_tablet_internal_parallel, tablet_internal_parallel_mode));
ASSERT_TRUE(morsel_queue_factory->is_shared());
}

} // namespace starrocks
3 changes: 2 additions & 1 deletion be/test/storage/lake/lake_data_source_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ TEST_F(LakeDataSourceTest, test_convert_scan_range_to_morsel_queue) {
auto data_source_provider = dynamic_cast<connector::LakeDataSourceProvider*>(scan_node->data_source_provider());
data_source_provider->set_lake_tablet_manager(_tablet_mgr.get());

ASSERT_TRUE(data_source_provider->always_shared_scan());
TScanRangeParams scan_range;
ASSERT_TRUE(data_source_provider->always_shared_scan(&scan_range));

config::tablet_internal_parallel_max_splitted_scan_bytes = 32;
config::tablet_internal_parallel_min_splitted_scan_rows = 4;
Expand Down

0 comments on commit f724591

Please sign in to comment.