-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add segment persist of closed buffer segment #24659
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of comments to add.
let row_count = self.rows.len(); | ||
let mut columns = BTreeMap::new(); | ||
for (name, column_type) in column_types { | ||
match ColumnType::try_from(*column_type).unwrap() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Along with this one, there are a few unwrap
s (and a panic
) in this function. It looks like the function that calls this is fallible (called on 192):
influxdb/influxdb3_write/src/write_buffer/mod.rs
Lines 174 to 192 in 4d9095e
fn get_table_chunks( | |
&self, | |
database_name: &str, | |
table_name: &str, | |
_filters: &[Expr], | |
_projection: Option<&Vec<usize>>, | |
_ctx: &SessionState, | |
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> { | |
let db_schema = self.catalog.db_schema(database_name).unwrap(); | |
let table = db_schema.tables.get(table_name).unwrap(); | |
let schema = table.schema.as_ref().cloned().unwrap(); | |
let table_buffer = self.clone_table_buffer(database_name, table_name).unwrap(); | |
let mut chunks = Vec::with_capacity(table_buffer.partition_buffers.len()); | |
for (partition_key, partition_buffer) in table_buffer.partition_buffers { | |
let partition_key: PartitionKey = partition_key.into(); | |
let batch = partition_buffer.rows_to_record_batch(&schema, table.columns()); |
So,
rows_to_record_batch
could also be fallible. Any opposition to using anyhow
to simplify errors for functions like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that function was just a temporary placeholder to wire up the happy path. The underlying buffer implementation (and thus this function) are going to get rewritten after the basic end-to-end persistence is all wired up.
No objections to using anyhow
from me. Want me to pick that up in this PR or in a follow on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be better for a follow on, I think it is more or less switching from
rows_to_record_batch(/* ... */) -> RecordBatch {
/* ... */
}
to
rows_to_record_batch(/* ... */) -> Result<RecordBatch, anyhow::Error> {
/* ... */
}
and using anyhow::Context
in place of unwrap
/panic
/expect
here. However, this "poisons" the calling function with anyhow
, which then needs to handle it explicitly or propagate it up, etc.
Once you've wired it up, the error handling could be addressed more holistically in a follow on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed up the get_table_chunks
to return a DataFusionError
rather than unwrap. I'll address rows_to_record_batch later when I rework the buffer
6541713
to
324027f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - but noted a couple of minor things.
050d4b8
to
b44003e
Compare
This builds on #24624, which should get merged in first.
Fixes #24573. This adds persist to
ClosedBufferSegment
, which will persist the Catalog, if it has been modified, persist all tables and partitions in the buffer as parquet files and then write the segment file.It doesn't have retries if errors are encountered during persistence and it doesn't parallelize them, which we should do later. Logged #24657 and #24658 to track those.