Skip to content

Commit

Permalink
try to find problem
Browse files Browse the repository at this point in the history
  • Loading branch information
dqhl76 committed Nov 1, 2024
1 parent c6f3fd1 commit 99f6a9f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 25 deletions.
3 changes: 3 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::time::SystemTime;
use dashmap::DashMap;
use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::Runtime;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_exception::ResultExt;
Expand Down Expand Up @@ -384,4 +385,6 @@ pub trait TableContext: Send + Sync {

fn is_temp_table(&self, catalog_name: &str, database_name: &str, table_name: &str) -> bool;
fn get_shared_settings(&self) -> Arc<Settings>;

fn get_runtime(&self) -> Result<Arc<Runtime>>;
}
5 changes: 5 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::TrySpawn;
use databend_common_base::JoinHandle;
use databend_common_catalog::catalog::CATALOG_DEFAULT;
Expand Down Expand Up @@ -1448,6 +1449,10 @@ impl TableContext for QueryContext {
.lock()
.is_temp_table(database_name, table_name)
}

fn get_runtime(&self) -> Result<Arc<Runtime>> {
self.shared.try_get_runtime()
}
}

impl TrySpawn for QueryContext {
Expand Down
9 changes: 0 additions & 9 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::sync::Arc;

use chrono::Duration;
use chrono::TimeDelta;
use databend_common_base::runtime::Runtime;
use databend_common_catalog::catalog::StorageDescription;
use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::PartStatistics;
Expand Down Expand Up @@ -133,8 +132,6 @@ pub struct FuseTable {

// If this is set, reading from fuse_table should only returns the increment blocks
pub(crate) changes_desc: Option<ChangesDesc>,

pub(crate) runtime: Arc<Runtime>,
}

impl FuseTable {
Expand Down Expand Up @@ -227,11 +224,6 @@ impl FuseTable {
let meta_location_generator =
TableMetaLocationGenerator::with_prefix(storage_prefix).with_part_prefix(part_prefix);

let runtime = Arc::new(Runtime::with_worker_threads(
2,
Some(String::from("PruneSnapshot")),
)?);

Ok(Box::new(FuseTable {
table_info,
meta_location_generator,
Expand All @@ -243,7 +235,6 @@ impl FuseTable {
table_compression: table_compression.as_str().try_into()?,
table_type,
changes_desc: None,
runtime,
}))
}

Expand Down
42 changes: 26 additions & 16 deletions src/query/storages/fuse/src/operations/read_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,25 +225,35 @@ impl FuseTable {
let table_schema = self.schema_with_stream();
let push_downs = plan.push_downs.clone();
pipeline.set_on_init(move || {
table.runtime.clone().spawn(async move {
match table
.prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0)
.await
{
Ok((_, partitions)) => {
for part in partitions.partitions {
// ignore the error, the sql may be killed or early stop
let _ = sender.send(Ok(part)).await;
ctx.get_runtime()?.try_spawn(
async move {
match table
.prune_snapshot_blocks(
ctx,
push_downs,
table_schema,
lazy_init_segments,
0,
)
.await
{
Ok((_, partitions)) => {
for part in partitions.partitions {
// ignore the error, the sql may be killed or early stop
let _ = sender.send(Ok(part)).await;
}
}
Err(err) => {
let _ = sender.send(Err(err)).await;
}
}
Err(err) => {
let _ = sender.send(Err(err)).await;
}
}
Ok::<_, ErrorCode>(())
});
Ok::<_, ErrorCode>(())
},
None,
)?;

Ok(())
})
});
}

Ok(())
Expand Down

0 comments on commit 99f6a9f

Please sign in to comment.