Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write bloom filter for AO #25

Open
wants to merge 6 commits into
base: OPENGPDB_STABLE
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 72 additions & 20 deletions contrib/pageinspect/aofuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@
#include "catalog/pg_namespace.h"
#include "utils/syscache.h"

#include "access/aobloomfilter.h"

#include "cdb/cdbaocsam.h"
#include "cdb/cdbappendonlyam.h"

PG_FUNCTION_INFO_V1(get_ao_headers_info);
PG_FUNCTION_INFO_V1(get_aocs_headers_info);
PG_FUNCTION_INFO_V1(check_bloom_filter);

typedef struct AOHeadersInfoCxt {
AppendOnlyScanDesc scan;
TupleTableSlot *slot;
} AOHeadersInfoCxt;

#define NUM_GET_AO_HEADERS_INFO 9
#define NUM_GET_AO_HEADERS_INFO 10
#define NUM_GET_AOCS_HEADERS_INFO 8

Datum
Expand Down Expand Up @@ -82,17 +85,19 @@ get_ao_headers_info(PG_FUNCTION_ARGS)
-1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)3, "buffer offset", INT4OID,
-1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)4, "block kind", TEXTOID,
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)4, "total Varblock count", INT8OID,
-1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)5, "block kind", TEXTOID,
-1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)5, "header kind", TEXTOID,
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)6, "header kind", TEXTOID,
-1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)6, "current item count", INT4OID,
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)7, "current item count", INT4OID,
-1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)7, "isCompressed",
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)8, "isCompressed",
BOOLOID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)8, "isLarge",
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)9, "isLarge",
BOOLOID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)9,
TupleDescInitEntry(funcctx->tuple_desc, (AttrNumber)10,
"dataLen", INT4OID, -1 /* typmod */,
0 /* attdim */);

Expand Down Expand Up @@ -145,43 +150,45 @@ get_ao_headers_info(PG_FUNCTION_ARGS)
values[0] = Int64GetDatum(scan->executorReadBlock.blockFirstRowNum);
values[1] = Int64GetDatum(scan->storageRead.bufferedRead.largeReadPosition);
values[2] = Int32GetDatum(scan->storageRead.bufferedRead.bufferOffset);
values[3] = Int64GetDatum(scan->executorReadBlock.totalVarblockCount);


switch (scan->executorReadBlock.executorBlockKind)
{
case AoExecutorBlockKind_VarBlock:
values[3] = CStringGetTextDatum("varblock");
values[4] = CStringGetTextDatum("varblock");
break;
case AoExecutorBlockKind_SingleRow:
values[3] = CStringGetTextDatum("single row");
values[4] = CStringGetTextDatum("single row");
break;
default:
values[3] = CStringGetTextDatum("unknown");
values[4] = CStringGetTextDatum("unknown");
break;
}

switch (scan->storageRead.current.headerKind)
{
case AoHeaderKind_SmallContent:
values[4] = CStringGetTextDatum("small content");
values[5] = CStringGetTextDatum("small content");
break;
case AoHeaderKind_LargeContent:
values[4] = CStringGetTextDatum("large content");
values[5] = CStringGetTextDatum("large content");
break;
case AoHeaderKind_NonBulkDenseContent:
values[4] = CStringGetTextDatum("non bulk dense content");
values[5] = CStringGetTextDatum("non bulk dense content");
break;
case AoHeaderKind_BulkDenseContent:
values[4] = CStringGetTextDatum("bulk dense content");
values[5] = CStringGetTextDatum("bulk dense content");
break;
default:
values[4] = CStringGetTextDatum("unknown");
values[5] = CStringGetTextDatum("unknown");
break;
}

values[5] = Int32GetDatum(scan->executorReadBlock.currentItemCount);
values[6] = BoolGetDatum(scan->executorReadBlock.isCompressed);
values[7] = BoolGetDatum(scan->executorReadBlock.isLarge);
values[8] = Int32GetDatum(scan->executorReadBlock.dataLen);
values[6] = Int32GetDatum(scan->executorReadBlock.currentItemCount);
values[7] = BoolGetDatum(scan->executorReadBlock.isCompressed);
values[8] = BoolGetDatum(scan->executorReadBlock.isLarge);
values[9] = Int32GetDatum(scan->executorReadBlock.dataLen);

