Skip to content

Commit

Permalink
sources/worker.c: perform max routing check
Browse files Browse the repository at this point in the history
When checking the `client_max_routing` option in od_system(),
it creates some problems with monitoring tools such as pgconsul
(which assumes that Odyssey is not accepting new connections
and needs to be restarted), and it is a poor design choice overall,
aimed at solving the problem of CPU consumption by TLS handshakes.

To solve this issue, tls threads needs to be created.
They will only do the tls handshakes, it will allows to unload workers
threads, which will be just ready to send the bytes between clients and servers,
and even accept new connections.

This patch performs one of the first steps to introduce tls threads: it moves
the logic of 'client max routing' into worker from od_system().

Signed-off-by: rkhapov <[email protected]>
  • Loading branch information
rkhapov committed Nov 3, 2024
1 parent d088bba commit d34fb0b
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 16 deletions.
12 changes: 0 additions & 12 deletions sources/system.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,6 @@ static inline void od_system_server(void *arg)
od_worker_pool_t *worker_pool = server->global->worker_pool;
od_atomic_u32_inc(&router->clients_routing);
od_worker_pool_feed(worker_pool, msg);
bool warning_emitted = false;
while (od_atomic_u32_of(&router->clients_routing) >=
(uint32_t)instance->config.client_max_routing) {
if (!warning_emitted) {
/* TODO: AB: Use WARNING here, it's not an error */
od_error(&instance->logger,
"client_max_routing", client, NULL,
"client is waiting in routing queue");
warning_emitted = true;
}
machine_sleep(1);
}
}
}

Expand Down
52 changes: 48 additions & 4 deletions sources/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,56 @@
#include <prom_metric.h>
#endif

static inline int process_new_client_msg(od_worker_t *worker,
od_router_t *router,
od_instance_t *instance,
machine_msg_t *msg)
static inline int max_routing_limit_reached(od_router_t *router,
od_instance_t *instance)
{
uint32_t current;
current = od_atomic_u32_of(&router->clients_routing);

uint32_t limit;
limit = (uint32_t)instance->config.client_max_routing;

return current >= limit;
}

static inline void log_max_routing(od_worker_t *worker, od_instance_t *instance,
od_client_t *client)
{
// write log message no more than 1 per second

uint64_t now = machine_time_ms();
uint64_t gap = worker->last_routing_warn_time_ms - now;

if (gap > 1000 /* 1 second */) {
/* TODO: AB: Use WARNING here, it's not an error */
od_error(&instance->logger, "client_max_routing", client, NULL,
"client is waiting in routing queue of worker %d",
worker->id);

worker->last_routing_warn_time_ms = now;
}
}

static inline void process_new_client_msg(od_worker_t *worker,
od_router_t *router,
od_instance_t *instance,
machine_msg_t *msg)
{
od_client_t *client;
client = *(od_client_t **)machine_msg_data(msg);

if (max_routing_limit_reached(router, instance)) {
// message will be processed later
machine_channel_write(worker->task_channel, msg);

log_max_routing(worker, instance, client);

// sleep here in hoping some client will finish his routing
machine_sleep(1);

return;
}

client->global = worker->global;

int64_t coroutine_id;
Expand Down Expand Up @@ -123,6 +166,7 @@ void od_worker_init(od_worker_t *worker, od_global_t *global, int id)
worker->id = id;
worker->global = global;
worker->clients_processed = 0;
worker->last_routing_warn_time_ms = 0;
}

int od_worker_start(od_worker_t *worker)
Expand Down
2 changes: 2 additions & 0 deletions sources/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct od_worker {
machine_channel_t *task_channel;
uint64_t clients_processed;
od_global_t *global;

uint64_t last_routing_warn_time_ms;
};

void od_worker_init(od_worker_t *, od_global_t *, int);
Expand Down

0 comments on commit d34fb0b

Please sign in to comment.