From c6f3fd1a432aeb668c2303cec1d3a1b202019877 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Fri, 1 Nov 2024 14:58:42 +0800 Subject: [PATCH] chore: apply review suggestion to spawn in pipeline init stage --- .../storages/fuse/src/operations/read_data.rs | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 2077e667b8b38..c384849ac011e 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -224,23 +224,26 @@ impl FuseTable { let table = self.clone(); let table_schema = self.schema_with_stream(); let push_downs = plan.push_downs.clone(); - self.runtime.spawn(async move { - match table - .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) - .await - { - Ok((_, partitions)) => { - for part in partitions.partitions { - // ignore the error, the sql may be killed or early stop - let _ = sender.send(Ok(part)).await; + pipeline.set_on_init(move || { + table.runtime.clone().spawn(async move { + match table + .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) + .await + { + Ok((_, partitions)) => { + for part in partitions.partitions { + // ignore the error, the sql may be killed or early stop + let _ = sender.send(Ok(part)).await; + } + } + Err(err) => { + let _ = sender.send(Err(err)).await; } } - Err(err) => { - let _ = sender.send(Err(err)).await; - } - } - Ok::<_, ErrorCode>(()) - }); + Ok::<_, ErrorCode>(()) + }); + Ok(()) + }) } Ok(())