Skip to content

Commit

Permalink
Fix ao_filehandler for new attnum-filenum changes
Browse files Browse the repository at this point in the history
The new attnum-filenum changes on pg_attribute_encoding introduced in
cdd03c165ad and the column rewrite operations introduced in last commit
changes the range of aocs files on disk.
The column filename is now dependent on the filenums and not attnums.
This commit changes how we look for possible aocs column files on disk
for a table
Earlier we used to only have files in order based on natts.
Now, as the column can be rewritten we also need to check the filenum
pair (i, i+MaxHeapAttributeNumber) of every filenum.

Co-authored-by: Soumyadeep Chakraborty <[email protected]>
Co-authored-by: Ashwin Agrawal <[email protected]>
Co-authored-by: Huansong Fu <[email protected]>
  • Loading branch information
4 people authored and Tao-Ma committed Jan 12, 2025
1 parent 8793a07 commit 993fe9d
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 51 deletions.
99 changes: 59 additions & 40 deletions src/backend/access/appendonly/aomd_filehandler.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,26 @@
*
* Heap Tables: contiguous extensions, no upper bound
* AO Tables: non contiguous extensions [.1 - .127]
* CO Tables: non contiguous extensions
* [ .1 - .127] for first column; .0 reserved for utility and alter
* [.129 - .255] for second column; .128 reserved for utility and alter
* [.257 - .283] for third column; .256 reserved for utility and alter
* etc
* CO Tables: non contiguous extensions based on filenums in pg_attribute_encoding
* [ .1 - .127] for first filenum; .0 reserved for utility and alter
* [.129 - .255] for second filenum; .128 reserved for utility and alter
* [.257 - .283] for third filenum; .256 reserved for utility and alter
* etc upto (128 * MaxFileNumber(3200))
*
* Column rewrites use the filenum from pair (i, i+MaxAttributeNumber)
* where i is in range 1 to MaxAttributeNumber
* Algorithm is coded with the assumption for CO tables that for a given
* concurrency level, the relfiles exist OR stop existing for all columns thereafter.
* For instance, if .2 exists, then .(2 + 128N) MIGHT exist for N=1. But if it does
* not exist for N=1, then it doesn't exist for N>=2.
* concurrency level, the segfiles exist on one of the filenum pairs
* OR stop existing for all columns thereafter.
* For instance, if .2 exists, then .(2 + 128f) MIGHT exist for filenum f=1.
* But if it does not exist for f=1 OR f=1601 then it doesn't exist for f>=2.
*
* We can think of this function as operating on a two-dimensional array:
* column index x concurrency level. The operation is broken up into two
* steps:
*
* 1) Finds for which concurrency levels the table has files using [.1 - .127].
* 1) Finds for which concurrency levels the table has files using
* [.1 - .127] for filenumber = 1 and same for filenumber = 1601.
* Concurrency level 0 is always checked as its corresponding segno file
* must always exist. However, the caller is expected to handle the that
* file.
Expand All @@ -54,8 +58,8 @@
*
* Graphically, showing the step above that can possibly operate on each
* segment file:
* column
* 1 2 3 4 --- MaxHeapAttributeNumber
* filenumber
* 1 2 3 4 --- MaxFileNumber
* concurrency 0 x 2) 2) 2) 2)
* 1 1) 2) 2) 2) 2)
* 2 1) 2) 2) 2) 2)
Expand All @@ -66,36 +70,51 @@
void
ao_foreach_extent_file(ao_extent_callback callback, void *ctx)
{
int segno;
int colnum;
int concurrency[MAX_AOREL_CONCURRENCY];
int concurrencySize;
int segno;
int physicalsegno;
int physicalsegnopair;
int filenum;
int concurrency[MAX_AOREL_CONCURRENCY];
int concurrencySize;
bool segnofileexists;
bool segnopairfileexists;

/*
* We always check concurrency level 0 here as the 0 based extensions such
* as .128, .256, ... for CO tables are created by ALTER table or utility
* mode insert. These also need to be copied. Column 0 concurrency level 0
* file is always present and, as noted above, handled by our caller.
*/
concurrency[0] = 0;
concurrencySize = 1;
/*
* We always check concurrency level 0 here as the 0 based extensions such
* as .128, .256, ... for CO tables are created by ALTER table or utility
* mode insert. These also need to be copied. Column 0 concurrency level 0
* file is always present and, as noted above, handled by our caller.
*/
concurrency[0] = 0;
concurrencySize = 1;

/* discover any remaining concurrency levels */
for (segno = 1; segno < MAX_AOREL_CONCURRENCY; segno++)
{
if (!callback(segno, ctx))
continue;
concurrency[concurrencySize] = segno;
concurrencySize++;
}
/* discover any remaining concurrency levels */
for (segno = 1; segno < MAX_AOREL_CONCURRENCY; segno++)
{
/* For AOCO tables, each column has two possible file segnos from
* filenum pair (i, i+MaxHeapAttributeNumber). Check them both. */
physicalsegno = segno;
physicalsegnopair = MaxHeapAttributeNumber * AOTupleId_MultiplierSegmentFileNum + segno;
segnofileexists = callback(physicalsegno, ctx);
segnopairfileexists = callback(physicalsegnopair, ctx);
if (!(segnofileexists || segnopairfileexists))
continue;
concurrency[concurrencySize] = segno;
concurrencySize++;
}

for (int index = 0; index < concurrencySize; index++)
{
for (colnum = 1; colnum < MaxHeapAttributeNumber; colnum++)
{
segno = colnum * AOTupleId_MultiplierSegmentFileNum + concurrency[index];
if (!callback(segno, ctx))
break;
}
}
for (int index = 0; index < concurrencySize; index++)
{
for (filenum = 1; filenum < MaxHeapAttributeNumber; filenum++)
{
physicalsegno = filenum * AOTupleId_MultiplierSegmentFileNum + concurrency[index];
physicalsegnopair = (filenum + MaxHeapAttributeNumber) * AOTupleId_MultiplierSegmentFileNum + concurrency[index];
/* Call the callback function on both possible files in filenum pair */
segnofileexists = callback(physicalsegno, ctx);
segnopairfileexists = callback(physicalsegnopair, ctx);
/* If they both don't exist, that means none of the further ones exist */
if (!(segnofileexists || segnopairfileexists))
break;
}
}
}
34 changes: 23 additions & 11 deletions src/backend/access/appendonly/test/aomd_filehandler_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "access/aomd.h"
#include "access/appendonlytid.h"
#include "access/appendonlywriter.h"
#include "catalog/pg_attribute_encoding.h"

