Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
RekGRpth committed Dec 17, 2024
1 parent 0d2375a commit b50db10
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 238 deletions.
4 changes: 4 additions & 0 deletions src/diskquota.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "postgres.h"
#include "port/atomics.h"

#include "access/htup.h"
#include "catalog/pg_class.h"
#include "lib/ilist.h"
#include "lib/stringinfo.h"
Expand Down Expand Up @@ -46,6 +47,8 @@
/* max number of QuotaInfoEntry in quota_info_map */
#define MAX_QUOTA_MAP_ENTRIES (AVG_QUOTA_MAP_ENTRIES < 1024 ? 1024 : AVG_QUOTA_MAP_ENTRIES)

#define DatumGetArrayTypePwrapper(X) ((X) ? DatumGetArrayTypeP(X) : NULL)

typedef enum
{
DISKQUOTA_TAG_HASH = 0,
Expand Down Expand Up @@ -321,4 +324,5 @@ extern HASHACTION check_hash_fullness(HTAB *hashp, int max_size, const char *war
TimestampTz *last_overflow_report);
bool SPI_connect_if_not_yet(void);
void SPI_finish_if(bool connected_in_this_function);
Datum SPI_getbinval_wrapper(HeapTuple tuple, TupleDesc tupdesc, const char *fname, bool allow_null, Oid typeid);
#endif
15 changes: 15 additions & 0 deletions src/diskquota_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -1714,3 +1714,18 @@ SPI_finish_if(bool connected_in_calling_function)
(errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_finish failed"),
errdetail("%s", SPI_result_code_string(rc))));
}

Datum
SPI_getbinval_wrapper(HeapTuple tuple, TupleDesc tupdesc, const char *fname, bool allow_null, Oid typeid)
{
bool isnull;
Datum datum;
int fnumber = SPI_fnumber(tupdesc, fname);
if (SPI_gettypeid(tupdesc, fnumber) != typeid)
ereport(ERROR, (errcode(ERRCODE_MOST_SPECIFIC_TYPE_MISMATCH),
errmsg("type of column \"%s\" must be \"%i\"", fname, typeid)));
datum = SPI_getbinval(tuple, tupdesc, fnumber, &isnull);
if (isnull && !allow_null)
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("column \"%s\" must not be null", fname)));
return datum;
}
208 changes: 74 additions & 134 deletions src/gp_activetable.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "storage/smgr.h"
#include "utils/faultinjector.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/inval.h"

Expand Down Expand Up @@ -85,22 +86,22 @@ static void active_table_hook_smgrtruncate(RelFileNodeBackend rnode);
static void active_table_hook_smgrunlink(RelFileNodeBackend rnode);
static void object_access_hook_QuotaStmt(ObjectAccessType access, Oid classId, Oid objectId, int subId, void *arg);

static HTAB *get_active_tables_stats(ArrayType *array);
static HTAB *get_active_tables_oid(void);
static HTAB *pull_active_list_from_seg(void);
static void pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_array);
static StringInfoData convert_map_to_string(HTAB *active_list);
static void load_table_size(HTAB *local_table_stats_map);
static void report_active_table_helper(const RelFileNodeBackend *relFileNode);
static void remove_from_active_table_map(const RelFileNodeBackend *relFileNode);
static void report_relation_cache_helper(Oid relid);
static void report_altered_reloid(Oid reloid);
static Oid get_dbid(ArrayType *array);

void init_active_table_hook(void);
void init_shm_worker_active_tables(void);
void init_lock_active_tables(void);
HTAB *gp_fetch_active_tables(bool is_init);
static HTAB *get_active_tables_stats(ArrayType *array);
static HTAB *get_active_tables_oid(void);

static StringInfoData pull_active_list_from_seg(void);
static StringInfoData pull_active_table_size_from_seg(HTAB *local_table_stats_map);
static StringInfoData load_table_size(void);