AppendOnlyExecutorReadBlock_GetContents(
&scan->executorReadBlock);
Expand All @@ -206,6 +213,8 @@ get_ao_headers_info(PG_FUNCTION_ARGS)
break;
}

elog(DEBUG3, "large read pos after block content %d, offset %d", scan->storageRead.bufferedRead.largeReadPosition, scan->storageRead.bufferedRead.bufferOffset);

relation_close(r, AccessShareLock);
SRF_RETURN_NEXT(funcctx, result);
}
Expand Down Expand Up @@ -367,7 +376,6 @@ get_aocs_headers_info(PG_FUNCTION_ARGS)
values[1] = Int64GetDatum(scan->ds[i]->ao_read.bufferedRead.largeReadPosition);
values[2] = Int32GetDatum(scan->ds[i]->ao_read.bufferedRead.bufferOffset);


switch (scan->ds[i]->ao_read.current.headerKind)
{
case AoHeaderKind_SmallContent:
Expand Down Expand Up @@ -427,4 +435,48 @@ get_aocs_headers_info(PG_FUNCTION_ARGS)

relation_close(r, AccessShareLock);
SRF_RETURN_DONE(funcctx);
}


/* aobloomfilter.c */
Datum
check_bloom_filter(PG_FUNCTION_ARGS)
{
bool res;
bloom_filter * blf;
Relation r;
int64 offset;
Form_pg_attribute att;
Oid oid = PG_GETARG_OID(0);
int64 varblocknum = PG_GETARG_INT64(1);
Datum d = PG_GETARG_DATUM(2);


if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser to use raw page functions"))));

r = relation_open(oid, AccessShareLock);

if (!RelationIsAoRows(r))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
(errmsg("must be an Append-Optimized relation to use raw headerfunctions"))));


att = TupleDescAttr(RelationGetDescr(r), 0);
if (!att->attbyval)
elog(ERROR, "FAILED TO BLOOM BY REF TUP");

blf = FetchBloomFilterForVarblock(r, varblocknum, &offset);

if (blf != NULL)
res = !bloom_lacks_element(blf, &d, att->attlen);
else
res = true;

relation_close(r, AccessShareLock);

PG_RETURN_BOOL(res);
}
12 changes: 12 additions & 0 deletions contrib/pageinspect/pageinspect--1.8.sql
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ RETURNS TABLE (
"first row number" BIGINT,
"large read position" BIGINT,
"buffer offset" INTEGER,
"total varblock count" BIGINT,
"block kind" TEXT,
"header kind" TEXT,
"current item count" INTEGER,
Expand Down Expand Up @@ -205,3 +206,14 @@ RETURNS TABLE (
AS 'MODULE_PATHNAME', 'get_aocs_headers_info'
LANGUAGE C STRICT
EXECUTE ON ALL SEGMENTS;


CREATE FUNCTION check_bloom_filter(
reloid OID,
varblocknum BIGINT,
d INT
)
RETURNS BOOLEAN
AS 'MODULE_PATHNAME', 'check_bloom_filter'
LANGUAGE C STRICT
EXECUTE ON ALL SEGMENTS;
17 changes: 13 additions & 4 deletions src/backend/access/aocs/aocsam.c
Original file line number Diff line number Diff line change
Expand Up @@ -875,9 +875,15 @@ OpenAOCSDatumStreams(AOCSInsertDesc desc)
FormatAOSegmentFileName(basepath, seginfo->segno, i, &fileSegNo, fn);
Assert(strlen(fn) + 1 <= MAXPGPATH);

datumstreamwrite_open_file(desc->ds[i], fn, e->eof, e->eof_uncompressed, seginfo->modcount,
&rnode,
fileSegNo, seginfo->formatversion);
datumstreamwrite_open_file(desc->ds[i],
fn,
e->eof,
e->eof_uncompressed,
0,
seginfo->modcount,
&rnode,
fileSegNo,
seginfo->formatversion);
}

pfree(basepath);
Expand Down Expand Up @@ -2041,7 +2047,10 @@ aocs_addcol_newsegfile(AOCSAddColumnDesc desc,
&fileSegNo, fn);
Assert(strlen(fn) + 1 <= MAXPGPATH);
datumstreamwrite_open_file(desc->dsw[i], fn,
0 /* eof */ , 0 /* eof_uncompressed */ , 0, /*modcount*/
0 /* eof */ ,
0 /* eof_uncompressed */ ,
0 /* varblockcount */,
0, /*modcount*/
&relfilenode, fileSegNo,
version);
desc->dsw[i]->blockFirstRowNum = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/backend/access/appendonly/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)

