diff --git a/Makefile b/Makefile index eeb3141a..77e774d1 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,10 @@ fmt: docker build -f docker/format/Dockerfile --tag=odyssey/clang-format-runner . docker run -v .:/odyssey:ro odyssey/clang-format-runner modules sources stress test third_party +format: + docker build -f docker/format/Dockerfile --tag=odyssey/clang-format-runner . + docker run --user=`stat -c "%u:%g" .` -v .:/odyssey:rw odyssey/clang-format-runner -i modules sources stress test third_party + apply_fmt: for d in sources test third_party stress modules ; do \ find $$d -maxdepth 5 -iname '*.h' -o -iname '*.c' | xargs -n 1 -t -P $(CONCURRENCY) $(FMT_BIN) -i ; \ diff --git a/sources/system.c b/sources/system.c index bb73fd26..e5603cf3 100644 --- a/sources/system.c +++ b/sources/system.c @@ -100,18 +100,6 @@ static inline void od_system_server(void *arg) od_worker_pool_t *worker_pool = server->global->worker_pool; od_atomic_u32_inc(&router->clients_routing); od_worker_pool_feed(worker_pool, msg); - bool warning_emitted = false; - while (od_atomic_u32_of(&router->clients_routing) >= - (uint32_t)instance->config.client_max_routing) { - if (!warning_emitted) { - /* TODO: AB: Use WARNING here, it's not an error */ - od_error(&instance->logger, - "client_max_routing", client, NULL, - "client is waiting in routing queue"); - warning_emitted = true; - } - machine_sleep(1); - } } } diff --git a/sources/worker.c b/sources/worker.c index f3eeb038..03506246 100644 --- a/sources/worker.c +++ b/sources/worker.c @@ -12,6 +12,104 @@ #include #endif +static inline int max_routing_limit_reached(od_router_t *router, + od_instance_t *instance) +{ + uint32_t current; + current = od_atomic_u32_of(&router->clients_routing); + + uint32_t limit; + limit = (uint32_t)instance->config.client_max_routing; + + return current >= limit; +} + +static inline void log_max_routing(od_worker_t *worker, od_instance_t *instance, + od_client_t *client) +{ + // write log message no more than 1 per second + + uint64_t now = machine_time_ms(); + uint64_t gap = worker->last_routing_warn_time_ms - now; + + if (gap > 1000 /* 1 second */) { + /* TODO: AB: Use WARNING here, it's not an error */ + od_error(&instance->logger, "client_max_routing", client, NULL, + "client is waiting in routing queue of worker %d", + worker->id); + + worker->last_routing_warn_time_ms = now; + } +} + +static inline int process_new_client_msg(od_worker_t *worker, + od_router_t *router, + od_instance_t *instance, + machine_msg_t *msg) +{ + od_client_t *client; + client = *(od_client_t **)machine_msg_data(msg); + + if (max_routing_limit_reached(router, instance)) { + // message will be processed later + machine_channel_write(worker->task_channel, msg); + + log_max_routing(worker, instance, client); + + // sleep here in hoping some client will finish his routing + machine_sleep(1); + + return; + } + + client->global = worker->global; + + int64_t coroutine_id; + coroutine_id = machine_coroutine_create(od_frontend, client); + if (coroutine_id != -1) { + client->coroutine_id = coroutine_id; + worker->clients_processed++; + } else { + od_error(&instance->logger, "worker", client, NULL, + "failed to create coroutine"); + od_io_close(&client->io); + od_client_free(client); + od_atomic_u32_dec(&router->clients_routing); + } + + machine_msg_free(msg); +} + +static inline void process_stat_msg(od_worker_t *worker, + od_instance_t *instance, machine_msg_t *msg) +{ + uint64_t count_coroutine = 0; + uint64_t count_coroutine_cache = 0; + uint64_t msg_allocated = 0; + uint64_t msg_cache_count = 0; + uint64_t msg_cache_gc_count = 0; + uint64_t msg_cache_size = 0; + machine_stat(&count_coroutine, &count_coroutine_cache, &msg_allocated, + &msg_cache_count, &msg_cache_gc_count, &msg_cache_size); +#ifdef PROM_FOUND + od_prom_metrics_write_worker_stat( + ((od_cron_t *)(worker->global->cron))->metrics, worker->id, + msg_allocated, msg_cache_count, msg_cache_gc_count, + msg_cache_size, count_coroutine, count_coroutine_cache, + worker->clients_processed); +#endif + od_log(&instance->logger, "stats", NULL, NULL, + "worker[%d]: msg (%" PRIu64 " allocated, %" PRIu64 + " cached, %" PRIu64 " freed, %" PRIu64 " cache_size), " + "coroutines (%" PRIu64 " active, %" PRIu64 + " cached), clients_processed: %" PRIu64, + worker->id, msg_allocated, msg_cache_count, msg_cache_gc_count, + msg_cache_size, count_coroutine, count_coroutine_cache, + worker->clients_processed); + + machine_msg_free(msg); +} + static inline void od_worker(void *arg) { od_worker_t *worker = arg; @@ -41,64 +139,20 @@ static inline void od_worker(void *arg) od_msg_t msg_type; msg_type = machine_msg_type(msg); + switch (msg_type) { case OD_MSG_CLIENT_NEW: { - od_client_t *client; - client = *(od_client_t **)machine_msg_data(msg); - client->global = worker->global; - - int64_t coroutine_id; - coroutine_id = - machine_coroutine_create(od_frontend, client); - if (coroutine_id == -1) { - od_error(&instance->logger, "worker", client, - NULL, "failed to create coroutine"); - od_io_close(&client->io); - od_client_free(client); - od_atomic_u32_dec(&router->clients_routing); - break; - } - client->coroutine_id = coroutine_id; - - worker->clients_processed++; + process_new_client_msg(worker, router, instance, msg); break; } case OD_MSG_STAT: { - uint64_t count_coroutine = 0; - uint64_t count_coroutine_cache = 0; - uint64_t msg_allocated = 0; - uint64_t msg_cache_count = 0; - uint64_t msg_cache_gc_count = 0; - uint64_t msg_cache_size = 0; - machine_stat(&count_coroutine, &count_coroutine_cache, - &msg_allocated, &msg_cache_count, - &msg_cache_gc_count, &msg_cache_size); -#ifdef PROM_FOUND - od_prom_metrics_write_worker_stat( - ((od_cron_t *)(worker->global->cron))->metrics, - worker->id, msg_allocated, msg_cache_count, - msg_cache_gc_count, msg_cache_size, - count_coroutine, count_coroutine_cache, - worker->clients_processed); -#endif - od_log(&instance->logger, "stats", NULL, NULL, - "worker[%d]: msg (%" PRIu64 - " allocated, %" PRIu64 " cached, %" PRIu64 - " freed, %" PRIu64 " cache_size), " - "coroutines (%" PRIu64 " active, %" PRIu64 - " cached), clients_processed: %" PRIu64, - worker->id, msg_allocated, msg_cache_count, - msg_cache_gc_count, msg_cache_size, - count_coroutine, count_coroutine_cache, - worker->clients_processed); + process_stat_msg(worker, instance, msg); break; } default: assert(0); break; } - - machine_msg_free(msg); } od_thread_global_free(*gl); @@ -112,6 +166,7 @@ void od_worker_init(od_worker_t *worker, od_global_t *global, int id) worker->id = id; worker->global = global; worker->clients_processed = 0; + worker->last_routing_warn_time_ms = 0; } int od_worker_start(od_worker_t *worker) diff --git a/sources/worker.h b/sources/worker.h index f9eba5a3..f008b7ac 100644 --- a/sources/worker.h +++ b/sources/worker.h @@ -15,6 +15,8 @@ struct od_worker { machine_channel_t *task_channel; uint64_t clients_processed; od_global_t *global; + + uint64_t last_routing_warn_time_ms; }; void od_worker_init(od_worker_t *, od_global_t *, int);