Skip to content

Commit

Permalink
feat(mito): Implement mito2 Wal (#2103)
Browse files Browse the repository at this point in the history
* feat: define wal struct

* feat: Implement Wal read/write

* feat: obsolete wal

* test: test wal

* refactor: use try_stream and remove async from scan
  • Loading branch information
evenyag authored Aug 4, 2023
1 parent 9139962 commit cb4dd89
Show file tree
Hide file tree
Showing 8 changed files with 433 additions and 13 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures.workspace = true
# TODO(yingwen): Update and use api crate once https://github.com/GreptimeTeam/greptime-proto/pull/75 is merged.
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec4b84931378004db60d168e2604bc3fb9735e9c" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "10c349c033dded29097d0dc933fbc2f89f658032" }
lazy_static = "1.4"
log-store = { path = "../log-store" }
metrics.workspace = true
object-store = { path = "../object-store" }
parquet = { workspace = true, features = ["async"] }
prost.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ impl MitoEngine {
}

/// Stop the engine.
///
/// Stopping the engine doesn't stop the underlying log store as other components might
/// still use it.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
}
Expand Down
75 changes: 68 additions & 7 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
use std::any::Any;

use common_datasource::compression::CompressionType;
use common_error::ext::ErrorExt;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use datatypes::arrow::error::ArrowError;
use prost::{DecodeError, EncodeError};
use snafu::{Location, Snafu};
use store_api::manifest::ManifestVersion;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -205,6 +206,60 @@ pub enum Error {
column: String,
source: datatypes::Error,
},

#[snafu(display(
"Failed to encode WAL entry, region_id: {}, location: {}, source: {}",
region_id,
location,
source
))]
EncodeWal {
region_id: RegionId,
location: Location,
source: EncodeError,
},

#[snafu(display("Failed to write WAL, location: {}, source: {}", location, source))]
WriteWal {
location: Location,
source: BoxedError,
},

#[snafu(display(
"Failed to read WAL, region_id: {}, location: {}, source: {}",
region_id,
location,
source
))]
ReadWal {
region_id: RegionId,
location: Location,
source: BoxedError,
},

#[snafu(display(
"Failed to decode WAL entry, region_id: {}, location: {}, source: {}",
region_id,
location,
source
))]
DecodeWal {
region_id: RegionId,
location: Location,
source: DecodeError,
},

#[snafu(display(
"Failed to delete WAL, region_id: {}, location: {}, source: {}",
region_id,
location,
source
))]
DeleteWal {
region_id: RegionId,
location: Location,
source: BoxedError,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -214,9 +269,12 @@ impl ErrorExt for Error {
use Error::*;

match self {
OpenDal { .. } | WriteParquet { .. } | ReadParquet { .. } => {
StatusCode::StorageUnavailable
}
OpenDal { .. }
| WriteParquet { .. }
| ReadParquet { .. }
| WriteWal { .. }
| ReadWal { .. }
| DeleteWal { .. } => StatusCode::StorageUnavailable,
CompressObject { .. }
| DecompressObject { .. }
| SerdeJson { .. }
Expand All @@ -231,9 +289,12 @@ impl ErrorExt for Error {
| InvalidSchema { .. }
| InvalidRequest { .. }
| FillDefault { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => {
StatusCode::Internal
}
RegionMetadataNotFound { .. }
| Join { .. }
| WorkerStopped { .. }
| Recv { .. }
| EncodeWal { .. }
| DecodeWal { .. } => StatusCode::Internal,
WriteBuffer { source, .. } => source.status_code(),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod region;
pub mod request;
#[allow(dead_code)]
pub mod sst;
pub mod wal;
#[allow(dead_code)]
mod worker;

Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::worker::WorkerGroup;
pub struct TestEnv {
/// Path to store data.
data_home: TempDir,
// TODO(yingwen): Maybe provide a way to close the log store.
}

impl Default for TestEnv {
Expand Down
Loading

0 comments on commit cb4dd89

Please sign in to comment.