Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: insert and mutation progress #17014

Merged
merged 7 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions src/common/storage/src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,6 @@ pub struct MutationStatus {
}

impl MutationStatus {
pub fn add_insert_rows(&mut self, insert_rows: u64) {
self.insert_rows += insert_rows;
}

pub fn add_deleted_rows(&mut self, deleted_rows: u64) {
self.deleted_rows += deleted_rows
}

pub fn add_update_rows(&mut self, update_rows: u64) {
self.update_rows += update_rows
}

pub fn merge_mutation_status(&mut self, mutation_status: MutationStatus) {
self.insert_rows += mutation_status.insert_rows;
self.deleted_rows += mutation_status.deleted_rows;
Expand Down
9 changes: 6 additions & 3 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ async fn do_hook_compact(
};

// keep the original progress value
let progress = ctx.get_write_progress();
let progress_value = progress.as_ref().get_values();
let write_progress = ctx.get_write_progress();
let write_progress_value = write_progress.as_ref().get_values();
let scan_progress = ctx.get_scan_progress();
let scan_progress_value = scan_progress.as_ref().get_values();

match GlobalIORuntime::instance().block_on({
compact_table(ctx, compact_target, compaction_limits, lock_opt)
Expand All @@ -119,7 +121,8 @@ async fn do_hook_compact(
}

// reset the progress value
progress.set(&progress_value);
write_progress.set(&write_progress_value);
scan_progress.set(&scan_progress_value);
metrics_inc_compact_hook_compact_time_ms(&trace_ctx.operation_name, compact_start_at.elapsed().as_millis() as u64);
}

Expand Down
14 changes: 14 additions & 0 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ use databend_common_catalog::lock::LockTableOption;
use databend_common_catalog::table::TableExt;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::UInt64Type;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchema;
use databend_common_expression::FromData;
use databend_common_expression::SendableDataBlockStream;
use databend_common_pipeline_sources::AsyncSourcer;
use databend_common_sql::executor::physical_plans::DistributedInsertSelect;
use databend_common_sql::executor::physical_plans::MutationKind;
Expand All @@ -43,6 +47,7 @@ use crate::pipelines::ValueSource;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::stream::DataBlockStream;

pub struct InsertInterpreter {
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -259,4 +264,13 @@ impl Interpreter for InsertInterpreter {

Ok(build_res)
}

fn inject_result(&self) -> Result<SendableDataBlockStream> {
let binding = self.ctx.get_mutation_status();
let status = binding.read();
let blocks = vec![DataBlock::new_from_columns(vec![UInt64Type::from_data(
vec![status.insert_rows],
)])];
Ok(Box::pin(DataBlockStream::create(None, blocks)))
}
}
45 changes: 26 additions & 19 deletions src/query/service/src/interpreters/interpreter_mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@

use std::sync::Arc;

use databend_common_base::base::ProgressValues;
use databend_common_catalog::lock::LockTableOption;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::plan::PartitionsShuffleKind;
use databend_common_catalog::table::TableExt;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::UInt32Type;
use databend_common_expression::types::UInt64Type;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::FromData;
use databend_common_expression::SendableDataBlockStream;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sinks::EmptySink;
use databend_common_pipeline_sources::EmptySource;
use databend_common_sql::binder::MutationStrategy;
use databend_common_sql::binder::MutationType;
use databend_common_sql::executor::physical_plans::create_push_down_filters;
Expand All @@ -36,6 +38,7 @@ use databend_common_sql::executor::PhysicalPlanBuilder;
use databend_common_sql::optimizer::SExpr;
use databend_common_sql::plans;
use databend_common_sql::plans::Mutation;
use databend_common_storage::MutationStatus;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::operations::TruncateMode;
use databend_common_storages_fuse::FuseTable;
Expand Down Expand Up @@ -249,14 +252,10 @@ impl MutationInterpreter {
let mut columns = Vec::new();
for field in self.schema.as_ref().fields() {
match field.name().as_str() {
plans::INSERT_NAME => {
columns.push(UInt32Type::from_data(vec![status.insert_rows as u32]))
}
plans::UPDATE_NAME => {
columns.push(UInt32Type::from_data(vec![status.update_rows as u32]))
}
plans::INSERT_NAME => columns.push(UInt64Type::from_data(vec![status.insert_rows])),
plans::UPDATE_NAME => columns.push(UInt64Type::from_data(vec![status.update_rows])),
plans::DELETE_NAME => {
columns.push(UInt32Type::from_data(vec![status.deleted_rows as u32]))
columns.push(UInt64Type::from_data(vec![status.deleted_rows]))
}
_ => unreachable!(),
}
Expand All @@ -270,8 +269,6 @@ impl MutationInterpreter {
fuse_table: &FuseTable,
snapshot: &Option<Arc<TableSnapshot>>,
) -> Result<Option<PipelineBuildResult>> {
let mut build_res = PipelineBuildResult::create();

// Check if the filter is a constant.
let mut truncate_table = mutation.truncate_table;
if let Some(filter) = &mutation.direct_filter
Expand All @@ -288,7 +285,7 @@ impl MutationInterpreter {
truncate_table = true;
} else if !filter_result {
// The update/delete condition is always false, do nothing.
return Ok(Some(build_res));
return self.no_effect_mutation();
}
}

Expand All @@ -299,20 +296,21 @@ impl MutationInterpreter {
// Check if table is empty.
let Some(snapshot) = snapshot else {
// No snapshot, no mutation.
return Ok(Some(build_res));
return self.no_effect_mutation();
};
if snapshot.summary.row_count == 0 {
// Empty snapshot, no mutation.
return Ok(Some(build_res));
return self.no_effect_mutation();
}

if mutation.mutation_type == MutationType::Delete {
if truncate_table {
let progress_values = ProgressValues {
rows: snapshot.summary.row_count as usize,
bytes: snapshot.summary.uncompressed_byte_size as usize,
};
self.ctx.get_write_progress().incr(&progress_values);
let mut build_res = PipelineBuildResult::create();
self.ctx.add_mutation_status(MutationStatus {
insert_rows: 0,
deleted_rows: snapshot.summary.row_count,
update_rows: 0,
});
// deleting the whole table... just a truncate
fuse_table
.do_truncate(
Expand All @@ -330,6 +328,15 @@ impl MutationInterpreter {
}
}

fn no_effect_mutation(&self) -> Result<Option<PipelineBuildResult>> {
let mut build_res = PipelineBuildResult::create();
build_res.main_pipeline.add_source(EmptySource::create, 1)?;
build_res
.main_pipeline
.add_sink(|input| Ok(ProcessorPtr::create(EmptySink::create(input))))?;
Ok(Some(build_res))
}

async fn mutation_source_partitions(
&self,
mutation: &Mutation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ create table test1(a int, b string)
---------- Input ----------
insert into table test1(a, b) values (1, 'x'), (2, 'y')
---------- Output ---------
2
+-------------------------+
| number of rows inserted |
+-------------------------+
| 2 |
+-------------------------+
---------- Input ----------
select * from test1
---------- Output ---------
Expand Down
12 changes: 6 additions & 6 deletions src/query/service/tests/it/servers/http/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ async fn test_insert() -> Result<()> {

let sqls = vec![
("create table t(a int) engine=fuse", 0, 0),
("insert into t(a) values (1),(2)", 0, 2),
("insert into t(a) values (1),(2)", 1, 2),
("select * from t", 2, 0),
];

Expand Down Expand Up @@ -1323,7 +1323,7 @@ async fn test_func_object_keys() -> Result<()> {
),
(
"INSERT INTO objects_test1 VALUES (1, parse_json('{\"a\": 1, \"b\": [1,2,3]}'), parse_json('{\"1\": 2}'));",
0,
1,
),
(
"SELECT id, object_keys(obj), object_keys(var) FROM objects_test1;",
Expand All @@ -1349,9 +1349,9 @@ async fn test_multi_partition() -> Result<()> {

let sqls = vec![
("create table tb2(id int, c1 varchar) Engine=Fuse;", 0),
("insert into tb2 values(1, 'mysql'),(1, 'databend')", 0),
("insert into tb2 values(2, 'mysql'),(2, 'databend')", 0),
("insert into tb2 values(3, 'mysql'),(3, 'databend')", 0),
("insert into tb2 values(1, 'mysql'),(1, 'databend')", 1),
("insert into tb2 values(2, 'mysql'),(2, 'databend')", 1),
("insert into tb2 values(3, 'mysql'),(3, 'databend')", 1),
("select * from tb2;", 6),
];

Expand Down Expand Up @@ -1682,7 +1682,7 @@ async fn test_has_result_set() -> Result<()> {

let sqls = vec![
("create table tb2(id int, c1 varchar) Engine=Fuse;", false),
("insert into tb2 values(1, 'mysql'),(1, 'databend')", false),
("insert into tb2 values(1, 'mysql'),(1, 'databend')", true),
("select * from tb2;", true),
];

Expand Down
12 changes: 12 additions & 0 deletions src/query/sql/src/planner/plans/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
use std::sync::Arc;

use databend_common_ast::ast::FormatTreeNode;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::types::StringType;
use databend_common_expression::DataBlock;
use databend_common_expression::DataField;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::DataSchemaRefExt;
use databend_common_expression::FromData;
use databend_common_expression::Scalar;
use databend_common_expression::TableSchemaRef;
Expand All @@ -28,6 +32,7 @@ use serde::Serialize;

use super::Plan;
use crate::plans::CopyIntoTablePlan;
use crate::INSERT_NAME;

#[derive(Clone, Debug, EnumAsInner)]
pub enum InsertInputSource {
Expand Down Expand Up @@ -116,6 +121,13 @@ impl Insert {
result.push(DataBlock::new_from_columns(vec![formatted_plan]));
Ok(vec![DataBlock::concat(&result)?])
}

pub fn schema(&self) -> DataSchemaRef {
DataSchemaRefExt::create(vec![DataField::new(
INSERT_NAME,
DataType::Number(NumberDataType::UInt64),
)])
}
}

pub(crate) fn format_insert_source(
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/src/planner/plans/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ impl Plan {
Plan::CallProcedure(plan) => plan.schema(),
Plan::InsertMultiTable(plan) => plan.schema(),
Plan::DescUser(plan) => plan.schema(),
Plan::Insert(plan) => plan.schema(),

_ => Arc::new(DataSchema::empty()),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_pipeline_core::processors::Processor;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_core::PipeItem;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_storage::MutationStatus;
use databend_storages_common_index::BloomIndex;
use opendal::Operator;

Expand Down Expand Up @@ -304,6 +305,14 @@ impl Processor for TransformSerializeBlock {
match std::mem::replace(&mut self.state, State::Consume) {
State::Serialized { serialized, index } => {
let block_meta = BlockWriter::write_down(&self.dal, serialized).await?;
let progress_values = ProgressValues {
rows: block_meta.row_count as usize,
bytes: block_meta.block_size as usize,
};
self.block_builder
.ctx
.get_write_progress()
.incr(&progress_values);

let mutation_log_data_block = if let Some(index) = index {
// we are replacing the block represented by the `index`
Expand All @@ -313,19 +322,18 @@ impl Processor for TransformSerializeBlock {
})
} else {
// appending new data block
let progress_values = ProgressValues {
rows: block_meta.row_count as usize,
bytes: block_meta.block_size as usize,
};
self.block_builder
.ctx
.get_write_progress()
.incr(&progress_values);

if let Some(tid) = self.table_id {
self.block_builder
.ctx
.update_multi_table_insert_status(tid, block_meta.row_count);
if matches!(self.kind, MutationKind::Insert) {
if let Some(tid) = self.table_id {
self.block_builder
.ctx
.update_multi_table_insert_status(tid, block_meta.row_count);
} else {
self.block_builder.ctx.add_mutation_status(MutationStatus {
insert_rows: block_meta.row_count,
update_rows: 0,
deleted_rows: 0,
});
}
}

if matches!(self.kind, MutationKind::Recluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::time::Instant;

use ahash::AHashMap;
use databend_common_base::base::tokio::sync::Semaphore;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::plan::build_origin_block_row_num;
Expand Down Expand Up @@ -66,7 +65,6 @@ use crate::operations::BlockMetaIndex;
use crate::FuseTable;

struct AggregationContext {
ctx: Arc<dyn TableContext>,
data_accessor: Operator,
write_settings: WriteSettings,
read_settings: ReadSettings,
Expand Down Expand Up @@ -135,7 +133,6 @@ impl MatchedAggregator {

Ok(Self {
aggregation_ctx: Arc::new(AggregationContext {
ctx: ctx.clone(),
write_settings,
read_settings,
data_accessor,
Expand All @@ -147,7 +144,7 @@ impl MatchedAggregator {
segment_reader,
block_mutation_row_offset: HashMap::new(),
segment_locations: AHashMap::from_iter(segment_locations),
ctx: ctx.clone(),
ctx,
target_build_optimization,
meta_indexes: HashSet::new(),
})
Expand Down Expand Up @@ -393,11 +390,6 @@ impl AggregationContext {
"apply update and delete to segment idx {}, block idx {}",
segment_idx, block_idx,
);
let progress_values = ProgressValues {
rows: modified_offsets.len(),
bytes: 0,
};
self.ctx.get_write_progress().incr(&progress_values);
let mut origin_data_block = read_block(
self.write_settings.storage_format,
&self.block_reader,
Expand Down
Loading
Loading