Skip to content

Commit

Permalink
refactor: change blocking pruning process to non-blocking channel (#1โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ6718)

* save

* fix: fix skip empty block

* fix: ci test

* fix: performance degrade

* chore: change channel to a 1 sender - n receivers pattern

* chore: catch up main

* chore: remove useless path prefix

* chore: apply review suggestion to avoid using globalIORuntime

* chore: apply review suggestion to spawn in pipeline init stage

* try to find problem

* clippy
  • Loading branch information
dqhl76 authored Nov 4, 2024
1 parent 67c5cce commit dad2e41
Show file tree
Hide file tree
Showing 19 changed files with 999 additions and 796 deletions.
36 changes: 5 additions & 31 deletions src/query/catalog/src/plan/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,37 +301,10 @@ impl StealablePartitions {
self.disable_steal = true;
}

pub fn steal_one(&self, idx: usize) -> Option<PartInfoPtr> {
pub fn steal(&self, idx: usize, max_size: usize) -> Option<Vec<PartInfoPtr>> {
let mut partitions = self.partitions.write();
if partitions.is_empty() {
return self.ctx.get_partition();
}

let idx = if idx >= partitions.len() {
idx % partitions.len()
} else {
idx
};

for step in 0..partitions.len() {
let index = (idx + step) % partitions.len();
if !partitions[index].is_empty() {
return partitions[index].pop_front();
}

if self.disable_steal {
break;
}
}

drop(partitions);
self.ctx.get_partition()
}

pub fn steal(&self, idx: usize, max_size: usize) -> Vec<PartInfoPtr> {
let mut partitions = self.partitions.write();
if partitions.is_empty() {
return self.ctx.get_partitions(max_size);
return None;
}

let idx = if idx >= partitions.len() {
Expand All @@ -346,7 +319,7 @@ impl StealablePartitions {
if !partitions[index].is_empty() {
let ps = &mut partitions[index];
let size = ps.len().min(max_size);
return ps.drain(..size).collect();
return Some(ps.drain(..size).collect());
}

if self.disable_steal {
Expand All @@ -355,7 +328,8 @@ impl StealablePartitions {
}

drop(partitions);
self.ctx.get_partitions(max_size)

None
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::time::SystemTime;
use dashmap::DashMap;
use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::Runtime;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_exception::ResultExt;
Expand Down Expand Up @@ -378,6 +379,8 @@ pub trait TableContext: Send + Sync {

fn is_temp_table(&self, catalog_name: &str, database_name: &str, table_name: &str) -> bool;
fn get_shared_settings(&self) -> Arc<Settings>;

fn get_runtime(&self) -> Result<Arc<Runtime>>;
}

pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;
Expand Down
5 changes: 5 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::TrySpawn;
use databend_common_base::JoinHandle;
use databend_common_catalog::catalog::CATALOG_DEFAULT;
Expand Down Expand Up @@ -1448,6 +1449,10 @@ impl TableContext for QueryContext {
.lock()
.is_temp_table(database_name, table_name)
}

fn get_runtime(&self) -> Result<Arc<Runtime>> {
self.shared.try_get_runtime()
}
}

impl TrySpawn for QueryContext {
Expand Down
5 changes: 5 additions & 0 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use dashmap::DashMap;
use databend_common_base::base::tokio;
use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::Runtime;
use databend_common_catalog::catalog::Catalog;
use databend_common_catalog::cluster_info::Cluster;
use databend_common_catalog::database::Database;
Expand Down Expand Up @@ -1000,6 +1001,10 @@ impl TableContext for CtxDelegation {
fn is_temp_table(&self, _catalog_name: &str, _database_name: &str, _table_name: &str) -> bool {
false
}

fn get_runtime(&self) -> Result<Arc<Runtime>> {
todo!()
}
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
5 changes: 5 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use dashmap::DashMap;
use databend_common_base::base::tokio;
use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::Runtime;
use databend_common_catalog::catalog::Catalog;
use databend_common_catalog::cluster_info::Cluster;
use databend_common_catalog::database::Database;
Expand Down Expand Up @@ -886,6 +887,10 @@ impl TableContext for CtxDelegation {
fn is_temp_table(&self, _catalog_name: &str, _database_name: &str, _table_name: &str) -> bool {
false
}

fn get_runtime(&self) -> Result<Arc<Runtime>> {
todo!()
}
}

#[derive(Clone, Debug)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;
use std::fmt::Formatter;

use databend_common_catalog::plan::PartInfoPtr;
use databend_common_expression::local_block_meta_serde;
use databend_common_expression::BlockMetaInfo;
use databend_common_expression::BlockMetaInfoPtr;

pub struct BlockPartitionMeta {
pub part_ptr: Vec<PartInfoPtr>,
}

impl BlockPartitionMeta {
pub fn create(part_ptr: Vec<PartInfoPtr>) -> BlockMetaInfoPtr {
Box::new(BlockPartitionMeta { part_ptr })
}
}

impl Debug for BlockPartitionMeta {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockPartitionMeta")
.field("part_ptr", &self.part_ptr)
.finish()
}
}

local_block_meta_serde!(BlockPartitionMeta);

#[typetag::serde(name = "block_partition_meta")]
impl BlockMetaInfo for BlockPartitionMeta {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_channel::Receiver;
use databend_common_catalog::plan::PartInfoPtr;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sources::AsyncSource;
use databend_common_pipeline_sources::AsyncSourcer;

use crate::operations::read::block_partition_meta::BlockPartitionMeta;

pub struct BlockPartitionReceiverSource {
pub meta_receiver: Receiver<Result<PartInfoPtr>>,
}

impl BlockPartitionReceiverSource {
pub fn create(
ctx: Arc<dyn TableContext>,
receiver: Receiver<Result<PartInfoPtr>>,
output_port: Arc<OutputPort>,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(ctx, output_port, Self {
meta_receiver: receiver,
})
}
}

#[async_trait::async_trait]
impl AsyncSource for BlockPartitionReceiverSource {
const NAME: &'static str = "BlockPartitionReceiverSource";
const SKIP_EMPTY_DATA_BLOCK: bool = false;

#[async_backtrace::framed]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
match self.meta_receiver.recv().await {
Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta(
BlockPartitionMeta::create(vec![part]),
))),
Ok(Err(e)) => Err(
// The error is occurred in pruning process
e,
),
Err(_) => {
// The channel is closed, we should return None to stop generating
Ok(None)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_catalog::plan::StealablePartitions;
use databend_common_catalog::table_context::TableContext;
use databend_common_expression::DataBlock;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sources::SyncSource;
use databend_common_pipeline_sources::SyncSourcer;

use crate::operations::read::block_partition_meta::BlockPartitionMeta;

pub struct BlockPartitionSource {
id: usize,
partitions: StealablePartitions,
max_batch_size: usize,
}

impl BlockPartitionSource {
pub fn create(
id: usize,
partitions: StealablePartitions,
max_batch_size: usize,
ctx: Arc<dyn TableContext>,
output_port: Arc<OutputPort>,
) -> databend_common_exception::Result<ProcessorPtr> {
SyncSourcer::create(ctx, output_port, BlockPartitionSource {
id,
partitions,
max_batch_size,
})
}
}

impl SyncSource for BlockPartitionSource {
const NAME: &'static str = "BlockPartitionSource";

fn generate(&mut self) -> databend_common_exception::Result<Option<DataBlock>> {
match self.partitions.steal(self.id, self.max_batch_size) {
None => Ok(None),
Some(parts) => Ok(Some(DataBlock::empty_with_meta(
BlockPartitionMeta::create(parts),
))),
}
}
}
Loading

0 comments on commit dad2e41

Please sign in to comment.