From 071142f4f4de90d3cc6dc6efffdd1072688856b3 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sun, 19 Jan 2025 08:54:57 +0200 Subject: [PATCH] Store prefetch results in LFC cache once as soon as they are received --- pgxn/neon/file_cache.c | 471 ++++++++++++++++++++++++++--------- pgxn/neon/neon.c | 2 + pgxn/neon/neon.h | 2 + pgxn/neon/pagestore_client.h | 3 + pgxn/neon/pagestore_smgr.c | 7 +- 5 files changed, 367 insertions(+), 118 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 64b236061ddd..3663c09d530c 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -22,6 +22,7 @@ #include "neon_pgversioncompat.h" #include "access/parallel.h" +#include "access/xlog.h" #include "funcapi.h" #include "miscadmin.h" #include "pagestore_client.h" @@ -93,7 +94,23 @@ #define MB ((uint64)1024*1024) #define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK)) -#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32) + +/* + * Blocks are read or written to LFC file outside LFC critical section. + * To synchronize access to such block, writer set state of such block to PENDING. + * If some other backend (read or writer) see PENDING status, it change it to REQUESTED and start + * waiting until status is changed on conditional variable. + * When writer completes is operation, it checks if status is REQUESTED and if so, broadcast conditional variable, + * waking up all backend waiting for access to this block. + */ +typedef enum FileCacheBlockState +{ + UNAVAILABLE, /* block is not present in cache */ + AVAILABLE, /* block can be used */ + PENDING, /* block is loaded */ + REQUESTED /* some other backend is waiting for block to be loaded */ +} FileCacheBlockState; + typedef struct FileCacheEntry { @@ -101,10 +118,16 @@ typedef struct FileCacheEntry uint32 hash; uint32 offset; uint32 access_count; - uint32 bitmap[CHUNK_BITMAP_SIZE]; + uint32 state[(BLOCKS_PER_CHUNK + 31) / 32 * 2]; /* two bits per block */ dlist_node list_node; /* LRU/holes list node */ } FileCacheEntry; +#define GET_STATE(entry, i) (((entry)->state[(i) / 16] >> ((i) % 16 * 2)) & 3) +#define SET_STATE(entry, i, new_state) (entry)->state[(i) / 16] = ((entry)->state[(i) / 16] & ~(3 << ((i) % 16 * 2))) | ((new_state) << ((i) % 16 * 2)) + +#define N_COND_VARS 64 +#define CV_WAIT_TIMEOUT 10 + typedef struct FileCacheControl { uint64 generation; /* generation is needed to handle correct hash @@ -122,6 +145,7 @@ typedef struct FileCacheControl * algorithm */ dlist_head holes; /* double linked list of punched holes */ HyperLogLogState wss_estimation; /* estimation of working set size */ + ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */ } FileCacheControl; static HTAB *lfc_hash; @@ -186,6 +210,9 @@ lfc_disable(char const *op) if (rc < 0) elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path); } + /* Wakeup waiting backends */ + for (int i = 0; i < N_COND_VARS; i++) + ConditionVariableBroadcast(&lfc_ctl->cv[i]); } /* @@ -293,6 +320,11 @@ lfc_shmem_startup(void) close(fd); lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit); } + + /* Initialize turnstile of condition variables */ + for (int i = 0; i < N_COND_VARS; i++) + ConditionVariableInit(&lfc_ctl->cv[i]); + } LWLockRelease(AddinShmemInitLock); } @@ -367,7 +399,7 @@ lfc_change_limit_hook(int newval, void *extra) /* We remove the old entry, and re-enter a hole to the hash table */ for (int i = 0; i < BLOCKS_PER_CHUNK; i++) { - lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1; + lfc_ctl->used_pages -= GET_STATE(victim, i) == AVAILABLE; } hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); @@ -480,7 +512,7 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) if (LFC_ENABLED()) { entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); - found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0; + found = entry != NULL && GET_STATE(entry, chunk_offs) == AVAILABLE; } LWLockRelease(lfc_lock); return found; @@ -526,8 +558,7 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, { for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++) { - if ((entry->bitmap[chunk_offs >> 5] & - (1 << (chunk_offs & 31))) != 0) + if (GET_STATE(entry, chunk_offs) == AVAILABLE) { BITMAP_SET(bitmap, i); found++; @@ -620,7 +651,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) } /* remove the page from the cache */ - entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1))); + SET_STATE(entry, chunk_offs, UNAVAILABLE); if (entry->access_count == 0) { @@ -628,28 +659,25 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) * If the chunk has no live entries, we can position the chunk to be * recycled first. */ - if (entry->bitmap[chunk_offs >> 5] == 0) - { - bool has_remaining_pages = false; + bool has_remaining_pages = false; - for (int i = 0; i < CHUNK_BITMAP_SIZE; i++) + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + { + if (GET_STATE(entry, i) != UNAVAILABLE) { - if (entry->bitmap[i] != 0) - { - has_remaining_pages = true; - break; - } + has_remaining_pages = true; + break; } + } - /* - * Put the entry at the position that is first to be reclaimed when we - * have no cached pages remaining in the chunk - */ - if (!has_remaining_pages) - { - dlist_delete(&entry->list_node); - dlist_push_head(&lfc_ctl->lru, &entry->list_node); - } + /* + * Put the entry at the position that is first to be reclaimed when we + * have no cached pages remaining in the chunk + */ + if (!has_remaining_pages) + { + dlist_delete(&entry->list_node); + dlist_push_head(&lfc_ctl->lru, &entry->list_node); } } @@ -698,7 +726,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); - /* + /* * For every chunk that has blocks we're interested in, we * 1. get the chunk header * 2. Check if the chunk actually has the blocks we're interested in @@ -715,6 +743,8 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, int iteration_hits = 0; int iteration_misses = 0; uint64 io_time_us = 0; + ConditionVariable* cv; + Assert(blocks_in_chunk > 0); for (int i = 0; i < blocks_in_chunk; i++) @@ -725,6 +755,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, tag.blockNum = blkno - chunk_offs; hash = get_hash_value(lfc_hash, &tag); + cv = &lfc_ctl->cv[hash % N_COND_VARS]; LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -766,15 +797,32 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, generation = lfc_ctl->generation; entry_offset = entry->offset; - LWLockRelease(lfc_lock); - for (int i = 0; i < blocks_in_chunk; i++) { - /* - * If the page is valid, we consider it "read". - * All other pages will be fetched separately by the next cache - */ - if (entry->bitmap[(chunk_offs + i) / 32] & (1 << ((chunk_offs + i) % 32))) + FileCacheBlockState state = UNAVAILABLE; + bool sleeping = false; + while (lfc_ctl->generation == generation) + { + state = GET_STATE(entry, chunk_offs + i); + if (state == PENDING) { + SET_STATE(entry, chunk_offs + i, REQUESTED); + } else if (state != REQUESTED) { + break; + } + if (!sleeping) + { + ConditionVariablePrepareToSleep(cv); + sleeping = true; + } + LWLockRelease(lfc_lock); + ConditionVariableTimedSleep(cv, CV_WAIT_TIMEOUT, WAIT_EVENT_NEON_LFC_CV_WAIT); + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + } + if (sleeping) + { + ConditionVariableCancelSleep(); + } + if (state == AVAILABLE) { BITMAP_SET(mask, buf_offset + i); iteration_hits++; @@ -782,6 +830,7 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, else iteration_misses++; } + LWLockRelease(lfc_lock); Assert(iteration_hits + iteration_misses > 0); @@ -838,6 +887,224 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, return blocks_read; } +/* + * Initialize new LFC hash entry, perform eviction if needed. + * Returns false if there are no unpinned entries and chunk can not be added. + */ +static bool +lfc_init_new_entry(FileCacheEntry* entry, uint32 hash) +{ + /*----------- + * If the chunk wasn't already in the LFC then we have these + * options, in order of preference: + * + * Unless there is no space available, we can: + * 1. Use an entry from the `holes` list, and + * 2. Create a new entry. + * We can always, regardless of space in the LFC: + * 3. evict an entry from LRU, and + * 4. ignore the write operation (the least favorite option) + */ + if (lfc_ctl->used < lfc_ctl->limit) + { + if (!dlist_is_empty(&lfc_ctl->holes)) + { + /* We can reuse a hole that was left behind when the LFC was shrunk previously */ + FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, + dlist_pop_head_node(&lfc_ctl->holes)); + uint32 offset = hole->offset; + bool hole_found; + + hash_search_with_hash_value(lfc_hash, &hole->key, + hole->hash, HASH_REMOVE, &hole_found); + CriticalAssert(hole_found); + + lfc_ctl->used += 1; + entry->offset = offset; /* reuse the hole */ + } + else + { + lfc_ctl->used += 1; + entry->offset = lfc_ctl->size++;/* allocate new chunk at end + * of file */ + } + } + /* + * We've already used up all allocated LFC entries. + * + * If we can clear an entry from the LRU, do that. + * If we can't (e.g. because all other slots are being accessed) + * then we will remove this entry from the hash and continue + * on to the next chunk, as we may not exceed the limit. + */ + else if (!dlist_is_empty(&lfc_ctl->lru)) + { + /* Cache overflow: evict least recently used chunk */ + FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, + dlist_pop_head_node(&lfc_ctl->lru)); + + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + { + lfc_ctl->used_pages -= GET_STATE(victim, i) == AVAILABLE; + } + + CriticalAssert(victim->access_count == 0); + entry->offset = victim->offset; /* grab victim's chunk */ + hash_search_with_hash_value(lfc_hash, &victim->key, + victim->hash, HASH_REMOVE, NULL); + neon_log(DEBUG2, "Swap file cache page"); + } + else + { + /* Can't add this chunk - we don't have the space for it */ + hash_search_with_hash_value(lfc_hash, &entry->key, hash, + HASH_REMOVE, NULL); + + return false; + } + + entry->access_count = 1; + entry->hash = hash; + + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + SET_STATE(entry, i, UNAVAILABLE); + + return true; +} + +/* + * Store received prefetch result in LFC cache. + * Unlike lfc_read/lfc_write this call is is not protected by shared buffer lock. + * So we should be ready that other backends will try to concurrently read or write this block. + * We do not store prefetched block if it already exists in LFC or it's not_modified_since LSN is smaller than current + * last written LSN. + */ +void +lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, + const void* buffer, XLogRecPtr lsn) +{ + BufferTag tag; + FileCacheEntry *entry; + ssize_t rc; + bool found; + uint32 hash; + uint64 generation; + uint32 entry_offset; + instr_time io_start, io_end; + ConditionVariable* cv; + FileCacheBlockState state; + int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); + + if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */ + return; + + if (!lfc_ensure_opened()) + return; + + CopyNRelFileInfoToBufTag(tag, rinfo); + tag.forkNum = forknum; + + CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); + + tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); + hash = get_hash_value(lfc_hash, &tag); + cv = &lfc_ctl->cv[hash % N_COND_VARS]; + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (!LFC_ENABLED()) + { + LWLockRelease(lfc_lock); + return; + } + if (GetLastWrittenLSN(rinfo, forknum, blkno) > lsn) + { + LWLockRelease(lfc_lock); + return; + } + + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); + + if (found) + { + state = GET_STATE(entry, chunk_offs); + if (state != UNAVAILABLE) { + /* Do not rewrite existed LFC entry */ + LWLockRelease(lfc_lock); + return; + } + /* + * Unlink entry from LRU list to pin it for the duration of IO + * operation + */ + if (entry->access_count++ == 0) + dlist_delete(&entry->list_node); + } + else + { + if (!lfc_init_new_entry(entry, hash)) + { + /* + * We can't process this chunk due to lack of space in LFC, + * so skip to the next one + */ + LWLockRelease(lfc_lock); + return; + } + } + + generation = lfc_ctl->generation; + entry_offset = entry->offset; + + SET_STATE(entry, chunk_offs, PENDING); + + LWLockRelease(lfc_lock); + + pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE); + INSTR_TIME_SET_CURRENT(io_start); + rc = pwrite(lfc_desc, buffer, BLCKSZ, + ((off_t) entry_offset * BLOCKS_PER_CHUNK + chunk_offs) * BLCKSZ); + INSTR_TIME_SET_CURRENT(io_end); + pgstat_report_wait_end(); + + if (rc != BLCKSZ) + { + lfc_disable("write"); + } + else + { + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + + if (lfc_ctl->generation == generation) + { + uint64 time_spent_us; + CriticalAssert(LFC_ENABLED()); + /* Place entry to the head of LRU list */ + CriticalAssert(entry->access_count > 0); + + lfc_ctl->writes += 1; + INSTR_TIME_SUBTRACT(io_start, io_end); + time_spent_us = INSTR_TIME_GET_MICROSEC(io_start); + lfc_ctl->time_write += time_spent_us; + inc_page_cache_write_wait(time_spent_us); + + if (--entry->access_count == 0) + dlist_push_tail(&lfc_ctl->lru, &entry->list_node); + + state = GET_STATE(entry, chunk_offs); + if (state == REQUESTED) { + ConditionVariableBroadcast(cv); + } + if (state != AVAILABLE) + { + lfc_ctl->used_pages += 1; + SET_STATE(entry, chunk_offs, AVAILABLE); + } + } + LWLockRelease(lfc_lock); + } +} + /* * Put page in local file cache. * If cache is full then evict some other page. @@ -881,6 +1148,8 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, int chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1); int blocks_in_chunk = Min(nblocks, BLOCKS_PER_CHUNK - (blkno % BLOCKS_PER_CHUNK)); instr_time io_start, io_end; + ConditionVariable* cv; + Assert(blocks_in_chunk > 0); for (int i = 0; i < blocks_in_chunk; i++) @@ -891,6 +1160,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); hash = get_hash_value(lfc_hash, &tag); + cv = &lfc_ctl->cv[hash % N_COND_VARS]; LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -911,92 +1181,52 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (entry->access_count++ == 0) dlist_delete(&entry->list_node); } - /*----------- - * If the chunk wasn't already in the LFC then we have these - * options, in order of preference: - * - * Unless there is no space available, we can: - * 1. Use an entry from the `holes` list, and - * 2. Create a new entry. - * We can always, regardless of space in the LFC: - * 3. evict an entry from LRU, and - * 4. ignore the write operation (the least favorite option) - */ - else if (lfc_ctl->used < lfc_ctl->limit) - { - if (!dlist_is_empty(&lfc_ctl->holes)) - { - /* We can reuse a hole that was left behind when the LFC was shrunk previously */ - FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, - dlist_pop_head_node(&lfc_ctl->holes)); - uint32 offset = hole->offset; - bool hole_found; - - hash_search_with_hash_value(lfc_hash, &hole->key, - hole->hash, HASH_REMOVE, &hole_found); - CriticalAssert(hole_found); - - lfc_ctl->used += 1; - entry->offset = offset; /* reuse the hole */ - } - else - { - lfc_ctl->used += 1; - entry->offset = lfc_ctl->size++;/* allocate new chunk at end - * of file */ - } - } - /* - * We've already used up all allocated LFC entries. - * - * If we can clear an entry from the LRU, do that. - * If we can't (e.g. because all other slots are being accessed) - * then we will remove this entry from the hash and continue - * on to the next chunk, as we may not exceed the limit. - */ - else if (!dlist_is_empty(&lfc_ctl->lru)) + else { - /* Cache overflow: evict least recently used chunk */ - FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, - dlist_pop_head_node(&lfc_ctl->lru)); - - for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + if (!lfc_init_new_entry(entry, hash)) { - lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1; + /* + * We can't process this chunk due to lack of space in LFC, + * so skip to the next one + */ + LWLockRelease(lfc_lock); + blkno += blocks_in_chunk; + buf_offset += blocks_in_chunk; + nblocks -= blocks_in_chunk; + continue; } - - CriticalAssert(victim->access_count == 0); - entry->offset = victim->offset; /* grab victim's chunk */ - hash_search_with_hash_value(lfc_hash, &victim->key, - victim->hash, HASH_REMOVE, NULL); - neon_log(DEBUG2, "Swap file cache page"); } - else - { - /* Can't add this chunk - we don't have the space for it */ - hash_search_with_hash_value(lfc_hash, &entry->key, hash, - HASH_REMOVE, NULL); - /* - * We can't process this chunk due to lack of space in LFC, - * so skip to the next one - */ - LWLockRelease(lfc_lock); - blkno += blocks_in_chunk; - buf_offset += blocks_in_chunk; - nblocks -= blocks_in_chunk; - continue; - } + generation = lfc_ctl->generation; + entry_offset = entry->offset; - if (!found) + for (int i = 0; i < blocks_in_chunk; i++) { - entry->access_count = 1; - entry->hash = hash; - memset(entry->bitmap, 0, sizeof entry->bitmap); + FileCacheBlockState state = UNAVAILABLE; + bool sleeping = false; + while (lfc_ctl->generation == generation) + { + state = GET_STATE(entry, chunk_offs + i); + if (state == PENDING) { + SET_STATE(entry, chunk_offs + i, REQUESTED); + } else if (state != REQUESTED) { + SET_STATE(entry, chunk_offs + i, PENDING); + break; + } + if (!sleeping) + { + ConditionVariablePrepareToSleep(cv); + sleeping = true; + } + LWLockRelease(lfc_lock); + ConditionVariableTimedSleep(cv, CV_WAIT_TIMEOUT, WAIT_EVENT_NEON_LFC_CV_WAIT); + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + } + if (sleeping) + { + ConditionVariableCancelSleep(); + } } - - generation = lfc_ctl->generation; - entry_offset = entry->offset; LWLockRelease(lfc_lock); pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE); @@ -1032,9 +1262,16 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, for (int i = 0; i < blocks_in_chunk; i++) { - lfc_ctl->used_pages += 1 - ((entry->bitmap[(chunk_offs + i) >> 5] >> ((chunk_offs + i) & 31)) & 1); - entry->bitmap[(chunk_offs + i) >> 5] |= - (1 << ((chunk_offs + i) & 31)); + FileCacheBlockState state = GET_STATE(entry, chunk_offs + i); + if (state == REQUESTED) + { + ConditionVariableBroadcast(cv); + } + if (state != AVAILABLE) + { + lfc_ctl->used_pages += 1; + SET_STATE(entry, chunk_offs + i, AVAILABLE); + } } } @@ -1253,8 +1490,8 @@ local_cache_pages(PG_FUNCTION_ARGS) hash_seq_init(&status, lfc_hash); while ((entry = hash_seq_search(&status)) != NULL) { - for (int i = 0; i < CHUNK_BITMAP_SIZE; i++) - n_pages += pg_popcount32(entry->bitmap[i]); + for (int i = 0; i < BLOCKS_PER_CHUNK; i++) + n_pages += GET_STATE(entry, i) == AVAILABLE; } } } @@ -1282,7 +1519,7 @@ local_cache_pages(PG_FUNCTION_ARGS) { for (int i = 0; i < BLOCKS_PER_CHUNK; i++) { - if (entry->bitmap[i >> 5] & (1 << (i & 31))) + if (GET_STATE(entry, i) == AVAILABLE) { fctx->record[n].pageoffs = entry->offset * BLOCKS_PER_CHUNK + i; fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key)); diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index ff08f9164dc3..1b5aebf0c979 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -55,6 +55,7 @@ uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE; uint32 WAIT_EVENT_NEON_LFC_READ; uint32 WAIT_EVENT_NEON_LFC_TRUNCATE; uint32 WAIT_EVENT_NEON_LFC_WRITE; +uint32 WAIT_EVENT_NEON_LFC_CV_WAIT; uint32 WAIT_EVENT_NEON_PS_STARTING; uint32 WAIT_EVENT_NEON_PS_CONFIGURING; uint32 WAIT_EVENT_NEON_PS_SEND; @@ -528,6 +529,7 @@ neon_shmem_startup_hook(void) WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read"); WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate"); WAIT_EVENT_NEON_LFC_WRITE = WaitEventExtensionNew("Neon/FileCache_Write"); + WAIT_EVENT_NEON_LFC_CV_WAIT = WaitEventExtensionNew("Neon/FileCache_CvWait"); WAIT_EVENT_NEON_PS_STARTING = WaitEventExtensionNew("Neon/PS_Starting"); WAIT_EVENT_NEON_PS_CONFIGURING = WaitEventExtensionNew("Neon/PS_Configuring"); WAIT_EVENT_NEON_PS_SEND = WaitEventExtensionNew("Neon/PS_SendIO"); diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index 79aa88b8d36b..912e09c3d3ef 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -28,6 +28,7 @@ extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE; extern uint32 WAIT_EVENT_NEON_LFC_READ; extern uint32 WAIT_EVENT_NEON_LFC_TRUNCATE; extern uint32 WAIT_EVENT_NEON_LFC_WRITE; +extern uint32 WAIT_EVENT_NEON_LFC_CV_WAIT; extern uint32 WAIT_EVENT_NEON_PS_STARTING; extern uint32 WAIT_EVENT_NEON_PS_CONFIGURING; extern uint32 WAIT_EVENT_NEON_PS_SEND; @@ -38,6 +39,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL; #define WAIT_EVENT_NEON_LFC_READ WAIT_EVENT_BUFFILE_READ #define WAIT_EVENT_NEON_LFC_TRUNCATE WAIT_EVENT_BUFFILE_TRUNCATE #define WAIT_EVENT_NEON_LFC_WRITE WAIT_EVENT_BUFFILE_WRITE +#define WAIT_EVENT_NEON_LFC_CV_WAIT WAIT_EVENT_BUFFILE_READ #define WAIT_EVENT_NEON_PS_STARTING PG_WAIT_EXTENSION #define WAIT_EVENT_NEON_PS_CONFIGURING PG_WAIT_EXTENSION #define WAIT_EVENT_NEON_PS_SEND PG_WAIT_EXTENSION diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 7b748d7252d9..1b1a992d1307 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -303,6 +303,9 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, int nblocks, bits8 *bitmap); extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno); extern void lfc_init(void); +extern void lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, + const void* buffer, XLogRecPtr lsn); + static inline bool lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 54cacea98448..ab1e9661d098 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -714,6 +714,12 @@ prefetch_read(PrefetchRequest *slot) /* update slot state */ slot->status = PRFS_RECEIVED; slot->response = response; + + if (response->tag == T_NeonGetPageResponse) + { + /* store prefetched result in LFC */ + lfc_prefetch(BufTagGetNRelFileInfo(buftag), buftag.forkNum, buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since); + } return true; } else @@ -3106,7 +3112,6 @@ neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_block } } memcpy(buffer, getpage_resp->page, BLCKSZ); - lfc_write(rinfo, forkNum, blockno, buffer); break; } case T_NeonErrorResponse: