From b3778e223936dda30cef9d2673a12dc4b2c3e1e4 Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Wed, 25 Dec 2024 20:44:51 +0000 Subject: [PATCH 1/6] Write bloom filter for AO --- src/backend/access/appendonly/Makefile | 2 +- src/backend/access/appendonly/aobloomfilter.c | 48 ++++++++++++++++ src/backend/access/appendonly/appendonlyam.c | 57 +++++++++++++++++++ src/include/access/aobloomfilter.h | 37 ++++++++++++ 4 files changed, 143 insertions(+), 1 deletion(-) create mode 100644 src/backend/access/appendonly/aobloomfilter.c create mode 100644 src/include/access/aobloomfilter.h diff --git a/src/backend/access/appendonly/Makefile b/src/backend/access/appendonly/Makefile index ba30f34fd90..94d74c16f4b 100755 --- a/src/backend/access/appendonly/Makefile +++ b/src/backend/access/appendonly/Makefile @@ -10,7 +10,7 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) -OBJS = appendonlyam.o aosegfiles.o aomd.o appendonlywriter.o appendonlytid.o \ +OBJS = aobloomfilter.o appendonlyam.o aosegfiles.o aomd.o appendonlywriter.o appendonlytid.o \ appendonlyblockdirectory.o appendonly_visimap.o \ appendonly_visimap_entry.o appendonly_visimap_store.o \ appendonly_compaction.o appendonly_visimap_udf.o \ diff --git a/src/backend/access/appendonly/aobloomfilter.c b/src/backend/access/appendonly/aobloomfilter.c new file mode 100644 index 00000000000..089b316825b --- /dev/null +++ b/src/backend/access/appendonly/aobloomfilter.c @@ -0,0 +1,48 @@ + +#include "postgres.h" + +#include "utils/rel.h" +#include "storage/buf_internals.h" +#include "storage/bufpage.h" + +#include "access/aobloomfilter.h" + +void SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int aoblknum) +{ + Buffer buffer; + Page page; + BlockNumber blkno; + AOBFPage metapage; + BloomFilterFixed *fixed_filter; + char * pointer; + + fixed_filter = ao_bloom_filter_serealize(filter); + + blkno = AOBLF_CALC_PAGE(aoblknum); + + buffer = ReadBufferExtended(aorel, VISIBILITYMAP_FORKNUM, blkno, RBM_NORMAL, NULL); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + page = BufferGetPage(buffer); + + metapage = PageGetContents(page); + + pointer = PageGetSpecialPointer(page) + AOBLF_CALC_OFFSET(aoblknum) * sizeof(BloomFilterFixed); + + memcpy(pointer, fixed_filter, sizeof(BloomFilterFixed)); + + MarkBufferDirtyHint(buffer, false); + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); +} + +bloom_filter * +ao_bloom_filter_deserealize(BloomFilterFixed *bf) +{ + +} + +BloomFilterFixed * +ao_bloom_filter_serealize(bloom_filter *bf) +{ + +} \ No newline at end of file diff --git a/src/backend/access/appendonly/appendonlyam.c b/src/backend/access/appendonly/appendonlyam.c index d926b22f7ff..e8f69c38046 100755 --- a/src/backend/access/appendonly/appendonlyam.c +++ b/src/backend/access/appendonly/appendonlyam.c @@ -34,6 +34,7 @@ #include "access/appendonlytid.h" #include "access/appendonlywriter.h" #include "access/aomd.h" +#include "access/aobloomfilter.h" #include "access/transam.h" #include "access/tupdesc.h" #include "access/tuptoaster.h" @@ -1436,12 +1437,63 @@ setupNextWriteBlock(AppendOnlyInsertDesc aoInsertDesc) aoInsertDesc->bufferCount++; } +static void +ao_fsm_extend(Relation rel, BlockNumber fsm_nblocks) +{ + BlockNumber fsm_nblocks_now; + PGAlignedBlock pg; + + PageInit((Page) pg.data, BLCKSZ, 0); + + /* + * We use the relation extension lock to lock out other backends trying to + * extend the FSM at the same time. It also locks out extension of the + * main fork, unnecessarily, but extending the FSM happens seldom enough + * that it doesn't seem worthwhile to have a separate lock tag type for + * it. + * + * Note that another backend might have extended or created the relation + * by the time we get the lock. + */ + LockRelationForExtension(rel, ExclusiveLock); + + /* Might have to re-open if a cache flush happened */ + RelationOpenSmgr(rel); + + /* + * Create the FSM file first if it doesn't exist. If smgr_fsm_nblocks is + * positive then it must exist, no need for an smgrexists call. + */ + if ((rel->rd_smgr->smgr_fsm_nblocks == 0 || + rel->rd_smgr->smgr_fsm_nblocks == InvalidBlockNumber) && + !smgrexists(rel->rd_smgr, FSM_FORKNUM)) + smgrcreate(rel->rd_smgr, FSM_FORKNUM, false); + + fsm_nblocks_now = smgrnblocks(rel->rd_smgr, FSM_FORKNUM); + + while (fsm_nblocks_now < fsm_nblocks) + { + PageSetChecksumInplace((Page) pg.data, fsm_nblocks_now); + + smgrextend(rel->rd_smgr, FSM_FORKNUM, fsm_nblocks_now, + pg.data, false); + fsm_nblocks_now++; + } + + /* Update local cache with the up-to-date size */ + rel->rd_smgr->smgr_fsm_nblocks = fsm_nblocks_now; + + UnlockRelationForExtension(rel, ExclusiveLock); +} + + static void finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc) { int executorBlockKind; int itemCount; int32 dataLen; + BlockNumber aoblknum; executorBlockKind = AoExecutorBlockKind_VarBlock; /* Assume. */ @@ -1460,6 +1512,11 @@ finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc) aoInsertDesc->varblockCount++; + aoblknum = 1 + AOBLF_CALC_PAGE(aoInsertDesc->varblockCount); + + if (aoblknum < smgrnblocks(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM)) + ao_fsm_extend(aoInsertDesc->aoi_rel, aoblknum); + if (!aoInsertDesc->shouldCompress) { if (itemCount == 1) diff --git a/src/include/access/aobloomfilter.h b/src/include/access/aobloomfilter.h new file mode 100644 index 00000000000..ca2d2117dd4 --- /dev/null +++ b/src/include/access/aobloomfilter.h @@ -0,0 +1,37 @@ +#ifndef AOBLOOMFILTER_H +#define AOBLOOMFILTER_H + +#include "lib/bloomfilter.h" + +#include "access/hash.h" +#include "lib/bloomfilter.h" + +#define AOBLF_SIZE 128 + +typedef struct BloomFilterFixed { + /* K hash functions are used, seeded by caller's seed */ + int k_hash_funcs; + uint64 seed; + /* m is bitset size, in bits. Must be a power of two <= 2^32. */ + unsigned char bitset[AOBLF_SIZE]; +} BloomFilterFixed; + +#define AO_BLF_SIZE (4 + 8 + AOBLF_SIZE) +#define AOBLF_N_FILTERS (8000 / AOBLF_SIZE) + +typedef struct AOBFPageData { + BloomFilterFixed filters[AOBLF_N_FILTERS]; +} AOBFPageData; + +typedef AOBFPageData *AOBFPage; + +#define AOBLF_CALC_PAGE(x) (x / AOBLF_N_FILTERS) +#define AOBLF_CALC_OFFSET(x) (x % AOBLF_N_FILTERS) + +extern bloom_filter* ao_bloom_filter_deserealize(BloomFilterFixed *bf); +extern BloomFilterFixed* ao_bloom_filter_serealize(bloom_filter *bf); + + +extern void SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int aoblknum); + +#endif /* AOBLOOMFILTER_H */ \ No newline at end of file From 354f9143ffcc8064fcdf6b4b810141b8228f8017 Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Thu, 26 Dec 2024 09:24:19 +0000 Subject: [PATCH 2/6] WIP --- src/backend/access/appendonly/aobloomfilter.c | 24 +++++++++----- src/backend/access/appendonly/appendonlyam.c | 31 +++++++++++++++++-- src/backend/cdb/cdbvarblock.c | 7 ++++- src/backend/lib/bloomfilter.c | 27 +++++++++------- src/include/access/aobloomfilter.h | 8 ++--- src/include/cdb/cdbvarblock.h | 7 ++++- src/include/lib/bloomfilter.h | 14 +++++++++ 7 files changed, 90 insertions(+), 28 deletions(-) diff --git a/src/backend/access/appendonly/aobloomfilter.c b/src/backend/access/appendonly/aobloomfilter.c index 089b316825b..da70a4845a5 100644 --- a/src/backend/access/appendonly/aobloomfilter.c +++ b/src/backend/access/appendonly/aobloomfilter.c @@ -4,28 +4,27 @@ #include "utils/rel.h" #include "storage/buf_internals.h" #include "storage/bufpage.h" +#include "lib/bloomfilter.h" #include "access/aobloomfilter.h" -void SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int aoblknum) +void +SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int aoblknum) { Buffer buffer; Page page; BlockNumber blkno; - AOBFPage metapage; BloomFilterFixed *fixed_filter; char * pointer; - fixed_filter = ao_bloom_filter_serealize(filter); + fixed_filter = ao_bloom_filter_serealize(filter, offset); blkno = AOBLF_CALC_PAGE(aoblknum); - buffer = ReadBufferExtended(aorel, VISIBILITYMAP_FORKNUM, blkno, RBM_NORMAL, NULL); + buffer = ReadBufferExtended(aorel, FSM_FORKNUM, blkno, RBM_NORMAL, NULL); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); page = BufferGetPage(buffer); - metapage = PageGetContents(page); - pointer = PageGetSpecialPointer(page) + AOBLF_CALC_OFFSET(aoblknum) * sizeof(BloomFilterFixed); memcpy(pointer, fixed_filter, sizeof(BloomFilterFixed)); @@ -33,6 +32,7 @@ void SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int aoblknum) MarkBufferDirtyHint(buffer, false); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); } bloom_filter * @@ -42,7 +42,17 @@ ao_bloom_filter_deserealize(BloomFilterFixed *bf) } BloomFilterFixed * -ao_bloom_filter_serealize(bloom_filter *bf) +ao_bloom_filter_serealize(bloom_filter *bf, int64 off) { + BloomFilterFixed *blff; + + blff = palloc0(sizeof(BloomFilterFixed)); + + blff->offset = off; + blff->k_hash_funcs = bf->k_hash_funcs; + blff->seed = bf->seed; + + memcpy(blff->bitset, bf->bitset, sizeof(char) * AOBLF_SIZE); + return blff; } \ No newline at end of file diff --git a/src/backend/access/appendonly/appendonlyam.c b/src/backend/access/appendonly/appendonlyam.c index e8f69c38046..d7b6dbee63c 100755 --- a/src/backend/access/appendonly/appendonlyam.c +++ b/src/backend/access/appendonly/appendonlyam.c @@ -1417,7 +1417,8 @@ setupNextWriteBlock(AppendOnlyInsertDesc aoInsertDesc) aoInsertDesc->nonCompressedData, aoInsertDesc->maxDataLen, aoInsertDesc->tempSpace, - aoInsertDesc->tempSpaceLen); + aoInsertDesc->tempSpaceLen, + aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition); } else @@ -1431,7 +1432,8 @@ setupNextWriteBlock(AppendOnlyInsertDesc aoInsertDesc) aoInsertDesc->uncompressedBuffer, aoInsertDesc->maxDataLen, aoInsertDesc->tempSpace, - aoInsertDesc->tempSpaceLen); + aoInsertDesc->tempSpaceLen, + aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition); } aoInsertDesc->bufferCount++; @@ -1514,9 +1516,18 @@ finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc) aoblknum = 1 + AOBLF_CALC_PAGE(aoInsertDesc->varblockCount); - if (aoblknum < smgrnblocks(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM)) + RelationOpenSmgr(aoInsertDesc->aoi_rel); + + smgrcreate(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM, true); + + if (aoblknum > smgrnblocks(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM)) ao_fsm_extend(aoInsertDesc->aoi_rel, aoblknum); + + SaveBloomFilterForBlock(aoInsertDesc->aoi_rel, + aoInsertDesc->varBlockMaker.blf, + aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition, aoblknum); + if (!aoInsertDesc->shouldCompress) { if (itemCount == 1) @@ -2958,6 +2969,9 @@ appendonly_insert(AppendOnlyInsertDesc aoInsertDesc, MemTuple tup = NULL; bool need_toast; bool isLargeContent; + bool isnull; + Datum bloom_datum; + Form_pg_attribute att; Assert(aoInsertDesc->usableBlockSize > 0 && aoInsertDesc->tempSpaceLen > 0); Assert(aoInsertDesc->toast_tuple_threshold > 0 && aoInsertDesc->toast_tuple_target > 0); @@ -3121,6 +3135,17 @@ appendonly_insert(AppendOnlyInsertDesc aoInsertDesc, } } + bloom_datum = memtuple_getattr(tup, aoInsertDesc->mt_bind, 1, &isnull); + + if (isnull) + elog(ERROR, "FAILED TO BLOOM NULL TUP"); + + + att = TupleDescAttr(RelationGetDescr(aoInsertDesc->aoi_rel), 0); + if (!att->attbyval) + elog(ERROR, "FAILED TO BLOOM BY REF TUP"); + bloom_add_element(aoInsertDesc->varBlockMaker.blf, &bloom_datum, att->attlen); + if (!isLargeContent) { /* diff --git a/src/backend/cdb/cdbvarblock.c b/src/backend/cdb/cdbvarblock.c index 53c329819fa..a04f50cf6c4 100644 --- a/src/backend/cdb/cdbvarblock.c +++ b/src/backend/cdb/cdbvarblock.c @@ -19,6 +19,7 @@ #include "postgres.h" #include "cdb/cdbvarblock.h" +#include "access/aobloomfilter.h" static VarBlockByteLen VarBlockGetItemLen( VarBlockReader *varBlockReader, @@ -53,7 +54,8 @@ VarBlockMakerInit( uint8 *buffer, VarBlockByteLen maxBufferLen, uint8 *tempScratchSpace, - int tempScratchSpaceLen) + int tempScratchSpaceLen, + int64 largeWritePosition) { Assert(varBlockMaker != NULL); Assert(buffer != NULL); @@ -83,6 +85,9 @@ VarBlockMakerInit( memset(buffer, 0, VARBLOCK_HEADER_LEN); VarBlockSet_version(varBlockMaker->header, InitialVersion); VarBlockSet_offsetsAreSmall(varBlockMaker->header, true); + + varBlockMaker->blf = bloom_create_nbytes(AOBLF_SIZE, random() % PG_INT32_MAX); + varBlockMaker->largeWritePosition = largeWritePosition; } /* diff --git a/src/backend/lib/bloomfilter.c b/src/backend/lib/bloomfilter.c index fc48ee09ffd..9dfcc0e6ec3 100644 --- a/src/backend/lib/bloomfilter.c +++ b/src/backend/lib/bloomfilter.c @@ -38,18 +38,6 @@ #include "access/hash.h" #include "lib/bloomfilter.h" -#define MAX_HASH_FUNCS 10 - -struct bloom_filter -{ - /* K hash functions are used, seeded by caller's seed */ - int k_hash_funcs; - uint64 seed; - /* m is bitset size, in bits. Must be a power of two <= 2^32. */ - uint64 m; - unsigned char bitset[FLEXIBLE_ARRAY_MEMBER]; -}; - static int my_bloom_power(uint64 target_bitset_bits); static int optimal_k(uint64 bitset_bits, int64 total_elems); static void k_hashes(bloom_filter *filter, uint32 *hashes, unsigned char *elem, @@ -119,6 +107,21 @@ bloom_create(int64 total_elems, int bloom_work_mem, uint64 seed) return filter; } +bloom_filter * +bloom_create_nbytes(int64 nbytes, uint64 seed) +{ + bloom_filter *filter; + + /* Allocate bloom filter with unset bitset */ + filter = palloc0(offsetof(bloom_filter, bitset) + + sizeof(unsigned char) * nbytes); + filter->k_hash_funcs = MAX_HASH_FUNCS; + filter->seed = seed; + filter->m = nbytes; + + return filter; +} + /* * Free Bloom filter */ diff --git a/src/include/access/aobloomfilter.h b/src/include/access/aobloomfilter.h index ca2d2117dd4..e9d4089aa1c 100644 --- a/src/include/access/aobloomfilter.h +++ b/src/include/access/aobloomfilter.h @@ -10,6 +10,7 @@ typedef struct BloomFilterFixed { /* K hash functions are used, seeded by caller's seed */ + int64 offset; int k_hash_funcs; uint64 seed; /* m is bitset size, in bits. Must be a power of two <= 2^32. */ @@ -17,7 +18,7 @@ typedef struct BloomFilterFixed { } BloomFilterFixed; #define AO_BLF_SIZE (4 + 8 + AOBLF_SIZE) -#define AOBLF_N_FILTERS (8000 / AOBLF_SIZE) +#define AOBLF_N_FILTERS (32500 / AOBLF_SIZE) typedef struct AOBFPageData { BloomFilterFixed filters[AOBLF_N_FILTERS]; @@ -29,9 +30,8 @@ typedef AOBFPageData *AOBFPage; #define AOBLF_CALC_OFFSET(x) (x % AOBLF_N_FILTERS) extern bloom_filter* ao_bloom_filter_deserealize(BloomFilterFixed *bf); -extern BloomFilterFixed* ao_bloom_filter_serealize(bloom_filter *bf); +extern BloomFilterFixed* ao_bloom_filter_serealize(bloom_filter *bf, int64 off); - -extern void SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int aoblknum); +extern void SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int aoblknum); #endif /* AOBLOOMFILTER_H */ \ No newline at end of file diff --git a/src/include/cdb/cdbvarblock.h b/src/include/cdb/cdbvarblock.h index bcc19a299f8..4231a82bc1d 100644 --- a/src/include/cdb/cdbvarblock.h +++ b/src/include/cdb/cdbvarblock.h @@ -90,6 +90,8 @@ #ifndef CDBVARBLOCK_H #define CDBVARBLOCK_H +#include "lib/bloomfilter.h" + typedef int32 VarBlockByteLen; typedef int32 VarBlockByteOffset; @@ -224,6 +226,8 @@ typedef struct VarBlockMaker /* The maximum number of items for the VarBlock. * Based on the length of the scratch area. */ + bloom_filter * blf; + int64 largeWritePosition; } VarBlockMaker; // ---------------------------------------------------------- @@ -282,7 +286,8 @@ extern void VarBlockMakerInit( uint8 *buffer, VarBlockByteLen maxBufferLen, uint8 *tempScratchSpace, - int tempScratchSpaceLen); + int tempScratchSpaceLen, + int64 largeWritePos); /* * Get a pointer to the next variable-length item so it can diff --git a/src/include/lib/bloomfilter.h b/src/include/lib/bloomfilter.h index 6cbdd9bfd99..54875a5b6a2 100644 --- a/src/include/lib/bloomfilter.h +++ b/src/include/lib/bloomfilter.h @@ -13,10 +13,24 @@ #ifndef BLOOMFILTER_H #define BLOOMFILTER_H +#define MAX_HASH_FUNCS 10 + +struct bloom_filter +{ + /* K hash functions are used, seeded by caller's seed */ + int k_hash_funcs; + uint64 seed; + /* m is bitset size, in bits. Must be a power of two <= 2^32. */ + uint64 m; + unsigned char bitset[FLEXIBLE_ARRAY_MEMBER]; +}; + typedef struct bloom_filter bloom_filter; extern bloom_filter *bloom_create(int64 total_elems, int bloom_work_mem, uint64 seed); +extern bloom_filter *bloom_create_nbytes(int64 nbytes, uint64 seed); + extern void bloom_free(bloom_filter *filter); extern void bloom_add_element(bloom_filter *filter, unsigned char *elem, size_t len); From 593e640ac9124ada9e49db6d23e7316b5be669ac Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Thu, 26 Dec 2024 09:33:08 +0000 Subject: [PATCH 3/6] fix --- src/backend/access/appendonly/aobloomfilter.c | 6 +++--- src/backend/access/appendonly/appendonlyam.c | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/backend/access/appendonly/aobloomfilter.c b/src/backend/access/appendonly/aobloomfilter.c index da70a4845a5..dc94acb27f8 100644 --- a/src/backend/access/appendonly/aobloomfilter.c +++ b/src/backend/access/appendonly/aobloomfilter.c @@ -9,7 +9,7 @@ #include "access/aobloomfilter.h" void -SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int aoblknum) +SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int varblocknum) { Buffer buffer; Page page; @@ -19,13 +19,13 @@ SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int fixed_filter = ao_bloom_filter_serealize(filter, offset); - blkno = AOBLF_CALC_PAGE(aoblknum); + blkno = AOBLF_CALC_PAGE(varblocknum); buffer = ReadBufferExtended(aorel, FSM_FORKNUM, blkno, RBM_NORMAL, NULL); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); page = BufferGetPage(buffer); - pointer = PageGetSpecialPointer(page) + AOBLF_CALC_OFFSET(aoblknum) * sizeof(BloomFilterFixed); + pointer = PageGetSpecialPointer(page) + AOBLF_CALC_OFFSET(varblocknum) * sizeof(BloomFilterFixed); memcpy(pointer, fixed_filter, sizeof(BloomFilterFixed)); diff --git a/src/backend/access/appendonly/appendonlyam.c b/src/backend/access/appendonly/appendonlyam.c index d7b6dbee63c..4898999d39e 100755 --- a/src/backend/access/appendonly/appendonlyam.c +++ b/src/backend/access/appendonly/appendonlyam.c @@ -1512,8 +1512,6 @@ finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc) dataLen = VarBlockMakerFinish(&aoInsertDesc->varBlockMaker); - aoInsertDesc->varblockCount++; - aoblknum = 1 + AOBLF_CALC_PAGE(aoInsertDesc->varblockCount); RelationOpenSmgr(aoInsertDesc->aoi_rel); @@ -1526,7 +1524,9 @@ finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc) SaveBloomFilterForBlock(aoInsertDesc->aoi_rel, aoInsertDesc->varBlockMaker.blf, - aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition, aoblknum); + aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition, aoInsertDesc->varblockCount); + + aoInsertDesc->varblockCount++; if (!aoInsertDesc->shouldCompress) { From 6c7c1b6d9abdbcfd5c40d34a891db6091a865fe2 Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Thu, 26 Dec 2024 16:50:39 +0000 Subject: [PATCH 4/6] Hacking --- contrib/pageinspect/aofuncs.c | 41 ++++++++++++++++ contrib/pageinspect/pageinspect--1.8.sql | 11 +++++ src/backend/access/aocs/aocsam.c | 17 +++++-- src/backend/access/appendonly/aobloomfilter.c | 47 +++++++++++++++++-- src/backend/access/appendonly/appendonlyam.c | 23 +++++---- src/backend/cdb/cdbappendonlystoragewrite.c | 4 +- src/backend/cdb/cdbbufferedappend.c | 4 +- src/backend/utils/datumstream/datumstream.c | 13 ++++- src/include/access/aobloomfilter.h | 7 +-- src/include/cdb/cdbappendonlystoragewrite.h | 1 + src/include/cdb/cdbbufferedappend.h | 4 +- src/include/utils/datumstream.h | 1 + 12 files changed, 150 insertions(+), 23 deletions(-) diff --git a/contrib/pageinspect/aofuncs.c b/contrib/pageinspect/aofuncs.c index b27fa6a58bd..ecf5a64316c 100644 --- a/contrib/pageinspect/aofuncs.c +++ b/contrib/pageinspect/aofuncs.c @@ -12,11 +12,14 @@ #include "catalog/pg_namespace.h" #include "utils/syscache.h" +#include "access/aobloomfilter.h" + #include "cdb/cdbaocsam.h" #include "cdb/cdbappendonlyam.h" PG_FUNCTION_INFO_V1(get_ao_headers_info); PG_FUNCTION_INFO_V1(get_aocs_headers_info); +PG_FUNCTION_INFO_V1(check_bloom_filter); typedef struct AOHeadersInfoCxt { AppendOnlyScanDesc scan; @@ -427,4 +430,42 @@ get_aocs_headers_info(PG_FUNCTION_ARGS) relation_close(r, AccessShareLock); SRF_RETURN_DONE(funcctx); +} + + +/* aobloomfilter.c */ +Datum +check_bloom_filter(PG_FUNCTION_ARGS) +{ + bool res; + bloom_filter * blf; + Relation r; + Form_pg_attribute att; + Oid oid = PG_GETARG_OID(0); + int64 varblocknum = PG_GETARG_INT64(1); + Datum d = PG_GETARG_DATUM(2); + + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser to use raw page functions")))); + + r = relation_open(oid, AccessShareLock); + + if (!RelationIsAoCols(r)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + (errmsg("must be an AOCS relation to use raw headerfunctions")))); + + + att = TupleDescAttr(RelationGetDescr(r), 0); + if (!att->attbyval) + elog(ERROR, "FAILED TO BLOOM BY REF TUP"); + + blf = FetchBloomFilterForVarblock(r, varblocknum); + + res = !bloom_lacks_element(blf, d, att->attlen); + + PG_RETURN_BOOL(res); } \ No newline at end of file diff --git a/contrib/pageinspect/pageinspect--1.8.sql b/contrib/pageinspect/pageinspect--1.8.sql index b01a3d9898e..a00d0688760 100644 --- a/contrib/pageinspect/pageinspect--1.8.sql +++ b/contrib/pageinspect/pageinspect--1.8.sql @@ -205,3 +205,14 @@ RETURNS TABLE ( AS 'MODULE_PATHNAME', 'get_aocs_headers_info' LANGUAGE C STRICT EXECUTE ON ALL SEGMENTS; + + +CREATE FUNCTION check_bloom_filter( + reloid OID, + varblocknum BIGINT, + d INT +) +RETURNS BOOLEAN +AS 'MODULE_PATHNAME', 'check_bloom_filter' + LANGUAGE C STRICT +EXECUTE ON ALL SEGMENTS; diff --git a/src/backend/access/aocs/aocsam.c b/src/backend/access/aocs/aocsam.c index 0f02f8e714e..0ef381c17a6 100644 --- a/src/backend/access/aocs/aocsam.c +++ b/src/backend/access/aocs/aocsam.c @@ -875,9 +875,15 @@ OpenAOCSDatumStreams(AOCSInsertDesc desc) FormatAOSegmentFileName(basepath, seginfo->segno, i, &fileSegNo, fn); Assert(strlen(fn) + 1 <= MAXPGPATH); - datumstreamwrite_open_file(desc->ds[i], fn, e->eof, e->eof_uncompressed, seginfo->modcount, - &rnode, - fileSegNo, seginfo->formatversion); + datumstreamwrite_open_file(desc->ds[i], + fn, + e->eof, + e->eof_uncompressed, + 0, + seginfo->modcount, + &rnode, + fileSegNo, + seginfo->formatversion); } pfree(basepath); @@ -2041,7 +2047,10 @@ aocs_addcol_newsegfile(AOCSAddColumnDesc desc, &fileSegNo, fn); Assert(strlen(fn) + 1 <= MAXPGPATH); datumstreamwrite_open_file(desc->dsw[i], fn, - 0 /* eof */ , 0 /* eof_uncompressed */ , 0, /*modcount*/ + 0 /* eof */ , + 0 /* eof_uncompressed */ , + 0 /* varblockcount */, + 0, /*modcount*/ &relfilenode, fileSegNo, version); desc->dsw[i]->blockFirstRowNum = 1; diff --git a/src/backend/access/appendonly/aobloomfilter.c b/src/backend/access/appendonly/aobloomfilter.c index dc94acb27f8..5135818c7fa 100644 --- a/src/backend/access/appendonly/aobloomfilter.c +++ b/src/backend/access/appendonly/aobloomfilter.c @@ -6,39 +6,80 @@ #include "storage/bufpage.h" #include "lib/bloomfilter.h" +#include "access/heapam_xlog.h" + #include "access/aobloomfilter.h" void -SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int varblocknum) +SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int64 varblocknum) { Buffer buffer; Page page; BlockNumber blkno; + OffsetNumber page_offset; BloomFilterFixed *fixed_filter; char * pointer; fixed_filter = ao_bloom_filter_serealize(filter, offset); blkno = AOBLF_CALC_PAGE(varblocknum); + page_offset = AOBLF_CALC_OFFSET(varblocknum); buffer = ReadBufferExtended(aorel, FSM_FORKNUM, blkno, RBM_NORMAL, NULL); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); page = BufferGetPage(buffer); - pointer = PageGetSpecialPointer(page) + AOBLF_CALC_OFFSET(varblocknum) * sizeof(BloomFilterFixed); + pointer = PageGetContents(page) + page_offset * sizeof(BloomFilterFixed); memcpy(pointer, fixed_filter, sizeof(BloomFilterFixed)); - MarkBufferDirtyHint(buffer, false); + MarkBufferDirty(buffer); + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); +} + +bloom_filter * +FetchBloomFilterForVarblock(Relation aorel, int64 varblocknum) +{ + Buffer buffer; + Page page; + BlockNumber blkno; + OffsetNumber page_offset; + BloomFilterFixed *fixed_filter; + char * pointer; + + blkno = AOBLF_CALC_PAGE(varblocknum); + page_offset = AOBLF_CALC_OFFSET(varblocknum); + + fixed_filter = palloc0(sizeof(BloomFilterFixed)); + + buffer = ReadBufferExtended(aorel, FSM_FORKNUM, blkno, RBM_NORMAL, NULL); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + page = BufferGetPage(buffer); + + + pointer = PageGetContents(page) + page_offset * sizeof(BloomFilterFixed); + + memcpy(fixed_filter, pointer, sizeof(BloomFilterFixed)); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); + + return ao_bloom_filter_deserealize(fixed_filter); } bloom_filter * ao_bloom_filter_deserealize(BloomFilterFixed *bf) { + bloom_filter * blf; + blf = bloom_create_nbytes(AOBLF_SIZE, bf->seed); + + blf->k_hash_funcs = bf->k_hash_funcs; + blf->m = AOBLF_SIZE; + memcpy(blf->bitset, bf->bitset, sizeof(char) * AOBLF_SIZE); + return blf; } BloomFilterFixed * diff --git a/src/backend/access/appendonly/appendonlyam.c b/src/backend/access/appendonly/appendonlyam.c index 4898999d39e..ad1735ef508 100755 --- a/src/backend/access/appendonly/appendonlyam.c +++ b/src/backend/access/appendonly/appendonlyam.c @@ -511,6 +511,7 @@ SetCurrentFileSegForWrite(AppendOnlyInsertDesc aoInsertDesc) aoInsertDesc->fsInfo->formatversion, eof, eof_uncompressed, + fsinfo->varblockcount, aoInsertDesc->fsInfo->modcount, &rnode, aoInsertDesc->cur_segno); @@ -1495,6 +1496,7 @@ finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc) int executorBlockKind; int itemCount; int32 dataLen; + int64 varblkcnt; BlockNumber aoblknum; executorBlockKind = AoExecutorBlockKind_VarBlock; @@ -1512,19 +1514,24 @@ finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc) dataLen = VarBlockMakerFinish(&aoInsertDesc->varBlockMaker); - aoblknum = 1 + AOBLF_CALC_PAGE(aoInsertDesc->varblockCount); + if (RelationIsAoRows(aoInsertDesc->aoi_rel)) + { + varblkcnt = aoInsertDesc->storageWrite.bufferedAppend.largeWriteVarBlock + aoInsertDesc->varblockCount; + + aoblknum = 1 + AOBLF_CALC_PAGE(varblkcnt); - RelationOpenSmgr(aoInsertDesc->aoi_rel); + RelationOpenSmgr(aoInsertDesc->aoi_rel); - smgrcreate(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM, true); + smgrcreate(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM, true); - if (aoblknum > smgrnblocks(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM)) - ao_fsm_extend(aoInsertDesc->aoi_rel, aoblknum); + if (aoblknum > smgrnblocks(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM)) + ao_fsm_extend(aoInsertDesc->aoi_rel, aoblknum); - SaveBloomFilterForBlock(aoInsertDesc->aoi_rel, - aoInsertDesc->varBlockMaker.blf, - aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition, aoInsertDesc->varblockCount); + SaveBloomFilterForBlock(aoInsertDesc->aoi_rel, + aoInsertDesc->varBlockMaker.blf, + aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition, varblkcnt); + } aoInsertDesc->varblockCount++; diff --git a/src/backend/cdb/cdbappendonlystoragewrite.c b/src/backend/cdb/cdbappendonlystoragewrite.c index b4cb5c902c5..b3d5e6e7b45 100755 --- a/src/backend/cdb/cdbappendonlystoragewrite.c +++ b/src/backend/cdb/cdbappendonlystoragewrite.c @@ -315,6 +315,7 @@ AppendOnlyStorageWrite_OpenFile(AppendOnlyStorageWrite *storageWrite, int version, int64 logicalEof, int64 fileLen_uncompressed, + int64 varblockcount, int64 modcount, RelFileNodeBackend *relFileNode, int32 segmentFileNum) @@ -407,7 +408,8 @@ AppendOnlyStorageWrite_OpenFile(AppendOnlyStorageWrite *storageWrite, storageWrite->segmentFileNum, storageWrite->segmentFileName, logicalEof, - fileLen_uncompressed); + fileLen_uncompressed, + varblockcount); } /* diff --git a/src/backend/cdb/cdbbufferedappend.c b/src/backend/cdb/cdbbufferedappend.c index 49006e7a084..864bf268cca 100644 --- a/src/backend/cdb/cdbbufferedappend.c +++ b/src/backend/cdb/cdbbufferedappend.c @@ -114,7 +114,8 @@ BufferedAppendSetFile(BufferedAppend *bufferedAppend, int32 segmentFileNum, char *filePathName, int64 eof, - int64 eof_uncompressed) + int64 eof_uncompressed, + int64 varblock) { Assert(bufferedAppend != NULL); Assert(bufferedAppend->largeWritePosition == 0); @@ -128,6 +129,7 @@ BufferedAppendSetFile(BufferedAppend *bufferedAppend, Assert(eof >= 0); bufferedAppend->largeWritePosition = eof; + bufferedAppend->largeWriteVarBlock = varblock; bufferedAppend->file = file; bufferedAppend->relFileNode = relFileNode; bufferedAppend->segmentFileNum = segmentFileNum; diff --git a/src/backend/utils/datumstream/datumstream.c b/src/backend/utils/datumstream/datumstream.c index af5a1248fa3..72027a10a2c 100644 --- a/src/backend/utils/datumstream/datumstream.c +++ b/src/backend/utils/datumstream/datumstream.c @@ -818,8 +818,16 @@ destroy_datumstreamread(DatumStreamRead * ds) void -datumstreamwrite_open_file(DatumStreamWrite *ds, char *fn, int64 eof, int64 eofUncompressed, int64 modcount, - RelFileNodeBackend *relFileNode, int32 segmentFileNum, int version) +datumstreamwrite_open_file( + DatumStreamWrite *ds, + char *fn, + int64 eof, + int64 eofUncompressed, + int64 varblockcount, + int64 modcount, + RelFileNodeBackend *relFileNode, + int32 segmentFileNum, + int version) { ds->eof = eof; ds->eofUncompress = eofUncompressed; @@ -847,6 +855,7 @@ datumstreamwrite_open_file(DatumStreamWrite *ds, char *fn, int64 eof, int64 eofU version, eof, eofUncompressed, + varblockcount, modcount, relFileNode, segmentFileNum); diff --git a/src/include/access/aobloomfilter.h b/src/include/access/aobloomfilter.h index e9d4089aa1c..5f129233dc1 100644 --- a/src/include/access/aobloomfilter.h +++ b/src/include/access/aobloomfilter.h @@ -29,9 +29,10 @@ typedef AOBFPageData *AOBFPage; #define AOBLF_CALC_PAGE(x) (x / AOBLF_N_FILTERS) #define AOBLF_CALC_OFFSET(x) (x % AOBLF_N_FILTERS) -extern bloom_filter* ao_bloom_filter_deserealize(BloomFilterFixed *bf); -extern BloomFilterFixed* ao_bloom_filter_serealize(bloom_filter *bf, int64 off); +extern bloom_filter *ao_bloom_filter_deserealize(BloomFilterFixed *bf); +extern BloomFilterFixed *ao_bloom_filter_serealize(bloom_filter *bf, int64 off); -extern void SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int aoblknum); +extern void SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int64 aoblknum); +extern bloom_filter *FetchBloomFilterForVarblock(Relation aorel, int64 varblocknum); #endif /* AOBLOOMFILTER_H */ \ No newline at end of file diff --git a/src/include/cdb/cdbappendonlystoragewrite.h b/src/include/cdb/cdbappendonlystoragewrite.h index bc2e5ac9aa7..5c56047d6d2 100755 --- a/src/include/cdb/cdbappendonlystoragewrite.h +++ b/src/include/cdb/cdbappendonlystoragewrite.h @@ -199,6 +199,7 @@ extern void AppendOnlyStorageWrite_OpenFile(AppendOnlyStorageWrite *storageWrite int version, int64 logicalEof, int64 fileLen_uncompressed, + int64 varblockcount, int64 modcount, RelFileNodeBackend *relFileNode, int32 segmentFileNum); diff --git a/src/include/cdb/cdbbufferedappend.h b/src/include/cdb/cdbbufferedappend.h index a315a727fc5..d558190e368 100644 --- a/src/include/cdb/cdbbufferedappend.h +++ b/src/include/cdb/cdbbufferedappend.h @@ -47,6 +47,7 @@ typedef struct BufferedAppend */ int64 largeWritePosition; + int64 largeWriteVarBlock; int32 largeWriteLen; /* * The position within the current file for the next write @@ -113,7 +114,8 @@ extern void BufferedAppendSetFile( int32 segmentFileNum, char *filePathName, int64 eof, - int64 eof_uncompressed); + int64 eof_uncompressed, + int64 varblockcount); /* * Return the position of the current write buffer in bytes. diff --git a/src/include/utils/datumstream.h b/src/include/utils/datumstream.h index 85154cb22c6..2a2ebbd8aed 100644 --- a/src/include/utils/datumstream.h +++ b/src/include/utils/datumstream.h @@ -282,6 +282,7 @@ extern void datumstreamwrite_open_file( char *fn, int64 eof, int64 eofUncompressed, + int64 varblockcount, int64 modcount, RelFileNodeBackend *relFileNode, int32 segmentFileNum, From 39d6349dd5d9df7555d6b5bc72757ef40e0636ee Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Fri, 27 Dec 2024 20:10:00 +0000 Subject: [PATCH 5/6] fixes --- contrib/pageinspect/aofuncs.c | 56 +++++++----- contrib/pageinspect/pageinspect--1.8.sql | 1 + src/backend/access/appendonly/aobloomfilter.c | 8 +- src/backend/access/appendonly/appendonlyam.c | 88 ++++++++++++------- src/backend/cdb/cdbbufferedread.c | 3 +- src/backend/cdb/cdbvarblock.c | 2 +- src/backend/lib/bloomfilter.c | 4 +- src/include/access/aobloomfilter.h | 5 +- src/include/cdb/cdbappendonlyam.h | 1 + src/include/lib/bloomfilter.h | 2 +- 10 files changed, 109 insertions(+), 61 deletions(-) diff --git a/contrib/pageinspect/aofuncs.c b/contrib/pageinspect/aofuncs.c index ecf5a64316c..84c5111c527 100644 --- a/contrib/pageinspect/aofuncs.c +++ b/contrib/pageinspect/aofuncs.c @@ -26,7 +26,7 @@ typedef struct AOHeadersInfoCxt { TupleTableSlot *slot; } AOHeadersInfoCxt; -#define NUM_GET_AO_HEADERS_INFO 9 +#define NUM_GET_AO_HEADERS_INFO 10 #define NUM_GET_AOCS_HEADERS_INFO 8 Datum @@ -85,17 +85,19 @@ get_ao_headers_info(PG_FUNCTION_ARGS) -1 /* typmod */, 0 /* attdim */); TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)3, "buffer offset", INT4OID, -1 /* typmod */, 0 /* attdim */); - TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)4, "block kind", TEXTOID, + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)4, "total Varblock count", INT8OID, -1 /* typmod */, 0 /* attdim */); - TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)5, "header kind", TEXTOID, + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)5, "block kind", TEXTOID, -1 /* typmod */, 0 /* attdim */); - TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)6, "current item count", INT4OID, + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)6, "header kind", TEXTOID, -1 /* typmod */, 0 /* attdim */); - TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)7, "isCompressed", + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)7, "current item count", INT4OID, + -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)8, "isCompressed", BOOLOID, -1 /* typmod */, 0 /* attdim */); - TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)8, "isLarge", + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)9, "isLarge", BOOLOID, -1 /* typmod */, 0 /* attdim */); - TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)9, + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)10, "dataLen", INT4OID, -1 /* typmod */, 0 /* attdim */); @@ -148,43 +150,45 @@ get_ao_headers_info(PG_FUNCTION_ARGS) values[0] = Int64GetDatum(scan->executorReadBlock.blockFirstRowNum); values[1] = Int64GetDatum(scan->storageRead.bufferedRead.largeReadPosition); values[2] = Int32GetDatum(scan->storageRead.bufferedRead.bufferOffset); + values[3] = Int64GetDatum(scan->executorReadBlock.totalVarblockCount); + switch (scan->executorReadBlock.executorBlockKind) { case AoExecutorBlockKind_VarBlock: - values[3] = CStringGetTextDatum("varblock"); + values[4] = CStringGetTextDatum("varblock"); break; case AoExecutorBlockKind_SingleRow: - values[3] = CStringGetTextDatum("single row"); + values[4] = CStringGetTextDatum("single row"); break; default: - values[3] = CStringGetTextDatum("unknown"); + values[4] = CStringGetTextDatum("unknown"); break; } switch (scan->storageRead.current.headerKind) { case AoHeaderKind_SmallContent: - values[4] = CStringGetTextDatum("small content"); + values[5] = CStringGetTextDatum("small content"); break; case AoHeaderKind_LargeContent: - values[4] = CStringGetTextDatum("large content"); + values[5] = CStringGetTextDatum("large content"); break; case AoHeaderKind_NonBulkDenseContent: - values[4] = CStringGetTextDatum("non bulk dense content"); + values[5] = CStringGetTextDatum("non bulk dense content"); break; case AoHeaderKind_BulkDenseContent: - values[4] = CStringGetTextDatum("bulk dense content"); + values[5] = CStringGetTextDatum("bulk dense content"); break; default: - values[4] = CStringGetTextDatum("unknown"); + values[5] = CStringGetTextDatum("unknown"); break; } - values[5] = Int32GetDatum(scan->executorReadBlock.currentItemCount); - values[6] = BoolGetDatum(scan->executorReadBlock.isCompressed); - values[7] = BoolGetDatum(scan->executorReadBlock.isLarge); - values[8] = Int32GetDatum(scan->executorReadBlock.dataLen); + values[6] = Int32GetDatum(scan->executorReadBlock.currentItemCount); + values[7] = BoolGetDatum(scan->executorReadBlock.isCompressed); + values[8] = BoolGetDatum(scan->executorReadBlock.isLarge); + values[9] = Int32GetDatum(scan->executorReadBlock.dataLen); AppendOnlyExecutorReadBlock_GetContents( &scan->executorReadBlock); @@ -209,6 +213,8 @@ get_ao_headers_info(PG_FUNCTION_ARGS) break; } + elog(DEBUG3, "large read pos after block content %d, offset %d", scan->storageRead.bufferedRead.largeReadPosition, scan->storageRead.bufferedRead.bufferOffset); + relation_close(r, AccessShareLock); SRF_RETURN_NEXT(funcctx, result); } @@ -370,7 +376,6 @@ get_aocs_headers_info(PG_FUNCTION_ARGS) values[1] = Int64GetDatum(scan->ds[i]->ao_read.bufferedRead.largeReadPosition); values[2] = Int32GetDatum(scan->ds[i]->ao_read.bufferedRead.bufferOffset); - switch (scan->ds[i]->ao_read.current.headerKind) { case AoHeaderKind_SmallContent: @@ -453,10 +458,10 @@ check_bloom_filter(PG_FUNCTION_ARGS) r = relation_open(oid, AccessShareLock); - if (!RelationIsAoCols(r)) + if (!RelationIsAoRows(r)) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - (errmsg("must be an AOCS relation to use raw headerfunctions")))); + (errmsg("must be an Append-Optimized relation to use raw headerfunctions")))); att = TupleDescAttr(RelationGetDescr(r), 0); @@ -465,7 +470,12 @@ check_bloom_filter(PG_FUNCTION_ARGS) blf = FetchBloomFilterForVarblock(r, varblocknum); - res = !bloom_lacks_element(blf, d, att->attlen); + if (blf != NULL) + res = !bloom_lacks_element(blf, &d, att->attlen); + else + res = true; + + relation_close(r, AccessShareLock); PG_RETURN_BOOL(res); } \ No newline at end of file diff --git a/contrib/pageinspect/pageinspect--1.8.sql b/contrib/pageinspect/pageinspect--1.8.sql index a00d0688760..f08b32db22d 100644 --- a/contrib/pageinspect/pageinspect--1.8.sql +++ b/contrib/pageinspect/pageinspect--1.8.sql @@ -178,6 +178,7 @@ RETURNS TABLE ( "first row number" BIGINT, "large read position" BIGINT, "buffer offset" INTEGER, + "total varblock count" BIGINT, "block kind" TEXT, "header kind" TEXT, "current item count" INTEGER, diff --git a/src/backend/access/appendonly/aobloomfilter.c b/src/backend/access/appendonly/aobloomfilter.c index 5135818c7fa..14da75f7bb2 100644 --- a/src/backend/access/appendonly/aobloomfilter.c +++ b/src/backend/access/appendonly/aobloomfilter.c @@ -21,6 +21,7 @@ SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int6 char * pointer; fixed_filter = ao_bloom_filter_serealize(filter, offset); + fixed_filter->magic = BLOOM_F_MAGIC; blkno = AOBLF_CALC_PAGE(varblocknum); page_offset = AOBLF_CALC_OFFSET(varblocknum); @@ -66,6 +67,9 @@ FetchBloomFilterForVarblock(Relation aorel, int64 varblocknum) LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); + if (fixed_filter->magic != BLOOM_F_MAGIC) + return NULL; + return ao_bloom_filter_deserealize(fixed_filter); } @@ -73,10 +77,10 @@ bloom_filter * ao_bloom_filter_deserealize(BloomFilterFixed *bf) { bloom_filter * blf; - blf = bloom_create_nbytes(AOBLF_SIZE, bf->seed); + blf = bloom_create_nbytes(AOBLF_SIZE, bf->k_hash_funcs, bf->seed); - blf->k_hash_funcs = bf->k_hash_funcs; blf->m = AOBLF_SIZE; + blf->seed = bf->seed; memcpy(blf->bitset, bf->bitset, sizeof(char) * AOBLF_SIZE); return blf; diff --git a/src/backend/access/appendonly/appendonlyam.c b/src/backend/access/appendonly/appendonlyam.c index ad1735ef508..fee3d1184d3 100755 --- a/src/backend/access/appendonly/appendonlyam.c +++ b/src/backend/access/appendonly/appendonlyam.c @@ -596,11 +596,12 @@ AppendOnlyExecutorReadBlock_GetContents(AppendOnlyExecutorReadBlock *executorRea elogif(Debug_appendonly_print_scan, LOG, "Append-only scan read small non-compressed block for table '%s' " - "(length = %d, segment file '%s', block offset in file = " INT64_FORMAT ")", + "(length = %d, segment file '%s', block offset in file = " INT64_FORMAT ", total offset = " INT64_FORMAT ")", AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead), executorReadBlock->dataLen, AppendOnlyStorageRead_SegmentFileName(executorReadBlock->storageRead), - executorReadBlock->headerOffsetInFile); + executorReadBlock->headerOffsetInFile, + executorReadBlock->storageRead->bufferedRead.largeReadPosition + executorReadBlock->storageRead->bufferedRead.bufferOffset); } else { @@ -657,11 +658,12 @@ AppendOnlyExecutorReadBlock_GetContents(AppendOnlyExecutorReadBlock *executorRea elogif(Debug_appendonly_print_scan, LOG, "Append-only scan read large row for table '%s' " "(length = %d, segment file '%s', " - "block offset in file = " INT64_FORMAT ")", + "block offset in file = " INT64_FORMAT ", total offset = " INT64_FORMAT ")", AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead), executorReadBlock->dataLen, AppendOnlyStorageRead_SegmentFileName(executorReadBlock->storageRead), - executorReadBlock->headerOffsetInFile); + executorReadBlock->headerOffsetInFile, + executorReadBlock->storageRead->bufferedRead.largeReadPosition + executorReadBlock->storageRead->bufferedRead.bufferOffset); } } else @@ -687,12 +689,13 @@ AppendOnlyExecutorReadBlock_GetContents(AppendOnlyExecutorReadBlock *executorRea elogif(Debug_appendonly_print_scan, LOG, "Append-only scan read decompressed block for table '%s' " "(compressed length %d, length = %d, segment file '%s', " - "block offset in file = " INT64_FORMAT ")", + "block offset in file = " INT64_FORMAT ", total offset = " INT64_FORMAT ")", AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead), compressedLen, executorReadBlock->dataLen, AppendOnlyStorageRead_SegmentFileName(executorReadBlock->storageRead), - executorReadBlock->headerOffsetInFile); + executorReadBlock->headerOffsetInFile, + executorReadBlock->storageRead->bufferedRead.largeReadPosition + executorReadBlock->storageRead->bufferedRead.bufferOffset); } /* @@ -737,10 +740,11 @@ AppendOnlyExecutorReadBlock_GetContents(AppendOnlyExecutorReadBlock *executorRea } elogif(Debug_appendonly_print_scan, LOG, - "append-only scan read VarBlock for table '%s' with %d items (block offset in file = " INT64_FORMAT ")", + "append-only scan read VarBlock for table '%s' with %d items (block offset in file = " INT64_FORMAT ", total offset = " INT64_FORMAT ")", AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead), executorReadBlock->readerItemCount, - executorReadBlock->headerOffsetInFile); + executorReadBlock->headerOffsetInFile, + executorReadBlock->storageRead->bufferedRead.largeReadPosition + executorReadBlock->storageRead->bufferedRead.bufferOffset); break; case AoExecutorBlockKind_SingleRow: @@ -756,10 +760,11 @@ AppendOnlyExecutorReadBlock_GetContents(AppendOnlyExecutorReadBlock *executorRea executorReadBlock->singleRow = executorReadBlock->dataBuffer; executorReadBlock->singleRowLen = executorReadBlock->dataLen; - elogif(Debug_appendonly_print_scan, LOG, "Append-only scan read single row for table '%s' with length %d (block offset in file = " INT64_FORMAT ")", + elogif(Debug_appendonly_print_scan, LOG, "Append-only scan read single row for table '%s' with length %d (block offset in file = " INT64_FORMAT ", total offset = " INT64_FORMAT ")", AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead), executorReadBlock->singleRowLen, - executorReadBlock->headerOffsetInFile); + executorReadBlock->headerOffsetInFile, + executorReadBlock->storageRead->bufferedRead.largeReadPosition + executorReadBlock->storageRead->bufferedRead.bufferOffset); break; @@ -825,6 +830,7 @@ AppendOnlyExecutionReadBlock_SetPositionInfo(AppendOnlyExecutorReadBlock *execut static void AppendOnlyExecutionReadBlock_FinishedScanBlock(AppendOnlyExecutorReadBlock *executorReadBlock) { + ++executorReadBlock->totalVarblockCount; executorReadBlock->blockFirstRowNum += executorReadBlock->rowCount; } @@ -874,6 +880,7 @@ static void AppendOnlyExecutorReadBlock_ResetCounts(AppendOnlyExecutorReadBlock *executorReadBlock) { executorReadBlock->totalRowsScannned = 0; + executorReadBlock->totalVarblockCount = 0; } /* @@ -1049,12 +1056,13 @@ AppendOnlyExecutorReadBlock_ProcessTuple(AppendOnlyExecutorReadBlock *executorRe elogif(Debug_appendonly_print_scan_tuple && valid, LOG, "Append-only scan tuple for table '%s' " - "(AOTupleId %s, tuple length %d, memtuple length %d, block offset in file " INT64_FORMAT ")", + "(AOTupleId %s, tuple length %d, memtuple length %d, block offset in file " INT64_FORMAT ", total offset = " INT64_FORMAT ")", AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead), AOTupleIdToString(aoTupleId), tupleLen, memtuple_get_size(tuple), - executorReadBlock->headerOffsetInFile); + executorReadBlock->headerOffsetInFile, + executorReadBlock->storageRead->bufferedRead.largeReadPosition + executorReadBlock->storageRead->bufferedRead.bufferOffset); return valid; } @@ -1260,8 +1268,11 @@ AppendOnlyExecutorReadBlock_FetchTuple(AppendOnlyExecutorReadBlock *executorRead * You can think of this scan routine as get next "executor" AO block. */ static bool -getNextBlock(AppendOnlyScanDesc scan) +getNextBlock(AppendOnlyScanDesc scan, int nkeys, ScanKey key) { + int64 offset; + bloom_filter *blf; + if (scan->aos_need_new_segfile) { /* @@ -1271,6 +1282,16 @@ getNextBlock(AppendOnlyScanDesc scan) return false; } + offset = scan->executorReadBlock.headerOffsetInFile; + + while (1) + { + // FetchBloomFilterForVarblock(scan->aos_rd, ); + break; + } + + + if (!AppendOnlyExecutorReadBlock_GetBlockInfo( &scan->storageRead, &scan->executorReadBlock)) @@ -1337,7 +1358,7 @@ appendonlygettup(AppendOnlyScanDesc scan, * get a block to process, or finished reading all the data (all * 'segment' files) for this relation. */ - while (!getNextBlock(scan)) + while (!getNextBlock(scan, nkeys, key)) { /* have we read all this relation's data. done! */ if (scan->aos_done_all_segfiles) @@ -1515,24 +1536,8 @@ finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc) dataLen = VarBlockMakerFinish(&aoInsertDesc->varBlockMaker); if (RelationIsAoRows(aoInsertDesc->aoi_rel)) - { varblkcnt = aoInsertDesc->storageWrite.bufferedAppend.largeWriteVarBlock + aoInsertDesc->varblockCount; - aoblknum = 1 + AOBLF_CALC_PAGE(varblkcnt); - - RelationOpenSmgr(aoInsertDesc->aoi_rel); - - smgrcreate(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM, true); - - if (aoblknum > smgrnblocks(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM)) - ao_fsm_extend(aoInsertDesc->aoi_rel, aoblknum); - - - SaveBloomFilterForBlock(aoInsertDesc->aoi_rel, - aoInsertDesc->varBlockMaker.blf, - aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition, varblkcnt); - } - aoInsertDesc->varblockCount++; if (!aoInsertDesc->shouldCompress) @@ -1608,6 +1613,29 @@ finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc) itemCount); } + + if (RelationIsAoRows(aoInsertDesc->aoi_rel)) + { + aoblknum = 1 + AOBLF_CALC_PAGE(varblkcnt); + + RelationOpenSmgr(aoInsertDesc->aoi_rel); + + smgrcreate(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM, true); + + if (aoblknum > smgrnblocks(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM)) + ao_fsm_extend(aoInsertDesc->aoi_rel, aoblknum); + + elog(LOG, + "writing hint bloom for varblock " INT64_FORMAT " in offset " INT64_FORMAT" - "INT64_FORMAT"", + varblkcnt, aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition, + aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition + aoInsertDesc->storageWrite.bufferedAppend.largeWriteLen); + + SaveBloomFilterForBlock(aoInsertDesc->aoi_rel, + aoInsertDesc->varBlockMaker.blf, + aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition + aoInsertDesc->storageWrite.bufferedAppend.largeWriteLen, + varblkcnt); + } + /* Insert an entry to the block directory */ AppendOnlyBlockDirectory_InsertEntry( &aoInsertDesc->blockDirectory, diff --git a/src/backend/cdb/cdbbufferedread.c b/src/backend/cdb/cdbbufferedread.c index 3deb9e6d56e..a1550657a2f 100644 --- a/src/backend/cdb/cdbbufferedread.c +++ b/src/backend/cdb/cdbbufferedread.c @@ -223,11 +223,12 @@ BufferedReadIo( bufferedRead->largeReadLen))); elogif(Debug_appendonly_print_read_block, LOG, - "Append-Only storage read: table \"%s\", segment file \"%s\", read position " INT64_FORMAT " (small offset %d), " + "Append-Only storage read: table \"%s\", segment file \"%s\", read position " INT64_FORMAT ", buffer offset (%d) (small offset %d), " "actual read length %d (equals large read length %d is %s)", bufferedRead->relationName, bufferedRead->filePathName, bufferedRead->largeReadPosition, + bufferedRead->bufferOffset, offset, actualLen, bufferedRead->largeReadLen, diff --git a/src/backend/cdb/cdbvarblock.c b/src/backend/cdb/cdbvarblock.c index a04f50cf6c4..816cbfee739 100644 --- a/src/backend/cdb/cdbvarblock.c +++ b/src/backend/cdb/cdbvarblock.c @@ -86,7 +86,7 @@ VarBlockMakerInit( VarBlockSet_version(varBlockMaker->header, InitialVersion); VarBlockSet_offsetsAreSmall(varBlockMaker->header, true); - varBlockMaker->blf = bloom_create_nbytes(AOBLF_SIZE, random() % PG_INT32_MAX); + varBlockMaker->blf = bloom_create_nbytes(AOBLF_SIZE, 4 /* default */, random() % PG_INT32_MAX); varBlockMaker->largeWritePosition = largeWritePosition; } diff --git a/src/backend/lib/bloomfilter.c b/src/backend/lib/bloomfilter.c index 9dfcc0e6ec3..7b4426917b3 100644 --- a/src/backend/lib/bloomfilter.c +++ b/src/backend/lib/bloomfilter.c @@ -108,14 +108,14 @@ bloom_create(int64 total_elems, int bloom_work_mem, uint64 seed) } bloom_filter * -bloom_create_nbytes(int64 nbytes, uint64 seed) +bloom_create_nbytes(int64 nbytes, int k_hash_funcs, uint64 seed) { bloom_filter *filter; /* Allocate bloom filter with unset bitset */ filter = palloc0(offsetof(bloom_filter, bitset) + sizeof(unsigned char) * nbytes); - filter->k_hash_funcs = MAX_HASH_FUNCS; + filter->k_hash_funcs = k_hash_funcs; filter->seed = seed; filter->m = nbytes; diff --git a/src/include/access/aobloomfilter.h b/src/include/access/aobloomfilter.h index 5f129233dc1..7b63d2dca3d 100644 --- a/src/include/access/aobloomfilter.h +++ b/src/include/access/aobloomfilter.h @@ -8,9 +8,12 @@ #define AOBLF_SIZE 128 +#define BLOOM_F_MAGIC 0xD0D0CACA + typedef struct BloomFilterFixed { + uint32 magic; /* K hash functions are used, seeded by caller's seed */ - int64 offset; + int64 offset; int k_hash_funcs; uint64 seed; /* m is bitset size, in bits. Must be a power of two <= 2^32. */ diff --git a/src/include/cdb/cdbappendonlyam.h b/src/include/cdb/cdbappendonlyam.h index 33e089477ad..43da9cbb17a 100644 --- a/src/include/cdb/cdbappendonlyam.h +++ b/src/include/cdb/cdbappendonlyam.h @@ -141,6 +141,7 @@ typedef struct AppendOnlyExecutorReadBlock int segmentFileNum; int64 totalRowsScannned; + int64 totalVarblockCount; int64 blockFirstRowNum; int64 headerOffsetInFile; diff --git a/src/include/lib/bloomfilter.h b/src/include/lib/bloomfilter.h index 54875a5b6a2..95250d670f9 100644 --- a/src/include/lib/bloomfilter.h +++ b/src/include/lib/bloomfilter.h @@ -29,7 +29,7 @@ typedef struct bloom_filter bloom_filter; extern bloom_filter *bloom_create(int64 total_elems, int bloom_work_mem, uint64 seed); -extern bloom_filter *bloom_create_nbytes(int64 nbytes, uint64 seed); +extern bloom_filter *bloom_create_nbytes(int64 nbytes, int k_hash_funcs, uint64 seed); extern void bloom_free(bloom_filter *filter); extern void bloom_add_element(bloom_filter *filter, unsigned char *elem, From bbd73ce50cc4abf70ecfc4d6f316a8a06be81c4f Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Sat, 28 Dec 2024 19:46:14 +0000 Subject: [PATCH 6/6] Fix --- contrib/pageinspect/aofuncs.c | 3 ++- src/backend/access/appendonly/aobloomfilter.c | 4 +++- src/backend/access/appendonly/appendonlyam.c | 19 +++++++++++++++++-- src/include/access/aobloomfilter.h | 2 +- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/contrib/pageinspect/aofuncs.c b/contrib/pageinspect/aofuncs.c index 84c5111c527..1c4494a5d98 100644 --- a/contrib/pageinspect/aofuncs.c +++ b/contrib/pageinspect/aofuncs.c @@ -445,6 +445,7 @@ check_bloom_filter(PG_FUNCTION_ARGS) bool res; bloom_filter * blf; Relation r; + int64 offset; Form_pg_attribute att; Oid oid = PG_GETARG_OID(0); int64 varblocknum = PG_GETARG_INT64(1); @@ -468,7 +469,7 @@ check_bloom_filter(PG_FUNCTION_ARGS) if (!att->attbyval) elog(ERROR, "FAILED TO BLOOM BY REF TUP"); - blf = FetchBloomFilterForVarblock(r, varblocknum); + blf = FetchBloomFilterForVarblock(r, varblocknum, &offset); if (blf != NULL) res = !bloom_lacks_element(blf, &d, att->attlen); diff --git a/src/backend/access/appendonly/aobloomfilter.c b/src/backend/access/appendonly/aobloomfilter.c index 14da75f7bb2..532d8b778ff 100644 --- a/src/backend/access/appendonly/aobloomfilter.c +++ b/src/backend/access/appendonly/aobloomfilter.c @@ -41,7 +41,7 @@ SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int6 } bloom_filter * -FetchBloomFilterForVarblock(Relation aorel, int64 varblocknum) +FetchBloomFilterForVarblock(Relation aorel, int64 varblocknum, int64 *next_offset) { Buffer buffer; Page page; @@ -70,6 +70,8 @@ FetchBloomFilterForVarblock(Relation aorel, int64 varblocknum) if (fixed_filter->magic != BLOOM_F_MAGIC) return NULL; + *next_offset = fixed_filter->offset; + return ao_bloom_filter_deserealize(fixed_filter); } diff --git a/src/backend/access/appendonly/appendonlyam.c b/src/backend/access/appendonly/appendonlyam.c index fee3d1184d3..f5562d21c4b 100755 --- a/src/backend/access/appendonly/appendonlyam.c +++ b/src/backend/access/appendonly/appendonlyam.c @@ -1271,6 +1271,7 @@ static bool getNextBlock(AppendOnlyScanDesc scan, int nkeys, ScanKey key) { int64 offset; + int64 tmp_offset; bloom_filter *blf; if (scan->aos_need_new_segfile) @@ -1286,11 +1287,25 @@ getNextBlock(AppendOnlyScanDesc scan, int nkeys, ScanKey key) while (1) { - // FetchBloomFilterForVarblock(scan->aos_rd, ); + for (int i = 0; i < nkeys; ++i) + { + if (key[i].sk_attno == 1 && key[i].sk_strategy == BTEqualStrategyNumber) + { + blf = FetchBloomFilterForVarblock(scan->aos_rd, scan->executorReadBlock.totalVarblockCount, &tmp_offset); + + if (blf != NULL && bloom_lacks_element(blf, key[i].sk_argument, sizeof(int))) + { + elog(LOG, "skip bytes " INT64_FORMAT " - " INT64_FORMAT "", offset, tmp_offset); + offset = tmp_offset; + } + + break; + } + } break; } - + scan->executorReadBlock.headerOffsetInFile = offset; if (!AppendOnlyExecutorReadBlock_GetBlockInfo( &scan->storageRead, diff --git a/src/include/access/aobloomfilter.h b/src/include/access/aobloomfilter.h index 7b63d2dca3d..c413824b631 100644 --- a/src/include/access/aobloomfilter.h +++ b/src/include/access/aobloomfilter.h @@ -36,6 +36,6 @@ extern bloom_filter *ao_bloom_filter_deserealize(BloomFilterFixed *bf); extern BloomFilterFixed *ao_bloom_filter_serealize(bloom_filter *bf, int64 off); extern void SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int64 aoblknum); -extern bloom_filter *FetchBloomFilterForVarblock(Relation aorel, int64 varblocknum); +extern bloom_filter *FetchBloomFilterForVarblock(Relation aorel, int64 varblocknum, int64 *next_offset); #endif /* AOBLOOMFILTER_H */ \ No newline at end of file