diff --git a/Cargo.lock b/Cargo.lock index e5a70d2868af..56f4509f1699 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4281,6 +4281,7 @@ dependencies = [ "match-template", "ordered-float 4.5.0", "serde", + "serde_json", "tokio", "typetag", ] @@ -8355,7 +8356,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=05c22cb1#05c22cb1afd6093d20aba9dd97891d402ce45217" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=2076f80#2076f8060d7e57e4da853f41262ced4e7d4c0a87" dependencies = [ "anyhow", "apache-avro", @@ -8403,7 +8404,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=05c22cb1#05c22cb1afd6093d20aba9dd97891d402ce45217" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=2076f80#2076f8060d7e57e4da853f41262ced4e7d4c0a87" dependencies = [ "anyhow", "async-trait", @@ -8420,7 +8421,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-hms" version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=05c22cb1#05c22cb1afd6093d20aba9dd97891d402ce45217" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=2076f80#2076f8060d7e57e4da853f41262ced4e7d4c0a87" dependencies = [ "anyhow", "async-trait", @@ -8439,7 +8440,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.3.0" -source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=05c22cb1#05c22cb1afd6093d20aba9dd97891d402ce45217" +source = "git+https://github.com/Xuanwo/iceberg-rust/?rev=2076f80#2076f8060d7e57e4da853f41262ced4e7d4c0a87" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 3c380288c6f5..465d183d95ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -307,10 +307,10 @@ http = "1" humantime = "2.1.0" hyper = "1" hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] } -iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "05c22cb1" } -iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "05c22cb1" } -iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "05c22cb1" } -iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "05c22cb1" } +iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "2076f80" } +iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "2076f80" } +iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "2076f80" } +iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "2076f80" } indexmap = "2.0.0" indicatif = "0.17.5" itertools = "0.13.0" diff --git a/src/query/storages/iceberg/Cargo.toml b/src/query/storages/iceberg/Cargo.toml index add1c755faf9..31ffa8aafb3c 100644 --- a/src/query/storages/iceberg/Cargo.toml +++ b/src/query/storages/iceberg/Cargo.toml @@ -33,6 +33,7 @@ ordered-float = { workspace = true } serde = { workspace = true } tokio = { workspace = true } typetag = { workspace = true } +serde_json = { workspace = true } [lints] workspace = true diff --git a/src/query/storages/iceberg/src/table.rs b/src/query/storages/iceberg/src/table.rs index eba5034fc5b7..9e50891f56ed 100644 --- a/src/query/storages/iceberg/src/table.rs +++ b/src/query/storages/iceberg/src/table.rs @@ -13,6 +13,8 @@ // limitations under the License. use std::any::Any; +use std::collections::BTreeMap; +use std::collections::HashMap; use std::sync::Arc; use arrow_schema::Schema as ArrowSchema; @@ -40,7 +42,7 @@ use databend_common_meta_app::schema::TableMeta; use databend_common_pipeline_core::Pipeline; use databend_storages_common_table_meta::table::ChangeType; use futures::TryStreamExt; -use tokio::sync::OnceCell; +use iceberg::io::FileIOBuilder; use crate::partition::IcebergPartInfo; use crate::predicate::PredicateBuilder; @@ -53,28 +55,15 @@ pub const ICEBERG_ENGINE: &str = "ICEBERG"; #[derive(Clone)] pub struct IcebergTable { info: TableInfo, - ctl: IcebergCatalog, - database_name: String, - table_name: String, - table: OnceCell, + pub table: iceberg::table::Table, } impl IcebergTable { /// create a new table on the table directory - #[async_backtrace::framed] pub fn try_create(info: TableInfo) -> Result> { - let ctl = IcebergCatalog::try_create(info.catalog_info.clone())?; - let (db_name, table_name) = info.desc.as_str().rsplit_once('.').ok_or_else(|| { - ErrorCode::BadArguments(format!("Iceberg table desc {} is invalid", &info.desc)) - })?; - Ok(Box::new(Self { - info: info.clone(), - ctl, - database_name: db_name.to_string(), - table_name: table_name.to_string(), - table: OnceCell::new(), - })) + let table = Self::parse_engine_options(&info.meta.engine_options)?; + Ok(Box::new(Self { info, table })) } pub fn description() -> StorageDescription { @@ -111,6 +100,88 @@ impl IcebergTable { TableSchema::try_from(&arrow_schema) } + /// build_engine_options will generate `engine_options` from [`iceberg::table::Table`] so that + /// we can distribute it across nodes and rebuild this table without loading from catalog again. + /// + /// We will never persist the `engine_options` to storage, so it's safe to change the implementation. + /// As long as you make sure both [`build_engine_options`] and [`parse_engine_options`] been updated. + pub fn build_engine_options(table: &iceberg::table::Table) -> Result> { + let (file_io_scheme, file_io_props) = table.file_io().clone().into_props(); + let file_io_props = serde_json::to_string(&file_io_props)?; + let metadata_location = table + .metadata_location() + .map(|v| v.to_string()) + .unwrap_or_default(); + let metadata = serde_json::to_string(table.metadata())?; + let identifier = serde_json::to_string(table.identifier())?; + + Ok(BTreeMap::from_iter([ + ("iceberg.file_io.scheme".to_string(), file_io_scheme), + ("iceberg.file_io.props".to_string(), file_io_props), + ("iceberg.metadata_location".to_string(), metadata_location), + ("iceberg.metadata".to_string(), metadata), + ("iceberg.identifier".to_string(), identifier), + ])) + } + + /// parse_engine_options will parse `engine_options` to [`BTreeMap`] so that we can rebuild the table. + /// + /// See [`build_engine_options`] for more information. + pub fn parse_engine_options( + options: &BTreeMap, + ) -> Result { + let file_io_scheme = options.get("iceberg.file_io.scheme").ok_or_else(|| { + ErrorCode::ReadTableDataError( + "Rebuild iceberg table failed: Missing iceberg.file_io.scheme", + ) + })?; + + let file_io_props: HashMap = + serde_json::from_str(options.get("iceberg.file_io.props").ok_or_else(|| { + ErrorCode::ReadTableDataError( + "Rebuild iceberg table failed: Missing iceberg.file_io.props", + ) + })?)?; + + let metadata_location = options + .get("iceberg.metadata_location") + .map(|s| s.to_string()) + .unwrap_or_default(); + + let metadata: iceberg::spec::TableMetadata = + serde_json::from_str(options.get("iceberg.metadata").ok_or_else(|| { + ErrorCode::ReadTableDataError( + "Rebuild iceberg table failed: Missing iceberg.metadata", + ) + })?)?; + + let identifier: iceberg::TableIdent = + serde_json::from_str(options.get("iceberg.identifier").ok_or_else(|| { + ErrorCode::ReadTableDataError( + "Rebuild iceberg table failed: Missing iceberg.identifier", + ) + })?)?; + + let file_io = FileIOBuilder::new(file_io_scheme) + .with_props(file_io_props) + .build() + .map_err(|err| { + ErrorCode::ReadTableDataError(format!( + "Rebuild iceberg table file io failed: {err:?}" + )) + })?; + + Ok(iceberg::table::Table::builder() + .identifier(identifier) + .metadata(metadata) + .metadata_location(metadata_location) + .file_io(file_io) + .build() + .map_err(|err| { + ErrorCode::ReadTableDataError(format!("Rebuild iceberg table failed: {err:?}")) + })?) + } + /// create a new table on the table directory #[async_backtrace::framed] pub async fn try_create_from_iceberg_catalog( @@ -121,6 +192,8 @@ impl IcebergTable { let table = Self::load_iceberg_table(&ctl, database_name, table_name).await?; let table_schema = Self::get_schema(&table)?; + let engine_options = Self::build_engine_options(&table)?; + // construct table info let info = TableInfo { ident: TableIdent::new(0, 0), @@ -129,6 +202,7 @@ impl IcebergTable { meta: TableMeta { schema: Arc::new(table_schema), engine: "iceberg".to_string(), + engine_options, created_on: Utc::now(), ..Default::default() }, @@ -136,31 +210,7 @@ impl IcebergTable { ..Default::default() }; - Ok(Self { - info, - ctl, - database_name: database_name.to_string(), - table_name: table_name.to_string(), - table: OnceCell::new_with(Some(table)), - }) - } - - /// Fetch or init the iceberg table - pub async fn table(&self) -> Result<&iceberg::table::Table> { - self.table - .get_or_try_init(|| async { - let table = - Self::load_iceberg_table(&self.ctl, &self.database_name, &self.table_name) - .await - .map_err(|err| { - ErrorCode::ReadTableDataError(format!( - "Iceberg catalog load failed: {err:?}" - )) - })?; - - Ok(table) - }) - .await + Ok(Self { info, table }) } pub fn do_read_data( @@ -189,9 +239,7 @@ impl IcebergTable { _: Arc, push_downs: Option, ) -> Result<(PartStatistics, Partitions)> { - let table = self.table().await?; - - let mut scan = table.scan(); + let mut scan = self.table.scan(); if let Some(push_downs) = &push_downs { if let Some(projection) = &push_downs.projection { diff --git a/src/query/storages/iceberg/src/table_source.rs b/src/query/storages/iceberg/src/table_source.rs index ef3d55a48f4e..79bfcb6f2e99 100644 --- a/src/query/storages/iceberg/src/table_source.rs +++ b/src/query/storages/iceberg/src/table_source.rs @@ -136,7 +136,7 @@ impl Processor for IcebergTableSource { } else if let Some(part) = self.ctx.get_partition() { let part = IcebergPartInfo::from_part(&part)?; // TODO: enable row filter? - let reader = self.table.table().await?.reader_builder().build(); + let reader = self.table.table.reader_builder().build(); // TODO: don't use stream here. let stream = reader .read(Box::pin(stream::iter([Ok(part.to_task())])))