diff --git a/contrib/pageinspect/aofuncs.c b/contrib/pageinspect/aofuncs.c index b27fa6a58b..1c4494a5d9 100644 --- a/contrib/pageinspect/aofuncs.c +++ b/contrib/pageinspect/aofuncs.c @@ -12,18 +12,21 @@ #include "catalog/pg_namespace.h" #include "utils/syscache.h" +#include "access/aobloomfilter.h" + #include "cdb/cdbaocsam.h" #include "cdb/cdbappendonlyam.h" PG_FUNCTION_INFO_V1(get_ao_headers_info); PG_FUNCTION_INFO_V1(get_aocs_headers_info); +PG_FUNCTION_INFO_V1(check_bloom_filter); typedef struct AOHeadersInfoCxt { AppendOnlyScanDesc scan; TupleTableSlot *slot; } AOHeadersInfoCxt; -#define NUM_GET_AO_HEADERS_INFO 9 +#define NUM_GET_AO_HEADERS_INFO 10 #define NUM_GET_AOCS_HEADERS_INFO 8 Datum @@ -82,17 +85,19 @@ get_ao_headers_info(PG_FUNCTION_ARGS) -1 /* typmod */, 0 /* attdim */); TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)3, "buffer offset", INT4OID, -1 /* typmod */, 0 /* attdim */); - TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)4, "block kind", TEXTOID, + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)4, "total Varblock count", INT8OID, + -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)5, "block kind", TEXTOID, -1 /* typmod */, 0 /* attdim */); - TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)5, "header kind", TEXTOID, + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)6, "header kind", TEXTOID, -1 /* typmod */, 0 /* attdim */); - TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)6, "current item count", INT4OID, + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)7, "current item count", INT4OID, -1 /* typmod */, 0 /* attdim */); - TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)7, "isCompressed", + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)8, "isCompressed", BOOLOID, -1 /* typmod */, 0 /* attdim */); - TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)8, "isLarge", + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)9, "isLarge", BOOLOID, -1 /* typmod */, 0 /* attdim */); - TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)9, + TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)10, "dataLen", INT4OID, -1 /* typmod */, 0 /* attdim */); @@ -145,43 +150,45 @@ get_ao_headers_info(PG_FUNCTION_ARGS) values[0] = Int64GetDatum(scan->executorReadBlock.blockFirstRowNum); values[1] = Int64GetDatum(scan->storageRead.bufferedRead.largeReadPosition); values[2] = Int32GetDatum(scan->storageRead.bufferedRead.bufferOffset); + values[3] = Int64GetDatum(scan->executorReadBlock.totalVarblockCount); + switch (scan->executorReadBlock.executorBlockKind) { case AoExecutorBlockKind_VarBlock: - values[3] = CStringGetTextDatum("varblock"); + values[4] = CStringGetTextDatum("varblock"); break; case AoExecutorBlockKind_SingleRow: - values[3] = CStringGetTextDatum("single row"); + values[4] = CStringGetTextDatum("single row"); break; default: - values[3] = CStringGetTextDatum("unknown"); + values[4] = CStringGetTextDatum("unknown"); break; } switch (scan->storageRead.current.headerKind) { case AoHeaderKind_SmallContent: - values[4] = CStringGetTextDatum("small content"); + values[5] = CStringGetTextDatum("small content"); break; case AoHeaderKind_LargeContent: - values[4] = CStringGetTextDatum("large content"); + values[5] = CStringGetTextDatum("large content"); break; case AoHeaderKind_NonBulkDenseContent: - values[4] = CStringGetTextDatum("non bulk dense content"); + values[5] = CStringGetTextDatum("non bulk dense content"); break; case AoHeaderKind_BulkDenseContent: - values[4] = CStringGetTextDatum("bulk dense content"); + values[5] = CStringGetTextDatum("bulk dense content"); break; default: - values[4] = CStringGetTextDatum("unknown"); + values[5] = CStringGetTextDatum("unknown"); break; } - values[5] = Int32GetDatum(scan->executorReadBlock.currentItemCount); - values[6] = BoolGetDatum(scan->executorReadBlock.isCompressed); - values[7] = BoolGetDatum(scan->executorReadBlock.isLarge); - values[8] = Int32GetDatum(scan->executorReadBlock.dataLen); + values[6] = Int32GetDatum(scan->executorReadBlock.currentItemCount); + values[7] = BoolGetDatum(scan->executorReadBlock.isCompressed); + values[8] = BoolGetDatum(scan->executorReadBlock.isLarge); + values[9] = Int32GetDatum(scan->executorReadBlock.dataLen); AppendOnlyExecutorReadBlock_GetContents( &scan->executorReadBlock); @@ -206,6 +213,8 @@ get_ao_headers_info(PG_FUNCTION_ARGS) break; } + elog(DEBUG3, "large read pos after block content %d, offset %d", scan->storageRead.bufferedRead.largeReadPosition, scan->storageRead.bufferedRead.bufferOffset); + relation_close(r, AccessShareLock); SRF_RETURN_NEXT(funcctx, result); } @@ -367,7 +376,6 @@ get_aocs_headers_info(PG_FUNCTION_ARGS) values[1] = Int64GetDatum(scan->ds[i]->ao_read.bufferedRead.largeReadPosition); values[2] = Int32GetDatum(scan->ds[i]->ao_read.bufferedRead.bufferOffset); - switch (scan->ds[i]->ao_read.current.headerKind) { case AoHeaderKind_SmallContent: @@ -427,4 +435,48 @@ get_aocs_headers_info(PG_FUNCTION_ARGS) relation_close(r, AccessShareLock); SRF_RETURN_DONE(funcctx); +} + + +/* aobloomfilter.c */ +Datum +check_bloom_filter(PG_FUNCTION_ARGS) +{ + bool res; + bloom_filter * blf; + Relation r; + int64 offset; + Form_pg_attribute att; + Oid oid = PG_GETARG_OID(0); + int64 varblocknum = PG_GETARG_INT64(1); + Datum d = PG_GETARG_DATUM(2); + + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser to use raw page functions")))); + + r = relation_open(oid, AccessShareLock); + + if (!RelationIsAoRows(r)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + (errmsg("must be an Append-Optimized relation to use raw headerfunctions")))); + + + att = TupleDescAttr(RelationGetDescr(r), 0); + if (!att->attbyval) + elog(ERROR, "FAILED TO BLOOM BY REF TUP"); + + blf = FetchBloomFilterForVarblock(r, varblocknum, &offset); + + if (blf != NULL) + res = !bloom_lacks_element(blf, &d, att->attlen); + else + res = true; + + relation_close(r, AccessShareLock); + + PG_RETURN_BOOL(res); } \ No newline at end of file diff --git a/contrib/pageinspect/pageinspect--1.8.sql b/contrib/pageinspect/pageinspect--1.8.sql index b01a3d9898..f08b32db22 100644 --- a/contrib/pageinspect/pageinspect--1.8.sql +++ b/contrib/pageinspect/pageinspect--1.8.sql @@ -178,6 +178,7 @@ RETURNS TABLE ( "first row number" BIGINT, "large read position" BIGINT, "buffer offset" INTEGER, + "total varblock count" BIGINT, "block kind" TEXT, "header kind" TEXT, "current item count" INTEGER, @@ -205,3 +206,14 @@ RETURNS TABLE ( AS 'MODULE_PATHNAME', 'get_aocs_headers_info' LANGUAGE C STRICT EXECUTE ON ALL SEGMENTS; + + +CREATE FUNCTION check_bloom_filter( + reloid OID, + varblocknum BIGINT, + d INT +) +RETURNS BOOLEAN +AS 'MODULE_PATHNAME', 'check_bloom_filter' + LANGUAGE C STRICT +EXECUTE ON ALL SEGMENTS; diff --git a/src/backend/access/aocs/aocsam.c b/src/backend/access/aocs/aocsam.c index 0f02f8e714..0ef381c17a 100644 --- a/src/backend/access/aocs/aocsam.c +++ b/src/backend/access/aocs/aocsam.c @@ -875,9 +875,15 @@ OpenAOCSDatumStreams(AOCSInsertDesc desc) FormatAOSegmentFileName(basepath, seginfo->segno, i, &fileSegNo, fn); Assert(strlen(fn) + 1 <= MAXPGPATH); - datumstreamwrite_open_file(desc->ds[i], fn, e->eof, e->eof_uncompressed, seginfo->modcount, - &rnode, - fileSegNo, seginfo->formatversion); + datumstreamwrite_open_file(desc->ds[i], + fn, + e->eof, + e->eof_uncompressed, + 0, + seginfo->modcount, + &rnode, + fileSegNo, + seginfo->formatversion); } pfree(basepath); @@ -2041,7 +2047,10 @@ aocs_addcol_newsegfile(AOCSAddColumnDesc desc, &fileSegNo, fn); Assert(strlen(fn) + 1 <= MAXPGPATH); datumstreamwrite_open_file(desc->dsw[i], fn, - 0 /* eof */ , 0 /* eof_uncompressed */ , 0, /*modcount*/ + 0 /* eof */ , + 0 /* eof_uncompressed */ , + 0 /* varblockcount */, + 0, /*modcount*/ &relfilenode, fileSegNo, version); desc->dsw[i]->blockFirstRowNum = 1; diff --git a/src/backend/access/appendonly/Makefile b/src/backend/access/appendonly/Makefile index ba30f34fd9..94d74c16f4 100755 --- a/src/backend/access/appendonly/Makefile +++ b/src/backend/access/appendonly/Makefile @@ -10,7 +10,7 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) -OBJS = appendonlyam.o aosegfiles.o aomd.o appendonlywriter.o appendonlytid.o \ +OBJS = aobloomfilter.o appendonlyam.o aosegfiles.o aomd.o appendonlywriter.o appendonlytid.o \ appendonlyblockdirectory.o appendonly_visimap.o \ appendonly_visimap_entry.o appendonly_visimap_store.o \ appendonly_compaction.o appendonly_visimap_udf.o \ diff --git a/src/backend/access/appendonly/aobloomfilter.c b/src/backend/access/appendonly/aobloomfilter.c new file mode 100644 index 0000000000..532d8b778f --- /dev/null +++ b/src/backend/access/appendonly/aobloomfilter.c @@ -0,0 +1,105 @@ + +#include "postgres.h" + +#include "utils/rel.h" +#include "storage/buf_internals.h" +#include "storage/bufpage.h" +#include "lib/bloomfilter.h" + +#include "access/heapam_xlog.h" + +#include "access/aobloomfilter.h" + +void +SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int64 varblocknum) +{ + Buffer buffer; + Page page; + BlockNumber blkno; + OffsetNumber page_offset; + BloomFilterFixed *fixed_filter; + char * pointer; + + fixed_filter = ao_bloom_filter_serealize(filter, offset); + fixed_filter->magic = BLOOM_F_MAGIC; + + blkno = AOBLF_CALC_PAGE(varblocknum); + page_offset = AOBLF_CALC_OFFSET(varblocknum); + + buffer = ReadBufferExtended(aorel, FSM_FORKNUM, blkno, RBM_NORMAL, NULL); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + page = BufferGetPage(buffer); + + pointer = PageGetContents(page) + page_offset * sizeof(BloomFilterFixed); + + memcpy(pointer, fixed_filter, sizeof(BloomFilterFixed)); + + MarkBufferDirty(buffer); + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); +} + +bloom_filter * +FetchBloomFilterForVarblock(Relation aorel, int64 varblocknum, int64 *next_offset) +{ + Buffer buffer; + Page page; + BlockNumber blkno; + OffsetNumber page_offset; + BloomFilterFixed *fixed_filter; + char * pointer; + + blkno = AOBLF_CALC_PAGE(varblocknum); + page_offset = AOBLF_CALC_OFFSET(varblocknum); + + fixed_filter = palloc0(sizeof(BloomFilterFixed)); + + buffer = ReadBufferExtended(aorel, FSM_FORKNUM, blkno, RBM_NORMAL, NULL); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + page = BufferGetPage(buffer); + + + pointer = PageGetContents(page) + page_offset * sizeof(BloomFilterFixed); + + memcpy(fixed_filter, pointer, sizeof(BloomFilterFixed)); + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + + if (fixed_filter->magic != BLOOM_F_MAGIC) + return NULL; + + *next_offset = fixed_filter->offset; + + return ao_bloom_filter_deserealize(fixed_filter); +} + +bloom_filter * +ao_bloom_filter_deserealize(BloomFilterFixed *bf) +{ + bloom_filter * blf; + blf = bloom_create_nbytes(AOBLF_SIZE, bf->k_hash_funcs, bf->seed); + + blf->m = AOBLF_SIZE; + blf->seed = bf->seed; + memcpy(blf->bitset, bf->bitset, sizeof(char) * AOBLF_SIZE); + + return blf; +} + +BloomFilterFixed * +ao_bloom_filter_serealize(bloom_filter *bf, int64 off) +{ + BloomFilterFixed *blff; + + blff = palloc0(sizeof(BloomFilterFixed)); + + blff->offset = off; + blff->k_hash_funcs = bf->k_hash_funcs; + blff->seed = bf->seed; + + memcpy(blff->bitset, bf->bitset, sizeof(char) * AOBLF_SIZE); + + return blff; +} \ No newline at end of file diff --git a/src/backend/access/appendonly/appendonlyam.c b/src/backend/access/appendonly/appendonlyam.c index d926b22f7f..f5562d21c4 100755 --- a/src/backend/access/appendonly/appendonlyam.c +++ b/src/backend/access/appendonly/appendonlyam.c @@ -34,6 +34,7 @@ #include "access/appendonlytid.h" #include "access/appendonlywriter.h" #include "access/aomd.h" +#include "access/aobloomfilter.h" #include "access/transam.h" #include "access/tupdesc.h" #include "access/tuptoaster.h" @@ -510,6 +511,7 @@ SetCurrentFileSegForWrite(AppendOnlyInsertDesc aoInsertDesc) aoInsertDesc->fsInfo->formatversion, eof, eof_uncompressed, + fsinfo->varblockcount, aoInsertDesc->fsInfo->modcount, &rnode, aoInsertDesc->cur_segno); @@ -594,11 +596,12 @@ AppendOnlyExecutorReadBlock_GetContents(AppendOnlyExecutorReadBlock *executorRea elogif(Debug_appendonly_print_scan, LOG, "Append-only scan read small non-compressed block for table '%s' " - "(length = %d, segment file '%s', block offset in file = " INT64_FORMAT ")", + "(length = %d, segment file '%s', block offset in file = " INT64_FORMAT ", total offset = " INT64_FORMAT ")", AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead), executorReadBlock->dataLen, AppendOnlyStorageRead_SegmentFileName(executorReadBlock->storageRead), - executorReadBlock->headerOffsetInFile); + executorReadBlock->headerOffsetInFile, + executorReadBlock->storageRead->bufferedRead.largeReadPosition + executorReadBlock->storageRead->bufferedRead.bufferOffset); } else { @@ -655,11 +658,12 @@ AppendOnlyExecutorReadBlock_GetContents(AppendOnlyExecutorReadBlock *executorRea elogif(Debug_appendonly_print_scan, LOG, "Append-only scan read large row for table '%s' " "(length = %d, segment file '%s', " - "block offset in file = " INT64_FORMAT ")", + "block offset in file = " INT64_FORMAT ", total offset = " INT64_FORMAT ")", AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead), executorReadBlock->dataLen, AppendOnlyStorageRead_SegmentFileName(executorReadBlock->storageRead), - executorReadBlock->headerOffsetInFile); + executorReadBlock->headerOffsetInFile, + executorReadBlock->storageRead->bufferedRead.largeReadPosition + executorReadBlock->storageRead->bufferedRead.bufferOffset); } } else @@ -685,12 +689,13 @@ AppendOnlyExecutorReadBlock_GetContents(AppendOnlyExecutorReadBlock *executorRea elogif(Debug_appendonly_print_scan, LOG, "Append-only scan read decompressed block for table '%s' " "(compressed length %d, length = %d, segment file '%s', " - "block offset in file = " INT64_FORMAT ")", + "block offset in file = " INT64_FORMAT ", total offset = " INT64_FORMAT ")", AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead), compressedLen, executorReadBlock->dataLen, AppendOnlyStorageRead_SegmentFileName(executorReadBlock->storageRead), - executorReadBlock->headerOffsetInFile); + executorReadBlock->headerOffsetInFile, + executorReadBlock->storageRead->bufferedRead.largeReadPosition + executorReadBlock->storageRead->bufferedRead.bufferOffset); } /* @@ -735,10 +740,11 @@ AppendOnlyExecutorReadBlock_GetContents(AppendOnlyExecutorReadBlock *executorRea } elogif(Debug_appendonly_print_scan, LOG, - "append-only scan read VarBlock for table '%s' with %d items (block offset in file = " INT64_FORMAT ")", + "append-only scan read VarBlock for table '%s' with %d items (block offset in file = " INT64_FORMAT ", total offset = " INT64_FORMAT ")", AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead), executorReadBlock->readerItemCount, - executorReadBlock->headerOffsetInFile); + executorReadBlock->headerOffsetInFile, + executorReadBlock->storageRead->bufferedRead.largeReadPosition + executorReadBlock->storageRead->bufferedRead.bufferOffset); break; case AoExecutorBlockKind_SingleRow: @@ -754,10 +760,11 @@ AppendOnlyExecutorReadBlock_GetContents(AppendOnlyExecutorReadBlock *executorRea executorReadBlock->singleRow = executorReadBlock->dataBuffer; executorReadBlock->singleRowLen = executorReadBlock->dataLen; - elogif(Debug_appendonly_print_scan, LOG, "Append-only scan read single row for table '%s' with length %d (block offset in file = " INT64_FORMAT ")", + elogif(Debug_appendonly_print_scan, LOG, "Append-only scan read single row for table '%s' with length %d (block offset in file = " INT64_FORMAT ", total offset = " INT64_FORMAT ")", AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead), executorReadBlock->singleRowLen, - executorReadBlock->headerOffsetInFile); + executorReadBlock->headerOffsetInFile, + executorReadBlock->storageRead->bufferedRead.largeReadPosition + executorReadBlock->storageRead->bufferedRead.bufferOffset); break; @@ -823,6 +830,7 @@ AppendOnlyExecutionReadBlock_SetPositionInfo(AppendOnlyExecutorReadBlock *execut static void AppendOnlyExecutionReadBlock_FinishedScanBlock(AppendOnlyExecutorReadBlock *executorReadBlock) { + ++executorReadBlock->totalVarblockCount; executorReadBlock->blockFirstRowNum += executorReadBlock->rowCount; } @@ -872,6 +880,7 @@ static void AppendOnlyExecutorReadBlock_ResetCounts(AppendOnlyExecutorReadBlock *executorReadBlock) { executorReadBlock->totalRowsScannned = 0; + executorReadBlock->totalVarblockCount = 0; } /* @@ -1047,12 +1056,13 @@ AppendOnlyExecutorReadBlock_ProcessTuple(AppendOnlyExecutorReadBlock *executorRe elogif(Debug_appendonly_print_scan_tuple && valid, LOG, "Append-only scan tuple for table '%s' " - "(AOTupleId %s, tuple length %d, memtuple length %d, block offset in file " INT64_FORMAT ")", + "(AOTupleId %s, tuple length %d, memtuple length %d, block offset in file " INT64_FORMAT ", total offset = " INT64_FORMAT ")", AppendOnlyStorageRead_RelationName(executorReadBlock->storageRead), AOTupleIdToString(aoTupleId), tupleLen, memtuple_get_size(tuple), - executorReadBlock->headerOffsetInFile); + executorReadBlock->headerOffsetInFile, + executorReadBlock->storageRead->bufferedRead.largeReadPosition + executorReadBlock->storageRead->bufferedRead.bufferOffset); return valid; } @@ -1258,8 +1268,12 @@ AppendOnlyExecutorReadBlock_FetchTuple(AppendOnlyExecutorReadBlock *executorRead * You can think of this scan routine as get next "executor" AO block. */ static bool -getNextBlock(AppendOnlyScanDesc scan) +getNextBlock(AppendOnlyScanDesc scan, int nkeys, ScanKey key) { + int64 offset; + int64 tmp_offset; + bloom_filter *blf; + if (scan->aos_need_new_segfile) { /* @@ -1269,6 +1283,30 @@ getNextBlock(AppendOnlyScanDesc scan) return false; } + offset = scan->executorReadBlock.headerOffsetInFile; + + while (1) + { + for (int i = 0; i < nkeys; ++i) + { + if (key[i].sk_attno == 1 && key[i].sk_strategy == BTEqualStrategyNumber) + { + blf = FetchBloomFilterForVarblock(scan->aos_rd, scan->executorReadBlock.totalVarblockCount, &tmp_offset); + + if (blf != NULL && bloom_lacks_element(blf, key[i].sk_argument, sizeof(int))) + { + elog(LOG, "skip bytes " INT64_FORMAT " - " INT64_FORMAT "", offset, tmp_offset); + offset = tmp_offset; + } + + break; + } + } + break; + } + + scan->executorReadBlock.headerOffsetInFile = offset; + if (!AppendOnlyExecutorReadBlock_GetBlockInfo( &scan->storageRead, &scan->executorReadBlock)) @@ -1335,7 +1373,7 @@ appendonlygettup(AppendOnlyScanDesc scan, * get a block to process, or finished reading all the data (all * 'segment' files) for this relation. */ - while (!getNextBlock(scan)) + while (!getNextBlock(scan, nkeys, key)) { /* have we read all this relation's data. done! */ if (scan->aos_done_all_segfiles) @@ -1416,7 +1454,8 @@ setupNextWriteBlock(AppendOnlyInsertDesc aoInsertDesc) aoInsertDesc->nonCompressedData, aoInsertDesc->maxDataLen, aoInsertDesc->tempSpace, - aoInsertDesc->tempSpaceLen); + aoInsertDesc->tempSpaceLen, + aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition); } else @@ -1430,18 +1469,71 @@ setupNextWriteBlock(AppendOnlyInsertDesc aoInsertDesc) aoInsertDesc->uncompressedBuffer, aoInsertDesc->maxDataLen, aoInsertDesc->tempSpace, - aoInsertDesc->tempSpaceLen); + aoInsertDesc->tempSpaceLen, + aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition); } aoInsertDesc->bufferCount++; } +static void +ao_fsm_extend(Relation rel, BlockNumber fsm_nblocks) +{ + BlockNumber fsm_nblocks_now; + PGAlignedBlock pg; + + PageInit((Page) pg.data, BLCKSZ, 0); + + /* + * We use the relation extension lock to lock out other backends trying to + * extend the FSM at the same time. It also locks out extension of the + * main fork, unnecessarily, but extending the FSM happens seldom enough + * that it doesn't seem worthwhile to have a separate lock tag type for + * it. + * + * Note that another backend might have extended or created the relation + * by the time we get the lock. + */ + LockRelationForExtension(rel, ExclusiveLock); + + /* Might have to re-open if a cache flush happened */ + RelationOpenSmgr(rel); + + /* + * Create the FSM file first if it doesn't exist. If smgr_fsm_nblocks is + * positive then it must exist, no need for an smgrexists call. + */ + if ((rel->rd_smgr->smgr_fsm_nblocks == 0 || + rel->rd_smgr->smgr_fsm_nblocks == InvalidBlockNumber) && + !smgrexists(rel->rd_smgr, FSM_FORKNUM)) + smgrcreate(rel->rd_smgr, FSM_FORKNUM, false); + + fsm_nblocks_now = smgrnblocks(rel->rd_smgr, FSM_FORKNUM); + + while (fsm_nblocks_now < fsm_nblocks) + { + PageSetChecksumInplace((Page) pg.data, fsm_nblocks_now); + + smgrextend(rel->rd_smgr, FSM_FORKNUM, fsm_nblocks_now, + pg.data, false); + fsm_nblocks_now++; + } + + /* Update local cache with the up-to-date size */ + rel->rd_smgr->smgr_fsm_nblocks = fsm_nblocks_now; + + UnlockRelationForExtension(rel, ExclusiveLock); +} + + static void finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc) { int executorBlockKind; int itemCount; int32 dataLen; + int64 varblkcnt; + BlockNumber aoblknum; executorBlockKind = AoExecutorBlockKind_VarBlock; /* Assume. */ @@ -1458,6 +1550,9 @@ finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc) dataLen = VarBlockMakerFinish(&aoInsertDesc->varBlockMaker); + if (RelationIsAoRows(aoInsertDesc->aoi_rel)) + varblkcnt = aoInsertDesc->storageWrite.bufferedAppend.largeWriteVarBlock + aoInsertDesc->varblockCount; + aoInsertDesc->varblockCount++; if (!aoInsertDesc->shouldCompress) @@ -1533,6 +1628,29 @@ finishWriteBlock(AppendOnlyInsertDesc aoInsertDesc) itemCount); } + + if (RelationIsAoRows(aoInsertDesc->aoi_rel)) + { + aoblknum = 1 + AOBLF_CALC_PAGE(varblkcnt); + + RelationOpenSmgr(aoInsertDesc->aoi_rel); + + smgrcreate(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM, true); + + if (aoblknum > smgrnblocks(aoInsertDesc->aoi_rel->rd_smgr, FSM_FORKNUM)) + ao_fsm_extend(aoInsertDesc->aoi_rel, aoblknum); + + elog(LOG, + "writing hint bloom for varblock " INT64_FORMAT " in offset " INT64_FORMAT" - "INT64_FORMAT"", + varblkcnt, aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition, + aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition + aoInsertDesc->storageWrite.bufferedAppend.largeWriteLen); + + SaveBloomFilterForBlock(aoInsertDesc->aoi_rel, + aoInsertDesc->varBlockMaker.blf, + aoInsertDesc->storageWrite.bufferedAppend.largeWritePosition + aoInsertDesc->storageWrite.bufferedAppend.largeWriteLen, + varblkcnt); + } + /* Insert an entry to the block directory */ AppendOnlyBlockDirectory_InsertEntry( &aoInsertDesc->blockDirectory, @@ -2901,6 +3019,9 @@ appendonly_insert(AppendOnlyInsertDesc aoInsertDesc, MemTuple tup = NULL; bool need_toast; bool isLargeContent; + bool isnull; + Datum bloom_datum; + Form_pg_attribute att; Assert(aoInsertDesc->usableBlockSize > 0 && aoInsertDesc->tempSpaceLen > 0); Assert(aoInsertDesc->toast_tuple_threshold > 0 && aoInsertDesc->toast_tuple_target > 0); @@ -3064,6 +3185,17 @@ appendonly_insert(AppendOnlyInsertDesc aoInsertDesc, } } + bloom_datum = memtuple_getattr(tup, aoInsertDesc->mt_bind, 1, &isnull); + + if (isnull) + elog(ERROR, "FAILED TO BLOOM NULL TUP"); + + + att = TupleDescAttr(RelationGetDescr(aoInsertDesc->aoi_rel), 0); + if (!att->attbyval) + elog(ERROR, "FAILED TO BLOOM BY REF TUP"); + bloom_add_element(aoInsertDesc->varBlockMaker.blf, &bloom_datum, att->attlen); + if (!isLargeContent) { /* diff --git a/src/backend/cdb/cdbappendonlystoragewrite.c b/src/backend/cdb/cdbappendonlystoragewrite.c index b4cb5c902c..b3d5e6e7b4 100755 --- a/src/backend/cdb/cdbappendonlystoragewrite.c +++ b/src/backend/cdb/cdbappendonlystoragewrite.c @@ -315,6 +315,7 @@ AppendOnlyStorageWrite_OpenFile(AppendOnlyStorageWrite *storageWrite, int version, int64 logicalEof, int64 fileLen_uncompressed, + int64 varblockcount, int64 modcount, RelFileNodeBackend *relFileNode, int32 segmentFileNum) @@ -407,7 +408,8 @@ AppendOnlyStorageWrite_OpenFile(AppendOnlyStorageWrite *storageWrite, storageWrite->segmentFileNum, storageWrite->segmentFileName, logicalEof, - fileLen_uncompressed); + fileLen_uncompressed, + varblockcount); } /* diff --git a/src/backend/cdb/cdbbufferedappend.c b/src/backend/cdb/cdbbufferedappend.c index 49006e7a08..864bf268cc 100644 --- a/src/backend/cdb/cdbbufferedappend.c +++ b/src/backend/cdb/cdbbufferedappend.c @@ -114,7 +114,8 @@ BufferedAppendSetFile(BufferedAppend *bufferedAppend, int32 segmentFileNum, char *filePathName, int64 eof, - int64 eof_uncompressed) + int64 eof_uncompressed, + int64 varblock) { Assert(bufferedAppend != NULL); Assert(bufferedAppend->largeWritePosition == 0); @@ -128,6 +129,7 @@ BufferedAppendSetFile(BufferedAppend *bufferedAppend, Assert(eof >= 0); bufferedAppend->largeWritePosition = eof; + bufferedAppend->largeWriteVarBlock = varblock; bufferedAppend->file = file; bufferedAppend->relFileNode = relFileNode; bufferedAppend->segmentFileNum = segmentFileNum; diff --git a/src/backend/cdb/cdbbufferedread.c b/src/backend/cdb/cdbbufferedread.c index 3deb9e6d56..a1550657a2 100644 --- a/src/backend/cdb/cdbbufferedread.c +++ b/src/backend/cdb/cdbbufferedread.c @@ -223,11 +223,12 @@ BufferedReadIo( bufferedRead->largeReadLen))); elogif(Debug_appendonly_print_read_block, LOG, - "Append-Only storage read: table \"%s\", segment file \"%s\", read position " INT64_FORMAT " (small offset %d), " + "Append-Only storage read: table \"%s\", segment file \"%s\", read position " INT64_FORMAT ", buffer offset (%d) (small offset %d), " "actual read length %d (equals large read length %d is %s)", bufferedRead->relationName, bufferedRead->filePathName, bufferedRead->largeReadPosition, + bufferedRead->bufferOffset, offset, actualLen, bufferedRead->largeReadLen, diff --git a/src/backend/cdb/cdbvarblock.c b/src/backend/cdb/cdbvarblock.c index 53c329819f..816cbfee73 100644 --- a/src/backend/cdb/cdbvarblock.c +++ b/src/backend/cdb/cdbvarblock.c @@ -19,6 +19,7 @@ #include "postgres.h" #include "cdb/cdbvarblock.h" +#include "access/aobloomfilter.h" static VarBlockByteLen VarBlockGetItemLen( VarBlockReader *varBlockReader, @@ -53,7 +54,8 @@ VarBlockMakerInit( uint8 *buffer, VarBlockByteLen maxBufferLen, uint8 *tempScratchSpace, - int tempScratchSpaceLen) + int tempScratchSpaceLen, + int64 largeWritePosition) { Assert(varBlockMaker != NULL); Assert(buffer != NULL); @@ -83,6 +85,9 @@ VarBlockMakerInit( memset(buffer, 0, VARBLOCK_HEADER_LEN); VarBlockSet_version(varBlockMaker->header, InitialVersion); VarBlockSet_offsetsAreSmall(varBlockMaker->header, true); + + varBlockMaker->blf = bloom_create_nbytes(AOBLF_SIZE, 4 /* default */, random() % PG_INT32_MAX); + varBlockMaker->largeWritePosition = largeWritePosition; } /* diff --git a/src/backend/lib/bloomfilter.c b/src/backend/lib/bloomfilter.c index fc48ee09ff..7b4426917b 100644 --- a/src/backend/lib/bloomfilter.c +++ b/src/backend/lib/bloomfilter.c @@ -38,18 +38,6 @@ #include "access/hash.h" #include "lib/bloomfilter.h" -#define MAX_HASH_FUNCS 10 - -struct bloom_filter -{ - /* K hash functions are used, seeded by caller's seed */ - int k_hash_funcs; - uint64 seed; - /* m is bitset size, in bits. Must be a power of two <= 2^32. */ - uint64 m; - unsigned char bitset[FLEXIBLE_ARRAY_MEMBER]; -}; - static int my_bloom_power(uint64 target_bitset_bits); static int optimal_k(uint64 bitset_bits, int64 total_elems); static void k_hashes(bloom_filter *filter, uint32 *hashes, unsigned char *elem, @@ -119,6 +107,21 @@ bloom_create(int64 total_elems, int bloom_work_mem, uint64 seed) return filter; } +bloom_filter * +bloom_create_nbytes(int64 nbytes, int k_hash_funcs, uint64 seed) +{ + bloom_filter *filter; + + /* Allocate bloom filter with unset bitset */ + filter = palloc0(offsetof(bloom_filter, bitset) + + sizeof(unsigned char) * nbytes); + filter->k_hash_funcs = k_hash_funcs; + filter->seed = seed; + filter->m = nbytes; + + return filter; +} + /* * Free Bloom filter */ diff --git a/src/backend/utils/datumstream/datumstream.c b/src/backend/utils/datumstream/datumstream.c index af5a1248fa..72027a10a2 100644 --- a/src/backend/utils/datumstream/datumstream.c +++ b/src/backend/utils/datumstream/datumstream.c @@ -818,8 +818,16 @@ destroy_datumstreamread(DatumStreamRead * ds) void -datumstreamwrite_open_file(DatumStreamWrite *ds, char *fn, int64 eof, int64 eofUncompressed, int64 modcount, - RelFileNodeBackend *relFileNode, int32 segmentFileNum, int version) +datumstreamwrite_open_file( + DatumStreamWrite *ds, + char *fn, + int64 eof, + int64 eofUncompressed, + int64 varblockcount, + int64 modcount, + RelFileNodeBackend *relFileNode, + int32 segmentFileNum, + int version) { ds->eof = eof; ds->eofUncompress = eofUncompressed; @@ -847,6 +855,7 @@ datumstreamwrite_open_file(DatumStreamWrite *ds, char *fn, int64 eof, int64 eofU version, eof, eofUncompressed, + varblockcount, modcount, relFileNode, segmentFileNum); diff --git a/src/include/access/aobloomfilter.h b/src/include/access/aobloomfilter.h new file mode 100644 index 0000000000..c413824b63 --- /dev/null +++ b/src/include/access/aobloomfilter.h @@ -0,0 +1,41 @@ +#ifndef AOBLOOMFILTER_H +#define AOBLOOMFILTER_H + +#include "lib/bloomfilter.h" + +#include "access/hash.h" +#include "lib/bloomfilter.h" + +#define AOBLF_SIZE 128 + +#define BLOOM_F_MAGIC 0xD0D0CACA + +typedef struct BloomFilterFixed { + uint32 magic; + /* K hash functions are used, seeded by caller's seed */ + int64 offset; + int k_hash_funcs; + uint64 seed; + /* m is bitset size, in bits. Must be a power of two <= 2^32. */ + unsigned char bitset[AOBLF_SIZE]; +} BloomFilterFixed; + +#define AO_BLF_SIZE (4 + 8 + AOBLF_SIZE) +#define AOBLF_N_FILTERS (32500 / AOBLF_SIZE) + +typedef struct AOBFPageData { + BloomFilterFixed filters[AOBLF_N_FILTERS]; +} AOBFPageData; + +typedef AOBFPageData *AOBFPage; + +#define AOBLF_CALC_PAGE(x) (x / AOBLF_N_FILTERS) +#define AOBLF_CALC_OFFSET(x) (x % AOBLF_N_FILTERS) + +extern bloom_filter *ao_bloom_filter_deserealize(BloomFilterFixed *bf); +extern BloomFilterFixed *ao_bloom_filter_serealize(bloom_filter *bf, int64 off); + +extern void SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int64 aoblknum); +extern bloom_filter *FetchBloomFilterForVarblock(Relation aorel, int64 varblocknum, int64 *next_offset); + +#endif /* AOBLOOMFILTER_H */ \ No newline at end of file diff --git a/src/include/cdb/cdbappendonlyam.h b/src/include/cdb/cdbappendonlyam.h index 33e089477a..43da9cbb17 100644 --- a/src/include/cdb/cdbappendonlyam.h +++ b/src/include/cdb/cdbappendonlyam.h @@ -141,6 +141,7 @@ typedef struct AppendOnlyExecutorReadBlock int segmentFileNum; int64 totalRowsScannned; + int64 totalVarblockCount; int64 blockFirstRowNum; int64 headerOffsetInFile; diff --git a/src/include/cdb/cdbappendonlystoragewrite.h b/src/include/cdb/cdbappendonlystoragewrite.h index bc2e5ac9aa..5c56047d6d 100755 --- a/src/include/cdb/cdbappendonlystoragewrite.h +++ b/src/include/cdb/cdbappendonlystoragewrite.h @@ -199,6 +199,7 @@ extern void AppendOnlyStorageWrite_OpenFile(AppendOnlyStorageWrite *storageWrite int version, int64 logicalEof, int64 fileLen_uncompressed, + int64 varblockcount, int64 modcount, RelFileNodeBackend *relFileNode, int32 segmentFileNum); diff --git a/src/include/cdb/cdbbufferedappend.h b/src/include/cdb/cdbbufferedappend.h index a315a727fc..d558190e36 100644 --- a/src/include/cdb/cdbbufferedappend.h +++ b/src/include/cdb/cdbbufferedappend.h @@ -47,6 +47,7 @@ typedef struct BufferedAppend */ int64 largeWritePosition; + int64 largeWriteVarBlock; int32 largeWriteLen; /* * The position within the current file for the next write @@ -113,7 +114,8 @@ extern void BufferedAppendSetFile( int32 segmentFileNum, char *filePathName, int64 eof, - int64 eof_uncompressed); + int64 eof_uncompressed, + int64 varblockcount); /* * Return the position of the current write buffer in bytes. diff --git a/src/include/cdb/cdbvarblock.h b/src/include/cdb/cdbvarblock.h index bcc19a299f..4231a82bc1 100644 --- a/src/include/cdb/cdbvarblock.h +++ b/src/include/cdb/cdbvarblock.h @@ -90,6 +90,8 @@ #ifndef CDBVARBLOCK_H #define CDBVARBLOCK_H +#include "lib/bloomfilter.h" + typedef int32 VarBlockByteLen; typedef int32 VarBlockByteOffset; @@ -224,6 +226,8 @@ typedef struct VarBlockMaker /* The maximum number of items for the VarBlock. * Based on the length of the scratch area. */ + bloom_filter * blf; + int64 largeWritePosition; } VarBlockMaker; // ---------------------------------------------------------- @@ -282,7 +286,8 @@ extern void VarBlockMakerInit( uint8 *buffer, VarBlockByteLen maxBufferLen, uint8 *tempScratchSpace, - int tempScratchSpaceLen); + int tempScratchSpaceLen, + int64 largeWritePos); /* * Get a pointer to the next variable-length item so it can diff --git a/src/include/lib/bloomfilter.h b/src/include/lib/bloomfilter.h index 6cbdd9bfd9..95250d670f 100644 --- a/src/include/lib/bloomfilter.h +++ b/src/include/lib/bloomfilter.h @@ -13,10 +13,24 @@ #ifndef BLOOMFILTER_H #define BLOOMFILTER_H +#define MAX_HASH_FUNCS 10 + +struct bloom_filter +{ + /* K hash functions are used, seeded by caller's seed */ + int k_hash_funcs; + uint64 seed; + /* m is bitset size, in bits. Must be a power of two <= 2^32. */ + uint64 m; + unsigned char bitset[FLEXIBLE_ARRAY_MEMBER]; +}; + typedef struct bloom_filter bloom_filter; extern bloom_filter *bloom_create(int64 total_elems, int bloom_work_mem, uint64 seed); +extern bloom_filter *bloom_create_nbytes(int64 nbytes, int k_hash_funcs, uint64 seed); + extern void bloom_free(bloom_filter *filter); extern void bloom_add_element(bloom_filter *filter, unsigned char *elem, size_t len); diff --git a/src/include/utils/datumstream.h b/src/include/utils/datumstream.h index 85154cb22c..2a2ebbd8ae 100644 --- a/src/include/utils/datumstream.h +++ b/src/include/utils/datumstream.h @@ -282,6 +282,7 @@ extern void datumstreamwrite_open_file( char *fn, int64 eof, int64 eofUncompressed, + int64 varblockcount, int64 modcount, RelFileNodeBackend *relFileNode, int32 segmentFileNum,