Skip to content

Commit

Permalink
Simplify counter (#720)
Browse files Browse the repository at this point in the history
* sources/counter.h: remove unused TODO's

Currently there is no need in them

Signed-off-by: rkhapov <[email protected]>

* sources/counter.h: remove od_counter_erase

It is not implemented

Signed-off-by: rkhapov <[email protected]>

* sources/counter.c: hashmap -> plain array

Usaging of hashmap for counting small range of integers
is overkill (router and frontend statuses).

So this patch replace hash table with plain array of atomics.

Signed-off-by: rkhapov <[email protected]>

---------

Signed-off-by: rkhapov <[email protected]>
Co-authored-by: rkhapov <[email protected]>
  • Loading branch information
rkhapov and rkhapov authored Nov 26, 2024
1 parent 8090ce8 commit 0a704db
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 238 deletions.
198 changes: 35 additions & 163 deletions sources/counter.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,202 +9,74 @@
#include <machinarium.h>
#include <odyssey.h>

inline od_counter_llist_t *od_counter_llist_create(void)
static inline void check_value(od_counter_t *counter, size_t value)
{
od_counter_llist_t *llist = malloc(sizeof(od_counter_llist_t));
if (llist == NULL)
return NULL;

llist->list = NULL;
llist->count = 0;

return llist;
}

od_bucket_t *od_bucket_create(void)
{
od_bucket_t *b = malloc(sizeof(od_bucket_t));
if (b == NULL)
return NULL;

b->l = od_counter_llist_create();
const int res = pthread_mutex_init(&b->mutex, NULL);
if (!res) {
return b;
if (od_unlikely(value >= counter->size)) {
// no need any special action, it is an error about constant sizes
abort();
}

free(b);
return NULL;
}

void od_bucket_free(od_bucket_t *b)
od_counter_t *od_counter_create(size_t max_value)
{
od_counter_llist_free(b->l);
pthread_mutex_destroy(&b->mutex);
free(b);
}

inline void od_counter_llist_add(od_counter_llist_t *llist,
const od_counter_item_t *it)
{
od_counter_litem_t *litem = malloc(sizeof(od_counter_litem_t));
if (litem == NULL)
return;
litem->value = *it;
litem->cnt = 1;

litem->next = llist->list;
llist->list = litem;

++llist->count;
}

inline od_retcode_t od_counter_llist_free(od_counter_llist_t *l)
{
od_counter_litem_t *next = NULL;

for (od_counter_litem_t *it = l->list; it != NULL; it = next) {
next = it->next;
free(it);
if (max_value > OD_COUNTER_MAX_POSSIBLE_VALUE) {
// counter should be used only for some
// small ranges, like enum for status
abort();
}
free(l);
return OK_RESPONSE;
}

static size_t od_counter_required_buf_size(int sz)
{
return sizeof(od_counter_t) + (sz * sizeof(od_bucket_t));
}
// 0..max_value
size_t values_count = max_value + 1;

od_counter_t *od_counter_create(size_t sz)
{
od_counter_t *t = malloc(od_counter_required_buf_size(sz));
if (t == NULL) {
goto error;
}
t->size = sz;

for (size_t i = 0; i < t->size; ++i) {
t->buckets[i] = od_bucket_create();
if (t->buckets[i] == NULL) {
goto error;
}
od_counter_t *counter = malloc(sizeof(od_counter_t) +
sizeof(od_atomic_u64_t) * values_count);
if (od_unlikely(counter == NULL)) {
return NULL;
}

return t;
error:
if (t) {
for (size_t i = 0; i < t->size; ++i) {
if (t->buckets[i] == NULL)
continue;
counter->size = values_count;

od_bucket_free(t->buckets[i]);
}
memset(counter->value_to_count, 0,
sizeof(od_atomic_u64_t) * counter->size);

free(t);
}
return NULL;
return counter;
}

od_counter_t *od_counter_create_default(void)
od_retcode_t od_counter_free(od_counter_t *counter)
{
return od_counter_create(OD_DEFAULT_HASH_TABLE_SIZE);
}

od_retcode_t od_counter_free(od_counter_t *t)
{
for (size_t i = 0; i < t->size; ++i) {
od_bucket_free(t->buckets[i]);
}

free(t);
free(counter);

return OK_RESPONSE;
}

void od_counter_inc(od_counter_t *t, od_counter_item_t item)
void od_counter_inc(od_counter_t *counter, size_t value)
{
od_counter_item_t key = od_hash_item(t, item);
/*
* prevent concurrent access to
* modify hash table section
*/
pthread_mutex_lock(&t->buckets[key]->mutex);
{
bool fnd = false;

for (od_counter_litem_t *it = t->buckets[key]->l->list;
it != NULL; it = it->next) {
if (it->value == item) {
++it->cnt;
fnd = true;
break;
}
}
if (!fnd)
od_counter_llist_add(t->buckets[key]->l, &item);
}
pthread_mutex_unlock(&t->buckets[key]->mutex);
check_value(counter, value);

od_atomic_u64_inc(&counter->value_to_count[value]);
}

od_count_t od_counter_get_count(od_counter_t *t, od_counter_item_t value)
uint64_t od_counter_get_count(od_counter_t *counter, size_t value)
{
od_counter_item_t key = od_hash_item(t, value);

od_count_t ret_val = 0;

pthread_mutex_lock(&t->buckets[key]->mutex);
{
for (od_counter_litem_t *it = t->buckets[key]->l->list;
it != NULL; it = it->next) {
if (it->value == value) {
ret_val = it->cnt;
break;
}
}
}
pthread_mutex_unlock(&t->buckets[key]->mutex);
check_value(counter, value);

return ret_val;
return od_atomic_u64_of(&counter->value_to_count[value]);
}

static inline od_retcode_t od_counter_reset_target_bucket(od_counter_t *t,
size_t bucket_key)
od_retcode_t od_counter_reset(od_counter_t *counter, size_t value)
{
pthread_mutex_lock(&t->buckets[bucket_key]->mutex);
{
for (od_counter_litem_t *it = t->buckets[bucket_key]->l->list;
it != NULL; it = it->next) {
it->value = 0;
}
}
pthread_mutex_unlock(&t->buckets[bucket_key]->mutex);
check_value(counter, value);

return OK_RESPONSE;
}
od_atomic_u64_set(&counter->value_to_count[value], 0ULL);

od_retcode_t od_counter_reset(od_counter_t *t, od_counter_item_t value)
{
od_counter_item_t key = od_hash_item(t, value);

pthread_mutex_lock(&t->buckets[key]->mutex);
{
for (od_counter_litem_t *it = t->buckets[key]->l->list;
it != NULL; it = it->next) {
if (it->value == value) {
it->value = 0;
break;
}
}
}
pthread_mutex_unlock(&t->buckets[key]->mutex);
return OK_RESPONSE;
}

od_retcode_t od_counter_reset_all(od_counter_t *t)
od_retcode_t od_counter_reset_all(od_counter_t *counter)
{
for (size_t i = 0; i < t->size; ++i) {
od_counter_reset_target_bucket(t, i);
for (size_t i = 0; i < counter->size; ++i) {
od_atomic_u64_set(&counter->value_to_count[i], 0ULL);
}

return OK_RESPONSE;
}
86 changes: 12 additions & 74 deletions sources/counter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,82 +7,20 @@
* Scalable PostgreSQL connection pooler.
*/

/* llist stands for linked list */
typedef struct od_hash_litem od_counter_litem_t;
typedef struct od_hash_llist od_counter_llist_t;
#define OD_COUNTER_MAX_POSSIBLE_VALUE 1000UL

typedef size_t od_hash_table_key_t;
/* support only int values for now */
/* TODO: change value type to (void *) */
typedef size_t od_counter_item_t;
typedef size_t od_count_t;
#include <atomic.h>

struct od_hash_litem {
od_counter_item_t value;
od_count_t cnt;

od_counter_litem_t *next;
};

struct od_hash_llist {
size_t count;
od_counter_litem_t *list;
};

extern od_counter_llist_t *od_counter_llist_create(void);

extern void od_counter_llist_add(od_counter_llist_t *llist,
const od_counter_item_t *it);

extern od_retcode_t od_counter_llist_free(od_counter_llist_t *l);

#define OD_DEFAULT_HASH_TABLE_SIZE 15
typedef struct od_bucket {
od_counter_llist_t *l;
pthread_mutex_t mutex;
} od_bucket_t;

typedef struct od_counter od_counter_t;
struct od_counter {
typedef struct od_counter {
size_t size;
// ISO C99 flexible array member
od_bucket_t *buckets[FLEXIBLE_ARRAY_MEMBER];
};

extern od_counter_t *od_counter_create(size_t sz);

extern od_counter_t *od_counter_create_default(void);

extern od_retcode_t od_counter_free(od_counter_t *t);

extern od_count_t od_counter_get_count(od_counter_t *t,
od_counter_item_t value);

extern od_retcode_t od_counter_reset(od_counter_t *t, od_counter_item_t value);

extern od_retcode_t od_counter_reset_all(od_counter_t *t);

extern od_retcode_t od_counter_erase(od_counter_t *t, od_hash_table_key_t key,
od_counter_llist_t *item);

extern void od_counter_inc(od_counter_t *t, od_counter_item_t item);

static inline od_counter_item_t od_hash_item(od_counter_t *t,
od_counter_item_t item)
{
/*
* How to get hash of number in range 0 to MAXINT ?
* there are many ways, we choose simplest one
*/
return item % t->size;
}

/* these are function to perform dynamic counter resize in case of overfit
* TODO: implement them
* */

extern od_retcode_t od_counter_resize_up(od_counter_t *t);

extern od_retcode_t od_counter_resize_down(od_counter_t *t);
od_atomic_u64_t value_to_count[FLEXIBLE_ARRAY_MEMBER];
} od_counter_t;

od_counter_t *od_counter_create(size_t max_value);
od_retcode_t od_counter_free(od_counter_t *counter);
uint64_t od_counter_get_count(od_counter_t *counter, size_t value);
od_retcode_t od_counter_reset(od_counter_t *counter, size_t value);
od_retcode_t od_counter_reset_all(od_counter_t *counter);
void od_counter_inc(od_counter_t *counter, size_t item);

#endif // ODYSSEY_COUNTER_H
4 changes: 3 additions & 1 deletion sources/err_logger.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ od_error_logger_t *od_err_logger_create(size_t intervals_count)
err_logger->current_interval_num = 0;

for (size_t i = 0; i < intervals_count; ++i) {
err_logger->interval_counters[i] = od_counter_create_default();
// used for router and frontend statuses
// so, let it be 100
err_logger->interval_counters[i] = od_counter_create(100UL);
if (err_logger->interval_counters[i] == NULL) {
goto error;
}
Expand Down

0 comments on commit 0a704db

Please sign in to comment.