Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Dec 9, 2024
1 parent 4888591 commit 97963d8
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 27 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 15 additions & 3 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,23 @@ pub trait TableContext: Send + Sync {
max_files: Option<usize>,
) -> Result<FilteredCopyFiles>;

fn add_segment_location(&self, segment_loc: Location) -> Result<()>;
fn add_inserted_segment_location(&self, segment_loc: Location) -> Result<()>;

fn clear_segment_locations(&self) -> Result<()>;
fn clear_inserted_segment_locations(&self) -> Result<()>;

fn get_segment_locations(&self) -> Result<Vec<Location>>;
fn get_inserted_segment_locations(&self) -> Result<Vec<Location>>;

fn add_target_segment_location(&self, _index: usize, _location: Location) -> Result<()> {
unimplemented!()
}

fn get_target_segment_locations(&self) -> Result<Vec<(usize, Location)>> {
unimplemented!()
}

fn clear_target_segment_locations(&self) -> Result<()> {
unimplemented!()
}

fn add_file_status(&self, file_path: &str, file_status: FileStatus) -> Result<()>;

Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ async fn compact_table(
PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;

// Clears previously generated segment locations to avoid duplicate data in the refresh phase
ctx.clear_segment_locations()?;
ctx.clear_inserted_segment_locations()?;
ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;
drop(complete_executor);
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ async fn generate_refresh_index_plan(
catalog: &str,
table_id: MetaId,
) -> Result<Vec<Plan>> {
let segment_locs = ctx.get_segment_locations()?;
let segment_locs = ctx.get_inserted_segment_locations()?;
let catalog = ctx.get_catalog(catalog).await?;
let mut plans = vec![];
let indexes = catalog
Expand Down Expand Up @@ -272,7 +272,7 @@ async fn generate_refresh_inverted_index_plan(
desc: &RefreshDesc,
table: Arc<dyn Table>,
) -> Result<Vec<Plan>> {
let segment_locs = ctx.get_segment_locations()?;
let segment_locs = ctx.get_inserted_segment_locations()?;
let mut plans = vec![];

let table_meta = &table.get_table_info().meta;
Expand All @@ -296,7 +296,7 @@ async fn generate_refresh_virtual_column_plan(
ctx: Arc<QueryContext>,
desc: &RefreshDesc,
) -> Result<Option<Plan>> {
let segment_locs = ctx.get_segment_locations()?;
let segment_locs = ctx.get_inserted_segment_locations()?;

let table_info = ctx
.get_table(&desc.catalog, &desc.database, &desc.table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl ReclusterTableInterpreter {

let complete_executor =
PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
self.ctx.clear_segment_locations()?;
self.ctx.clear_inserted_segment_locations()?;
self.ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;
// make sure the executor is dropped before the next loop.
Expand Down
31 changes: 24 additions & 7 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ use databend_common_storage::StageFileInfo;
use databend_common_storage::StageFilesInfo;
use databend_common_storage::StorageMetrics;
use databend_common_storages_delta::DeltaTable;
use databend_common_storages_fuse::operations::SegmentIndex;
use databend_common_storages_fuse::TableContext;
use databend_common_storages_iceberg::IcebergTable;
use databend_common_storages_orc::OrcTable;
Expand Down Expand Up @@ -152,9 +153,7 @@ pub struct QueryContext {
fragment_id: Arc<AtomicUsize>,
// Used by synchronized generate aggregating indexes when new data written.
inserted_segment_locs: Arc<RwLock<HashSet<Location>>>,
// Temp table for materialized CTE, first string is the database_name, second string is the table_name
// All temp tables' catalog is `CATALOG_DEFAULT`, so we don't need to store it.
m_cte_temp_table: Arc<RwLock<Vec<(String, String)>>>,
target_segment_locs: Arc<DashMap<SegmentIndex, Location>>,
}

impl QueryContext {
Expand All @@ -180,7 +179,7 @@ impl QueryContext {
fragment_id: Arc::new(AtomicUsize::new(0)),
inserted_segment_locs: Arc::new(RwLock::new(HashSet::new())),
block_threshold: Arc::new(RwLock::new(BlockThresholds::default())),
m_cte_temp_table: Arc::new(Default::default()),
target_segment_locs: Arc::new(DashMap::new()),
})
}

Expand Down Expand Up @@ -1068,19 +1067,19 @@ impl TableContext for QueryContext {
})
}

fn add_segment_location(&self, segment_loc: Location) -> Result<()> {
fn add_inserted_segment_location(&self, segment_loc: Location) -> Result<()> {
let mut segment_locations = self.inserted_segment_locs.write();
segment_locations.insert(segment_loc);
Ok(())
}

fn clear_segment_locations(&self) -> Result<()> {
fn clear_inserted_segment_locations(&self) -> Result<()> {
let mut segment_locations = self.inserted_segment_locs.write();
segment_locations.clear();
Ok(())
}

fn get_segment_locations(&self) -> Result<Vec<Location>> {
fn get_inserted_segment_locations(&self) -> Result<Vec<Location>> {
Ok(self
.inserted_segment_locs
.read()
Expand All @@ -1089,6 +1088,24 @@ impl TableContext for QueryContext {
.collect::<Vec<_>>())
}

fn add_target_segment_location(&self, index: usize, location: Location) -> Result<()> {
self.target_segment_locs.insert(index, location);
Ok(())
}

fn get_target_segment_locations(&self) -> Result<Vec<(usize, Location)>> {
Ok(self
.target_segment_locs
.iter()
.map(|entry| (*entry.key(), entry.value().clone()))
.collect())
}

fn clear_target_segment_locations(&self) -> Result<()> {
self.target_segment_locs.clear();
Ok(())
}

fn add_file_status(&self, file_path: &str, file_status: FileStatus) -> Result<()> {
if matches!(self.get_query_kind(), QueryKind::CopyIntoTable) {
self.shared.copy_status.add_chunk(file_path, file_status);
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,15 +843,15 @@ impl TableContext for CtxDelegation {
todo!()
}

fn add_segment_location(&self, _segment_loc: Location) -> Result<()> {
fn add_inserted_segment_location(&self, _segment_loc: Location) -> Result<()> {
todo!()
}

fn clear_segment_locations(&self) -> Result<()> {
fn clear_inserted_segment_locations(&self) -> Result<()> {
todo!()
}

fn get_segment_locations(&self) -> Result<Vec<Location>> {
fn get_inserted_segment_locations(&self) -> Result<Vec<Location>> {
todo!()
}

Expand Down
6 changes: 3 additions & 3 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,15 +732,15 @@ impl TableContext for CtxDelegation {
HashMap::new()
}

fn add_segment_location(&self, _segment_loc: Location) -> Result<()> {
fn add_inserted_segment_location(&self, _segment_loc: Location) -> Result<()> {
todo!()
}

fn clear_segment_locations(&self) -> Result<()> {
fn clear_inserted_segment_locations(&self) -> Result<()> {
todo!()
}

fn get_segment_locations(&self) -> Result<Vec<Location>> {
fn get_inserted_segment_locations(&self) -> Result<Vec<Location>> {
todo!()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use databend_common_catalog::table_with_options::get_with_opt_max_batch_size;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_storages_view::view_table::QUERY;
use databend_storages_common_table_meta::table::{get_change_type, is_hilbert_recluster};
use databend_storages_common_table_meta::table::get_change_type;
use databend_storages_common_table_meta::table::is_hilbert_recluster;

use crate::binder::util::TableIdentifier;
use crate::binder::Binder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn is_hilbert_recluster(table_alias_name: &Option<String>) -> bool {
if let Ok(suffix) = i64::from_str_radix(alias_param[1], 16) {
// 2023-01-01 00:00:00.
let base_timestamp = 1672502400;
if suffix > base_timestamp && alias_param[0] == "_compact"{
if suffix > base_timestamp && alias_param[0] == "_compact" {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ where F: SnapshotGenerator + Send + 'static
metrics_inc_commit_copied_files(files.file_info.len() as u64);
}
for segment_loc in std::mem::take(&mut self.new_segment_locs).into_iter() {
self.ctx.add_segment_location(segment_loc)?;
self.ctx.add_inserted_segment_location(segment_loc)?;
}

let target_descriptions = {
Expand Down

0 comments on commit 97963d8

Please sign in to comment.