Skip to content

Commit

Permalink
sources/worker.c: little refactor
Browse files Browse the repository at this point in the history
Current od_worker function is quite heavy: it performs all message
type processing in single switch construction.

This patchs makes two new functions for every message types.

Signed-off-by: rkhapov <[email protected]>
  • Loading branch information
rkhapov committed Nov 2, 2024
1 parent cba256c commit e963d32
Showing 1 changed file with 55 additions and 45 deletions.
100 changes: 55 additions & 45 deletions sources/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,59 @@
#include <prom_metric.h>
#endif

static inline void process_new_client_msg(od_worker_t *worker,
od_router_t *router,
od_instance_t *instance,
machine_msg_t *msg)
{
od_client_t *client;
client = *(od_client_t **)machine_msg_data(msg);
client->global = worker->global;

int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_frontend, client);
if (coroutine_id == -1) {
od_error(&instance->logger, "worker", client, NULL,
"failed to create coroutine");
od_io_close(&client->io);
od_client_free(client);
od_atomic_u32_dec(&router->clients_routing);
return;
}

client->coroutine_id = coroutine_id;

worker->clients_processed++;
}

static inline void process_stat_msg(od_worker_t *worker,
od_instance_t *instance)
{
uint64_t count_coroutine = 0;
uint64_t count_coroutine_cache = 0;
uint64_t msg_allocated = 0;
uint64_t msg_cache_count = 0;
uint64_t msg_cache_gc_count = 0;
uint64_t msg_cache_size = 0;
machine_stat(&count_coroutine, &count_coroutine_cache, &msg_allocated,
&msg_cache_count, &msg_cache_gc_count, &msg_cache_size);
#ifdef PROM_FOUND
od_prom_metrics_write_worker_stat(
((od_cron_t *)(worker->global->cron))->metrics, worker->id,
msg_allocated, msg_cache_count, msg_cache_gc_count,
msg_cache_size, count_coroutine, count_coroutine_cache,
worker->clients_processed);
#endif
od_log(&instance->logger, "stats", NULL, NULL,
"worker[%d]: msg (%" PRIu64 " allocated, %" PRIu64
" cached, %" PRIu64 " freed, %" PRIu64 " cache_size), "
"coroutines (%" PRIu64 " active, %" PRIu64
" cached), clients_processed: %" PRIu64,
worker->id, msg_allocated, msg_cache_count, msg_cache_gc_count,
msg_cache_size, count_coroutine, count_coroutine_cache,
worker->clients_processed);
}

static inline void od_worker(void *arg)
{
od_worker_t *worker = arg;
Expand Down Expand Up @@ -43,54 +96,11 @@ static inline void od_worker(void *arg)
msg_type = machine_msg_type(msg);
switch (msg_type) {
case OD_MSG_CLIENT_NEW: {
od_client_t *client;
client = *(od_client_t **)machine_msg_data(msg);
client->global = worker->global;

int64_t coroutine_id;
coroutine_id =
machine_coroutine_create(od_frontend, client);
if (coroutine_id == -1) {
od_error(&instance->logger, "worker", client,
NULL, "failed to create coroutine");
od_io_close(&client->io);
od_client_free(client);
od_atomic_u32_dec(&router->clients_routing);
break;
}
client->coroutine_id = coroutine_id;

worker->clients_processed++;
process_new_client_msg(worker, router, instance, msg);
break;
}
case OD_MSG_STAT: {
uint64_t count_coroutine = 0;
uint64_t count_coroutine_cache = 0;
uint64_t msg_allocated = 0;
uint64_t msg_cache_count = 0;
uint64_t msg_cache_gc_count = 0;
uint64_t msg_cache_size = 0;
machine_stat(&count_coroutine, &count_coroutine_cache,
&msg_allocated, &msg_cache_count,
&msg_cache_gc_count, &msg_cache_size);
#ifdef PROM_FOUND
od_prom_metrics_write_worker_stat(
((od_cron_t *)(worker->global->cron))->metrics,
worker->id, msg_allocated, msg_cache_count,
msg_cache_gc_count, msg_cache_size,
count_coroutine, count_coroutine_cache,
worker->clients_processed);
#endif
od_log(&instance->logger, "stats", NULL, NULL,
"worker[%d]: msg (%" PRIu64
" allocated, %" PRIu64 " cached, %" PRIu64
" freed, %" PRIu64 " cache_size), "
"coroutines (%" PRIu64 " active, %" PRIu64
" cached), clients_processed: %" PRIu64,
worker->id, msg_allocated, msg_cache_count,
msg_cache_gc_count, msg_cache_size,
count_coroutine, count_coroutine_cache,
worker->clients_processed);
process_stat_msg(worker, instance);
break;
}
default:
Expand Down

0 comments on commit e963d32

Please sign in to comment.