/*
* ACHTUNG This module is trickier than you might initially have expected
Expand All @@ -18,7 +19,7 @@
* combinations here as that is a higher-level test than this unit test.
*/

#define MAX_SEGNO_FILES (MAX_AOREL_CONCURRENCY * MaxHeapAttributeNumber)
#define MAX_SEGNO_FILES (MAX_AOREL_CONCURRENCY * MaxFileNumber)
typedef struct {
bool present[MAX_SEGNO_FILES];
bool call_result[MAX_SEGNO_FILES];
Expand All @@ -36,8 +37,12 @@ setup_test_structures(aomd_filehandler_callback_ctx *ctx)

/* these files get checked for presence in the foreach() */
ctx->call_expected[AOTupleId_MultiplierSegmentFileNum] = true;
ctx->call_expected[MaxHeapAttributeNumber * AOTupleId_MultiplierSegmentFileNum + AOTupleId_MultiplierSegmentFileNum] = true;
for (int segno = 1; segno < MAX_AOREL_CONCURRENCY; segno++)
{
ctx->call_expected[segno] = true;
ctx->call_expected[MaxHeapAttributeNumber * AOTupleId_MultiplierSegmentFileNum + segno] = true;
}
}

/*
Expand All @@ -51,7 +56,10 @@ set_ctx_for_present_file(aomd_filehandler_callback_ctx *ctx, int segno)
{
ctx->present[segno] = true;
if (segno < (MAX_SEGNO_FILES - MAX_AOREL_CONCURRENCY))
ctx->call_expected[segno + MAX_AOREL_CONCURRENCY] = true;
{
ctx->call_expected[segno + MAX_AOREL_CONCURRENCY] = true;
ctx->call_expected[MAX_AOREL_CONCURRENCY * (MaxHeapAttributeNumber + 1) + segno] = true;
}
}

static int
Expand Down Expand Up @@ -105,7 +113,7 @@ test_no_files_present(void **state)

ao_foreach_extent_file(file_callback, &ctx);

assert_int_equal(ctx.num_called, MAX_AOREL_CONCURRENCY);
assert_int_equal(ctx.num_called, 2 * MAX_AOREL_CONCURRENCY);
assert_int_equal(compareSegnoFiles(ctx.call_expected, ctx.call_result), 0);
}

Expand All @@ -121,7 +129,7 @@ test_co_1_column_1_concurrency(void **state)

ao_foreach_extent_file(file_callback, &ctx);

assert_int_equal(ctx.num_called, MAX_AOREL_CONCURRENCY + 1*1);
assert_int_equal(ctx.num_called, 2 * MAX_AOREL_CONCURRENCY + 2 * 1 * 1);
assert_int_equal(compareSegnoFiles(ctx.call_expected, ctx.call_result), 0);
}

Expand All @@ -137,7 +145,7 @@ test_co_4_columns_1_concurrency(void **state)

ao_foreach_extent_file(file_callback, &ctx);

assert_int_equal(ctx.num_called, MAX_AOREL_CONCURRENCY + 4*1);
assert_int_equal(ctx.num_called, 2 * MAX_AOREL_CONCURRENCY + 2 * 4 * 1);
assert_int_equal(compareSegnoFiles(ctx.call_expected, ctx.call_result), 0);
}

Expand All @@ -156,7 +164,7 @@ test_co_3_columns_2_concurrency(void **state)

ao_foreach_extent_file(file_callback, &ctx);

assert_int_equal(ctx.num_called, MAX_AOREL_CONCURRENCY + 3*2);
assert_int_equal(ctx.num_called, 2 * MAX_AOREL_CONCURRENCY + 2 * 3 * 2);
assert_int_equal(compareSegnoFiles(ctx.call_expected, ctx.call_result), 0);
}

Expand All @@ -172,7 +180,7 @@ test_co_1_column_127_concurrency(void **state)

ao_foreach_extent_file(file_callback, &ctx);

assert_int_equal(ctx.num_called, MAX_AOREL_CONCURRENCY + 1*127);
assert_int_equal(ctx.num_called, 2 * MAX_AOREL_CONCURRENCY + 2 * 1 * 127);
assert_int_equal(compareSegnoFiles(ctx.call_expected, ctx.call_result), 0);
}

Expand All @@ -186,10 +194,12 @@ test_co_max_columns_0th_concurrency(void **state)
for (int col = 0; col < MaxHeapAttributeNumber; col++)
set_ctx_for_present_file(&ctx, col * MAX_AOREL_CONCURRENCY);

ctx.call_expected[MAX_AOREL_CONCURRENCY * MaxHeapAttributeNumber] = false;

ao_foreach_extent_file(file_callback, &ctx);

/* 0th file already acccounted for, hence the -1 */
assert_int_equal(ctx.num_called, (MAX_AOREL_CONCURRENCY-1) + (MaxHeapAttributeNumber * 1 - 1));
assert_int_equal(ctx.num_called, (2 * MAX_AOREL_CONCURRENCY - 1) + (MaxHeapAttributeNumber * 2 - 1) - 1);
assert_int_equal(compareSegnoFiles(ctx.call_expected, ctx.call_result), 0);
}

