Skip to content

Commit

Permalink
feat: Add segment persist of closed buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldix committed Feb 9, 2024
1 parent 2e88ad0 commit 6541713
Show file tree
Hide file tree
Showing 9 changed files with 391 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3_write/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ chrono = "0.4"
crc32fast = "1.2.0"
crossbeam-channel = "0.5.11"
datafusion = { workspace = true }
datafusion_util = { path = "../datafusion_util" }
parking_lot = "0.11.1"
parquet = { workspace = true }
thiserror = "1.0"
Expand Down
29 changes: 24 additions & 5 deletions influxdb3_write/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ impl Catalog {
}
}

pub fn from_inner(inner: InnerCatalog) -> Self {
Self {
inner: RwLock::new(inner),
}
}

pub(crate) fn replace_database(
&self,
sequence: SequenceNumber,
Expand Down Expand Up @@ -94,9 +100,13 @@ impl Catalog {
pub fn sequence_number(&self) -> SequenceNumber {
self.inner.read().sequence
}

pub fn clone_inner(&self) -> InnerCatalog {
self.inner.read().clone()
}
}

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct InnerCatalog {
/// The catalog is a map of databases with their table schemas
databases: HashMap<String, Arc<DatabaseSchema>>,
Expand Down Expand Up @@ -135,7 +145,11 @@ impl DatabaseSchema {
pub fn get_table_schema(&self, table_name: &str) -> Option<Schema> {
self.tables
.get(table_name)
.and_then(|table| table.schema.clone())
.map(|table| table.schema.clone())
}

pub fn get_table(&self, table_name: &str) -> Option<&TableDefinition> {
self.tables.get(table_name)
}

pub fn table_names(&self) -> Vec<String> {
Expand All @@ -151,7 +165,7 @@ impl DatabaseSchema {
pub struct TableDefinition {
pub name: String,
#[serde(skip_serializing, skip_deserializing)]
pub schema: Option<Schema>,
pub schema: Schema,
columns: BTreeMap<String, i16>,
}

Expand Down Expand Up @@ -218,7 +232,7 @@ impl TableDefinition {

Self {
name: name.into(),
schema: Some(schema),
schema,
columns,
}
}
Expand All @@ -241,7 +255,12 @@ impl TableDefinition {
for (name, column_type) in columns.into_iter() {
self.columns.insert(name, column_type);
}
self.schema = Some(schema);
self.schema = schema;
}

#[allow(dead_code)]
pub(crate) fn schema(&self) -> &Schema {
&self.schema
}

pub(crate) fn columns(&self) -> &BTreeMap<String, i16> {
Expand Down
20 changes: 15 additions & 5 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datafusion::prelude::Expr;
use iox_query::QueryChunk;
use parquet::format::FileMetaData;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::path::PathBuf;
Expand Down Expand Up @@ -187,16 +188,18 @@ pub trait Persister: Debug + Send + Sync + 'static {
/// for this segment.
async fn persist_segment(&self, persisted_segment: PersistedSegment) -> Result<()>;

// Writes a SendableRecorgBatchStream to the Parquet format and perists it
// to Object Store at the given path
// Writes a SendableRecorgBatchStream to the Parquet format and persists it
// to Object Store at the given path. Returns the number of bytes written and the file metadata.
async fn persist_parquet_file(
&self,
path: ParquetFilePath,
record_batch: SendableRecordBatchStream,
) -> crate::Result<FileMetaData>;
) -> crate::Result<(u64, FileMetaData)>;

/// Returns the configured `ObjectStore` that data is loaded from and persisted to.
fn object_store(&self) -> Arc<dyn object_store::ObjectStore>;

fn as_any(&self) -> &dyn Any;
}

pub trait Wal: Debug + Send + Sync + 'static {
Expand Down Expand Up @@ -224,6 +227,8 @@ pub struct SegmentFile {
pub trait WalSegmentWriter: Debug + Send + Sync + 'static {
fn id(&self) -> SegmentId;

fn bytes_written(&self) -> u64;

fn write_batch(&mut self, ops: Vec<WalOp>) -> wal::Result<SequenceNumber>;

fn last_sequence_number(&self) -> SequenceNumber;
Expand Down Expand Up @@ -309,7 +314,12 @@ pub struct PersistedSegment {
pub segment_max_time: i64,
/// The collection of databases that had tables persisted in this segment. The tables will then have their
/// name and the parquet files.
pub databases: HashMap<String, TableParquetFiles>,
pub databases: HashMap<String, DatabaseTables>,
}

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct DatabaseTables {
pub tables: HashMap<String, TableParquetFiles>,
}

/// A collection of parquet files persisted in a segment for a specific table.
Expand All @@ -328,7 +338,7 @@ pub struct TableParquetFiles {
pub struct ParquetFile {
pub path: String,
pub size_bytes: u64,
pub row_count: u32,
pub row_count: u64,
pub min_time: i64,
pub max_time: i64,
}
15 changes: 15 additions & 0 deletions influxdb3_write/src/paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,21 @@ impl ParquetFilePath {
));
Self(path)
}

pub fn new_with_parititon_key(
db_name: &str,
table_name: &str,
partition_key: &str,
file_number: u32,
) -> Self {
let path = ObjPath::from(format!(
"dbs/{db_name}/{table_name}/{}/{:010}.{}",
partition_key,
object_store_file_stem(file_number),
PARQUET_FILE_EXTENSION
));
Self(path)
}
}

impl Deref for ParquetFilePath {
Expand Down
15 changes: 11 additions & 4 deletions influxdb3_write/src/persister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;
use std::any::Any;
use std::io::Write;
use std::sync::Arc;

Expand Down Expand Up @@ -214,16 +215,21 @@ impl Persister for PersisterImpl {
&self,
path: ParquetFilePath,
record_batch: SendableRecordBatchStream,
) -> Result<FileMetaData> {
) -> Result<(u64, FileMetaData)> {
let parquet = self.serialize_to_parquet(record_batch).await?;
let bytes_written = parquet.bytes.len() as u64;
self.object_store.put(path.as_ref(), parquet.bytes).await?;

Ok(parquet.meta_data)
Ok((bytes_written, parquet.meta_data))
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
self.object_store.clone()
}

fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}

pub struct ParquetBytes {
Expand Down Expand Up @@ -482,7 +488,7 @@ async fn persist_and_load_parquet_bytes() {
stream_builder.tx().send(Ok(batch2)).await.unwrap();

let path = ParquetFilePath::new("db_one", "table_one", Utc::now(), 1);
let meta = persister
let (bytes_written, meta) = persister
.persist_parquet_file(path.clone(), stream_builder.build())
.await
.unwrap();
Expand All @@ -493,5 +499,6 @@ async fn persist_and_load_parquet_bytes() {
let bytes = persister.load_parquet_file(path).await.unwrap();

// Assert that we have a file of bytes > 0
assert!(!bytes.is_empty())
assert!(!bytes.is_empty());
assert_eq!(bytes.len() as u64, bytes_written);
}
18 changes: 17 additions & 1 deletion influxdb3_write/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ impl WalSegmentWriter for WalSegmentWriterImpl {
self.segment_id
}

fn bytes_written(&self) -> u64 {
self.bytes_written as u64
}

fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<SequenceNumber> {
self.write_batch(ops)
}
Expand All @@ -331,13 +335,15 @@ impl WalSegmentWriter for WalSegmentWriterImpl {
pub struct WalSegmentWriterNoopImpl {
segment_id: SegmentId,
sequence_number: SequenceNumber,
wal_ops_written: usize,
}

impl WalSegmentWriterNoopImpl {
pub fn new(segment_id: SegmentId) -> Self {
Self {
segment_id,
sequence_number: SequenceNumber::new(0),
wal_ops_written: 0,
}
}
}
Expand All @@ -347,9 +353,19 @@ impl WalSegmentWriter for WalSegmentWriterNoopImpl {
self.segment_id
}

fn write_batch(&mut self, _ops: Vec<WalOp>) -> Result<SequenceNumber> {
fn bytes_written(&self) -> u64 {
println!("getting bytes_written");
self.wal_ops_written as u64
}

fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<SequenceNumber> {
let sequence_number = self.sequence_number.next();
self.sequence_number = sequence_number;
self.wal_ops_written += ops.len();
println!(
"write_batch called: wal_ops_written: {}",
self.wal_ops_written
);
Ok(sequence_number)
}

Expand Down
Loading

0 comments on commit 6541713

Please sign in to comment.