static void report_active_table_helper(const RelFileNodeBackend *relFileNode);
static void remove_from_active_table_map(const RelFileNodeBackend *relFileNode);
static void report_relation_cache_helper(Oid relid);
static void report_altered_reloid(Oid reloid);
static Oid get_dbid(ArrayType *array);

void init_active_table_hook(void);
void init_shm_worker_active_tables(void);
void init_lock_active_tables(void);

/*
* Init active_tables_map shared memory
Expand Down Expand Up @@ -363,13 +364,10 @@ remove_from_active_table_map(const RelFileNodeBackend *relFileNode)
* And aggregate the table size on each segment
* to get the real table size at cluster level.
*/
HTAB *
gp_fetch_active_tables(bool is_init)
StringInfoData
gp_fetch_active_tables(HTAB *local_active_table_stat_map)
{
HTAB *local_table_stats_map = NULL;
HASHCTL ctl;
HTAB *local_active_table_oid_maps;
StringInfoData active_oid_list;
HASHCTL ctl;

Assert(Gp_role == GP_ROLE_DISPATCH);

Expand All @@ -378,29 +376,12 @@ gp_fetch_active_tables(bool is_init)
ctl.entrysize = sizeof(ActiveTableEntryCombined) + SEGCOUNT * sizeof(Size);
ctl.hcxt = CurrentMemoryContext;

local_table_stats_map = diskquota_hash_create("local active table map with relfilenode info", 1024, &ctl,
HASH_ELEM | HASH_CONTEXT, DISKQUOTA_OID_HASH);

if (is_init)
if (local_active_table_stat_map == NULL)
{
load_table_size(local_table_stats_map);
return load_table_size();
}
else
{
/* step 1: fetch active oids from all the segments */
local_active_table_oid_maps = pull_active_list_from_seg();
active_oid_list = convert_map_to_string(local_active_table_oid_maps);

ereport(DEBUG1,
(errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] active_old_list = %s", active_oid_list.data)));

/* step 2: fetch active table sizes based on active oids */
pull_active_table_size_from_seg(local_table_stats_map, active_oid_list.data);

hash_destroy(local_active_table_oid_maps);
pfree(active_oid_list.data);
}
return local_table_stats_map;
return pull_active_table_size_from_seg(local_active_table_stat_map);
}

/*
Expand Down Expand Up @@ -936,93 +917,57 @@ get_active_tables_oid(void)
* This is called when system startup, disk quota rejectmap
* and other shared memory will be warmed up by table_size table.
*/
static void
load_table_size(HTAB *local_table_stats_map)
static StringInfoData
load_table_size(void)
{
TupleDesc tupdesc;
int i;
bool found;
ActiveTableEntryCombined *quota_entry;
SPIPlanPtr plan;
Portal portal;
char *sql = "select tableid, size, segid from diskquota.table_size";
bool connected_in_this_function = SPI_connect_if_not_yet();
SPIPlanPtr plan;
Portal portal;
int16 typlen;
bool typbyval;
char typalign;

StringInfoData active_oids;
static const char *sql = "select tableid, array_agg(size order by segid) size from diskquota.table_size group by 1";
bool connected_in_this_function = SPI_connect_if_not_yet();
MemoryContext oldContext = MemoryContextSwitchTo(CurTransactionContext);

initStringInfo(&active_oids);
MemoryContextSwitchTo(oldContext);
get_typlenbyvalalign(INT8OID, &typlen, &typbyval, &typalign);

if ((plan = SPI_prepare(sql, 0, NULL)) == NULL)
ereport(ERROR, (errmsg("[diskquota] SPI_prepare(\"%s\") failed", sql)));
if ((portal = SPI_cursor_open(NULL, plan, NULL, NULL, true)) == NULL)
ereport(ERROR, (errmsg("[diskquota] SPI_cursor_open(\"%s\") failed", sql)));

SPI_cursor_fetch(portal, true, 10000);

if (SPI_tuptable == NULL)
{
ereport(ERROR, (errmsg("[diskquota] load_table_size SPI_cursor_fetch failed")));
}

tupdesc = SPI_tuptable->tupdesc;
#if GP_VERSION_NUM < 70000
if (tupdesc->natts != 3 || ((tupdesc)->attrs[0])->atttypid != OIDOID ||
((tupdesc)->attrs[1])->atttypid != INT8OID || ((tupdesc)->attrs[2])->atttypid != INT2OID)
#else
if (tupdesc->natts != 3 || ((tupdesc)->attrs[0]).atttypid != OIDOID || ((tupdesc)->attrs[1]).atttypid != INT8OID ||
((tupdesc)->attrs[2]).atttypid != INT2OID)
#endif /* GP_VERSION_NUM */
{
if (tupdesc->natts != 3)
{
ereport(WARNING, (errmsg("[diskquota] tupdesc->natts: %d", tupdesc->natts)));
}
else
{
#if GP_VERSION_NUM < 70000
ereport(WARNING, (errmsg("[diskquota] attrs: %d, %d, %d", tupdesc->attrs[0]->atttypid,
tupdesc->attrs[1]->atttypid, tupdesc->attrs[2]->atttypid)));
#else
ereport(WARNING, (errmsg("[diskquota] attrs: %d, %d, %d", tupdesc->attrs[0].atttypid,
tupdesc->attrs[1].atttypid, tupdesc->attrs[2].atttypid)));
#endif /* GP_VERSION_NUM */
}
ereport(ERROR, (errmsg("[diskquota] table \"table_size\" is corrupted in database \"%s\","
" please recreate diskquota extension",
get_database_name(MyDatabaseId))));
}

while (SPI_processed > 0)
do
{
/* push the table oid and size into local_table_stats_map */
for (i = 0; i < SPI_processed; i++)
SPI_cursor_fetch(portal, true, 10000);
for (uint64 row = 0; row < SPI_processed; row++)
{
HeapTuple tup = SPI_tuptable->vals[i];
Datum dat;
Oid reloid;
int64 size;
int16 segid;
bool isnull;

dat = SPI_getbinval(tup, tupdesc, 1, &isnull);
if (isnull) continue;
reloid = DatumGetObjectId(dat);

dat = SPI_getbinval(tup, tupdesc, 2, &isnull);
if (isnull) continue;
size = DatumGetInt64(dat);
dat = SPI_getbinval(tup, tupdesc, 3, &isnull);
if (isnull) continue;
segid = DatumGetInt16(dat);

quota_entry = (ActiveTableEntryCombined *)hash_search(local_table_stats_map, &reloid, HASH_ENTER, &found);
quota_entry->reloid = reloid;
quota_entry->tablesize[segid + 1] = size;
HeapTuple val = SPI_tuptable->vals[row];
TupleDesc tupdesc = SPI_tuptable->tupdesc;
Oid tableid = DatumGetObjectId(SPI_getbinval_wrapper(val, tupdesc, "tableid", false, OIDOID));
ArrayType *array =
DatumGetArrayTypePwrapper(SPI_getbinval_wrapper(val, tupdesc, "size", false, INT8ARRAYOID));
Datum *sizes;
int nelems;
if (active_oids.len > 0) appendStringInfoString(&active_oids, ",");
appendStringInfo(&active_oids, "%i", tableid);
deconstruct_array(array, ARR_ELEMTYPE(array), typlen, typbyval, typalign, &sizes, NULL, &nelems);
Assert(nelems == SEGCOUNT + 1);
for (int16 segid = -1; segid < SEGCOUNT; segid++)
update_active_table_size(tableid, DatumGetInt64(sizes[segid + 1]), segid, NULL);
pfree(sizes);
}
SPI_freetuptable(SPI_tuptable);
SPI_cursor_fetch(portal, true, 10000);
}
} while (SPI_processed);

SPI_freetuptable(SPI_tuptable);
SPI_cursor_close(portal);
SPI_freeplan(plan);
SPI_finish_if(connected_in_this_function);

return active_oids;
}

