Skip to content

Commit

Permalink
Compute/LFC: Apply limits consistently (#10449)
Browse files Browse the repository at this point in the history
Otherwise we might hit ERRORs in otherwise safe situations (such as user
queries), which isn't a great user experience.

## Problem

#10376

## Summary of changes

Instead of accepting internal errors as acceptable, we ensure we don't
exceed our allocated usage.
  • Loading branch information
MMeent authored Jan 20, 2025
1 parent 72130d7 commit e781cf6
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 42 deletions.
110 changes: 69 additions & 41 deletions pgxn/neon/file_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -911,57 +911,85 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (entry->access_count++ == 0)
dlist_delete(&entry->list_node);
}
else
/*-----------
* If the chunk wasn't already in the LFC then we have these
* options, in order of preference:
*
* Unless there is no space available, we can:
* 1. Use an entry from the `holes` list, and
* 2. Create a new entry.
* We can always, regardless of space in the LFC:
* 3. evict an entry from LRU, and
* 4. ignore the write operation (the least favorite option)
*/
else if (lfc_ctl->used < lfc_ctl->limit)
{
/*
* We have two choices if all cache pages are pinned (i.e. used in IO
* operations):
*
* 1) Wait until some of this operation is completed and pages is
* unpinned.
*
* 2) Allocate one more chunk, so that specified cache size is more
* recommendation than hard limit.
*
* As far as probability of such event (that all pages are pinned) is
* considered to be very very small: there are should be very large
* number of concurrent IO operations and them are limited by
* max_connections, we prefer not to complicate code and use second
* approach.
*/
if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru));

for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1;
}
CriticalAssert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
neon_log(DEBUG2, "Swap file cache page");
}
else if (!dlist_is_empty(&lfc_ctl->holes))
if (!dlist_is_empty(&lfc_ctl->holes))
{
/* We can reuse a hole that was left behind when the LFC was shrunk previously */
FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->holes));
uint32 offset = hole->offset;
bool hole_found;

hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &hole_found);
FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node,
dlist_pop_head_node(&lfc_ctl->holes));
uint32 offset = hole->offset;
bool hole_found;

hash_search_with_hash_value(lfc_hash, &hole->key,
hole->hash, HASH_REMOVE, &hole_found);
CriticalAssert(hole_found);

lfc_ctl->used += 1;
entry->offset = offset; /* reuse the hole */
entry->offset = offset; /* reuse the hole */
}
else
{
lfc_ctl->used += 1;
entry->offset = lfc_ctl->size++; /* allocate new chunk at end
* of file */
entry->offset = lfc_ctl->size++;/* allocate new chunk at end
* of file */
}
}
/*
* We've already used up all allocated LFC entries.
*
* If we can clear an entry from the LRU, do that.
* If we can't (e.g. because all other slots are being accessed)
* then we will remove this entry from the hash and continue
* on to the next chunk, as we may not exceed the limit.
*/
else if (!dlist_is_empty(&lfc_ctl->lru))
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node,
dlist_pop_head_node(&lfc_ctl->lru));

for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
lfc_ctl->used_pages -= (victim->bitmap[i >> 5] >> (i & 31)) & 1;
}

CriticalAssert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */
hash_search_with_hash_value(lfc_hash, &victim->key,
victim->hash, HASH_REMOVE, NULL);
neon_log(DEBUG2, "Swap file cache page");
}
else
{
/* Can't add this chunk - we don't have the space for it */
hash_search_with_hash_value(lfc_hash, &entry->key, hash,
HASH_REMOVE, NULL);

/*
* We can't process this chunk due to lack of space in LFC,
* so skip to the next one
*/
LWLockRelease(lfc_lock);
blkno += blocks_in_chunk;
buf_offset += blocks_in_chunk;
nblocks -= blocks_in_chunk;
continue;
}

if (!found)
{
entry->access_count = 1;
entry->hash = hash;
memset(entry->bitmap, 0, sizeof entry->bitmap);
Expand Down
71 changes: 70 additions & 1 deletion test_runner/regress/test_local_file_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,78 @@
import time

import pytest
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.utils import USE_LFC, query_scalar

"""
Test whether LFC doesn't error out when the LRU is empty, but the LFC is
already at its maximum size.
If we don't handle this safely, we might allocate more hash entries than
otherwise considered safe, thus causing ERRORs in hash_search(HASH_ENTER) once
we hit lfc->used >= lfc->limit.
"""


@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_local_file_cache_all_pinned(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"neon.max_file_cache_size='1MB'",
"neon.file_cache_size_limit='1MB'",
],
)
top_cur = endpoint.connect().cursor()

stop = threading.Event()
n_rows = 10000
n_threads = 5
n_updates_per_connection = 1000

top_cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
top_cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")

# Start threads that will perform random UPDATEs. Each UPDATE
# increments the counter on the row, so that we can check at the
# end that the sum of all the counters match the number of updates
# performed (plus the initial 1 on each row).
#
# Furthermore, each thread will reconnect between every 1000 updates.
def run_updates(n_updates_performed_q: queue.Queue[int]):
n_updates_performed = 0
conn = endpoint.connect()
cur = conn.cursor()
while not stop.is_set():
id = random.randint(1, n_rows)
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
n_updates_performed += 1
if n_updates_performed % n_updates_per_connection == 0:
cur.close()
conn.close()
conn = endpoint.connect()
cur = conn.cursor()
n_updates_performed_q.put(n_updates_performed)

n_updates_performed_q: queue.Queue[int] = queue.Queue()
threads: list[threading.Thread] = []
for _i in range(n_threads):
thread = threading.Thread(target=run_updates, args=(n_updates_performed_q,), daemon=True)
thread.start()
threads.append(thread)

time.sleep(15)

stop.set()

n_updates_performed = 0
for thread in threads:
thread.join()
n_updates_performed += n_updates_performed_q.get()

assert query_scalar(top_cur, "SELECT SUM(n) FROM lfctest") == n_rows + n_updates_performed


@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
Expand Down

1 comment on commit e781cf6

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7510 tests run: 7118 passed, 1 failed, 391 skipped (full report)


Failures on Postgres 16

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_layer_map[release-pg16-github-actions-selfhosted]"
Flaky tests (4)

Postgres 17

Postgres 16

Code coverage* (full report)

  • functions: 33.6% (8446 of 25104 functions)
  • lines: 49.2% (70723 of 143866 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
e781cf6 at 2025-01-20T20:51:40.535Z :recycle:

Please sign in to comment.