Skip to content

Commit

Permalink
AtomicDict now uses regular linear probing instead of Robin Hood ha…
Browse files Browse the repository at this point in the history
…shing
  • Loading branch information
dpdani committed Feb 4, 2025
1 parent df99f2e commit 3d4ec2a
Show file tree
Hide file tree
Showing 16 changed files with 133 additions and 1,445 deletions.
2 changes: 0 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ Python3_add_library(_cereggii MODULE
"cereggii/atomic_dict/meta.c"
"cereggii/atomic_dict/migrate.c"
"cereggii/atomic_dict/node_ops.c"
"cereggii/atomic_dict/node_sizes_table.c"
"cereggii/atomic_dict/robin_hood.c"
"cereggii/atomic_int/atomic_int.c"
"cereggii/atomic_int/handle.c"
"cereggii/atomic_event.c"
Expand Down
1 change: 0 additions & 1 deletion src/cereggii/_cereggii.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ class AtomicDict:
`AtomicDict`. This can greatly reduce contention when the keys in the input are repeated.
"""

def compact(self) -> None: ...
def _debug(self) -> dict:
"""
Provide some debugging information.
Expand Down
36 changes: 9 additions & 27 deletions src/cereggii/atomic_dict/atomic_dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ PyObject *
AtomicDict_new(PyTypeObject *type, PyObject *Py_UNUSED(args), PyObject *Py_UNUSED(kwds))
{
AtomicDict *self = NULL;
self = PyObject_GC_New(AtomicDict, &AtomicDict_Type);
self = PyObject_GC_New(AtomicDict, type);
if (self != NULL) {
self->metadata = NULL;
self->metadata = (AtomicRef *) AtomicRef_new(&AtomicRef_Type, NULL, NULL);
Expand Down Expand Up @@ -327,27 +327,18 @@ AtomicDict_UnsafeInsert(AtomicDict_Meta *meta, Py_hash_t hash, uint64_t pos)
.index = pos,
.tag = hash,
};
uint64_t ix = AtomicDict_Distance0Of(hash, meta);
const uint64_t d0 = AtomicDict_Distance0Of(hash, meta);

for (int probe = 0; probe < meta->max_distance; probe++) {
AtomicDict_ReadNodeAt((ix + probe) % meta->size, &temp, meta);
for (uint64_t distance = 0; distance < SIZE_OF(meta); distance++) {
AtomicDict_ReadNodeAt((d0 + distance) & (SIZE_OF(meta) - 1), &temp, meta);

if (temp.node == 0) {
node.distance = probe;
AtomicDict_WriteNodeAt((ix + probe) % meta->size, &node, meta);
AtomicDict_WriteNodeAt((d0 + distance) & (SIZE_OF(meta) - 1), &node, meta);
goto done;
}

if (temp.distance < probe) {
// non-atomic robin hood
node.distance = probe;
AtomicDict_WriteNodeAt((ix + probe) % meta->size, &node, meta);
ix = ix + probe - temp.distance;
probe = temp.distance;
node = temp;
}
}
// probes exhausted

// full
return -1;
done:
return 0;
Expand Down Expand Up @@ -548,18 +539,9 @@ AtomicDict_Debug(AtomicDict *self)
PyObject *block_info = NULL;

meta = (AtomicDict_Meta *) AtomicRef_Get(self->metadata);
metadata = Py_BuildValue("{sOsOsOsOsOsOsOsOsOsOsOsOsOsOsO}",
metadata = Py_BuildValue("{sOsOsOsOsOsO}",
"log_size\0", Py_BuildValue("B", meta->log_size),
"generation\0", Py_BuildValue("n", (Py_ssize_t) meta->generation),
"node_size\0", Py_BuildValue("B", meta->node_size),
"distance_size\0", Py_BuildValue("B", meta->distance_size),
"tag_size\0", Py_BuildValue("B", meta->tag_size),
"node_mask\0", Py_BuildValue("k", meta->node_mask),
"index_mask\0", Py_BuildValue("k", meta->index_mask),
"distance_mask\0", Py_BuildValue("k", meta->distance_mask),
"tag_mask\0", Py_BuildValue("k", meta->tag_mask),
"tombstone\0", Py_BuildValue("k", meta->tombstone.node),
"is_compact\0", Py_BuildValue("B", meta->is_compact),
"inserting_block\0", Py_BuildValue("l", meta->inserting_block),
"greatest_allocated_block\0", Py_BuildValue("l", meta->greatest_allocated_block),
"greatest_deleted_block\0", Py_BuildValue("l", meta->greatest_deleted_block),
Expand All @@ -572,7 +554,7 @@ AtomicDict_Debug(AtomicDict *self)
goto fail;

AtomicDict_Node node;
for (uint64_t i = 0; i < meta->size; i++) {
for (uint64_t i = 0; i < SIZE_OF(meta); i++) {
AtomicDict_ReadNodeAt(i, &node, meta);
PyObject *n = Py_BuildValue("k", node.node);
if (n == NULL)
Expand Down
14 changes: 7 additions & 7 deletions src/cereggii/atomic_dict/blocks.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ AtomicDictBlock_traverse(AtomicDict_Block *self, visitproc visit, void *arg)
for (int i = 0; i < ATOMIC_DICT_ENTRIES_IN_BLOCK; ++i) {
entry = self->entries[i];

if (entry.value == NULL || entry.flags & ENTRY_FLAGS_TOMBSTONE || entry.flags & ENTRY_FLAGS_SWAPPED)
if (entry.value == NULL)
continue;

Py_VISIT(entry.key);
Expand All @@ -50,7 +50,7 @@ AtomicDictBlock_clear(AtomicDict_Block *self)
for (int i = 0; i < ATOMIC_DICT_ENTRIES_IN_BLOCK; ++i) {
entry = self->entries[i];

if (entry.flags & ENTRY_FLAGS_TOMBSTONE || entry.flags & ENTRY_FLAGS_SWAPPED)
if (entry.value == NULL)
continue;

self->entries[i].key = NULL;
Expand Down Expand Up @@ -108,10 +108,10 @@ AtomicDict_GetEmptyEntry(AtomicDict *self, AtomicDict_Meta *meta, AtomicDict_Res
CereggiiAtomic_CompareExchangeInt64(&meta->inserting_block, inserting_block, inserting_block + 1);
goto reserve_in_inserting_block; // even if the above CAS fails
}
if (greatest_allocated_block + 1 >= meta->size >> ATOMIC_DICT_LOG_ENTRIES_IN_BLOCK) {
if (greatest_allocated_block + 1 >= SIZE_OF(meta) >> ATOMIC_DICT_LOG_ENTRIES_IN_BLOCK) {
return 0; // must grow
}
assert(greatest_allocated_block + 1 <= meta->size >> ATOMIC_DICT_LOG_ENTRIES_IN_BLOCK);
assert(greatest_allocated_block + 1 <= SIZE_OF(meta) >> ATOMIC_DICT_LOG_ENTRIES_IN_BLOCK);

AtomicDict_Block *block = NULL;
block = AtomicDictBlock_New(meta);
Expand All @@ -121,7 +121,7 @@ AtomicDict_GetEmptyEntry(AtomicDict *self, AtomicDict_Meta *meta, AtomicDict_Res
block->entries[0].flags = ENTRY_FLAGS_RESERVED;

if (CereggiiAtomic_CompareExchangePtr((void **) &meta->blocks[greatest_allocated_block + 1], NULL, block)) {
if (greatest_allocated_block + 2 < meta->size >> ATOMIC_DICT_LOG_ENTRIES_IN_BLOCK) {
if (greatest_allocated_block + 2 < SIZE_OF(meta) >> ATOMIC_DICT_LOG_ENTRIES_IN_BLOCK) {
CereggiiAtomic_StorePtr((void **) &meta->blocks[greatest_allocated_block + 2], NULL);
}
CereggiiAtomic_CompareExchangeInt64(&meta->greatest_allocated_block,
Expand All @@ -144,7 +144,7 @@ AtomicDict_GetEmptyEntry(AtomicDict *self, AtomicDict_Meta *meta, AtomicDict_Res
done:
assert(entry_loc->entry != NULL);
assert(entry_loc->entry->key == NULL);
assert(entry_loc->location < meta->size);
assert(entry_loc->location < SIZE_OF(meta));
return 1;
fail:
entry_loc->entry = NULL;
Expand Down Expand Up @@ -178,7 +178,7 @@ AtomicDict_ReadEntry(AtomicDict_Entry *entry_p, AtomicDict_Entry *entry)
{
entry->flags = entry_p->flags;
entry->value = entry_p->value;
if (entry->value == NULL || entry->flags & ENTRY_FLAGS_TOMBSTONE || entry->flags & ENTRY_FLAGS_SWAPPED) {
if (entry->value == NULL) {
entry->key = NULL;
entry->value = NULL;
entry->hash = -1;
Expand Down
138 changes: 6 additions & 132 deletions src/cereggii/atomic_dict/delete.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,11 @@
#include "atomic_ops.h"


inline int
AtomicDict_IncrementGreatestDeletedBlock(AtomicDict_Meta *meta, int64_t gab, int64_t gdb)
{
CereggiiAtomic_CompareExchangeInt64(&meta->greatest_deleted_block, gdb, gdb + 1);

if ((gab - gdb + meta->greatest_refilled_block) * ATOMIC_DICT_ENTRIES_IN_BLOCK <= meta->size * 1 / 3) {
return 1;
}

return 0;
}

int
AtomicDict_Delete(AtomicDict_Meta *meta, PyObject *key, Py_hash_t hash)
{
AtomicDict_SearchResult result;
AtomicDict_Lookup(meta, key, hash, &result);
int should_shrink = 0;

if (result.error)
goto fail;
Expand All @@ -44,120 +31,13 @@ AtomicDict_Delete(AtomicDict_Meta *meta, PyObject *key, Py_hash_t hash)
Py_DECREF(result.entry.value);
result.entry.value = NULL;

// do {
// if (CereggiiAtomic_CompareExchangeUInt8(
// &result.entry_p->flags,
// result.entry.flags,
// result.entry.flags | ENTRY_FLAGS_TOMBSTONE
// )) {
// result.entry.flags |= ENTRY_FLAGS_TOMBSTONE;
// } else {
// // what if swapped?
// AtomicDict_ReadEntry(result.entry_p, &result.entry);
// }
// } while (!(result.entry.flags & ENTRY_FLAGS_TOMBSTONE));
//
// uint64_t entry_ix = result.node.index;
// AtomicDict_BufferedNodeReader reader;
// AtomicDict_Node temp[16];
// int begin_write, end_write;
//
// do {
// AtomicDict_LookupEntry(meta, entry_ix, hash, &result);
// assert(!result.error);
// assert(result.found);
// reader.zone = -1;
// AtomicDict_ReadNodesFromZoneStartIntoBuffer(result.position, &reader, meta);
// AtomicDict_CopyNodeBuffers(reader.buffer, temp);
// AtomicDict_RobinHoodDelete(meta, temp, reader.idx_in_buffer);
// AtomicDict_ComputeBeginEndWrite(meta, reader.buffer, temp, &begin_write, &end_write);
// } while (!AtomicDict_AtomicWriteNodesAt(result.position - reader.idx_in_buffer + begin_write,
// end_write - begin_write,
// &reader.buffer[begin_write], &temp[begin_write], meta));
//
uint64_t block_num;
int64_t gab, gdb;
AtomicDict_EntryLoc swap_loc;
AtomicDict_Entry swap;
//
// recycle_entry:
block_num = AtomicDict_BlockOf(result.node.index);
gab = meta->greatest_allocated_block;
gdb = meta->greatest_deleted_block;
//
// if (gdb > gab)
// goto recycle_entry;

if (block_num == gdb + 1) {
int all_deleted = 1;
AtomicDict_Node tombstone = {
.index = 0,
.tag = TOMBSTONE(meta),
};

for (int i = 0; i < ATOMIC_DICT_ENTRIES_IN_BLOCK; ++i) {
swap_loc.location = ((gdb + 1) << ATOMIC_DICT_LOG_ENTRIES_IN_BLOCK) + i;
swap_loc.entry = AtomicDict_GetEntryAt(swap_loc.location, meta);
AtomicDict_ReadEntry(swap_loc.entry, &swap);

if (block_num == 0 && i == 0)
continue;

if (!(swap.flags & ENTRY_FLAGS_TOMBSTONE || swap.flags & ENTRY_FLAGS_SWAPPED)) {
all_deleted = 0;
break;
}
}

if (all_deleted) {
should_shrink = AtomicDict_IncrementGreatestDeletedBlock(meta, gab, gdb);
}
}

if (block_num > gdb + 1) {
// for (int i = 0; i < ATOMIC_DICT_ENTRIES_IN_BLOCK; ++i) {
// swap_loc.location = ((gdb + 1) << ATOMIC_DICT_LOG_ENTRIES_IN_BLOCK) +
// ((i + hash) % ATOMIC_DICT_ENTRIES_IN_BLOCK);
// swap_loc.entry = AtomicDict_GetEntryAt(swap_loc.location, meta);
// AtomicDict_ReadEntry(swap_loc.entry, &swap);
//
// if (!(swap.value == NULL || swap.flags & ENTRY_FLAGS_TOMBSTONE || swap.flags & ENTRY_FLAGS_SWAPPED))
// goto swap_found;
// }

// should_shrink = AtomicDict_IncrementGreatestDeletedBlock(meta, gab, gdb);
// goto recycle_entry; // don't handle failure

// swap_found:
// result.entry_p->key = swap.key;
// result.entry_p->value = swap.value; // todo: what if value was updated? => use AtomicRef
// result.entry_p->hash = swap.hash;
// if (!CereggiiAtomic_CompareExchangeUInt8(
// &swap_loc.entry->flags,
// swap.flags,
// swap.flags | ENTRY_FLAGS_SWAPPED
// )) {
// AtomicDict_ReadEntry(swap_loc.entry, &swap);
// if (swap.value == NULL || swap.flags & ENTRY_FLAGS_TOMBSTONE || swap.flags & ENTRY_FLAGS_SWAPPED)
// goto recycle_entry;
// }
//
// CereggiiAtomic_StoreUInt8(&result.entry_p->flags, result.entry.flags & ~ENTRY_FLAGS_TOMBSTONE);
//
// AtomicDict_SearchResult swap_search;
// do_swap:
// AtomicDict_LookupEntry(meta, swap_loc.location, swap.hash, &swap_search);
// AtomicDict_Node swapped = {
// .tag = swap_search.node.tag,
// .distance = swap_search.node.distance,
// .index = entry_ix,
// };
//
// if (!AtomicDict_AtomicWriteNodesAt(swap_search.position, 1, &swap_search.node, &swapped, meta)) {
// goto do_swap;
// }
// swap_loc.entry->key = NULL;
// swap_loc.entry->value = NULL;
}

if (should_shrink)
return 2;
int ok = AtomicDict_AtomicWriteNodeAt(result.position, &result.node, &tombstone, meta);
assert(ok);

return 1;

Expand Down Expand Up @@ -215,12 +95,6 @@ AtomicDict_DelItem(AtomicDict *self, PyObject *key)
goto fail;
}

if (deleted == 2) { // should shrink
int success = AtomicDict_Shrink(self);
if (success < 0)
goto fail;
}

Py_DECREF(meta);
return 0;

Expand Down
Loading

0 comments on commit 3d4ec2a

Please sign in to comment.