Skip to content

Commit

Permalink
sources/worker.c: perform max routing check
Browse files Browse the repository at this point in the history
When checking the `client_max_routing` option in od_system(),
it creates some problems with monitoring tools such as pgconsul
(which assumes that Odyssey is not accepting new connections
and needs to be restarted), and it is a poor design choice overall,
aimed at solving the problem of CPU consumption by TLS handshakes.

To solve this issue, tls threads needs to be created.
They will only do the tls handshakes, it will allows to unload workers
threads, which will be just ready to send the bytes between clients and servers,
and even accept new connections.

This patch performs one of the first steps to introduce tls threads: it moves
the logic of 'client max routing' into worker from od_system().

Signed-off-by: rkhapov <[email protected]>
  • Loading branch information
rkhapov committed Nov 5, 2024
1 parent 7b9453f commit 3d1a156
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 20 deletions.
12 changes: 0 additions & 12 deletions sources/system.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,6 @@ static inline void od_system_server(void *arg)
od_worker_pool_t *worker_pool = server->global->worker_pool;
od_atomic_u32_inc(&router->clients_routing);
od_worker_pool_feed(worker_pool, msg);
bool warning_emitted = false;
while (od_atomic_u32_of(&router->clients_routing) >=
(uint32_t)instance->config.client_max_routing) {
if (!warning_emitted) {
/* TODO: AB: Use WARNING here, it's not an error */
od_error(&instance->logger,
"client_max_routing", client, NULL,
"client is waiting in routing queue");
warning_emitted = true;
}
machine_sleep(1);
}
}
}

Expand Down
50 changes: 42 additions & 8 deletions sources/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,51 @@
#include <prom_metric.h>
#endif

static inline int process_new_client_msg(od_worker_t *worker,
od_router_t *router,
od_instance_t *instance,
machine_msg_t *msg)
static inline int max_routing_limit_reached(od_router_t *router,
od_instance_t *instance)
{
uint32_t current;
current = od_atomic_u32_of(&router->clients_routing);

uint32_t limit;
limit = (uint32_t)instance->config.client_max_routing;

return current > limit;
}

static inline void wait_max_routing(od_worker_t *worker, od_router_t *router,
od_instance_t *instance,
od_client_t *client)
{
int log_emitted = 0;

while (max_routing_limit_reached(router, instance)) {
if (!log_emitted, 0) {
/* TODO: AB: Use WARNING here, it's not an error */
od_error(
&instance->logger, "client_max_routing", client,
NULL,
"client is waiting in routing queue of worker %d",
worker->id);
log_emitted = 1;
}

// sleep here in hoping some client will finish his routing
machine_sleep(1);
}
}

static inline void process_new_client_msg(od_worker_t *worker,
od_router_t *router,
od_instance_t *instance,
machine_msg_t *msg)
{
od_client_t *client;
client = *(od_client_t **)machine_msg_data(msg);
client->global = worker->global;

wait_max_routing(worker, router, instance, client);

int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_frontend, client);
if (coroutine_id != -1) {
Expand All @@ -33,8 +69,6 @@ static inline int process_new_client_msg(od_worker_t *worker,
od_client_free(client);
od_atomic_u32_dec(&router->clients_routing);
}

machine_msg_free(msg);
}

static inline void process_stat_msg(od_worker_t *worker,
Expand Down Expand Up @@ -63,8 +97,6 @@ static inline void process_stat_msg(od_worker_t *worker,
worker->id, msg_allocated, msg_cache_count, msg_cache_gc_count,
msg_cache_size, count_coroutine, count_coroutine_cache,
worker->clients_processed);

machine_msg_free(msg);
}

static inline void od_worker(void *arg)
Expand Down Expand Up @@ -110,6 +142,8 @@ static inline void od_worker(void *arg)
assert(0);
break;
}

machine_msg_free(msg);
}

od_thread_global_free(*gl);
Expand Down

0 comments on commit 3d1a156

Please sign in to comment.