Skip to content

Commit

Permalink
refactor MutationSource as transform
Browse files Browse the repository at this point in the history
  • Loading branch information
SkyFan2002 committed Dec 20, 2023
1 parent 41b8ff3 commit 21d26fd
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 82 deletions.
32 changes: 15 additions & 17 deletions src/query/storages/fuse/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,26 +219,24 @@ impl FuseTable {
projection.sort_by_key(|&i| source_col_indices[i]);
let ops = vec![BlockOperator::Project { projection }];

let max_threads = (ctx.get_settings().get_max_threads()? as usize)
let _max_threads = (ctx.get_settings().get_max_threads()? as usize)
.min(ctx.partition_num())
.max(1);
// Add source pipe.
pipeline.add_source(
|output| {
MutationSource::try_create(
ctx.clone(),
MutationAction::Deletion,
output,
filter_expr.clone(),
block_reader.clone(),
remain_reader.clone(),
ops.clone(),
self.storage_format,
query_row_id_col,
)
},
max_threads,
)?;
pipeline.add_transform(|input, output| {
MutationSource::try_create(
ctx.clone(),
MutationAction::Deletion,
input,
output,
filter_expr.clone(),
block_reader.clone(),
remain_reader.clone(),
ops.clone(),
self.storage_format,
query_row_id_col,
)
})?;
Ok(())
}

Expand Down
95 changes: 47 additions & 48 deletions src/query/storages/fuse/src/operations/mutation/mutation_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use databend_common_exception::Result;
use databend_common_expression::types::BooleanType;
use databend_common_expression::types::DataType;
use databend_common_expression::BlockEntry;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::BlockMetaInfoPtr;
use databend_common_expression::DataBlock;
use databend_common_expression::Evaluator;
Expand All @@ -36,6 +37,7 @@ use databend_common_expression::Value;
use databend_common_expression::ROW_ID_COL_NAME;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_pipeline_core::processors::Event;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::Processor;
use databend_common_pipeline_core::processors::ProcessorPtr;
Expand All @@ -57,8 +59,11 @@ pub enum MutationAction {
Update,
}