/*
Expand All @@ -1036,27 +981,16 @@ convert_map_to_string(HTAB *local_active_table_oid_maps)
HASH_SEQ_STATUS iter;
StringInfoData buffer;
DiskQuotaActiveTableEntry *entry;
uint32 count = 0;
uint32 nitems = hash_get_num_entries(local_active_table_oid_maps);

initStringInfo(&buffer);
appendStringInfo(&buffer, "{");

hash_seq_init(&iter, local_active_table_oid_maps);

while ((entry = (DiskQuotaActiveTableEntry *)hash_seq_search(&iter)) != NULL)
{
count++;
if (count != nitems)
{
appendStringInfo(&buffer, "%d,", entry->reloid);
}
else
{
appendStringInfo(&buffer, "%d", entry->reloid);
}
if (buffer.len > 0) appendStringInfoString(&buffer, ",");
appendStringInfo(&buffer, "%d", entry->reloid);
}
appendStringInfo(&buffer, "}");

return buffer;
}
Expand All @@ -1067,7 +1001,7 @@ convert_map_to_string(HTAB *local_active_table_oid_maps)
* Function diskquota_fetch_table_stat is called to calculate
* the table size on the fly.
*/
static HTAB *
static StringInfoData
pull_active_list_from_seg(void)
{
CdbPgResults cdb_pgresults = {NULL, 0};
Expand Down Expand Up @@ -1120,7 +1054,10 @@ pull_active_list_from_seg(void)
}
cdbdisp_clearCdbPgResults(&cdb_pgresults);