Expand All @@ -205,10 +215,11 @@ test_co_max_columns_0_1_concurrency(void **state)
set_ctx_for_present_file(&ctx, col * MAX_AOREL_CONCURRENCY + 1);
}

ctx.call_expected[MAX_AOREL_CONCURRENCY * MaxHeapAttributeNumber] = false;
ao_foreach_extent_file(file_callback, &ctx);

/* 0th file already acccount for, hence the -1 */
assert_int_equal(ctx.num_called, (MAX_AOREL_CONCURRENCY-1) + (MaxHeapAttributeNumber - 1) * 2);
assert_int_equal(ctx.num_called, 2* ((2 * MAX_AOREL_CONCURRENCY - 1) + (MaxHeapAttributeNumber * 2 - 1)) - 1);
assert_int_equal(compareSegnoFiles(ctx.call_expected, ctx.call_result), 0);
}

Expand All @@ -226,7 +237,7 @@ test_different_number_of_columns_per_concurrency_level(void **state)

ao_foreach_extent_file(file_callback, &ctx);

assert_int_equal(ctx.num_called, MAX_AOREL_CONCURRENCY + 5);
assert_int_equal(ctx.num_called, 2 * MAX_AOREL_CONCURRENCY + 10);
assert_int_equal(compareSegnoFiles(ctx.call_expected, ctx.call_result), 0);
}

Expand All @@ -240,9 +251,10 @@ test_all_files_present(void **state)
memset(ctx.call_expected, true, sizeof(ctx.call_expected));

ctx.call_expected[0] = false; /* caller must deal with .0 file */
ctx.call_expected[MaxHeapAttributeNumber * AOTupleId_MultiplierSegmentFileNum] = false; /* caller must deal with this file */
ao_foreach_extent_file(file_callback, &ctx);

assert_int_equal(ctx.num_called, MAX_SEGNO_FILES - 1);
assert_int_equal(ctx.num_called, MAX_SEGNO_FILES - 2);
assert_int_equal(compareSegnoFiles(ctx.call_expected, ctx.call_result), 0);

return;
Expand Down

0 comments on commit 993fe9d

Please sign in to comment.