From b50db10c6200a22fdd9af01a9ae574cd33815bd7 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Tue, 17 Dec 2024 09:18:35 +0500 Subject: [PATCH] test --- src/diskquota.h | 4 + src/diskquota_utility.c | 15 +++ src/gp_activetable.c | 208 +++++++++++++------------------------ src/gp_activetable.h | 10 +- src/quotamodel.c | 225 ++++++++++++++++++++++------------------ 5 files changed, 224 insertions(+), 238 deletions(-) diff --git a/src/diskquota.h b/src/diskquota.h index 9c47acb5..b437b877 100644 --- a/src/diskquota.h +++ b/src/diskquota.h @@ -17,6 +17,7 @@ #include "postgres.h" #include "port/atomics.h" +#include "access/htup.h" #include "catalog/pg_class.h" #include "lib/ilist.h" #include "lib/stringinfo.h" @@ -46,6 +47,8 @@ /* max number of QuotaInfoEntry in quota_info_map */ #define MAX_QUOTA_MAP_ENTRIES (AVG_QUOTA_MAP_ENTRIES < 1024 ? 1024 : AVG_QUOTA_MAP_ENTRIES) +#define DatumGetArrayTypePwrapper(X) ((X) ? DatumGetArrayTypeP(X) : NULL) + typedef enum { DISKQUOTA_TAG_HASH = 0, @@ -321,4 +324,5 @@ extern HASHACTION check_hash_fullness(HTAB *hashp, int max_size, const char *war TimestampTz *last_overflow_report); bool SPI_connect_if_not_yet(void); void SPI_finish_if(bool connected_in_this_function); +Datum SPI_getbinval_wrapper(HeapTuple tuple, TupleDesc tupdesc, const char *fname, bool allow_null, Oid typeid); #endif diff --git a/src/diskquota_utility.c b/src/diskquota_utility.c index 535393bd..5856ed3f 100644 --- a/src/diskquota_utility.c +++ b/src/diskquota_utility.c @@ -1714,3 +1714,18 @@ SPI_finish_if(bool connected_in_calling_function) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_finish failed"), errdetail("%s", SPI_result_code_string(rc)))); } + +Datum +SPI_getbinval_wrapper(HeapTuple tuple, TupleDesc tupdesc, const char *fname, bool allow_null, Oid typeid) +{ + bool isnull; + Datum datum; + int fnumber = SPI_fnumber(tupdesc, fname); + if (SPI_gettypeid(tupdesc, fnumber) != typeid) + ereport(ERROR, (errcode(ERRCODE_MOST_SPECIFIC_TYPE_MISMATCH), + errmsg("type of column \"%s\" must be \"%i\"", fname, typeid))); + datum = SPI_getbinval(tuple, tupdesc, fnumber, &isnull); + if (isnull && !allow_null) + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("column \"%s\" must not be null", fname))); + return datum; +} diff --git a/src/gp_activetable.c b/src/gp_activetable.c index b216a14f..192e5785 100644 --- a/src/gp_activetable.c +++ b/src/gp_activetable.c @@ -35,6 +35,7 @@ #include "storage/smgr.h" #include "utils/faultinjector.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/syscache.h" #include "utils/inval.h" @@ -85,22 +86,22 @@ static void active_table_hook_smgrtruncate(RelFileNodeBackend rnode); static void active_table_hook_smgrunlink(RelFileNodeBackend rnode); static void object_access_hook_QuotaStmt(ObjectAccessType access, Oid classId, Oid objectId, int subId, void *arg); -static HTAB *get_active_tables_stats(ArrayType *array); -static HTAB *get_active_tables_oid(void); -static HTAB *pull_active_list_from_seg(void); -static void pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_array); -static StringInfoData convert_map_to_string(HTAB *active_list); -static void load_table_size(HTAB *local_table_stats_map); -static void report_active_table_helper(const RelFileNodeBackend *relFileNode); -static void remove_from_active_table_map(const RelFileNodeBackend *relFileNode); -static void report_relation_cache_helper(Oid relid); -static void report_altered_reloid(Oid reloid); -static Oid get_dbid(ArrayType *array); - -void init_active_table_hook(void); -void init_shm_worker_active_tables(void); -void init_lock_active_tables(void); -HTAB *gp_fetch_active_tables(bool is_init); +static HTAB *get_active_tables_stats(ArrayType *array); +static HTAB *get_active_tables_oid(void); + +static StringInfoData pull_active_list_from_seg(void); +static StringInfoData pull_active_table_size_from_seg(HTAB *local_table_stats_map); +static StringInfoData load_table_size(void); + +static void report_active_table_helper(const RelFileNodeBackend *relFileNode); +static void remove_from_active_table_map(const RelFileNodeBackend *relFileNode); +static void report_relation_cache_helper(Oid relid); +static void report_altered_reloid(Oid reloid); +static Oid get_dbid(ArrayType *array); + +void init_active_table_hook(void); +void init_shm_worker_active_tables(void); +void init_lock_active_tables(void); /* * Init active_tables_map shared memory @@ -363,13 +364,10 @@ remove_from_active_table_map(const RelFileNodeBackend *relFileNode) * And aggregate the table size on each segment * to get the real table size at cluster level. */ -HTAB * -gp_fetch_active_tables(bool is_init) +StringInfoData +gp_fetch_active_tables(HTAB *local_active_table_stat_map) { - HTAB *local_table_stats_map = NULL; - HASHCTL ctl; - HTAB *local_active_table_oid_maps; - StringInfoData active_oid_list; + HASHCTL ctl; Assert(Gp_role == GP_ROLE_DISPATCH); @@ -378,29 +376,12 @@ gp_fetch_active_tables(bool is_init) ctl.entrysize = sizeof(ActiveTableEntryCombined) + SEGCOUNT * sizeof(Size); ctl.hcxt = CurrentMemoryContext; - local_table_stats_map = diskquota_hash_create("local active table map with relfilenode info", 1024, &ctl, - HASH_ELEM | HASH_CONTEXT, DISKQUOTA_OID_HASH); - - if (is_init) + if (local_active_table_stat_map == NULL) { - load_table_size(local_table_stats_map); + return load_table_size(); } - else - { - /* step 1: fetch active oids from all the segments */ - local_active_table_oid_maps = pull_active_list_from_seg(); - active_oid_list = convert_map_to_string(local_active_table_oid_maps); - - ereport(DEBUG1, - (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] active_old_list = %s", active_oid_list.data))); - - /* step 2: fetch active table sizes based on active oids */ - pull_active_table_size_from_seg(local_table_stats_map, active_oid_list.data); - hash_destroy(local_active_table_oid_maps); - pfree(active_oid_list.data); - } - return local_table_stats_map; + return pull_active_table_size_from_seg(local_active_table_stat_map); } /* @@ -936,93 +917,57 @@ get_active_tables_oid(void) * This is called when system startup, disk quota rejectmap * and other shared memory will be warmed up by table_size table. */ -static void -load_table_size(HTAB *local_table_stats_map) +static StringInfoData +load_table_size(void) { - TupleDesc tupdesc; - int i; - bool found; - ActiveTableEntryCombined *quota_entry; - SPIPlanPtr plan; - Portal portal; - char *sql = "select tableid, size, segid from diskquota.table_size"; - bool connected_in_this_function = SPI_connect_if_not_yet(); + SPIPlanPtr plan; + Portal portal; + int16 typlen; + bool typbyval; + char typalign; + + StringInfoData active_oids; + static const char *sql = "select tableid, array_agg(size order by segid) size from diskquota.table_size group by 1"; + bool connected_in_this_function = SPI_connect_if_not_yet(); + MemoryContext oldContext = MemoryContextSwitchTo(CurTransactionContext); + + initStringInfo(&active_oids); + MemoryContextSwitchTo(oldContext); + get_typlenbyvalalign(INT8OID, &typlen, &typbyval, &typalign); if ((plan = SPI_prepare(sql, 0, NULL)) == NULL) ereport(ERROR, (errmsg("[diskquota] SPI_prepare(\"%s\") failed", sql))); if ((portal = SPI_cursor_open(NULL, plan, NULL, NULL, true)) == NULL) ereport(ERROR, (errmsg("[diskquota] SPI_cursor_open(\"%s\") failed", sql))); - SPI_cursor_fetch(portal, true, 10000); - - if (SPI_tuptable == NULL) - { - ereport(ERROR, (errmsg("[diskquota] load_table_size SPI_cursor_fetch failed"))); - } - - tupdesc = SPI_tuptable->tupdesc; -#if GP_VERSION_NUM < 70000 - if (tupdesc->natts != 3 || ((tupdesc)->attrs[0])->atttypid != OIDOID || - ((tupdesc)->attrs[1])->atttypid != INT8OID || ((tupdesc)->attrs[2])->atttypid != INT2OID) -#else - if (tupdesc->natts != 3 || ((tupdesc)->attrs[0]).atttypid != OIDOID || ((tupdesc)->attrs[1]).atttypid != INT8OID || - ((tupdesc)->attrs[2]).atttypid != INT2OID) -#endif /* GP_VERSION_NUM */ - { - if (tupdesc->natts != 3) - { - ereport(WARNING, (errmsg("[diskquota] tupdesc->natts: %d", tupdesc->natts))); - } - else - { -#if GP_VERSION_NUM < 70000 - ereport(WARNING, (errmsg("[diskquota] attrs: %d, %d, %d", tupdesc->attrs[0]->atttypid, - tupdesc->attrs[1]->atttypid, tupdesc->attrs[2]->atttypid))); -#else - ereport(WARNING, (errmsg("[diskquota] attrs: %d, %d, %d", tupdesc->attrs[0].atttypid, - tupdesc->attrs[1].atttypid, tupdesc->attrs[2].atttypid))); -#endif /* GP_VERSION_NUM */ - } - ereport(ERROR, (errmsg("[diskquota] table \"table_size\" is corrupted in database \"%s\"," - " please recreate diskquota extension", - get_database_name(MyDatabaseId)))); - } - - while (SPI_processed > 0) + do { - /* push the table oid and size into local_table_stats_map */ - for (i = 0; i < SPI_processed; i++) + SPI_cursor_fetch(portal, true, 10000); + for (uint64 row = 0; row < SPI_processed; row++) { - HeapTuple tup = SPI_tuptable->vals[i]; - Datum dat; - Oid reloid; - int64 size; - int16 segid; - bool isnull; - - dat = SPI_getbinval(tup, tupdesc, 1, &isnull); - if (isnull) continue; - reloid = DatumGetObjectId(dat); - - dat = SPI_getbinval(tup, tupdesc, 2, &isnull); - if (isnull) continue; - size = DatumGetInt64(dat); - dat = SPI_getbinval(tup, tupdesc, 3, &isnull); - if (isnull) continue; - segid = DatumGetInt16(dat); - - quota_entry = (ActiveTableEntryCombined *)hash_search(local_table_stats_map, &reloid, HASH_ENTER, &found); - quota_entry->reloid = reloid; - quota_entry->tablesize[segid + 1] = size; + HeapTuple val = SPI_tuptable->vals[row]; + TupleDesc tupdesc = SPI_tuptable->tupdesc; + Oid tableid = DatumGetObjectId(SPI_getbinval_wrapper(val, tupdesc, "tableid", false, OIDOID)); + ArrayType *array = + DatumGetArrayTypePwrapper(SPI_getbinval_wrapper(val, tupdesc, "size", false, INT8ARRAYOID)); + Datum *sizes; + int nelems; + if (active_oids.len > 0) appendStringInfoString(&active_oids, ","); + appendStringInfo(&active_oids, "%i", tableid); + deconstruct_array(array, ARR_ELEMTYPE(array), typlen, typbyval, typalign, &sizes, NULL, &nelems); + Assert(nelems == SEGCOUNT + 1); + for (int16 segid = -1; segid < SEGCOUNT; segid++) + update_active_table_size(tableid, DatumGetInt64(sizes[segid + 1]), segid, NULL); + pfree(sizes); } SPI_freetuptable(SPI_tuptable); - SPI_cursor_fetch(portal, true, 10000); - } + } while (SPI_processed); - SPI_freetuptable(SPI_tuptable); SPI_cursor_close(portal); SPI_freeplan(plan); SPI_finish_if(connected_in_this_function); + + return active_oids; } /* @@ -1036,27 +981,16 @@ convert_map_to_string(HTAB *local_active_table_oid_maps) HASH_SEQ_STATUS iter; StringInfoData buffer; DiskQuotaActiveTableEntry *entry; - uint32 count = 0; - uint32 nitems = hash_get_num_entries(local_active_table_oid_maps); initStringInfo(&buffer); - appendStringInfo(&buffer, "{"); hash_seq_init(&iter, local_active_table_oid_maps); while ((entry = (DiskQuotaActiveTableEntry *)hash_seq_search(&iter)) != NULL) { - count++; - if (count != nitems) - { - appendStringInfo(&buffer, "%d,", entry->reloid); - } - else - { - appendStringInfo(&buffer, "%d", entry->reloid); - } + if (buffer.len > 0) appendStringInfoString(&buffer, ","); + appendStringInfo(&buffer, "%d", entry->reloid); } - appendStringInfo(&buffer, "}"); return buffer; } @@ -1067,7 +1001,7 @@ convert_map_to_string(HTAB *local_active_table_oid_maps) * Function diskquota_fetch_table_stat is called to calculate * the table size on the fly. */ -static HTAB * +static StringInfoData pull_active_list_from_seg(void) { CdbPgResults cdb_pgresults = {NULL, 0}; @@ -1120,7 +1054,10 @@ pull_active_list_from_seg(void) } cdbdisp_clearCdbPgResults(&cdb_pgresults); - return local_active_table_oid_map; + StringInfoData active_oids = convert_map_to_string(local_active_table_oid_map); + hash_destroy(local_active_table_oid_map); + + return active_oids; } /* @@ -1132,17 +1069,20 @@ pull_active_list_from_seg(void) * memory), so when re-calculate the table size, we need to sum the * table size on all of the segments. */ -static void -pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_array) +static StringInfoData +pull_active_table_size_from_seg(HTAB *local_table_stats_map) { + /* step 1: fetch active oids from all the segments */ + StringInfoData active_oids = pull_active_list_from_seg(); + /* step 2: fetch active table sizes based on active oids */ CdbPgResults cdb_pgresults = {NULL, 0}; StringInfoData sql_command; int i; int j; initStringInfo(&sql_command); - appendStringInfo(&sql_command, "select * from diskquota.diskquota_fetch_table_stat(1, '%s'::oid[])", - active_oid_array); + appendStringInfo(&sql_command, "select * from diskquota.diskquota_fetch_table_stat(1, '{%s}'::oid[])", + active_oids.data); CdbDispatchCommand(sql_command.data, DF_NONE, &cdb_pgresults); pfree(sql_command.data); @@ -1189,5 +1129,5 @@ pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_ar } } cdbdisp_clearCdbPgResults(&cdb_pgresults); - return; + return active_oids; } diff --git a/src/gp_activetable.h b/src/gp_activetable.h index 633e3307..a6c82798 100644 --- a/src/gp_activetable.h +++ b/src/gp_activetable.h @@ -47,10 +47,12 @@ typedef struct ActiveTableEntryCombined Size tablesize[1]; } ActiveTableEntryCombined; -extern HTAB *gp_fetch_active_tables(bool force); -extern void init_active_table_hook(void); -extern void init_shm_worker_active_tables(void); -extern void init_lock_active_tables(void); +extern StringInfoData gp_fetch_active_tables(HTAB *local_active_table_stat_map); + +extern void init_active_table_hook(void); +extern void init_shm_worker_active_tables(void); +extern void init_lock_active_tables(void); +extern void update_active_table_size(Oid tableid, int64 size, int16 segid, void *arg); extern HTAB *monitored_dbid_cache; diff --git a/src/quotamodel.c b/src/quotamodel.c index 7f4004ed..5912f191 100644 --- a/src/quotamodel.c +++ b/src/quotamodel.c @@ -220,10 +220,10 @@ static void transfer_table_for_quota(int64 totalsize, QuotaType type, Oid *old_k /* functions to refresh disk quota model*/ static void refresh_disk_quota_usage(bool is_init); -static void calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map); +static void calculate_table_disk_usage(HTAB *local_active_table_stat_map); static void flush_to_table_size(void); static bool flush_local_reject_map(void); -static void dispatch_rejectmap(HTAB *local_active_table_stat_map); +static void dispatch_rejectmap(StringInfoData active_oids); static bool load_quotas(void); static void do_load_quotas(void); @@ -812,6 +812,7 @@ refresh_disk_quota_usage(bool is_init) volatile bool pushed_active_snap = false; volatile bool ret = true; HTAB *volatile local_active_table_stat_map = NULL; + StringInfoData volatile active_oids = {0}; StartTransactionCommand(); @@ -831,11 +832,20 @@ refresh_disk_quota_usage(bool is_init) * local_active_table_stat_map only contains the active tables which belong * to the current database. */ - local_active_table_stat_map = gp_fetch_active_tables(is_init); - bool hasActiveTable = (hash_get_num_entries(local_active_table_stat_map) != 0); + if (!is_init) + { + HASHCTL ctl = { + .keysize = sizeof(Oid), + .entrysize = sizeof(ActiveTableEntryCombined) + SEGCOUNT * sizeof(Size), + .hcxt = CurrentMemoryContext, + }; + local_active_table_stat_map = diskquota_hash_create("local active table map with relfilenode info", 1024, + &ctl, HASH_ELEM | HASH_CONTEXT, DISKQUOTA_OID_HASH); + } + active_oids = gp_fetch_active_tables(local_active_table_stat_map); /* TODO: if we can skip the following steps when there is no active table */ /* recalculate the disk usage of table, schema and role */ - calculate_table_disk_usage(is_init, local_active_table_stat_map); + calculate_table_disk_usage(local_active_table_stat_map); /* refresh quota_info_map */ refresh_quota_info_map(); /* flush local table_size_map to user table table_size */ @@ -848,8 +858,8 @@ refresh_disk_quota_usage(bool is_init) * Otherwise, only when the rejectmap is changed or the active_table_list is * not empty the rejectmap should be dispatched to segments. */ - if (is_init || (diskquota_hardlimit && (reject_map_changed || hasActiveTable))) - dispatch_rejectmap(local_active_table_stat_map); + if (is_init || (diskquota_hardlimit && (reject_map_changed || active_oids.len > 0))) + dispatch_rejectmap(active_oids); } PG_CATCH(); { @@ -862,6 +872,7 @@ refresh_disk_quota_usage(bool is_init) RESUME_INTERRUPTS(); } PG_END_TRY(); + if (active_oids.data) pfree(active_oids.data); if (local_active_table_stat_map) hash_destroy(local_active_table_stat_map); if (pushed_active_snap) PopActiveSnapshot(); if (ret) @@ -900,6 +911,81 @@ merge_uncommitted_table_to_oidlist(List *oidlist) return oidlist; } +static TableSizeEntry * +get_tsentry(Oid tableid, int16 segid) +{ + bool table_size_map_found; + TableSizeEntryKey key = { + .reloid = tableid, + .id = TableSizeEntryId(segid), + }; + HASHACTION action = check_hash_fullness(table_size_map, MAX_NUM_TABLE_SIZE_ENTRIES, table_size_map_warning, + table_size_map_last_overflow_report); + TableSizeEntry *tsentry = hash_search(table_size_map, &key, action, &table_size_map_found); + + if (!table_size_map_found && tsentry != NULL) + { + Assert(TableSizeEntrySegidStart(tsentry) == segid); + memset(tsentry->totalsize, 0, sizeof(tsentry->totalsize)); + tsentry->owneroid = InvalidOid; + tsentry->namespaceoid = InvalidOid; + tsentry->tablespaceoid = InvalidOid; + tsentry->flag = 0; + + int seg_st = TableSizeEntrySegidStart(tsentry); + int seg_ed = TableSizeEntrySegidEnd(tsentry); + for (int j = seg_st; j < seg_ed; j++) TableSizeEntrySetFlushFlag(tsentry, j); + } + + /* mark tsentry is_exist */ + if (tsentry) set_table_size_entry_flag(tsentry, TABLE_EXIST); + + return tsentry; +} + +void +update_active_table_size(Oid tableid, int64 size, int16 segid, void *arg) +{ + TableSizeEntry *tsentry = arg != NULL ? arg : get_tsentry(tableid, segid); + + if (tsentry == NULL) + { + /* + * Too many tables have been added to the table_size_map, to avoid diskquota using + * too much share memory, just return the function. The diskquota won't work correctly + * anymore. + */ + return; + } + + if (segid == -1) + { + /* pretend process as utility mode, and append the table size on master */ + Gp_role = GP_ROLE_UTILITY; + + /* when segid is -1, the size is the sum of size of master and all segments */ + size += calculate_table_size(tableid); + + Gp_role = GP_ROLE_DISPATCH; + } + + /* firstly calculate the updated total size of a table */ + int64 updated_total_size = size - TableSizeEntryGetSize(tsentry, segid); + + /* update the table_size entry */ + TableSizeEntrySetSize(tsentry, segid, size); + TableSizeEntrySetFlushFlag(tsentry, segid); + + /* update the disk usage, there may be entries in the map whose keys are InvlidOid as the tsentry does + * not exist in the table_size_map */ + update_size_for_quota(updated_total_size, NAMESPACE_QUOTA, (Oid[]){tsentry->namespaceoid}, segid); + update_size_for_quota(updated_total_size, ROLE_QUOTA, (Oid[]){tsentry->owneroid}, segid); + update_size_for_quota(updated_total_size, ROLE_TABLESPACE_QUOTA, (Oid[]){tsentry->owneroid, tsentry->tablespaceoid}, + segid); + update_size_for_quota(updated_total_size, NAMESPACE_TABLESPACE_QUOTA, + (Oid[]){tsentry->namespaceoid, tsentry->tablespaceoid}, segid); +} + /* * Incremental way to update the disk quota of every database objects * Recalculate the table's disk usage when it's a new table or active table. @@ -911,18 +997,13 @@ merge_uncommitted_table_to_oidlist(List *oidlist) */ static void -calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map) +calculate_table_disk_usage(HTAB *local_active_table_stat_map) { - bool table_size_map_found; - bool active_tbl_found; - int64 updated_total_size; - TableSizeEntry *tsentry = NULL; - Oid relOid; - HASH_SEQ_STATUS iter; - ActiveTableEntryCombined *active_table_entry; - TableSizeEntryKey key; - List *oidlist; - ListCell *l; + TableSizeEntry *tsentry = NULL; + Oid relOid; + HASH_SEQ_STATUS iter; + List *oidlist; + ListCell *l; DeleteArrays delete = {0}; /* @@ -940,7 +1021,7 @@ calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map) * calculate the file size for active table and update namespace_size_map * and role_size_map */ - oidlist = get_rel_oid_list(is_init); + oidlist = get_rel_oid_list(local_active_table_stat_map == NULL); oidlist = merge_uncommitted_table_to_oidlist(oidlist); @@ -975,7 +1056,7 @@ calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map) elog(WARNING, "cache lookup failed for relation %u", relOid); LWLockRelease(diskquota_locks.relation_cache_lock); - if (!is_init) continue; + if (local_active_table_stat_map != NULL) continue; for (int i = -1; i < SEGCOUNT; i++) { @@ -1005,74 +1086,32 @@ calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map) */ for (int cur_segid = -1; cur_segid < SEGCOUNT; cur_segid++) { - key.reloid = relOid; - key.id = TableSizeEntryId(cur_segid); + tsentry = get_tsentry(relOid, cur_segid); - HASHACTION action = check_hash_fullness(table_size_map, MAX_NUM_TABLE_SIZE_ENTRIES, table_size_map_warning, - table_size_map_last_overflow_report); - tsentry = hash_search(table_size_map, &key, action, &table_size_map_found); - - if (!table_size_map_found) + if (tsentry == NULL) { - if (tsentry == NULL) - { - /* Too many tables have been added to the table_size_map, to avoid diskquota using - too much share memory, just quit the loop. The diskquota won't work correctly - anymore. */ - break; - } - - tsentry->key.reloid = relOid; - tsentry->key.id = key.id; - Assert(TableSizeEntrySegidStart(tsentry) == cur_segid); - memset(tsentry->totalsize, 0, sizeof(tsentry->totalsize)); - tsentry->owneroid = InvalidOid; - tsentry->namespaceoid = InvalidOid; - tsentry->tablespaceoid = InvalidOid; - tsentry->flag = 0; - - int seg_st = TableSizeEntrySegidStart(tsentry); - int seg_ed = TableSizeEntrySegidEnd(tsentry); - for (int j = seg_st; j < seg_ed; j++) TableSizeEntrySetFlushFlag(tsentry, j); + /* + * Too many tables have been added to the table_size_map, to avoid diskquota using + * too much share memory, just quit the loop. The diskquota won't work correctly + * anymore. + */ + break; } - /* mark tsentry is_exist */ - if (tsentry) set_table_size_entry_flag(tsentry, TABLE_EXIST); - active_table_entry = (ActiveTableEntryCombined *)hash_search(local_active_table_stat_map, &relOid, - HASH_FIND, &active_tbl_found); - - /* skip to recalculate the tables which are not in active list */ - if (active_tbl_found) + if (local_active_table_stat_map != NULL) { - if (cur_segid == -1) + bool active_tbl_found; + ActiveTableEntryCombined *active_table_entry = (ActiveTableEntryCombined *)hash_search( + local_active_table_stat_map, &relOid, HASH_FIND, &active_tbl_found); + /* skip to recalculate the tables which are not in active list */ + if (active_tbl_found && active_table_entry != NULL) { - /* pretend process as utility mode, and append the table size on master */ - Gp_role = GP_ROLE_UTILITY; - - /* when cur_segid is -1, the tablesize is the sum of tablesize of master and all segments */ - active_table_entry->tablesize[0] += calculate_table_size(relOid); - - Gp_role = GP_ROLE_DISPATCH; + update_active_table_size(relOid, active_table_entry->tablesize[cur_segid + 1], cur_segid, tsentry); } - /* firstly calculate the updated total size of a table */ - updated_total_size = - active_table_entry->tablesize[cur_segid + 1] - TableSizeEntryGetSize(tsentry, cur_segid); - - /* update the table_size entry */ - TableSizeEntrySetSize(tsentry, cur_segid, active_table_entry->tablesize[cur_segid + 1]); - TableSizeEntrySetFlushFlag(tsentry, cur_segid); - - /* update the disk usage, there may be entries in the map whose keys are InvlidOid as the tsentry does - * not exist in the table_size_map */ - update_size_for_quota(updated_total_size, NAMESPACE_QUOTA, (Oid[]){tsentry->namespaceoid}, cur_segid); - update_size_for_quota(updated_total_size, ROLE_QUOTA, (Oid[]){tsentry->owneroid}, cur_segid); - update_size_for_quota(updated_total_size, ROLE_TABLESPACE_QUOTA, - (Oid[]){tsentry->owneroid, tsentry->tablespaceoid}, cur_segid); - update_size_for_quota(updated_total_size, NAMESPACE_TABLESPACE_QUOTA, - (Oid[]){tsentry->namespaceoid, tsentry->tablespaceoid}, cur_segid); } + /* table size info doesn't need to flush at init quota model stage */ - if (is_init) + if (local_active_table_stat_map == NULL) { TableSizeEntryResetFlushFlag(tsentry, cur_segid); } @@ -1344,19 +1383,16 @@ flush_local_reject_map(void) * Dispatch rejectmap to segment servers. */ static void -dispatch_rejectmap(HTAB *local_active_table_stat_map) +dispatch_rejectmap(StringInfoData active_oids) { - HASH_SEQ_STATUS hash_seq; - GlobalRejectMapEntry *rejectmap_entry; - ActiveTableEntryCombined *active_table_entry; - int num_entries, count = 0; - CdbPgResults cdb_pgresults = {NULL, 0}; - StringInfoData rows; - StringInfoData active_oids; - StringInfoData sql; + HASH_SEQ_STATUS hash_seq; + GlobalRejectMapEntry *rejectmap_entry; + int num_entries, count = 0; + CdbPgResults cdb_pgresults = {NULL, 0}; + StringInfoData rows; + StringInfoData sql; initStringInfo(&rows); - initStringInfo(&active_oids); initStringInfo(&sql); LWLockAcquire(diskquota_locks.reject_map_lock, LW_SHARED); @@ -1372,16 +1408,6 @@ dispatch_rejectmap(HTAB *local_active_table_stat_map) } LWLockRelease(diskquota_locks.reject_map_lock); - count = 0; - num_entries = hash_get_num_entries(local_active_table_stat_map); - hash_seq_init(&hash_seq, local_active_table_stat_map); - while ((active_table_entry = hash_seq_search(&hash_seq)) != NULL) - { - appendStringInfo(&active_oids, "%d", active_table_entry->reloid); - - if (++count != num_entries) appendStringInfo(&active_oids, ","); - } - appendStringInfo(&sql, "select diskquota.refresh_rejectmap(" "ARRAY[%s]::diskquota.rejectmap_entry[], " @@ -1390,7 +1416,6 @@ dispatch_rejectmap(HTAB *local_active_table_stat_map) CdbDispatchCommand(sql.data, DF_NONE, &cdb_pgresults); pfree(rows.data); - pfree(active_oids.data); pfree(sql.data); cdbdisp_clearCdbPgResults(&cdb_pgresults); }