Skip to content

Commit

Permalink
Don't load from catalog everytime
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Dec 14, 2024
1 parent 439da3a commit fee2883
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 54 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,10 @@ http = "1"
humantime = "2.1.0"
hyper = "1"
hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] }
iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "05c22cb1" }
iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "05c22cb1" }
iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "05c22cb1" }
iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "05c22cb1" }
iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "2076f80" }
iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "2076f80" }
iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "2076f80" }
iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "2076f80" }
indexmap = "2.0.0"
indicatif = "0.17.5"
itertools = "0.13.0"
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ ordered-float = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
typetag = { workspace = true }
serde_json = { workspace = true }

[lints]
workspace = true
Expand Down
138 changes: 93 additions & 45 deletions src/query/storages/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

use std::any::Any;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;

use arrow_schema::Schema as ArrowSchema;
Expand Down Expand Up @@ -40,7 +42,7 @@ use databend_common_meta_app::schema::TableMeta;
use databend_common_pipeline_core::Pipeline;
use databend_storages_common_table_meta::table::ChangeType;
use futures::TryStreamExt;
use tokio::sync::OnceCell;
use iceberg::io::FileIOBuilder;

use crate::partition::IcebergPartInfo;
use crate::predicate::PredicateBuilder;
Expand All @@ -53,28 +55,15 @@ pub const ICEBERG_ENGINE: &str = "ICEBERG";
#[derive(Clone)]
pub struct IcebergTable {
info: TableInfo,
ctl: IcebergCatalog,
database_name: String,
table_name: String,

table: OnceCell<iceberg::table::Table>,
pub table: iceberg::table::Table,
}

