Skip to content

Commit

Permalink
feat: add basic wal implementation for Edge (#24570)
Browse files Browse the repository at this point in the history
* feat: add basic wal implementation for Edge

This WAL implementation uses some of the code from the wal crate, but departs pretty significantly from it in many ways. For now it uses simple JSON encoding for the serialized ops, but we may want to switch that to Protobuf at some point in the future. This version of the wal doesn't have its own buffering. That will be implemented higher up in the BufferImpl, which will use the wal and SegmentWriter to make data in the buffer durable.

The write flow will be that writes will come into the buffer and validate/update against an in memory Catalog. Once validated, writes will get buffered up in memory and then flushed into the WAL periodically (likely every 10-20ms). After being flushed to the wal, the entire batch of writes will be put into the in memory queryable buffer. After that responses will be sent back to the clients. This should reduce the write lock pressure on the in-memory buffer considerably.

In this PR:
- Update the Wal, WalSegmentWriter, and WalSegmentReader traits to line up with new design/understanding
- Implement wal (mainly just a way to identify segment files in a directory)
- Implement WalSegmentWriter (write header, op batch with crc, and track sequence number in segment, re-open existing file)
- Implement WalSegmentReader

* refactor: make Wal return impl reader/writer

* refactor: clean up wal segment open

* fix: WriteBuffer and Wal usage

Turn wal and write buffer references into a concrete type, rather than dyn.

* fix: have wal loading ignore invalid files
  • Loading branch information
pauldix authored Jan 12, 2024
1 parent 028a05f commit 02b4d28
Show file tree
Hide file tree
Showing 8 changed files with 732 additions and 66 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use influxdb3_server::{query_executor::QueryExecutorImpl, serve, CommonServerSta
use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::wal::WalImpl;
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::Wal;
use iox_query::exec::{Executor, ExecutorConfig};
use ioxd_common::reexport::trace_http::ctx::TraceHeaderParser;
use object_store::DynObjectStore;
Expand Down Expand Up @@ -46,6 +45,9 @@ pub enum Error {

#[error("Server error: {0}")]
Server(#[from] influxdb3_server::Error),

#[error("Wal error: {0}")]
Wal(#[from] influxdb3_write::wal::Error),
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -231,9 +233,10 @@ pub async fn command(config: Config) -> Result<()> {
*config.http_bind_address,
);
let catalog = Arc::new(influxdb3_write::catalog::Catalog::new());
let wal: Option<Arc<dyn Wal>> = config
let wal: Option<Arc<WalImpl>> = config
.wal_directory
.map(|dir| Arc::new(WalImpl::new(dir)) as _);
.map(|dir| WalImpl::new(dir).map(Arc::new))
.transpose()?;
let write_buffer = Arc::new(WriteBufferImpl::new(Arc::clone(&catalog), wal));
let query_executor = QueryExecutorImpl::new(
catalog,
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ mod tests {

let write_buffer = Arc::new(influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&catalog),
None,
None::<Arc<influxdb3_write::wal::WalImpl>>,
));
let query_executor = crate::query_executor::QueryExecutorImpl::new(
catalog,
Expand Down
44 changes: 29 additions & 15 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl<W: WriteBuffer> QueryExecutor for QueryExecutorImpl<W> {
// This implementation is for the Flight service
#[async_trait]
impl<W: WriteBuffer> QueryNamespaceProvider for QueryExecutorImpl<W> {
type Db = QueryDatabase;
type Db = QueryDatabase<W>;

async fn db(
&self,
Expand Down Expand Up @@ -135,18 +135,18 @@ impl<W: WriteBuffer> QueryNamespaceProvider for QueryExecutorImpl<W> {
}
}

#[derive(Debug, Clone)]
pub struct QueryDatabase {
#[derive(Debug)]
pub struct QueryDatabase<B> {
db_schema: Arc<DatabaseSchema>,
write_buffer: Arc<dyn WriteBuffer>,
write_buffer: Arc<B>,
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
}

impl QueryDatabase {
impl<B: WriteBuffer> QueryDatabase<B> {
pub fn new(
db_schema: Arc<DatabaseSchema>,
write_buffer: Arc<dyn WriteBuffer>,
write_buffer: Arc<B>,
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
) -> Self {
Expand All @@ -160,7 +160,7 @@ impl QueryDatabase {
}

#[async_trait]
impl QueryNamespace for QueryDatabase {
impl<B: WriteBuffer> QueryNamespace for QueryDatabase<B> {
async fn chunks(
&self,
_table_name: &str,
Expand Down Expand Up @@ -189,10 +189,17 @@ impl QueryNamespace for QueryDatabase {
}

fn new_query_context(&self, span_ctx: Option<SpanContext>) -> IOxSessionContext {
let qdb = Self::new(
Arc::clone(&self.db_schema),
Arc::clone(&self.write_buffer),
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
);

let mut cfg = self
.exec
.new_execution_config(ExecutorType::Query)
.with_default_catalog(Arc::new(self.clone()))
.with_default_catalog(Arc::new(qdb))
.with_span_context(span_ctx);

for (k, v) in self.datafusion_config.as_ref() {
Expand All @@ -203,7 +210,7 @@ impl QueryNamespace for QueryDatabase {
}
}

impl CatalogProvider for QueryDatabase {
impl<B: WriteBuffer> CatalogProvider for QueryDatabase<B> {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
Expand All @@ -215,15 +222,22 @@ impl CatalogProvider for QueryDatabase {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
info!("CatalogProvider schema {}", name);
let qdb = Self::new(
Arc::clone(&self.db_schema),
Arc::clone(&self.write_buffer),
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
);

match name {
DEFAULT_SCHEMA => Some(Arc::new(self.clone())),
DEFAULT_SCHEMA => Some(Arc::new(qdb)),
_ => None,
}
}
}

#[async_trait]
impl SchemaProvider for QueryDatabase {
impl<B: WriteBuffer> SchemaProvider for QueryDatabase<B> {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
Expand Down Expand Up @@ -253,14 +267,14 @@ impl SchemaProvider for QueryDatabase {
}

#[derive(Debug)]
pub struct QueryTable {
pub struct QueryTable<B> {
db_schema: Arc<DatabaseSchema>,
name: Arc<str>,
schema: Schema,
write_buffer: Arc<dyn WriteBuffer>,
write_buffer: Arc<B>,
}

impl QueryTable {
impl<B: WriteBuffer> QueryTable<B> {
fn chunks(
&self,
ctx: &SessionState,
Expand All @@ -279,7 +293,7 @@ impl QueryTable {
}

#[async_trait]
impl TableProvider for QueryTable {
impl<B: WriteBuffer> TableProvider for QueryTable<B> {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
Expand Down
8 changes: 8 additions & 0 deletions influxdb3_write/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }

arrow = { workspace = true }
async-trait = "0.1"
byteorder = "1.3.4"
chrono = "0.4"
crc32fast = "1.2.0"
datafusion = { workspace = true }
parking_lot = "0.11.1"
thiserror = "1.0"
tokio = { version = "1.35", features = ["macros", "fs", "io-util", "parking_lot", "rt-multi-thread", "sync", "time"] }
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.107"
snap = "1.0.0"

[dev-dependencies]
test_helpers = { path = "../test_helpers" }

59 changes: 42 additions & 17 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
) -> Result<Vec<Arc<dyn BufferSegment>>>;

/// Returns the configured WAL, if there is one.
fn wal(&self) -> Option<Arc<dyn Wal>>;
fn wal(&self) -> Option<Arc<impl Wal>>;
}

/// A segment in the buffer that corresponds to a single WAL segment file. It contains a catalog with any updates
Expand Down Expand Up @@ -109,11 +109,40 @@ pub trait ChunkContainer: Debug + Send + Sync + 'static {

/// The segment identifier, which will be monotonically increasing.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct SegmentId(u64);
pub struct SegmentId(u32);
pub type SegmentIdBytes = [u8; 4];

impl SegmentId {
pub fn new(id: u32) -> Self {
Self(id)
}

pub fn as_bytes(&self) -> SegmentIdBytes {
self.0.to_be_bytes()
}

pub fn from_bytes(bytes: SegmentIdBytes) -> Self {
Self(u32::from_be_bytes(bytes))
}

pub fn next(&self) -> Self {
Self(self.0 + 1)
}
}

/// The sequence number of a batch of WAL operations.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct SequenceNumber(u64);
pub struct SequenceNumber(u32);

impl SequenceNumber {
pub fn new(id: u32) -> Self {
Self(id)
}

pub fn next(&self) -> Self {
Self(self.0 + 1)
}
}

#[async_trait]
pub trait Persister: Debug + Send + Sync + 'static {
Expand All @@ -136,21 +165,18 @@ pub trait Persister: Debug + Send + Sync + 'static {
fn object_store(&self) -> Arc<dyn object_store::ObjectStore>;
}

#[async_trait]
pub trait Wal: Debug + Send + Sync + 'static {
/// Opens a writer to a segment, either creating a new file or appending to an existing file.
async fn open_segment_writer(&self, segment_id: SegmentId)
-> Result<Arc<dyn WalSegmentWriter>>;
fn open_segment_writer(&self, segment_id: SegmentId) -> wal::Result<impl WalSegmentWriter>;

/// Opens a reader to a segment file.
async fn open_segment_reader(&self, segment_id: SegmentId)
-> Result<Arc<dyn WalSegmentReader>>;
fn open_segment_reader(&self, segment_id: SegmentId) -> wal::Result<impl WalSegmentReader>;

/// Checks the WAL directory for any segment files and returns them.
async fn segment_files(&self) -> Result<Vec<SegmentFile>>;
fn segment_files(&self) -> wal::Result<Vec<SegmentFile>>;

/// Drops the WAL segment file from disk.
async fn drop_wal_segment(&self, segment_id: SegmentId) -> Result<()>;
/// Deletes the WAL segment file from disk.
fn delete_wal_segment(&self, segment_id: SegmentId) -> wal::Result<()>;
}

#[derive(Debug)]
Expand All @@ -161,22 +187,21 @@ pub struct SegmentFile {
pub segment_id: SegmentId,
}

#[async_trait]
pub trait WalSegmentWriter: Debug + Send + Sync + 'static {
fn id(&self) -> SegmentId;

async fn write(&self, op: WalOp) -> Result<SequenceNumber>;
fn write_batch(&mut self, ops: Vec<WalOp>) -> wal::Result<SequenceNumber>;
}

pub trait WalSegmentReader: Debug + Send + Sync + 'static {
fn id(&self) -> SegmentId;

fn next_batch(&mut self) -> Result<Option<WalBatch>>;
fn next_batch(&mut self) -> wal::Result<Option<WalOpBatch>>;
}

/// Individual WalOps get batched into the WAL asynchronously. The batch is then written to the segment file.
#[derive(Debug, Serialize, Deserialize)]
pub struct WalBatch {
pub struct WalOpBatch {
pub sequence_number: SequenceNumber,
pub ops: Vec<WalOp>,
}
Expand All @@ -185,14 +210,14 @@ pub struct WalBatch {
/// lands in object storage. Things in the WAL are buffered until they are persisted to object storage. The write
/// is called an `LpWrite` because it is a write of line protocol and we intend to have a new write protocol for
/// 3.0 that supports a different kind of schema.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub enum WalOp {
LpWrite(LpWriteOp),
}

/// A write of 1 or more lines of line protocol to a single database. The default time is set by the server at the
/// time the write comes in.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct LpWriteOp {
pub db_name: String,
pub lp: String,
Expand Down
Loading

0 comments on commit 02b4d28

Please sign in to comment.