Skip to content

Commit

Permalink
refactor: mget db names and table names remove duplicate ids to avoid…
Browse files Browse the repository at this point in the history
… too many parameters (#17013)

* refactor: mget db names and table names remove duplicate ids to avoid too many parameters

* fix

* fix tests
  • Loading branch information
b41sh authored Dec 9, 2024
1 parent 70c7768 commit 9700a3b
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 112 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 30 additions & 53 deletions src/query/service/src/catalogs/default/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,44 +289,28 @@ impl Catalog for DatabaseCatalog {
tenant: &Tenant,
table_ids: &[MetaId],
) -> Result<Vec<Option<String>>> {
// Fetching system database names
let sys_dbs = self.immutable_catalog.list_databases(tenant).await?;

// Collecting system table names from all system databases
let mut sys_table_ids = Vec::new();
for sys_db in sys_dbs {
let sys_tables = self
.immutable_catalog
.list_tables(tenant, sys_db.name())
.await?;
for sys_table in sys_tables {
sys_table_ids.push(sys_table.get_id());
}
}

// Filtering table IDs that are not in the system table IDs
let mut_table_ids: Vec<MetaId> = table_ids
.iter()
.copied()
.filter(|table_id| !sys_table_ids.contains(table_id))
.collect();

// Fetching table names for mutable table IDs
let mut tables = self
let sys_table_names = self
.immutable_catalog
.mget_table_names_by_ids(tenant, table_ids)
.await?;

// Fetching table names for remaining system table IDs
let other = self
let mut_table_names = self
.mutable_catalog
.mget_table_names_by_ids(tenant, &mut_table_ids)
.mget_table_names_by_ids(tenant, table_ids)
.await?;

// Appending the results from the mutable catalog to tables
tables.extend(other);

Ok(tables)
let mut table_names = Vec::with_capacity(table_ids.len());
for (mut_table_name, sys_table_name) in
mut_table_names.into_iter().zip(sys_table_names.into_iter())
{
if mut_table_name.is_some() {
table_names.push(mut_table_name);
} else if sys_table_name.is_some() {
table_names.push(sys_table_name);
} else {
table_names.push(None);
}
}
Ok(table_names)
}

#[async_backtrace::framed]
Expand Down Expand Up @@ -388,33 +372,26 @@ impl Catalog for DatabaseCatalog {
tenant: &Tenant,
db_ids: &[MetaId],
) -> Result<Vec<Option<String>>> {
let sys_db_ids: Vec<_> = self
.immutable_catalog
.list_databases(tenant)
.await?
.iter()
.map(|sys_db| sys_db.get_db_info().database_id.db_id)
.collect();

let mut_db_ids: Vec<MetaId> = db_ids
.iter()
.filter(|db_id| !sys_db_ids.contains(db_id))
.copied()
.collect();

let mut dbs = self
let sys_db_names = self
.immutable_catalog
.mget_database_names_by_ids(tenant, db_ids)
.await?;

let other = self
let mut_db_names = self
.mutable_catalog
.mget_database_names_by_ids(tenant, &mut_db_ids)
.mget_database_names_by_ids(tenant, db_ids)
.await?;

dbs.extend(other);

Ok(dbs)
let mut db_names = Vec::with_capacity(db_ids.len());
for (mut_db_name, sys_db_name) in mut_db_names.into_iter().zip(sys_db_names.into_iter()) {
if mut_db_name.is_some() {
db_names.push(mut_db_name);
} else if sys_db_name.is_some() {
db_names.push(sys_db_name);
} else {
db_names.push(None);
}
}
Ok(db_names)
}

#[async_backtrace::framed]
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/src/catalogs/default/immutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ impl Catalog for ImmutableCatalog {
for id in table_ids {
if let Some(table) = self.sys_db_meta.get_by_id(id) {
table_name.push(Some(table.name().to_string()));
} else {
table_name.push(None);
}
}
Ok(table_name)
Expand Down Expand Up @@ -270,6 +272,8 @@ impl Catalog for ImmutableCatalog {
res.push(Some("system".to_string()));
} else if self.info_schema_db.get_db_info().database_id.db_id == *id {
res.push(Some("information_schema".to_string()));
} else {
res.push(None);
}
}
Ok(res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::any::Any;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use databend_common_catalog::plan::DataSourcePlan;
Expand Down Expand Up @@ -496,49 +497,76 @@ async fn show_account_grants(
}
}

for (catalog_name, dbs_priv_id) in catalog_db_ids {
for (catalog_name, dbs_priv_id) in catalog_db_ids.into_iter() {
let catalog = ctx.get_catalog(&catalog_name).await?;
let db_ids = dbs_priv_id.iter().map(|res| res.0).collect::<Vec<u64>>();
let privileges_strs = dbs_priv_id
let db_id_set = dbs_priv_id
.iter()
.map(|res| res.1.clone())
.collect::<Vec<String>>();
let dbs_name = catalog.mget_database_names_by_ids(&tenant, &db_ids).await?;

for (i, db_name) in dbs_name.iter().enumerate() {
if let Some(db_name) = db_name {
object_name.push(db_name.to_string());
object_id.push(Some(db_ids[i]));
privileges.push(privileges_strs[i].to_string());
grant_list.push(format!(
.map(|res| res.0)
.collect::<HashSet<u64>>();
let mut db_ids = db_id_set.into_iter().collect::<Vec<u64>>();
db_ids.sort();
let db_names = catalog.mget_database_names_by_ids(&tenant, &db_ids).await?;
let db_map = db_ids
.into_iter()
.zip(db_names.into_iter())
.filter(|(_, db_name)| db_name.is_some())
.map(|(db_id, db_name)| (db_id, db_name.unwrap()))
.collect::<HashMap<_, _>>();
for (db_id, privilege_str) in dbs_priv_id.into_iter() {
if let Some(db_name) = db_map.get(&db_id) {
let grant_str = format!(
"GRANT {} ON '{}'.'{}'.* TO {}",
&privileges_strs[i], catalog_name, db_name, identity
));
privilege_str, catalog_name, db_name, identity
);
object_name.push(db_name.to_string());
object_id.push(Some(db_id));
privileges.push(privilege_str);
grant_list.push(grant_str);
}
}
}

for (catalog_name, tables_priv_id) in catalog_table_ids {
for (catalog_name, tables_priv_id) in catalog_table_ids.into_iter() {
let catalog = ctx.get_catalog(&catalog_name).await?;
let db_ids = tables_priv_id.iter().map(|res| res.0).collect::<Vec<u64>>();
let table_ids = tables_priv_id.iter().map(|res| res.1).collect::<Vec<u64>>();
let privileges_strs = tables_priv_id
let db_id_set = tables_priv_id
.iter()
.map(|res| res.2.clone())
.collect::<Vec<String>>();
let dbs_name = catalog.mget_database_names_by_ids(&tenant, &db_ids).await?;
let tables_name = catalog.mget_table_names_by_ids(&tenant, &table_ids).await?;

for (i, table_name) in tables_name.iter().enumerate() {
if let Some(table_name) = table_name {
if let Some(db_name) = &dbs_name[i] {
object_name.push(format!("{}.{}.{}", catalog_name, db_name, table_name));
object_id.push(Some(table_ids[i]));
privileges.push(privileges_strs[i].to_string());
grant_list.push(format!(
.map(|res| res.0)
.collect::<HashSet<u64>>();
let mut db_ids = db_id_set.into_iter().collect::<Vec<u64>>();
db_ids.sort();
let db_names = catalog.mget_database_names_by_ids(&tenant, &db_ids).await?;
let db_map = db_ids
.into_iter()
.zip(db_names.into_iter())
.filter(|(_, db_name)| db_name.is_some())
.map(|(db_id, db_name)| (db_id, db_name.unwrap()))
.collect::<HashMap<_, _>>();

let table_id_set = tables_priv_id
.iter()
.map(|res| res.1)
.collect::<HashSet<u64>>();
let mut table_ids = table_id_set.into_iter().collect::<Vec<u64>>();
table_ids.sort();
let table_names = catalog.mget_table_names_by_ids(&tenant, &table_ids).await?;
let table_map = table_ids
.into_iter()
.zip(table_names.into_iter())
.filter(|(_, table_name)| table_name.is_some())
.map(|(table_id, table_name)| (table_id, table_name.unwrap()))
.collect::<HashMap<_, _>>();

for (db_id, table_id, privilege_str) in tables_priv_id.into_iter() {
if let Some(db_name) = db_map.get(&db_id) {
if let Some(table_name) = table_map.get(&table_id) {
let grant_str = format!(
"GRANT {} ON '{}'.'{}'.'{}' TO {}",
&privileges_strs[i], catalog_name, db_name, table_name, identity
));
&privilege_str, catalog_name, db_name, table_name, identity
);
object_name.push(format!("{}.{}.{}", catalog_name, db_name, table_name));
object_id.push(Some(table_id));
privileges.push(privilege_str);
grant_list.push(grant_str);
}
}
}
Expand Down
66 changes: 47 additions & 19 deletions src/query/storages/system/src/streams_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use databend_common_base::base::tokio::sync::Semaphore;
Expand Down Expand Up @@ -161,8 +162,9 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {
HashMap::new()
};

let mut source_db_ids = vec![];
let mut source_tb_ids = vec![];
let mut source_db_id_set = HashSet::new();
let mut source_tb_id_set = HashSet::new();
let mut source_db_tb_ids = vec![];
for db in final_dbs {
let db_id = db.get_db_info().database_id.db_id;
let db_name = db.name();
Expand Down Expand Up @@ -197,15 +199,22 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {
let stream_info = table.get_table_info();
let stream_table = StreamTable::try_from_table(table.as_ref())?;

source_db_ids.push(
stream_table
.source_database_id(ctl.as_ref())
.await
.unwrap_or(0),
);
let source_db_id = stream_table.source_database_id(ctl.as_ref()).await.ok();
if let Some(source_db_id) = source_db_id {
source_db_id_set.insert(source_db_id);
}
let source_tb_id = stream_table.source_table_id().ok();
source_tb_ids.push(source_tb_id.unwrap_or(0));

if let Some(source_tb_id) = source_tb_id {
source_tb_id_set.insert(source_tb_id);
}
match (source_db_id, source_tb_id) {
(Some(source_db_id), Some(source_tb_id)) => {
source_db_tb_ids.push(Some((source_db_id, source_tb_id)));
}
(_, _) => {
source_db_tb_ids.push(None);
}
}
catalogs.push(ctl_name.as_str());
databases.push(db_name.to_owned());
names.push(stream_table.name().to_string());
Expand Down Expand Up @@ -274,19 +283,38 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {
invalid_reason.append(&mut joint);
}

let mut source_db_ids = source_db_id_set.into_iter().collect::<Vec<u64>>();
source_db_ids.sort();
let source_db_names = ctl
.mget_database_names_by_ids(&tenant, &source_db_ids)
.await?;
let source_table_names = ctl.mget_table_names_by_ids(&tenant, &source_tb_ids).await?;
for (db, tb) in source_db_names
let source_db_map = source_db_ids
.into_iter()
.zip(source_table_names.into_iter())
{
let desc = match (db, tb) {
(Some(db), Some(tb)) => Some(format!("{db}.{tb}")),
_ => None,
};
table_name.push(desc);
.zip(source_db_names.into_iter())
.filter(|(_, db_name)| db_name.is_some())
.map(|(db_id, db_name)| (db_id, db_name.unwrap()))
.collect::<HashMap<_, _>>();

let mut source_tb_ids = source_tb_id_set.into_iter().collect::<Vec<u64>>();
source_tb_ids.sort();
let source_tb_names = ctl.mget_table_names_by_ids(&tenant, &source_tb_ids).await?;
let source_tb_map = source_tb_ids
.into_iter()
.zip(source_tb_names.into_iter())
.filter(|(_, tb_name)| tb_name.is_some())
.map(|(tb_id, tb_name)| (tb_id, tb_name.unwrap()))
.collect::<HashMap<_, _>>();

for source_db_tb_id in source_db_tb_ids.into_iter() {
if let Some((db_id, tb_id)) = source_db_tb_id {
if let Some(db) = source_db_map.get(&db_id) {
if let Some(tb) = source_tb_map.get(&tb_id) {
table_name.push(Some(format!("{db}.{tb}")));
continue;
}
}
}
table_name.push(None);
}
}

Expand Down
12 changes: 6 additions & 6 deletions tests/suites/0_stateless/18_rbac/18_0007_privilege_access.result
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,25 @@ Error: APIError: QueryFailed: [1063]Permission denied: privilege [Select] is req
a b/data_UUID_0000_00000000.parquet 1 0 NULL NULL
=== check db/table_id ===
Read s3 USER b GRANT Read ON STAGE s3 TO 'b'@'%'
CREATE system USER b GRANT CREATE ON 'default'.'system'.* TO 'b'@'%'
SELECT default USER b GRANT SELECT ON 'default'.'default'.* TO 'b'@'%'
CREATE default USER b GRANT CREATE ON 'default'.'default'.* TO 'b'@'%'
SELECT system USER b GRANT SELECT ON 'default'.'system'.* TO 'b'@'%'
SELECT,INSERT,DELETE default.default.t USER b GRANT SELECT,INSERT,DELETE ON 'default'.'default'.'t' TO 'b'@'%'
SELECT default.default.t1 USER b GRANT SELECT ON 'default'.'default'.'t1' TO 'b'@'%'
SELECT,INSERT default.c.t USER b GRANT SELECT,INSERT ON 'default'.'c'.'t' TO 'b'@'%'
OWNERSHIP default.default.t2 USER b GRANT OWNERSHIP ON 'default'.'default'.'t2' TO 'b'@'%'
1
Read s3 USER b GRANT Read ON STAGE s3 TO 'b'@'%'
CREATE system USER b GRANT CREATE ON 'default'.'system'.* TO 'b'@'%'
SELECT default USER b GRANT SELECT ON 'default'.'default'.* TO 'b'@'%'
CREATE default USER b GRANT CREATE ON 'default'.'default'.* TO 'b'@'%'
SELECT system USER b GRANT SELECT ON 'default'.'system'.* TO 'b'@'%'
SELECT,INSERT,DELETE default.default.t USER b GRANT SELECT,INSERT,DELETE ON 'default'.'default'.'t' TO 'b'@'%'
SELECT default.default.t1 USER b GRANT SELECT ON 'default'.'default'.'t1' TO 'b'@'%'
SELECT,INSERT default.c.t1 USER b GRANT SELECT,INSERT ON 'default'.'c'.'t1' TO 'b'@'%'
OWNERSHIP default.default.t2 USER b GRANT OWNERSHIP ON 'default'.'default'.'t2' TO 'b'@'%'
1
2
Read s3 USER b GRANT Read ON STAGE s3 TO 'b'@'%'
CREATE system USER b GRANT CREATE ON 'default'.'system'.* TO 'b'@'%'
SELECT default USER b GRANT SELECT ON 'default'.'default'.* TO 'b'@'%'
CREATE default USER b GRANT CREATE ON 'default'.'default'.* TO 'b'@'%'
SELECT system USER b GRANT SELECT ON 'default'.'system'.* TO 'b'@'%'
SELECT,INSERT,DELETE default.default.t USER b GRANT SELECT,INSERT,DELETE ON 'default'.'default'.'t' TO 'b'@'%'
SELECT default.default.t1 USER b GRANT SELECT ON 'default'.'default'.'t1' TO 'b'@'%'
SELECT,INSERT default.d.t1 USER b GRANT SELECT,INSERT ON 'default'.'d'.'t1' TO 'b'@'%'
Expand Down

0 comments on commit 9700a3b

Please sign in to comment.