-
Notifications
You must be signed in to change notification settings - Fork 605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(iceberg): add support for creating new iceberg tables with partition keys #20340
base: main
Are you sure you want to change the base?
feat(iceberg): add support for creating new iceberg tables with partition keys #20340
Conversation
@@ -1446,6 +1502,7 @@ mod test { | |||
r#type: "upsert".to_owned(), | |||
force_append_only: false, | |||
primary_key: Some(vec!["v1".to_owned()]), | |||
partition_by: Some(vec!["v1".to_owned(), "identity(v2)".to_owned()]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should test all partition cases here, e.g
v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)
let mut partition_fields = | ||
Vec::<UnboundPartitionField>::with_capacity(partition_by.len()); | ||
for partition_field in partition_by { | ||
let re = Regex::new(r"\w+(\(\w+\))?").unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Regex can be created outside the for loop. Also according to v1, identity(v1), truncate(4,v2), bucket(5,v1), year(v3), month(v4), day(v5), hour(v6), void(v1)
, maybe we can check the prefix first and capture the param according to different transform type
1f0f0c5
to
65e01f0
Compare
.partition_spec( | ||
UnboundPartitionSpec::builder() | ||
.add_partition_fields(partition_fields) | ||
.unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should return the error here instead of unwrap
let partition_fields = match &self.config.partition_by { | ||
Some(partition_field) => { | ||
let mut partition_fields = Vec::<UnboundPartitionField>::new(); | ||
// captures column, transform(column), transform(n,column), transform(n, column) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this should be captures column, transform(column), transform(n,column)
?
if col.name == column { | ||
partition_fields.push( | ||
UnboundPartitionField::builder() | ||
.source_id(pos as i32) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should enumerate iceberg_schema here and use the field.id
instead of pos here.
(&mat["func"], Transform::Identity) | ||
} else { | ||
let mut func = mat["func"].to_owned(); | ||
let n = &mat["n"]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems n only need inside if branch
if func == "bucket" || func == "truncate" {
let n = &mat["n"];
func = format!("{func}({n})");
}
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Checklist
Documentation
Release note