Skip to content

Commit

Permalink
sources/worker.c: little refactor
Browse files Browse the repository at this point in the history
Current od_worker function is quite heavy: it performs all message
type processing in single switch construction.

This patchs makes two new functions for every message types.

Signed-off-by: rkhapov <[email protected]>
  • Loading branch information
rkhapov committed Nov 2, 2024
1 parent d6dcd86 commit d088bba
Showing 1 changed file with 58 additions and 47 deletions.
105 changes: 58 additions & 47 deletions sources/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,61 @@
#include <prom_metric.h>
#endif

static inline int process_new_client_msg(od_worker_t *worker,
od_router_t *router,
od_instance_t *instance,
machine_msg_t *msg)
{
od_client_t *client;
client = *(od_client_t **)machine_msg_data(msg);
client->global = worker->global;

int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_frontend, client);
if (coroutine_id != -1) {
client->coroutine_id = coroutine_id;
worker->clients_processed++;
} else {
od_error(&instance->logger, "worker", client, NULL,
"failed to create coroutine");
od_io_close(&client->io);
od_client_free(client);
od_atomic_u32_dec(&router->clients_routing);
}

machine_msg_free(msg);
}

static inline void process_stat_msg(od_worker_t *worker,
od_instance_t *instance, machine_msg_t *msg)
{
uint64_t count_coroutine = 0;
uint64_t count_coroutine_cache = 0;
uint64_t msg_allocated = 0;
uint64_t msg_cache_count = 0;
uint64_t msg_cache_gc_count = 0;
uint64_t msg_cache_size = 0;
machine_stat(&count_coroutine, &count_coroutine_cache, &msg_allocated,
&msg_cache_count, &msg_cache_gc_count, &msg_cache_size);
#ifdef PROM_FOUND
od_prom_metrics_write_worker_stat(
((od_cron_t *)(worker->global->cron))->metrics, worker->id,
msg_allocated, msg_cache_count, msg_cache_gc_count,
msg_cache_size, count_coroutine, count_coroutine_cache,
worker->clients_processed);
#endif
od_log(&instance->logger, "stats", NULL, NULL,
"worker[%d]: msg (%" PRIu64 " allocated, %" PRIu64
" cached, %" PRIu64 " freed, %" PRIu64 " cache_size), "
"coroutines (%" PRIu64 " active, %" PRIu64
" cached), clients_processed: %" PRIu64,
worker->id, msg_allocated, msg_cache_count, msg_cache_gc_count,
msg_cache_size, count_coroutine, count_coroutine_cache,
worker->clients_processed);

machine_msg_free(msg);
}

static inline void od_worker(void *arg)
{
od_worker_t *worker = arg;
Expand Down Expand Up @@ -41,64 +96,20 @@ static inline void od_worker(void *arg)

od_msg_t msg_type;
msg_type = machine_msg_type(msg);

switch (msg_type) {
case OD_MSG_CLIENT_NEW: {
od_client_t *client;
client = *(od_client_t **)machine_msg_data(msg);
client->global = worker->global;

int64_t coroutine_id;
coroutine_id =
machine_coroutine_create(od_frontend, client);
if (coroutine_id == -1) {
od_error(&instance->logger, "worker", client,
NULL, "failed to create coroutine");
od_io_close(&client->io);
od_client_free(client);
od_atomic_u32_dec(&router->clients_routing);
break;
}
client->coroutine_id = coroutine_id;

worker->clients_processed++;
process_new_client_msg(worker, router, instance, msg);
break;
}
case OD_MSG_STAT: {
uint64_t count_coroutine = 0;
uint64_t count_coroutine_cache = 0;
uint64_t msg_allocated = 0;
uint64_t msg_cache_count = 0;
uint64_t msg_cache_gc_count = 0;
uint64_t msg_cache_size = 0;
machine_stat(&count_coroutine, &count_coroutine_cache,
&msg_allocated, &msg_cache_count,
&msg_cache_gc_count, &msg_cache_size);
#ifdef PROM_FOUND
od_prom_metrics_write_worker_stat(
((od_cron_t *)(worker->global->cron))->metrics,
worker->id, msg_allocated, msg_cache_count,
msg_cache_gc_count, msg_cache_size,
count_coroutine, count_coroutine_cache,
worker->clients_processed);
#endif
od_log(&instance->logger, "stats", NULL, NULL,
"worker[%d]: msg (%" PRIu64
" allocated, %" PRIu64 " cached, %" PRIu64
" freed, %" PRIu64 " cache_size), "
"coroutines (%" PRIu64 " active, %" PRIu64
" cached), clients_processed: %" PRIu64,
worker->id, msg_allocated, msg_cache_count,
msg_cache_gc_count, msg_cache_size,
count_coroutine, count_coroutine_cache,
worker->clients_processed);
process_stat_msg(worker, instance, msg);
break;
}
default:
assert(0);
break;
}

machine_msg_free(msg);
}

od_thread_global_free(*gl);
Expand Down

0 comments on commit d088bba

Please sign in to comment.