OBJS = appendonlyam.o aosegfiles.o aomd.o appendonlywriter.o appendonlytid.o \
OBJS = aobloomfilter.o appendonlyam.o aosegfiles.o aomd.o appendonlywriter.o appendonlytid.o \
appendonlyblockdirectory.o appendonly_visimap.o \
appendonly_visimap_entry.o appendonly_visimap_store.o \
appendonly_compaction.o appendonly_visimap_udf.o \
Expand Down
105 changes: 105 additions & 0 deletions src/backend/access/appendonly/aobloomfilter.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@

#include "postgres.h"

#include "utils/rel.h"
#include "storage/buf_internals.h"
#include "storage/bufpage.h"
#include "lib/bloomfilter.h"

#include "access/heapam_xlog.h"

#include "access/aobloomfilter.h"

void
SaveBloomFilterForBlock(Relation aorel, bloom_filter *filter, int64 offset, int64 varblocknum)
{
Buffer buffer;
Page page;
BlockNumber blkno;
OffsetNumber page_offset;
BloomFilterFixed *fixed_filter;
char * pointer;

fixed_filter = ao_bloom_filter_serealize(filter, offset);
fixed_filter->magic = BLOOM_F_MAGIC;

blkno = AOBLF_CALC_PAGE(varblocknum);
page_offset = AOBLF_CALC_OFFSET(varblocknum);

buffer = ReadBufferExtended(aorel, FSM_FORKNUM, blkno, RBM_NORMAL, NULL);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buffer);

pointer = PageGetContents(page) + page_offset * sizeof(BloomFilterFixed);

memcpy(pointer, fixed_filter, sizeof(BloomFilterFixed));

MarkBufferDirty(buffer);

LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
}

bloom_filter *
FetchBloomFilterForVarblock(Relation aorel, int64 varblocknum, int64 *next_offset)
{
Buffer buffer;
Page page;
BlockNumber blkno;
OffsetNumber page_offset;
BloomFilterFixed *fixed_filter;
char * pointer;

blkno = AOBLF_CALC_PAGE(varblocknum);
page_offset = AOBLF_CALC_OFFSET(varblocknum);

fixed_filter = palloc0(sizeof(BloomFilterFixed));

buffer = ReadBufferExtended(aorel, FSM_FORKNUM, blkno, RBM_NORMAL, NULL);
LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = BufferGetPage(buffer);


pointer = PageGetContents(page) + page_offset * sizeof(BloomFilterFixed);

memcpy(fixed_filter, pointer, sizeof(BloomFilterFixed));

LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);

if (fixed_filter->magic != BLOOM_F_MAGIC)
return NULL;

*next_offset = fixed_filter->offset;

return ao_bloom_filter_deserealize(fixed_filter);
}

bloom_filter *
ao_bloom_filter_deserealize(BloomFilterFixed *bf)
{
bloom_filter * blf;
blf = bloom_create_nbytes(AOBLF_SIZE, bf->k_hash_funcs, bf->seed);

blf->m = AOBLF_SIZE;
blf->seed = bf->seed;
memcpy(blf->bitset, bf->bitset, sizeof(char) * AOBLF_SIZE);

return blf;
}

BloomFilterFixed *
ao_bloom_filter_serealize(bloom_filter *bf, int64 off)
{
BloomFilterFixed *blff;

blff = palloc0(sizeof(BloomFilterFixed));

blff->offset = off;
blff->k_hash_funcs = bf->k_hash_funcs;
blff->seed = bf->seed;

memcpy(blff->bitset, bf->bitset, sizeof(char) * AOBLF_SIZE);

return blff;
}
Loading
Loading