Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tls workers #710

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ fmt:
docker build -f docker/format/Dockerfile --tag=odyssey/clang-format-runner .
docker run -v .:/odyssey:ro odyssey/clang-format-runner modules sources stress test third_party

format:
docker build -f docker/format/Dockerfile --tag=odyssey/clang-format-runner .
docker run --user=`stat -c "%u:%g" .` -v .:/odyssey:rw odyssey/clang-format-runner -i modules sources stress test third_party

apply_fmt:
for d in sources test third_party stress modules ; do \
find $$d -maxdepth 5 -iname '*.h' -o -iname '*.c' | xargs -n 1 -t -P $(CONCURRENCY) $(FMT_BIN) -i ; \
Expand Down
12 changes: 0 additions & 12 deletions sources/system.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,6 @@ static inline void od_system_server(void *arg)
od_worker_pool_t *worker_pool = server->global->worker_pool;
od_atomic_u32_inc(&router->clients_routing);
od_worker_pool_feed(worker_pool, msg);
bool warning_emitted = false;
while (od_atomic_u32_of(&router->clients_routing) >=
(uint32_t)instance->config.client_max_routing) {
if (!warning_emitted) {
/* TODO: AB: Use WARNING here, it's not an error */
od_error(&instance->logger,
"client_max_routing", client, NULL,
"client is waiting in routing queue");
warning_emitted = true;
}
machine_sleep(1);
}
}
}

Expand Down
149 changes: 102 additions & 47 deletions sources/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,104 @@
#include <prom_metric.h>
#endif

static inline int max_routing_limit_reached(od_router_t *router,
od_instance_t *instance)
{
uint32_t current;
current = od_atomic_u32_of(&router->clients_routing);

uint32_t limit;
limit = (uint32_t)instance->config.client_max_routing;

return current >= limit;
}

static inline void log_max_routing(od_worker_t *worker, od_instance_t *instance,
od_client_t *client)
{
// write log message no more than 1 per second

uint64_t now = machine_time_ms();
uint64_t gap = worker->last_routing_warn_time_ms - now;

if (gap > 1000 /* 1 second */) {
/* TODO: AB: Use WARNING here, it's not an error */
od_error(&instance->logger, "client_max_routing", client, NULL,
"client is waiting in routing queue of worker %d",
worker->id);

worker->last_routing_warn_time_ms = now;
}
}

static inline int process_new_client_msg(od_worker_t *worker,
od_router_t *router,
od_instance_t *instance,
machine_msg_t *msg)
{
od_client_t *client;
client = *(od_client_t **)machine_msg_data(msg);

if (max_routing_limit_reached(router, instance)) {
// message will be processed later
machine_channel_write(worker->task_channel, msg);

log_max_routing(worker, instance, client);

// sleep here in hoping some client will finish his routing
machine_sleep(1);

return;
}

client->global = worker->global;

int64_t coroutine_id;
coroutine_id = machine_coroutine_create(od_frontend, client);
if (coroutine_id != -1) {
client->coroutine_id = coroutine_id;
worker->clients_processed++;
} else {
od_error(&instance->logger, "worker", client, NULL,
"failed to create coroutine");
od_io_close(&client->io);
od_client_free(client);
od_atomic_u32_dec(&router->clients_routing);
}

machine_msg_free(msg);
}

static inline void process_stat_msg(od_worker_t *worker,
od_instance_t *instance, machine_msg_t *msg)
{
uint64_t count_coroutine = 0;
uint64_t count_coroutine_cache = 0;
uint64_t msg_allocated = 0;
uint64_t msg_cache_count = 0;
uint64_t msg_cache_gc_count = 0;
uint64_t msg_cache_size = 0;
machine_stat(&count_coroutine, &count_coroutine_cache, &msg_allocated,
&msg_cache_count, &msg_cache_gc_count, &msg_cache_size);
#ifdef PROM_FOUND
od_prom_metrics_write_worker_stat(
((od_cron_t *)(worker->global->cron))->metrics, worker->id,
msg_allocated, msg_cache_count, msg_cache_gc_count,
msg_cache_size, count_coroutine, count_coroutine_cache,
worker->clients_processed);
#endif
od_log(&instance->logger, "stats", NULL, NULL,
"worker[%d]: msg (%" PRIu64 " allocated, %" PRIu64
" cached, %" PRIu64 " freed, %" PRIu64 " cache_size), "
"coroutines (%" PRIu64 " active, %" PRIu64
" cached), clients_processed: %" PRIu64,
worker->id, msg_allocated, msg_cache_count, msg_cache_gc_count,
msg_cache_size, count_coroutine, count_coroutine_cache,
worker->clients_processed);

machine_msg_free(msg);
}

static inline void od_worker(void *arg)
{
od_worker_t *worker = arg;
Expand Down Expand Up @@ -41,64 +139,20 @@ static inline void od_worker(void *arg)

od_msg_t msg_type;
msg_type = machine_msg_type(msg);

switch (msg_type) {
case OD_MSG_CLIENT_NEW: {
od_client_t *client;
client = *(od_client_t **)machine_msg_data(msg);
client->global = worker->global;

int64_t coroutine_id;
coroutine_id =
machine_coroutine_create(od_frontend, client);
if (coroutine_id == -1) {
od_error(&instance->logger, "worker", client,
NULL, "failed to create coroutine");
od_io_close(&client->io);
od_client_free(client);
od_atomic_u32_dec(&router->clients_routing);
break;
}
client->coroutine_id = coroutine_id;

worker->clients_processed++;
process_new_client_msg(worker, router, instance, msg);
break;
}
case OD_MSG_STAT: {
uint64_t count_coroutine = 0;
uint64_t count_coroutine_cache = 0;
uint64_t msg_allocated = 0;
uint64_t msg_cache_count = 0;
uint64_t msg_cache_gc_count = 0;
uint64_t msg_cache_size = 0;
machine_stat(&count_coroutine, &count_coroutine_cache,
&msg_allocated, &msg_cache_count,
&msg_cache_gc_count, &msg_cache_size);
#ifdef PROM_FOUND
od_prom_metrics_write_worker_stat(
((od_cron_t *)(worker->global->cron))->metrics,
worker->id, msg_allocated, msg_cache_count,
msg_cache_gc_count, msg_cache_size,
count_coroutine, count_coroutine_cache,
worker->clients_processed);
#endif
od_log(&instance->logger, "stats", NULL, NULL,
"worker[%d]: msg (%" PRIu64
" allocated, %" PRIu64 " cached, %" PRIu64
" freed, %" PRIu64 " cache_size), "
"coroutines (%" PRIu64 " active, %" PRIu64
" cached), clients_processed: %" PRIu64,
worker->id, msg_allocated, msg_cache_count,
msg_cache_gc_count, msg_cache_size,
count_coroutine, count_coroutine_cache,
worker->clients_processed);
process_stat_msg(worker, instance, msg);
break;
}
default:
assert(0);
break;
}

machine_msg_free(msg);
}

od_thread_global_free(*gl);
Expand All @@ -112,6 +166,7 @@ void od_worker_init(od_worker_t *worker, od_global_t *global, int id)
worker->id = id;
worker->global = global;
worker->clients_processed = 0;
worker->last_routing_warn_time_ms = 0;
}

int od_worker_start(od_worker_t *worker)
Expand Down
2 changes: 2 additions & 0 deletions sources/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct od_worker {
machine_channel_t *task_channel;
uint64_t clients_processed;
od_global_t *global;

uint64_t last_routing_warn_time_ms;
};

void od_worker_init(od_worker_t *, od_global_t *, int);
Expand Down
Loading