impl IcebergTable {
/// create a new table on the table directory
#[async_backtrace::framed]
pub fn try_create(info: TableInfo) -> Result<Box<dyn Table>> {
let ctl = IcebergCatalog::try_create(info.catalog_info.clone())?;
let (db_name, table_name) = info.desc.as_str().rsplit_once('.').ok_or_else(|| {
ErrorCode::BadArguments(format!("Iceberg table desc {} is invalid", &info.desc))
})?;
Ok(Box::new(Self {
info: info.clone(),
ctl,
database_name: db_name.to_string(),
table_name: table_name.to_string(),
table: OnceCell::new(),
}))
let table = Self::parse_engine_options(&info.meta.engine_options)?;
Ok(Box::new(Self { info, table }))
}

pub fn description() -> StorageDescription {
Expand Down Expand Up @@ -111,6 +100,88 @@ impl IcebergTable {
TableSchema::try_from(&arrow_schema)
}

/// build_engine_options will generate `engine_options` from [`iceberg::table::Table`] so that
/// we can distribute it across nodes and rebuild this table without loading from catalog again.
///
/// We will never persist the `engine_options` to storage, so it's safe to change the implementation.
/// As long as you make sure both [`build_engine_options`] and [`parse_engine_options`] been updated.
pub fn build_engine_options(table: &iceberg::table::Table) -> Result<BTreeMap<String, String>> {
let (file_io_scheme, file_io_props) = table.file_io().clone().into_props();
let file_io_props = serde_json::to_string(&file_io_props)?;
let metadata_location = table
.metadata_location()
.map(|v| v.to_string())
.unwrap_or_default();
let metadata = serde_json::to_string(table.metadata())?;
let identifier = serde_json::to_string(table.identifier())?;

Ok(BTreeMap::from_iter([
("iceberg.file_io.scheme".to_string(), file_io_scheme),
("iceberg.file_io.props".to_string(), file_io_props),
("iceberg.metadata_location".to_string(), metadata_location),
("iceberg.metadata".to_string(), metadata),
("iceberg.identifier".to_string(), identifier),
]))
}

/// parse_engine_options will parse `engine_options` to [`BTreeMap`] so that we can rebuild the table.
///
/// See [`build_engine_options`] for more information.
pub fn parse_engine_options(
options: &BTreeMap<String, String>,
) -> Result<iceberg::table::Table> {
let file_io_scheme = options.get("iceberg.file_io.scheme").ok_or_else(|| {
ErrorCode::ReadTableDataError(
"Rebuild iceberg table failed: Missing iceberg.file_io.scheme",
)
})?;

let file_io_props: HashMap<String, String> =
serde_json::from_str(options.get("iceberg.file_io.props").ok_or_else(|| {
ErrorCode::ReadTableDataError(
"Rebuild iceberg table failed: Missing iceberg.file_io.props",
)
})?)?;

let metadata_location = options
.get("iceberg.metadata_location")
.map(|s| s.to_string())
.unwrap_or_default();

let metadata: iceberg::spec::TableMetadata =
serde_json::from_str(options.get("iceberg.metadata").ok_or_else(|| {
ErrorCode::ReadTableDataError(
"Rebuild iceberg table failed: Missing iceberg.metadata",
)
})?)?;

let identifier: iceberg::TableIdent =
serde_json::from_str(options.get("iceberg.identifier").ok_or_else(|| {
ErrorCode::ReadTableDataError(
"Rebuild iceberg table failed: Missing iceberg.identifier",
)
})?)?;

let file_io = FileIOBuilder::new(file_io_scheme)
.with_props(file_io_props)
.build()
.map_err(|err| {
ErrorCode::ReadTableDataError(format!(
"Rebuild iceberg table file io failed: {err:?}"
))
})?;

Ok(iceberg::table::Table::builder()
.identifier(identifier)
.metadata(metadata)
.metadata_location(metadata_location)
.file_io(file_io)
.build()
.map_err(|err| {
ErrorCode::ReadTableDataError(format!("Rebuild iceberg table failed: {err:?}"))
})?)
}

/// create a new table on the table directory
#[async_backtrace::framed]
pub async fn try_create_from_iceberg_catalog(
Expand All @@ -121,6 +192,8 @@ impl IcebergTable {
let table = Self::load_iceberg_table(&ctl, database_name, table_name).await?;
let table_schema = Self::get_schema(&table)?;

let engine_options = Self::build_engine_options(&table)?;

// construct table info
let info = TableInfo {
ident: TableIdent::new(0, 0),
Expand All @@ -129,38 +202,15 @@ impl IcebergTable {
meta: TableMeta {
schema: Arc::new(table_schema),
engine: "iceberg".to_string(),
engine_options,
created_on: Utc::now(),
..Default::default()
},
catalog_info: ctl.info(),
..Default::default()
};

Ok(Self {
info,
ctl,
database_name: database_name.to_string(),
table_name: table_name.to_string(),
table: OnceCell::new_with(Some(table)),
})
}

/// Fetch or init the iceberg table
pub async fn table(&self) -> Result<&iceberg::table::Table> {
self.table
.get_or_try_init(|| async {
let table =
Self::load_iceberg_table(&self.ctl, &self.database_name, &self.table_name)
.await
.map_err(|err| {
ErrorCode::ReadTableDataError(format!(
"Iceberg catalog load failed: {err:?}"
))
})?;

Ok(table)
})
.await
Ok(Self { info, table })
}

pub fn do_read_data(
Expand Down Expand Up @@ -189,9 +239,7 @@ impl IcebergTable {
_: Arc<dyn TableContext>,
push_downs: Option<PushDownInfo>,
) -> Result<(PartStatistics, Partitions)> {
let table = self.table().await?;

let mut scan = table.scan();
let mut scan = self.table.scan();

if let Some(push_downs) = &push_downs {
if let Some(projection) = &push_downs.projection {
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/iceberg/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Processor for IcebergTableSource {
} else if let Some(part) = self.ctx.get_partition() {
let part = IcebergPartInfo::from_part(&part)?;
// TODO: enable row filter?
let reader = self.table.table().await?.reader_builder().build();
let reader = self.table.table.reader_builder().build();
// TODO: don't use stream here.
let stream = reader
.read(Box::pin(stream::iter([Ok(part.to_task())])))
Expand Down

0 comments on commit fee2883

Please sign in to comment.