return local_active_table_oid_map;
StringInfoData active_oids = convert_map_to_string(local_active_table_oid_map);
hash_destroy(local_active_table_oid_map);

return active_oids;
}

/*
Expand All @@ -1132,17 +1069,20 @@ pull_active_list_from_seg(void)
* memory), so when re-calculate the table size, we need to sum the
* table size on all of the segments.
*/
static void
pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_array)
static StringInfoData
pull_active_table_size_from_seg(HTAB *local_table_stats_map)
{
/* step 1: fetch active oids from all the segments */
StringInfoData active_oids = pull_active_list_from_seg();
/* step 2: fetch active table sizes based on active oids */
CdbPgResults cdb_pgresults = {NULL, 0};
StringInfoData sql_command;
int i;
int j;

initStringInfo(&sql_command);
appendStringInfo(&sql_command, "select * from diskquota.diskquota_fetch_table_stat(1, '%s'::oid[])",
active_oid_array);
appendStringInfo(&sql_command, "select * from diskquota.diskquota_fetch_table_stat(1, '{%s}'::oid[])",
active_oids.data);
CdbDispatchCommand(sql_command.data, DF_NONE, &cdb_pgresults);
pfree(sql_command.data);

Expand Down Expand Up @@ -1189,5 +1129,5 @@ pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_ar
}
}
cdbdisp_clearCdbPgResults(&cdb_pgresults);
return;
return active_oids;
}
10 changes: 6 additions & 4 deletions src/gp_activetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ typedef struct ActiveTableEntryCombined
Size tablesize[1];
} ActiveTableEntryCombined;

extern HTAB *gp_fetch_active_tables(bool force);
extern void init_active_table_hook(void);
extern void init_shm_worker_active_tables(void);
extern void init_lock_active_tables(void);
extern StringInfoData gp_fetch_active_tables(HTAB *local_active_table_stat_map);

extern void init_active_table_hook(void);
extern void init_shm_worker_active_tables(void);
extern void init_lock_active_tables(void);
extern void update_active_table_size(Oid tableid, int64 size, int16 segid, void *arg);

extern HTAB *monitored_dbid_cache;

Expand Down
Loading

0 comments on commit b50db10

Please sign in to comment.