Skip to content

Commit

Permalink
chore: when opening a region, set Version.flued_entry_id = RegionMani…
Browse files Browse the repository at this point in the history
…fest.flushed_entry_id
  • Loading branch information
DevilExileSu committed Sep 11, 2023
1 parent 3649fb0 commit 7144883
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
17 changes: 9 additions & 8 deletions src/mito2/src/engine/truncate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use object_store::util::join_path;
use smallvec::SmallVec;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
};
use store_api::storage::RegionId;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -103,7 +103,7 @@ async fn test_engine_put_data_after_truncate() {
};
put_rows(&engine, region_id, rows).await;

// Scan the region.
// Scan the region.mut
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -234,19 +234,17 @@ async fn test_engine_truncate_reopen() {
};
put_rows(&engine, region_id, rows).await;

let region = engine.get_region(region_id).unwrap();
let last_entry_id = region.version_control.current().last_entry_id;

// Truncate the region
engine
.handle_request(region_id, RegionRequest::Truncate(RegionTruncateRequest {}))
.await
.unwrap();

// Close the region.
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();

// Reopen the region again.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
Expand All @@ -259,6 +257,9 @@ async fn test_engine_truncate_reopen() {
.await
.unwrap();

let region = engine.get_region(region_id).unwrap();
assert_eq!(last_entry_id, region.version().flushed_entry_id);

// Scan the region.
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
Expand Down
6 changes: 4 additions & 2 deletions src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ impl RegionOpener {

let region_id = metadata.region_id;
let mutable = self.memtable_builder.build(&metadata);
let version = VersionBuilder::new(metadata, mutable).build();
let version = VersionBuilder::new(metadata, mutable)
.flushed_entry_id(manifest.flushed_entry_id)
.build();
let flushed_sequence = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version));
replay_memtable(wal, region_id, flushed_sequence, &version_control).await?;
Expand All @@ -173,7 +175,7 @@ async fn replay_memtable<S: LogStore>(
version_control: &VersionControlRef,
) -> Result<()> {
let mut rows_replayed = 0;
let mut last_entry_id = EntryId::MIN;
let mut last_entry_id = flushed_entry_id;
let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control);
let mut wal_stream = wal.scan(region_id, flushed_entry_id)?;
while let Some(res) = wal_stream.next().await {
Expand Down

0 comments on commit 7144883

Please sign in to comment.