#[derive(Default)]
enum State {
ReadData(Option<PartInfoPtr>),
#[default]
Init,
ReadData(PartInfoPtr),
FilterData(PartInfoPtr, MergeIOReadResult),
ReadRemain {
part: PartInfoPtr,
Expand All @@ -72,12 +77,12 @@ enum State {
filter: Option<Value<BooleanType>>,
},
PerformOperator(DataBlock, String),
Output(Option<PartInfoPtr>, DataBlock),
Finish,
Output(DataBlock),
}

pub struct MutationSource {
state: State,
input: Arc<InputPort>,
output: Arc<OutputPort>,

ctx: Arc<dyn TableContext>,
Expand All @@ -98,6 +103,7 @@ impl MutationSource {
pub fn try_create(
ctx: Arc<dyn TableContext>,
action: MutationAction,
input: Arc<InputPort>,
output: Arc<OutputPort>,
filter: Arc<Option<Expr>>,
block_reader: Arc<BlockReader>,
Expand All @@ -107,7 +113,8 @@ impl MutationSource {
query_row_id_col: bool,
) -> Result<ProcessorPtr> {
Ok(ProcessorPtr::create(Box::new(MutationSource {
state: State::ReadData(None),
state: State::Init,
input,
output,
ctx: ctx.clone(),
filter,
Expand All @@ -134,46 +141,47 @@ impl Processor for MutationSource {
}

fn event(&mut self) -> Result<Event> {
if matches!(self.state, State::ReadData(None)) {
self.state = self
.ctx
.get_partition()
.map_or(State::Finish, |part| State::ReadData(Some(part)));
}

if matches!(self.state, State::Finish) {
self.output.finish();
return Ok(Event::Finished);
}

if self.output.is_finished() {
self.input.finish();
return Ok(Event::Finished);
}

if !self.output.can_push() {
self.input.set_not_need_data();
return Ok(Event::NeedConsume);
}

if matches!(self.state, State::Output(_, _)) {
if let State::Output(part, data_block) =
std::mem::replace(&mut self.state, State::Finish)
{
self.state = part.map_or(State::Finish, |part| State::ReadData(Some(part)));

match std::mem::take(&mut self.state) {
State::Init if self.input.has_data() => {
let mut input_block = self.input.pull_data().unwrap()?;
let part: PartInfoPtr = Arc::new(Box::new(
FusePartInfo::downcast_from(input_block.take_meta().unwrap()).unwrap(),
));
self.state = State::ReadData(part);
Ok(Event::Async)
}
State::Init if self.input.is_finished() => {
self.output.finish();
Ok(Event::Finished)
}
State::Init => {
self.input.set_need_data();
Ok(Event::NeedData)
}
State::ReadData(_) | State::ReadRemain { .. } => Ok(Event::Async),
State::FilterData(_, _) | State::MergeRemain { .. } | State::PerformOperator(..) => {
Ok(Event::Sync)
}
State::Output(data_block) => {
self.output.push_data(Ok(data_block));
return Ok(Event::NeedConsume);
self.state = State::Init;
Ok(Event::NeedConsume)
}
}

if matches!(self.state, State::ReadData(_) | State::ReadRemain { .. }) {
Ok(Event::Async)
} else {
Ok(Event::Sync)
}
}

fn process(&mut self) -> Result<()> {
match std::mem::replace(&mut self.state, State::Finish) {
match std::mem::replace(&mut self.state, State::Init) {
State::FilterData(part, read_res) => {
let chunks = read_res.columns_chunks()?;
let mut data_block = self.block_reader.deserialize_chunks_with_part_info(
Expand Down Expand Up @@ -249,10 +257,7 @@ impl Processor for MutationSource {
self.stats_type.clone(),
),
));
self.state = State::Output(
self.ctx.get_partition(),
DataBlock::empty_with_meta(meta),
);
self.state = State::Output(DataBlock::empty_with_meta(meta));
} else {
let predicate_col = predicates.into_column().unwrap();
let filter = predicate_col.not();
Expand Down Expand Up @@ -293,7 +298,7 @@ impl Processor for MutationSource {
}
} else {
// Do nothing.
self.state = State::Output(self.ctx.get_partition(), DataBlock::empty());
self.state = State::Output(DataBlock::empty());
}
} else {
let progress_values = ProgressValues {
Expand Down Expand Up @@ -350,7 +355,7 @@ impl Processor for MutationSource {
} else {
inner_meta
};
self.state = State::Output(self.ctx.get_partition(), block.add_meta(Some(meta))?);
self.state = State::Output(block.add_meta(Some(meta))?);
}
_ => return Err(ErrorCode::Internal("It's a bug.")),
}
Expand All @@ -359,8 +364,8 @@ impl Processor for MutationSource {

#[async_backtrace::framed]
async fn async_process(&mut self) -> Result<()> {
match std::mem::replace(&mut self.state, State::Finish) {
State::ReadData(Some(part)) => {
match std::mem::replace(&mut self.state, State::Init) {
State::ReadData(part) => {
let settings = ReadSettings::from_ctx(&self.ctx)?;
match Mutation::from_part(&part)? {
Mutation::MutationDeletedSegment(deleted_segment) => {
Expand All @@ -369,12 +374,9 @@ impl Processor for MutationSource {
bytes: 0,
};
self.ctx.get_write_progress().incr(&progress_values);
self.state = State::Output(
self.ctx.get_partition(),
DataBlock::empty_with_meta(Box::new(
SerializeDataMeta::DeletedSegment(deleted_segment.clone()),
)),
)
self.state = State::Output(DataBlock::empty_with_meta(Box::new(
SerializeDataMeta::DeletedSegment(deleted_segment.clone()),
)))
}
Mutation::MutationPartInfo(part) => {
self.index = BlockMetaIndex {
Expand All @@ -401,10 +403,7 @@ impl Processor for MutationSource {
let meta = Box::new(SerializeDataMeta::SerializeBlock(
SerializeBlock::create(self.index.clone(), self.stats_type.clone()),
));
self.state = State::Output(
self.ctx.get_partition(),
DataBlock::empty_with_meta(meta),
);
self.state = State::Output(DataBlock::empty_with_meta(meta));
} else {
let read_res = self
.block_reader
Expand Down
32 changes: 15 additions & 17 deletions src/query/storages/fuse/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,25 +214,23 @@ impl FuseTable {
.project_column_ref(|name| schema.index_of(name).unwrap())
}));

let max_threads = (ctx.get_settings().get_max_threads()? as usize)
let _max_threads = (ctx.get_settings().get_max_threads()? as usize)
.min(ctx.partition_num())
.max(1);
// Add source pipe.
pipeline.add_source(
|output| {
MutationSource::try_create(
ctx.clone(),
MutationAction::Update,
output,
filter_expr.clone(),
block_reader.clone(),
remain_reader.clone(),
ops.clone(),
self.storage_format,
true,
)
},
max_threads,
)
pipeline.add_transform(|input, output| {
MutationSource::try_create(
ctx.clone(),
MutationAction::Update,
input,
output,
filter_expr.clone(),
block_reader.clone(),
remain_reader.clone(),
ops.clone(),
self.storage_format,
true,
)
})
}
}

0 comments on commit 21d26fd

Please sign in to comment.