diff --git a/src/client/api/SConscript b/src/client/api/SConscript index 62ba96ef6002..43af58a6c868 100644 --- a/src/client/api/SConscript +++ b/src/client/api/SConscript @@ -1,7 +1,7 @@ """Build DAOS client""" LIBDAOS_SRC = ['agent.c', 'array.c', 'container.c', 'event.c', 'init.c', 'job.c', 'kv.c', 'mgmt.c', - 'object.c', 'pool.c', 'rpc.c', 'task.c', 'tx.c', 'pipeline.c'] + 'object.c', 'pool.c', 'rpc.c', 'task.c', 'tx.c', 'pipeline.c', 'metrics.c'] def scons(): diff --git a/src/client/api/init.c b/src/client/api/init.c index da02c71631ce..b357e2088da7 100644 --- a/src/client/api/init.c +++ b/src/client/api/init.c @@ -23,6 +23,7 @@ #include #include #include +#include #if BUILD_PIPELINE #include #endif @@ -242,19 +243,25 @@ daos_init(void) if (rc != 0) D_GOTO(out_co, rc); + rc = dc_tm_init(); + if (rc) + D_GOTO(out_obj, rc); + #if BUILD_PIPELINE /** set up pipeline */ rc = dc_pipeline_init(); if (rc != 0) - D_GOTO(out_obj, rc); + D_GOTO(out_tm, rc); #endif module_initialized++; D_GOTO(unlock, rc = 0); #if BUILD_PIPELINE +out_tm: + dc_tm_fini(); +#endif out_obj: dc_obj_fini(); -#endif out_co: dc_cont_fini(); out_pool: @@ -322,6 +329,7 @@ daos_fini(void) D_ERROR("failed to disconnect some resources may leak, " DF_RC"\n", DP_RC(rc)); + dc_tm_fini(); dc_agent_fini(); dc_job_fini(); diff --git a/src/client/api/metrics.c b/src/client/api/metrics.c new file mode 100644 index 000000000000..56e8510a5d72 --- /dev/null +++ b/src/client/api/metrics.c @@ -0,0 +1,146 @@ +/* + * (C) Copyright 2020-2023 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define INIT_JOB_NUM 1024 +bool daos_client_metric; +bool daos_client_metric_retain; + +#define MAX_IDS_SIZE(num) (num * D_TM_METRIC_SIZE) +/* The client side metrics structure looks like + * root/job_id/pid/.... + */ +int +dc_tm_init(void) +{ + int metrics_tag; + pid_t pid; + int rc; + + d_getenv_bool(DAOS_CLIENT_METRICS_ENV, &daos_client_metric); + if (!daos_client_metric) + return 0; + + rc = dc_tls_key_create(); + if (rc) + D_GOTO(out, rc); + + metrics_tag = D_TM_OPEN_OR_CREATE; + d_getenv_bool(DAOS_CLIENT_METRICS_RETAIN_ENV, &daos_client_metric_retain); + if (daos_client_metric_retain) + metrics_tag |= D_TM_RETAIN_SHMEM; + else + metrics_tag |= D_TM_RETAIN_SHMEM_IF_NON_EMPTY; + + rc = d_tm_init(DC_TM_JOB_ROOT_ID, MAX_IDS_SIZE(INIT_JOB_NUM), metrics_tag); + if (rc != 0) { + DL_ERROR(rc, "init job root id."); + return rc; + } + + pid = getpid(); + D_INFO("INIT %s/%u metrics\n", dc_jobid, pid); + + /** create new shmem space for per-pool metrics */ + rc = d_tm_add_ephemeral_dir(NULL, MAX_IDS_SIZE(INIT_JOB_NUM), "%s/%u", + dc_jobid, pid); + if (rc != 0) { + DL_ERROR(rc, "add metric %s/%u failed.\n", dc_jobid, pid); + D_GOTO(out, rc); + } + +out: + if (rc) + d_tm_fini(); + + return rc; +} + +static void +iter_dump(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, void *arg) +{ + d_tm_print_node(ctx, node, level, path, format, opt_fields, (FILE *)arg); +} + +static int +dump_tm_file(const char *dump_path) +{ + struct d_tm_context *ctx; + struct d_tm_node_t *root; + char dirname[D_TM_MAX_NAME_LEN] = {0}; + uint32_t filter; + FILE *dump_file; + int rc; + + dump_file = fopen(dump_path, "w+"); + if (dump_file == NULL) { + D_INFO("cannot open %s", dump_path); + return -DER_INVAL; + } + + filter = D_TM_COUNTER | D_TM_DURATION | D_TM_TIMESTAMP | D_TM_MEMINFO | + D_TM_TIMER_SNAPSHOT | D_TM_GAUGE | D_TM_STATS_GAUGE; + + ctx = d_tm_open(DC_TM_JOB_ROOT_ID); + if (ctx == NULL) + D_GOTO(close, rc = -DER_NOMEM); + + snprintf(dirname, sizeof(dirname), "%s/%u", dc_jobid, getpid()); + root = d_tm_find_metric(ctx, dirname); + if (root == NULL) { + printf("No metrics found at: '%s'\n", dirname); + D_GOTO(close_ctx, rc = -DER_NONEXIST); + } + + d_tm_print_field_descriptors(0, dump_file); + + d_tm_iterate(ctx, root, 0, filter, NULL, D_TM_CSV, 0, iter_dump, dump_file); + +close_ctx: + d_tm_close(&ctx); +close: + fclose(dump_file); + return rc; +} + +void +dc_tm_fini() +{ + pid_t pid = getpid(); + char *dump_path; + int rc; + + if (!daos_client_metric) + return; + + dump_path = getenv(METRIC_DUMP_ENV); + D_INFO("dump path is %s\n", dump_path); + if (dump_path != NULL) + dump_tm_file(dump_path); + + dc_tls_fini(); + dc_tls_key_delete(); + + if (!daos_client_metric_retain) { + rc = d_tm_del_ephemeral_dir("%s/%d", dc_jobid, pid); + if (rc != 0) + DL_ERROR(rc, "delete tm directory %s/%d.", dc_jobid, pid); + } + + D_INFO("delete pid %s/%u\n", dc_jobid, pid); + d_tm_fini(); +} diff --git a/src/common/SConscript b/src/common/SConscript index 151ba5f0ed46..432b72403e51 100644 --- a/src/common/SConscript +++ b/src/common/SConscript @@ -9,7 +9,7 @@ COMMON_FILES = ['debug.c', 'mem.c', 'fail_loc.c', 'lru.c', 'dedup.c', 'profile.c', 'compression.c', 'compression_isal.c', 'compression_qat.c', 'multihash.c', 'multihash_isal.c', 'cipher.c', 'cipher_isal.c', 'qat.c', 'fault_domain.c', - 'policy.c'] + 'policy.c', 'tls.c'] def build_daos_common(denv, client): diff --git a/src/common/tls.c b/src/common/tls.c new file mode 100644 index 000000000000..68bdef8a4db7 --- /dev/null +++ b/src/common/tls.c @@ -0,0 +1,230 @@ +/** + * (C) Copyright 2016-2023 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ +/** + * It implements thread-local storage (TLS) for DAOS. + */ +#include +#include + +/* The array remember all of registered module keys on one node. */ +static struct daos_module_key *daos_module_keys[DAOS_MODULE_KEYS_NR] = { NULL }; +pthread_mutex_t daos_module_keys_lock = PTHREAD_MUTEX_INITIALIZER; + +static __thread bool dc_tls_thread_init; + +static pthread_key_t dss_tls_key; +static pthread_key_t dc_tls_key; + +void +daos_register_key(struct daos_module_key *key) +{ + int i; + + D_MUTEX_LOCK(&daos_module_keys_lock); + for (i = 0; i < DAOS_MODULE_KEYS_NR; i++) { + if (daos_module_keys[i] == NULL) { + daos_module_keys[i] = key; + key->dmk_index = i; + break; + } + } + D_MUTEX_UNLOCK(&daos_module_keys_lock); + D_ASSERT(i < DAOS_MODULE_KEYS_NR); +} + +void +daos_unregister_key(struct daos_module_key *key) +{ + if (key == NULL) + return; + D_ASSERT(key->dmk_index >= 0); + D_ASSERT(key->dmk_index < DAOS_MODULE_KEYS_NR); + D_MUTEX_LOCK(&daos_module_keys_lock); + daos_module_keys[key->dmk_index] = NULL; + D_MUTEX_UNLOCK(&daos_module_keys_lock); +} + +struct daos_module_key* +daos_get_module_key(int index) +{ + D_ASSERT(index < DAOS_MODULE_KEYS_NR); + D_ASSERT(index >= 0); + + return daos_module_keys[index]; +} + +static int +daos_thread_local_storage_init(struct daos_thread_local_storage *dtls, + int xs_id, int tgt_id) +{ + int rc = 0; + int i; + + if (dtls->dtls_values == NULL) { + D_ALLOC_ARRAY(dtls->dtls_values, DAOS_MODULE_KEYS_NR); + if (dtls->dtls_values == NULL) + return -DER_NOMEM; + } + + for (i = 0; i < DAOS_MODULE_KEYS_NR; i++) { + struct daos_module_key *dmk = daos_module_keys[i]; + + if (dmk != NULL && dtls->dtls_tag & dmk->dmk_tags) { + D_ASSERT(dmk->dmk_init != NULL); + dtls->dtls_values[i] = dmk->dmk_init(dtls->dtls_tag, xs_id, tgt_id); + if (dtls->dtls_values[i] == NULL) { + rc = -DER_NOMEM; + break; + } + } + } + return rc; +} + +static void +daos_thread_local_storage_fini(struct daos_thread_local_storage *dtls) +{ + int i; + + if (dtls->dtls_values != NULL) { + for (i = DAOS_MODULE_KEYS_NR - 1; i >= 0; i--) { + struct daos_module_key *dmk = daos_module_keys[i]; + + if (dmk != NULL && dtls->dtls_tag & dmk->dmk_tags) { + D_ASSERT(dtls->dtls_values[i] != NULL); + D_ASSERT(dmk->dmk_fini != NULL); + dmk->dmk_fini(dtls->dtls_tag, dtls->dtls_values[i]); + } + } + } + + D_FREE(dtls->dtls_values); +} + + +/* + * Allocate daos_thread_local_storage for a particular thread on server and + * store the pointer in a thread-specific value which can be fetched at any + * time with daos_tls_get(). + */ +static struct daos_thread_local_storage * +daos_tls_init(int tag, int xs_id, int tgt_id, bool server) +{ + struct daos_thread_local_storage *dtls; + int rc; + + D_ALLOC_PTR(dtls); + if (dtls == NULL) + return NULL; + + dtls->dtls_tag = tag; + rc = daos_thread_local_storage_init(dtls, xs_id, tgt_id); + if (rc != 0) { + D_FREE(dtls); + return NULL; + } + + if (server) { + rc = pthread_setspecific(dss_tls_key, dtls); + } else { + rc = pthread_setspecific(dc_tls_key, dtls); + if (rc == 0) + dc_tls_thread_init = true; + } + + if (rc) { + D_ERROR("failed to initialize tls: %d\n", rc); + daos_thread_local_storage_fini(dtls); + D_FREE(dtls); + return NULL; + } + + return dtls; +} + +int +ds_tls_key_create(void) +{ + return pthread_key_create(&dss_tls_key, NULL); +} + +int +dc_tls_key_create(void) +{ + return pthread_key_create(&dc_tls_key, NULL); +} + +void +ds_tls_key_delete() +{ + pthread_key_delete(dss_tls_key); +} + +void +dc_tls_key_delete(void) +{ + pthread_key_delete(dc_tls_key); +} + +/* Free DTC for a particular thread. */ +static void +daos_tls_fini(struct daos_thread_local_storage *dtls, bool server) +{ + daos_thread_local_storage_fini(dtls); + D_FREE(dtls); + if (server) + pthread_setspecific(dss_tls_key, NULL); + else + pthread_setspecific(dc_tls_key, NULL); +} + +/* Allocate local per thread storage. */ +struct daos_thread_local_storage * +dc_tls_init(int tag, uint32_t pid) +{ + return daos_tls_init(tag, -1, pid, false); +} + +/* Free DTC for a particular thread. */ +void +dc_tls_fini(void) +{ + struct daos_thread_local_storage *dtls; + + dtls = (struct daos_thread_local_storage *)pthread_getspecific(dc_tls_key); + if (dtls != NULL) + daos_tls_fini(dtls, false); +} + +struct daos_thread_local_storage * +dc_tls_get(unsigned int tag) +{ + if (!dc_tls_thread_init) + return dc_tls_init(tag, getpid()); + + return (struct daos_thread_local_storage *)pthread_getspecific(dc_tls_key); +} + +struct daos_thread_local_storage * +dss_tls_get() +{ + return (struct daos_thread_local_storage *) + pthread_getspecific(dss_tls_key); +} + +/* Allocate local per thread storage. */ +struct daos_thread_local_storage * +dss_tls_init(int tag, int xs_id, int tgt_id) +{ + return daos_tls_init(tag, xs_id, tgt_id, true); +} + +/* Free DTC for a particular thread. */ +void +dss_tls_fini(struct daos_thread_local_storage *dtls) +{ + daos_tls_fini(dtls, true); +} diff --git a/src/engine/SConscript b/src/engine/SConscript index ceb00a409d09..e94b6a83dd61 100644 --- a/src/engine/SConscript +++ b/src/engine/SConscript @@ -29,7 +29,7 @@ def scons(): 'drpc_handler.c', 'drpc_listener.c', 'drpc_progress.c', 'init.c', 'module.c', 'srv_cli.c', 'profile.c', 'rpc.c', - 'server_iv.c', 'srv.c', 'srv.pb-c.c', 'tls.c', + 'server_iv.c', 'srv.c', 'srv.pb-c.c', 'sched.c', 'ult.c', 'event.pb-c.c', 'srv_metrics.c'] + libdaos_tgts diff --git a/src/engine/init.c b/src/engine/init.c index eb3bca9edb14..6b2125e81193 100644 --- a/src/engine/init.c +++ b/src/engine/init.c @@ -22,6 +22,7 @@ #include #include #include +#include #include "srv_internal.h" #include "drpc_internal.h" #include @@ -618,14 +619,14 @@ server_id_cb(uint32_t *tid, uint64_t *uid) } if (tid != NULL) { - struct dss_thread_local_storage *dtc; - struct dss_module_info *dmi; + struct daos_thread_local_storage *dtc; + struct daos_module_info *dmi; int index = daos_srv_modkey.dmk_index; - /* Avoid assertion in dss_module_key_get() */ + /* Avoid assertion in daos_module_key_get() */ dtc = dss_tls_get(); if (dtc != NULL && index >= 0 && index < DAOS_MODULE_KEYS_NR && - dss_module_keys[index] == &daos_srv_modkey) { + daos_get_module_key(index) == &daos_srv_modkey) { dmi = dss_get_module_info(); if (dmi != NULL) *tid = dmi->dmi_xs_id; diff --git a/src/engine/srv.c b/src/engine/srv.c index aa6cbd706e8f..ff6629c362a2 100644 --- a/src/engine/srv.c +++ b/src/engine/srv.c @@ -382,9 +382,9 @@ wait_all_exited(struct dss_xstream *dx, struct dss_module_info *dmi) static void dss_srv_handler(void *arg) { - struct dss_xstream *dx = (struct dss_xstream *)arg; - struct dss_thread_local_storage *dtc; - struct dss_module_info *dmi; + struct dss_xstream *dx = (struct dss_xstream *)arg; + struct daos_thread_local_storage *dtc; + struct dss_module_info *dmi; int rc; bool track_mem = false; bool signal_caller = true; @@ -1292,7 +1292,7 @@ dss_srv_fini(bool force) vos_standalone_tls_fini(); /* fall through */ case XD_INIT_TLS_REG: - pthread_key_delete(dss_tls_key); + ds_tls_key_delete(); /* fall through */ case XD_INIT_ULT_BARRIER: ABT_cond_free(&xstream_data.xd_ult_barrier); @@ -1389,7 +1389,7 @@ dss_srv_init(void) xstream_data.xd_init_step = XD_INIT_ULT_BARRIER; /* register xstream-local storage key */ - rc = pthread_key_create(&dss_tls_key, NULL); + rc = ds_tls_key_create(); if (rc) { rc = dss_abterr2der(rc); D_ERROR("Failed to register storage key: "DF_RC"\n", DP_RC(rc)); diff --git a/src/engine/srv_internal.h b/src/engine/srv_internal.h index 92504c026ca3..d3a06d79db37 100644 --- a/src/engine/srv_internal.h +++ b/src/engine/srv_internal.h @@ -314,10 +314,6 @@ sched_create_thread(struct dss_xstream *dx, void (*func)(void *), void *arg, return dss_abterr2der(rc); } -/* tls.c */ -void dss_tls_fini(struct dss_thread_local_storage *dtls); -struct dss_thread_local_storage *dss_tls_init(int tag, int xs_id, int tgt_id); - /* server_iv.c */ void ds_iv_init(void); void ds_iv_fini(void); diff --git a/src/engine/tls.c b/src/engine/tls.c deleted file mode 100644 index 90ea6cce7c58..000000000000 --- a/src/engine/tls.c +++ /dev/null @@ -1,155 +0,0 @@ -/** - * (C) Copyright 2016-2021 Intel Corporation. - * - * SPDX-License-Identifier: BSD-2-Clause-Patent - */ -/** - * This file is part of the DAOS server. It implements thread-local storage - * (TLS) for DAOS service threads. - */ -#define D_LOGFAC DD_FAC(server) - -#include -#include "srv_internal.h" - -/* The array remember all of registered module keys on one node. */ -struct dss_module_key *dss_module_keys[DAOS_MODULE_KEYS_NR] = { NULL }; - -pthread_mutex_t dss_module_keys_lock = PTHREAD_MUTEX_INITIALIZER; - -void -dss_register_key(struct dss_module_key *key) -{ - int i; - - D_MUTEX_LOCK(&dss_module_keys_lock); - for (i = 0; i < DAOS_MODULE_KEYS_NR; i++) { - if (dss_module_keys[i] == NULL) { - dss_module_keys[i] = key; - key->dmk_index = i; - break; - } - } - D_MUTEX_UNLOCK(&dss_module_keys_lock); - D_ASSERT(i < DAOS_MODULE_KEYS_NR); -} - -void -dss_unregister_key(struct dss_module_key *key) -{ - if (key == NULL) - return; - D_ASSERT(key->dmk_index >= 0); - D_ASSERT(key->dmk_index < DAOS_MODULE_KEYS_NR); - D_MUTEX_LOCK(&dss_module_keys_lock); - dss_module_keys[key->dmk_index] = NULL; - D_MUTEX_UNLOCK(&dss_module_keys_lock); -} - -/** - * Init thread context - * - * \param[in]dtls Init the thread context to allocate the - * local thread variable for each module. - * - * \retval 0 if initialization succeeds - * \retval negative errno if initialization fails - */ -static int -dss_thread_local_storage_init(struct dss_thread_local_storage *dtls, - int xs_id, int tgt_id) -{ - int rc = 0; - int i; - - if (dtls->dtls_values == NULL) { - D_ALLOC_ARRAY(dtls->dtls_values, - (int)ARRAY_SIZE(dss_module_keys)); - if (dtls->dtls_values == NULL) - return -DER_NOMEM; - } - - for (i = 0; i < DAOS_MODULE_KEYS_NR; i++) { - struct dss_module_key *dmk = dss_module_keys[i]; - - if (dmk != NULL && dtls->dtls_tag & dmk->dmk_tags) { - D_ASSERT(dmk->dmk_init != NULL); - dtls->dtls_values[i] = dmk->dmk_init(dtls->dtls_tag, xs_id, tgt_id); - if (dtls->dtls_values[i] == NULL) { - rc = -DER_NOMEM; - break; - } - } - } - return rc; -} - -/** - * Finish module context - * - * \param[in]dtls Finish the thread context to free the - * local thread variable for each module. - */ -static void -dss_thread_local_storage_fini(struct dss_thread_local_storage *dtls) -{ - int i; - - if (dtls->dtls_values != NULL) { - for (i = DAOS_MODULE_KEYS_NR - 1; i >= 0; i--) { - struct dss_module_key *dmk = dss_module_keys[i]; - - if (dmk != NULL && dtls->dtls_tag & dmk->dmk_tags) { - D_ASSERT(dtls->dtls_values[i] != NULL); - D_ASSERT(dmk->dmk_fini != NULL); - dmk->dmk_fini(dtls->dtls_tag, dtls->dtls_values[i]); - } - } - } - - D_FREE(dtls->dtls_values); -} - -pthread_key_t dss_tls_key; - -/* - * Allocate dss_thread_local_storage for a particular thread and - * store the pointer in a thread-specific value which can be - * fetched at any time with dss_tls_get(). - */ -struct dss_thread_local_storage * -dss_tls_init(int tag, int xs_id, int tgt_id) -{ - struct dss_thread_local_storage *dtls; - int rc; - - D_ALLOC_PTR(dtls); - if (dtls == NULL) - return NULL; - - dtls->dtls_tag = tag; - rc = dss_thread_local_storage_init(dtls, xs_id, tgt_id); - if (rc != 0) { - D_FREE(dtls); - return NULL; - } - - rc = pthread_setspecific(dss_tls_key, dtls); - if (rc) { - D_ERROR("failed to initialize tls: %d\n", rc); - dss_thread_local_storage_fini(dtls); - D_FREE(dtls); - return NULL; - } - - return dtls; -} - -/* Free DTC for a particular thread. */ -void -dss_tls_fini(struct dss_thread_local_storage *dtls) -{ - dss_thread_local_storage_fini(dtls); - D_FREE(dtls); - pthread_setspecific(dss_tls_key, NULL); -} diff --git a/src/gurt/examples/telem_consumer_example.c b/src/gurt/examples/telem_consumer_example.c index 6b7b1653a163..f2b506bbc79d 100644 --- a/src/gurt/examples/telem_consumer_example.c +++ b/src/gurt/examples/telem_consumer_example.c @@ -147,6 +147,13 @@ void read_metrics(struct d_tm_context *ctx, struct d_tm_node_t *root, d_tm_list_free(head); } +static void +iter_print(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, void *arg) +{ + d_tm_print_node(ctx, node, level, path, format, opt_fields, (FILE *)arg); +} + int main(int argc, char **argv) { @@ -178,7 +185,7 @@ main(int argc, char **argv) D_TM_DURATION | D_TM_GAUGE | D_TM_DIRECTORY); show_meta = true; d_tm_iterate(ctx, root, 0, filter, NULL, D_TM_STANDARD, - D_TM_INCLUDE_METADATA, D_TM_ITER_READ, stdout); + D_TM_INCLUDE_METADATA, iter_print, stdout); sprintf(dirname, "manually added"); filter = (D_TM_COUNTER | D_TM_TIMESTAMP | D_TM_TIMER_SNAPSHOT | diff --git a/src/gurt/telemetry.c b/src/gurt/telemetry.c index 3294c0662262..c812cab36f9b 100644 --- a/src/gurt/telemetry.c +++ b/src/gurt/telemetry.c @@ -69,8 +69,9 @@ static struct d_tm_shmem { struct d_tm_context *ctx; /** context for the producer */ struct d_tm_node_t *root; /** root node of shmem */ pthread_mutex_t add_lock; /** for synchronized access */ - bool sync_access; /** whether to sync access */ - bool retain; /** retain shmem region on exit */ + uint32_t retain:1, /* retain shmem region during exit */ + sync_access:1, + retain_non_empty:1; /** retain shmem region if it is not empty */ int id; /** Instance ID */ } tm_shmem; @@ -200,6 +201,7 @@ attach_shmem(key_t key, size_t size, int flags, struct d_tm_shmem_hdr **shmem) return -DER_SHMEM_PERMS; } + D_INFO("allocate shmid %d key 0x%x addr %p\n", shmid, key, addr); *shmem = addr; return shmid; } @@ -529,7 +531,7 @@ init_node(struct d_tm_shmem_hdr *shmem, struct d_tm_node_t *node, D_ERROR("cannot allocate node name [%s]\n", name); return -DER_NO_SHMEM; } - strncpy(node->dtn_name, name, buff_len); + strncpy(conv_ptr(shmem, node->dtn_name), name, buff_len); node->dtn_shmem_key = shmem->sh_key; node->dtn_child = NULL; /* may be reinitializing an existing node, in which case we shouldn't @@ -557,6 +559,7 @@ alloc_node(struct d_tm_shmem_hdr *shmem, struct d_tm_node_t **newnode, const char *name) { struct d_tm_node_t *node = NULL; + struct d_tm_node_t *tmp; int rc = DER_SUCCESS; if (shmem == NULL || newnode == NULL || name == NULL) { @@ -569,13 +572,16 @@ alloc_node(struct d_tm_shmem_hdr *shmem, struct d_tm_node_t **newnode, rc = -DER_NO_SHMEM; goto out; } - rc = init_node(shmem, node, name); + + tmp = conv_ptr(shmem, node); + + rc = init_node(shmem, tmp, name); if (rc != 0) goto out; - node->dtn_metric = NULL; - node->dtn_sibling = NULL; - *newnode = node; + tmp->dtn_metric = NULL; + tmp->dtn_sibling = NULL; + *newnode = node; out: return rc; } @@ -624,10 +630,10 @@ add_child(struct d_tm_node_t **newnode, struct d_tm_node_t *parent, * 1) a previously-cleared link node that can be reused, or * 2) the right place to attach a newly allocated node. */ - child = parent->dtn_child; + child = conv_ptr(shmem, parent->dtn_child); while (child != NULL && !is_cleared_link(tm_shmem.ctx, child)) { sibling = child; - child = child->dtn_sibling; + child = conv_ptr(shmem, child->dtn_sibling); } if (is_cleared_link(tm_shmem.ctx, child)) { @@ -657,6 +663,7 @@ add_child(struct d_tm_node_t **newnode, struct d_tm_node_t *parent, else sibling->dtn_sibling = *newnode; + *newnode = conv_ptr(shmem, *newnode); return 0; failure: @@ -772,7 +779,7 @@ destroy_shmem_with_key(key_t key) int d_tm_init(int id, uint64_t mem_size, int flags) { - struct d_tm_shmem_hdr *new_shmem; + struct d_tm_shmem_hdr *new_shmem = NULL; key_t key; int shmid; char tmp[D_TM_MAX_NAME_LEN]; @@ -780,31 +787,47 @@ d_tm_init(int id, uint64_t mem_size, int flags) memset(&tm_shmem, 0, sizeof(tm_shmem)); - if ((flags & ~(D_TM_SERIALIZATION | D_TM_RETAIN_SHMEM)) != 0) { - D_ERROR("Invalid flags\n"); + if ((flags & ~(D_TM_SERIALIZATION | D_TM_RETAIN_SHMEM | + D_TM_RETAIN_SHMEM_IF_NON_EMPTY | D_TM_OPEN_OR_CREATE)) != 0) { + D_ERROR("Invalid flags 0x%x\n", flags); rc = -DER_INVAL; goto failure; } if (flags & D_TM_SERIALIZATION) { - tm_shmem.sync_access = true; + tm_shmem.sync_access = 1; D_INFO("Serialization enabled for id %d\n", id); } if (flags & D_TM_RETAIN_SHMEM) { - tm_shmem.retain = true; + tm_shmem.retain = 1; D_INFO("Retaining shared memory for id %d\n", id); } + if (flags & D_TM_RETAIN_SHMEM_IF_NON_EMPTY) { + tm_shmem.retain_non_empty = 1; + D_INFO("Retaining shared memory for id %d if not empty\n", id); + } + tm_shmem.id = id; snprintf(tmp, sizeof(tmp), "ID: %d", id); key = d_tm_get_srv_key(id); - rc = destroy_shmem_with_key(key); - if (rc != 0) - goto failure; - rc = create_shmem(tmp, key, mem_size, &shmid, &new_shmem); - if (rc != 0) - goto failure; + if (flags & D_TM_OPEN_OR_CREATE) { + rc = open_shmem(key, &new_shmem); + if (rc > 0) { + D_ASSERT(new_shmem != NULL); + shmid = rc; + } + } + + if (new_shmem == NULL) { + rc = destroy_shmem_with_key(key); + if (rc != 0) + goto failure; + rc = create_shmem(tmp, key, mem_size, &shmid, &new_shmem); + if (rc != 0) + goto failure; + } rc = alloc_ctx(&tm_shmem.ctx, new_shmem, shmid); if (rc != 0) @@ -837,13 +860,21 @@ d_tm_init(int id, uint64_t mem_size, int flags) void d_tm_fini(void) { - bool destroy_shmem = false; + bool destroy_shmem = true; if (tm_shmem.ctx == NULL) goto out; - if (!tm_shmem.retain) - destroy_shmem = true; + if (tm_shmem.retain) + destroy_shmem = false; + + if (tm_shmem.retain_non_empty) { + struct d_tm_node_t *root; + + root = d_tm_get_root(tm_shmem.ctx); + if (root->dtn_child != NULL) + destroy_shmem = false; + } /* close with the option to destroy the shmem region if needed */ close_all_shmem(tm_shmem.ctx, destroy_shmem); @@ -1452,9 +1483,9 @@ _reset_node(struct d_tm_context *ctx, struct d_tm_node_t *node) return DER_SUCCESS; } -static void -reset_node(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, - char *path, int format, int opt_fields, FILE *stream) +void +d_tm_reset_node(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, FILE *stream) { char *name = NULL; @@ -1468,7 +1499,7 @@ reset_node(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, switch (node->dtn_type) { case D_TM_LINK: node = d_tm_follow_link(ctx, node); - reset_node(ctx, node, level, path, format, opt_fields, stream); + d_tm_reset_node(ctx, node, level, path, format, opt_fields, stream); break; case D_TM_DIRECTORY: case D_TM_COUNTER: @@ -1508,20 +1539,19 @@ reset_node(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, * Choose D_TM_CSV for comma separated values. * \param[in] opt_fields A bitmask. Set D_TM_INCLUDE_* as desired for * the optional output fields. - * \param[in] show_timestamp Set to true to print the timestamp the metric - * was read by the consumer. - * \param[in] stream Direct output to this stream (stdout, stderr) + * \param[in] iter_cb iterate callback. + * \param[in] cb_arg argument for iterate callback. */ void d_tm_iterate(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, int filter, char *path, int format, - int opt_fields, uint32_t ops, FILE *stream) + int opt_fields, d_tm_iter_cb_t iter_cb, void *cb_arg) { struct d_tm_shmem_hdr *shmem = NULL; char *fullpath = NULL; char *parent_name = NULL; - if ((node == NULL) || (stream == NULL)) + if (node == NULL) return; if (node->dtn_type == D_TM_LINK) { @@ -1534,14 +1564,8 @@ d_tm_iterate(struct d_tm_context *ctx, struct d_tm_node_t *node, if (shmem == NULL) return; - if (node->dtn_type & filter) { - if (ops & D_TM_ITER_READ) - d_tm_print_node(ctx, node, level, path, format, - opt_fields, stream); - if (ops & D_TM_ITER_RESET) - reset_node(ctx, node, level, path, format, - opt_fields, stream); - } + if (node->dtn_type & filter) + iter_cb(ctx, node, level, path, format, opt_fields, cb_arg); parent_name = conv_ptr(shmem, node->dtn_name); node = node->dtn_child; @@ -1557,7 +1581,7 @@ d_tm_iterate(struct d_tm_context *ctx, struct d_tm_node_t *node, D_ASPRINTF(fullpath, "%s/%s", path, parent_name); d_tm_iterate(ctx, node, level + 1, filter, fullpath, format, - opt_fields, ops, stream); + opt_fields, iter_cb, cb_arg); D_FREE(fullpath); node = node->dtn_sibling; node = conv_ptr(shmem, node); @@ -2106,6 +2130,29 @@ is_initialized(void) tm_shmem.ctx->shmem_root != NULL; } +/* + * Get a pointer to the last token in the path without modifying the original + * string. + */ +static const char * +get_last_token(const char *path) +{ + const char *substr = path; + const char *ch; + bool next_token = false; + + for (ch = path; *ch != '\0'; ch++) { + if (*ch == '/') { + next_token = true; + } else if (next_token) { + substr = ch; + next_token = false; + } + } + + return substr; +} + static int add_metric(struct d_tm_context *ctx, struct d_tm_node_t **node, int metric_type, char *desc, char *units, char *path) @@ -2114,6 +2161,7 @@ add_metric(struct d_tm_context *ctx, struct d_tm_node_t **node, int metric_type, struct d_tm_node_t *parent_node; struct d_tm_node_t *temp = NULL; struct d_tm_shmem_hdr *shmem; + struct d_tm_metric_t *metric; char *token; char *rest; char *unit_string; @@ -2155,11 +2203,11 @@ add_metric(struct d_tm_context *ctx, struct d_tm_node_t **node, int metric_type, } } - temp->dtn_metric->dtm_stats = NULL; + metric = conv_ptr(shmem, temp->dtn_metric); + metric->dtm_stats = NULL; if (has_stats(temp)) { - temp->dtn_metric->dtm_stats = - shmalloc(shmem, sizeof(struct d_tm_stats_t)); - if (temp->dtn_metric->dtm_stats == NULL) { + metric->dtm_stats = shmalloc(shmem, sizeof(struct d_tm_stats_t)); + if (metric->dtm_stats == NULL) { rc = -DER_NO_SHMEM; goto out; } @@ -2176,14 +2224,14 @@ add_metric(struct d_tm_context *ctx, struct d_tm_node_t **node, int metric_type, if (buff_len > 0) { buff_len += 1; /** make room for the trailing null */ - temp->dtn_metric->dtm_desc = shmalloc(shmem, buff_len); - if (temp->dtn_metric->dtm_desc == NULL) { + metric->dtm_desc = shmalloc(shmem, buff_len); + if (metric->dtm_desc == NULL) { rc = -DER_NO_SHMEM; goto out; } - strncpy(temp->dtn_metric->dtm_desc, desc, buff_len); + strncpy(conv_ptr(shmem, metric->dtm_desc), desc, buff_len); } else { - temp->dtn_metric->dtm_desc = NULL; + metric->dtm_desc = NULL; } unit_string = units; @@ -2217,14 +2265,14 @@ add_metric(struct d_tm_context *ctx, struct d_tm_node_t **node, int metric_type, if (buff_len > 0) { buff_len += 1; /** make room for the trailing null */ - temp->dtn_metric->dtm_units = shmalloc(shmem, buff_len); - if (temp->dtn_metric->dtm_units == NULL) { + metric->dtm_units = shmalloc(shmem, buff_len); + if (metric->dtm_units == NULL) { rc = -DER_NO_SHMEM; goto out; } - strncpy(temp->dtn_metric->dtm_units, unit_string, buff_len); + strncpy(conv_ptr(shmem, metric->dtm_units), unit_string, buff_len); } else { - temp->dtn_metric->dtm_units = NULL; + metric->dtm_units = NULL; } temp->dtn_protect = false; @@ -2359,12 +2407,17 @@ static int get_free_region_entry(struct d_tm_shmem_hdr *shmem, struct shmem_region_list **entry) { + d_list_t *cur; + d_list_t *head; + d_list_t *next; struct shmem_region_list *tmp; D_ASSERT(shmem != NULL); D_ASSERT(entry != NULL); - d_list_for_each_entry(tmp, &shmem->sh_subregions, rl_link) { + head = &shmem->sh_subregions; + for (cur = conv_ptr(shmem, head->next); cur != head; cur = conv_ptr(shmem, cur->next)) { + tmp = d_list_entry(cur, __typeof__(*tmp), rl_link); if (tmp->rl_link_node == NULL) { *entry = tmp; return 0; @@ -2377,7 +2430,17 @@ get_free_region_entry(struct d_tm_shmem_hdr *shmem, shmem->sh_key); return -DER_NO_SHMEM; } - d_list_add(&tmp->rl_link, &shmem->sh_subregions); + + next = conv_ptr(shmem, head->next); + cur = head->next; + + head->next = &tmp->rl_link; + next->prev = &tmp->rl_link; + + tmp = conv_ptr(shmem, tmp); + tmp->rl_link.next = cur; + tmp->rl_link.prev = (d_list_t *)(shmem->sh_base_addr + + (uint64_t)(&((struct d_tm_shmem_hdr *)(0))->sh_subregions)); *entry = tmp; return 0; @@ -2413,29 +2476,6 @@ get_unique_shmem_key(const char *path, int id) return (key_t)d_hash_string_u32(salted, sizeof(salted)); } -/* - * Get a pointer to the last token in the path without modifying the original - * string. - */ -static const char * -get_last_token(const char *path) -{ - const char *substr = path; - const char *ch; - bool next_token = false; - - for (ch = path; *ch != '\0'; ch++) { - if (*ch == '/') { - next_token = true; - } else if (next_token) { - substr = ch; - next_token = false; - } - } - - return substr; -} - /** * Creates a directory in the metric tree at the path designated by fmt that * can be deleted later, with all its children. @@ -2460,6 +2500,7 @@ d_tm_add_ephemeral_dir(struct d_tm_node_t **node, size_t size_bytes, struct d_tm_context *ctx = tm_shmem.ctx; struct d_tm_shmem_hdr *parent_shmem; struct d_tm_shmem_hdr *new_shmem; + struct d_tm_metric_t *link_metric; struct shmem_region_list *region_entry; va_list args; key_t key; @@ -2522,8 +2563,6 @@ d_tm_add_ephemeral_dir(struct d_tm_node_t **node, size_t size_bytes, D_ERROR("can't set up the link node, " DF_RC "\n", DP_RC(rc)); D_GOTO(fail_tracking, rc); } - D_ASSERT(link_node->dtn_type == D_TM_LINK); - link_node->dtn_metric->dtm_data.value = key; /* track attached regions within the parent shmem */ parent_shmem = get_shmem_for_key(ctx, link_node->dtn_shmem_key); @@ -2531,6 +2570,11 @@ d_tm_add_ephemeral_dir(struct d_tm_node_t **node, size_t size_bytes, D_ERROR("failed to get parent shmem pointer\n"); D_GOTO(fail_link, rc = -DER_NO_SHMEM); } + + D_ASSERT(link_node->dtn_type == D_TM_LINK); + link_metric = conv_ptr(parent_shmem, link_node->dtn_metric); + link_metric->dtm_data.value = key; + rc = get_free_region_entry(parent_shmem, ®ion_entry); if (rc != 0) D_GOTO(fail_link, rc); @@ -2562,9 +2606,13 @@ d_tm_add_ephemeral_dir(struct d_tm_node_t **node, size_t size_bytes, static void clear_region_entry_for_key(struct d_tm_shmem_hdr *shmem, key_t key) { + d_list_t *cur; + d_list_t *head; struct shmem_region_list *tmp; - d_list_for_each_entry(tmp, &shmem->sh_subregions, rl_link) { + head = &shmem->sh_subregions; + for (cur = conv_ptr(shmem, head->next); cur != head; cur = conv_ptr(shmem, cur->next)) { + tmp = d_list_entry(cur, __typeof__(*tmp), rl_link); if (tmp->rl_key == key) { D_DEBUG(DB_TRACE, "cleared shmem metadata for key 0x%x\n", key); @@ -2583,6 +2631,8 @@ rm_ephemeral_dir(struct d_tm_context *ctx, struct d_tm_node_t *link) struct d_tm_shmem_hdr *parent_shmem; struct d_tm_shmem_hdr *shmem; struct d_tm_node_t *node; + d_list_t *cur; + d_list_t *head; struct shmem_region_list *curr; key_t key; int rc = 0; @@ -2616,7 +2666,9 @@ rm_ephemeral_dir(struct d_tm_context *ctx, struct d_tm_node_t *link) } /* delete sub-regions recursively */ - d_list_for_each_entry(curr, &shmem->sh_subregions, rl_link) { + head = &shmem->sh_subregions; + for (cur = conv_ptr(shmem, head->next); cur != head; cur = conv_ptr(shmem, cur->next)) { + curr = d_list_entry(cur, __typeof__(*curr), rl_link); rc = rm_ephemeral_dir(ctx, curr->rl_link_node); if (rc != 0) /* nothing much we can do to recover here */ D_ERROR("error removing tmp dir [%s]: "DF_RC"\n", @@ -3669,7 +3721,7 @@ shmalloc(struct d_tm_shmem_hdr *shmem, int length) D_DEBUG(DB_TRACE, "Allocated %d bytes. Now %" PRIu64 " remain\n", length, shmem->sh_bytes_free); - memset(new_mem, 0, length); + memset(conv_ptr(shmem, new_mem), 0, length); return new_mem; } diff --git a/src/gurt/tests/test_gurt_telem_producer.c b/src/gurt/tests/test_gurt_telem_producer.c index bf3db9d19c95..76a9b7b27f25 100644 --- a/src/gurt/tests/test_gurt_telem_producer.c +++ b/src/gurt/tests/test_gurt_telem_producer.c @@ -1226,6 +1226,13 @@ test_verify_object_count(void **state) assert_int_equal(num, exp_total); } +static void +iter_print(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, void *arg) +{ + d_tm_print_node(ctx, node, level, path, format, opt_fields, (FILE *)arg); +} + static void test_print_metrics(void **state) { @@ -1239,14 +1246,14 @@ test_print_metrics(void **state) D_TM_DURATION | D_TM_GAUGE | D_TM_DIRECTORY); d_tm_iterate(cli_ctx, node, 0, filter, NULL, D_TM_STANDARD, - D_TM_INCLUDE_METADATA, D_TM_ITER_READ, stdout); + D_TM_INCLUDE_METADATA, iter_print, stdout); d_tm_print_field_descriptors(D_TM_INCLUDE_TIMESTAMP | D_TM_INCLUDE_METADATA, stdout); filter &= ~D_TM_DIRECTORY; d_tm_iterate(cli_ctx, node, 0, filter, NULL, D_TM_CSV, - D_TM_INCLUDE_METADATA, D_TM_ITER_READ, stdout); + D_TM_INCLUDE_METADATA, iter_print, stdout); } static void diff --git a/src/include/daos/metric.h b/src/include/daos/metric.h new file mode 100644 index 000000000000..9417b52fdc93 --- /dev/null +++ b/src/include/daos/metric.h @@ -0,0 +1,19 @@ +/* + * (C) Copyright 2020-2023 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ +#ifndef __DAOS_METRIC_H__ +#define __DAOS_METRIC_H__ + +/** + * Called during library initialization to init metrics. + */ +int dc_tm_init(void); + +/** + * Called during library finalization to free metrics resources + */ +void dc_tm_fini(void); + +#endif /* __DAOS_TM_H__ */ diff --git a/src/include/daos/tls.h b/src/include/daos/tls.h new file mode 100644 index 000000000000..446ff53c1800 --- /dev/null +++ b/src/include/daos/tls.h @@ -0,0 +1,114 @@ +/** + * (C) Copyright 2016-2023 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ +/** + * This file is part of daos + * + * src/include/daos/tls.h + */ + +#ifndef __DAOS_TLS_H__ +#define __DAOS_TLS_H__ + +#include +#include + +/** + * Stackable Module API + * Provides a modular interface to load and register server-side code on + * demand. A module is composed of: + * - a set of request handlers which are registered when the module is loaded. + * - a server-side API (see header files suffixed by "_srv") used for + * inter-module direct calls. + * + * For now, all loaded modules are assumed to be trustful, but sandboxes can be + * implemented in the future. + */ +/* + * Thead-local storage + */ +struct daos_thread_local_storage { + uint32_t dtls_tag; + void **dtls_values; +}; + +enum daos_module_tag { + DAOS_SYS_TAG = 1 << 0, /** only run on system xstream */ + DAOS_TGT_TAG = 1 << 1, /** only run on target xstream */ + DAOS_RDB_TAG = 1 << 2, /** only run on rdb xstream */ + DAOS_OFF_TAG = 1 << 3, /** only run on offload/helper xstream */ + DAOS_CLI_TAG = 1 << 4, /** only run on client stack */ + DAOS_SERVER_TAG = 0xff, /** run on all xstream */ +}; + +/* The module key descriptor for each xstream */ +struct daos_module_key { + /* Indicate where the keys should be instantiated */ + enum daos_module_tag dmk_tags; + + /* The position inside the daos_module_keys */ + int dmk_index; + /* init keys for context */ + void *(*dmk_init)(int tags, int xs_id, int tgt_id); + + /* fini keys for context */ + void (*dmk_fini)(int tags, void *data); +}; + +#define DAOS_MODULE_KEYS_NR 10 +struct daos_thread_local_storage *dss_tls_get(void); +struct daos_thread_local_storage *dc_tls_get(unsigned int tag); + +int ds_tls_key_create(void); +int dc_tls_key_create(void); +void ds_tls_key_delete(void); +void dc_tls_key_delete(void); +/* For now TLS is only enabled if metrics are enabled */ +#define METRIC_DUMP_ENV "DAOS_METRIC_DUMP_ENV" +#define DAOS_CLIENT_METRICS_ENV "DAOS_CLIENT_METRICS" +#define DAOS_CLIENT_METRICS_RETAIN_ENV "DAOS_CLIENT_METRICS_RETAIN" +extern bool daos_client_metric; +extern bool daos_client_metric_retain; +struct daos_module_key* daos_get_module_key(int index); + +/** + * Get value from context by the key + * + * Get value inside dtls by key. So each module will use this API to + * retrieve their own value in the thread context. + * + * \param[in] dtls the thread context. + * \param[in] key key used to retrieve the dtls_value. + * + * \retval the dtls_value retrieved by key. + */ +static inline void * +daos_module_key_get(struct daos_thread_local_storage *dtls, + struct daos_module_key *key) +{ + D_ASSERT(key->dmk_index >= 0); + D_ASSERT(key->dmk_index < DAOS_MODULE_KEYS_NR); + D_ASSERT(daos_get_module_key(key->dmk_index) == key); + D_ASSERT(dtls != NULL); + + return dtls->dtls_values[key->dmk_index]; +} + +#define dss_module_key_get daos_module_key_get +#define dss_register_key daos_register_key +#define dss_unregister_key daos_unregister_key +#define dss_module_info daos_module_info +#define dss_module_tag daos_module_tag +#define dss_module_key daos_module_key +#define dss_thread_local_storage daos_thread_local_storage + +void daos_register_key(struct daos_module_key *key); +void daos_unregister_key(struct daos_module_key *key); +struct daos_thread_local_storage * dc_tls_init(int tag, uint32_t pid); +void dc_tls_fini(void); +struct daos_thread_local_storage * dss_tls_init(int tag, int xs_id, int tgt_id); +void dss_tls_fini(struct daos_thread_local_storage *dtls); + +#endif /*__DAOS_TLS_H__*/ diff --git a/src/include/daos_srv/daos_engine.h b/src/include/daos_srv/daos_engine.h index be491483fbcf..db7418a21e95 100644 --- a/src/include/daos_srv/daos_engine.h +++ b/src/include/daos_srv/daos_engine.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -54,84 +55,6 @@ extern unsigned int dss_instance_idx; /** Bypass for the nvme health check */ extern bool dss_nvme_bypass_health_check; -/** - * Stackable Module API - * Provides a modular interface to load and register server-side code on - * demand. A module is composed of: - * - a set of request handlers which are registered when the module is loaded. - * - a server-side API (see header files suffixed by "_srv") used for - * inter-module direct calls. - * - * For now, all loaded modules are assumed to be trustful, but sandboxes can be - * implemented in the future. - */ -/* - * Thead-local storage - */ -struct dss_thread_local_storage { - uint32_t dtls_tag; - void **dtls_values; -}; - -enum dss_module_tag { - DAOS_SYS_TAG = 1 << 0, /** only run on system xstream */ - DAOS_TGT_TAG = 1 << 1, /** only run on target xstream */ - DAOS_RDB_TAG = 1 << 2, /** only run on rdb xstream */ - DAOS_OFF_TAG = 1 << 3, /** only run on offload/helper xstream */ - DAOS_SERVER_TAG = 0xff, /** run on all xstream */ -}; - -/* The module key descriptor for each xstream */ -struct dss_module_key { - /* Indicate where the keys should be instantiated */ - enum dss_module_tag dmk_tags; - - /* The position inside the dss_module_keys */ - int dmk_index; - /* init keys for context */ - void *(*dmk_init)(int tags, int xs_id, int tgt_id); - - /* fini keys for context */ - void (*dmk_fini)(int tags, void *data); -}; - -extern pthread_key_t dss_tls_key; -extern struct dss_module_key *dss_module_keys[]; -#define DAOS_MODULE_KEYS_NR 10 - -static inline struct dss_thread_local_storage * -dss_tls_get() -{ - return (struct dss_thread_local_storage *) - pthread_getspecific(dss_tls_key); -} - -/** - * Get value from context by the key - * - * Get value inside dtls by key. So each module will use this API to - * retrieve their own value in the thread context. - * - * \param[in] dtls the thread context. - * \param[in] key key used to retrieve the dtls_value. - * - * \retval the dtls_value retrieved by key. - */ -static inline void * -dss_module_key_get(struct dss_thread_local_storage *dtls, - struct dss_module_key *key) -{ - D_ASSERT(key->dmk_index >= 0); - D_ASSERT(key->dmk_index < DAOS_MODULE_KEYS_NR); - D_ASSERT(dss_module_keys[key->dmk_index] == key); - D_ASSERT(dtls != NULL); - - return dtls->dtls_values[key->dmk_index]; -} - -void dss_register_key(struct dss_module_key *key); -void dss_unregister_key(struct dss_module_key *key); - /** pthread names are limited to 16 chars */ #define DSS_XS_NAME_LEN (32) @@ -172,7 +95,7 @@ static inline struct dss_module_info * dss_get_module_info(void) { struct dss_module_info *dmi; - struct dss_thread_local_storage *dtc; + struct daos_thread_local_storage *dtc; dtc = dss_tls_get(); dmi = (struct dss_module_info *) diff --git a/src/include/gurt/telemetry_common.h b/src/include/gurt/telemetry_common.h index 983ec2553f23..a3ba89020102 100644 --- a/src/include/gurt/telemetry_common.h +++ b/src/include/gurt/telemetry_common.h @@ -155,6 +155,8 @@ enum { D_TM_SERVER_PROCESS = 0x000, D_TM_SERIALIZATION = 0x001, D_TM_RETAIN_SHMEM = 0x002, + D_TM_RETAIN_SHMEM_IF_NON_EMPTY = 0x004, + D_TM_OPEN_OR_CREATE = 0x008, }; /** Output formats */ @@ -176,6 +178,7 @@ enum { D_TM_ITER_RESET = 0x002, }; +#define DC_TM_JOB_ROOT_ID 256 /** * @brief Statistics for gauge and duration metrics * diff --git a/src/include/gurt/telemetry_consumer.h b/src/include/gurt/telemetry_consumer.h index f0b1d706be71..9b8de3d70fae 100644 --- a/src/include/gurt/telemetry_consumer.h +++ b/src/include/gurt/telemetry_consumer.h @@ -49,12 +49,21 @@ int d_tm_list(struct d_tm_context *ctx, struct d_tm_nodeList_t **head, int d_tm_list_subdirs(struct d_tm_context *ctx, struct d_tm_nodeList_t **head, struct d_tm_node_t *node, uint64_t *node_count, int max_depth); + +typedef void (*d_tm_iter_cb_t)(struct d_tm_context *ctx, struct d_tm_node_t *node, + int level, char *path, int format, int opt_fields, + void *cb_arg); + void d_tm_iterate(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, int filter, char *path, int format, - int opt_fields, uint32_t ops, FILE *stream); + int opt_fields, d_tm_iter_cb_t iter_cb, void *cb_arg); void d_tm_print_node(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, char *name, int format, int opt_fields, FILE *stream); + +void d_tm_reset_node(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, FILE *stream); + void d_tm_print_field_descriptors(int opt_fields, FILE *stream); void d_tm_print_counter(uint64_t val, char *name, int format, char *units, int opt_fields, FILE *stream); diff --git a/src/object/cli_mod.c b/src/object/cli_mod.c index 79c13fee9489..97fcdec23721 100644 --- a/src/object/cli_mod.c +++ b/src/object/cli_mod.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2022 Intel Corporation. + * (C) Copyright 2016-2023 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -12,6 +12,10 @@ #include #include #include +#include +#include +#include +#include #include #include "obj_rpc.h" #include "obj_internal.h" @@ -19,14 +23,99 @@ unsigned int srv_io_mode = DIM_DTX_FULL_ENABLED; int dc_obj_proto_version; +static void* +dc_obj_tls_init(int tags, int xs_id, int pid) +{ + struct dc_obj_tls *tls; + int opc; + int rc; + unsigned long tid = pthread_self(); + + D_ALLOC_PTR(tls); + if (tls == NULL) + return NULL; + + /** register different per-opcode sensors */ + for (opc = 0; opc < OBJ_PROTO_CLI_COUNT; opc++) { + /** Start with number of active requests, of type gauge */ + rc = d_tm_add_metric(&tls->cot_op_active[opc], D_TM_STATS_GAUGE, + "number of active object RPCs", "ops", + "%s/%u/%lu/ops/%s/active", dc_jobid, pid, tid, + obj_opc_to_str(opc)); + if (rc) { + D_WARN("Failed to create active counter: "DF_RC"\n", DP_RC(rc)); + D_GOTO(out, rc); + } + + if (opc == DAOS_OBJ_RPC_UPDATE || + opc == DAOS_OBJ_RPC_TGT_UPDATE || + opc == DAOS_OBJ_RPC_FETCH) + /** See below, latency reported per size for those */ + continue; + + /** And finally the per-opcode latency, of type gauge */ + rc = d_tm_add_metric(&tls->cot_op_lat[opc], D_TM_STATS_GAUGE, + "object RPC processing time", "us", + "%s/%u/%lu/ops/%s/latency", dc_jobid, pid, tid, + obj_opc_to_str(opc)); + if (rc) { + D_WARN("Failed to create latency sensor: "DF_RC"\n", DP_RC(rc)); + D_GOTO(out, rc); + } + } + + /** + * Maintain per-I/O size latency for update & fetch RPCs + * of type gauge + */ + rc = obj_latency_tm_init(DAOS_OBJ_RPC_UPDATE, pid, tls->cot_update_lat, + obj_opc_to_str(DAOS_OBJ_RPC_UPDATE), + "update RPC processing time", false); + if (rc) + D_GOTO(out, rc); + + rc = obj_latency_tm_init(DAOS_OBJ_RPC_FETCH, pid, tls->cot_fetch_lat, + obj_opc_to_str(DAOS_OBJ_RPC_FETCH), + "fetch RPC processing time", false); + if (rc) + D_GOTO(out, rc); + +out: + if (rc) { + D_FREE(tls); + tls = NULL; + } + + return tls; +} + +static void +dc_obj_tls_fini(int tags, void *data) +{ + struct dc_obj_tls *tls = data; + + D_FREE(tls); +} + +struct daos_module_key dc_obj_module_key = { + .dmk_tags = DAOS_CLI_TAG, + .dmk_index = -1, + .dmk_init = dc_obj_tls_init, + .dmk_fini = dc_obj_tls_fini, +}; + /** * Initialize object interface */ int dc_obj_init(void) { - uint32_t ver_array[2] = {DAOS_OBJ_VERSION - 1, DAOS_OBJ_VERSION}; - int rc; + uint32_t ver_array[2] = {DAOS_OBJ_VERSION - 1, DAOS_OBJ_VERSION}; + int rc; + + d_getenv_bool(DAOS_CLIENT_METRICS_ENV, &daos_client_metric); + if (daos_client_metric) + daos_register_key(&dc_obj_module_key); rc = obj_utils_init(); if (rc) @@ -78,6 +167,7 @@ dc_obj_init(void) out_utils: if (rc) obj_utils_fini(); + return rc; } @@ -94,4 +184,6 @@ dc_obj_fini(void) obj_ec_codec_fini(); obj_class_fini(); obj_utils_fini(); + if (daos_client_metric) + daos_unregister_key(&dc_obj_module_key); } diff --git a/src/object/cli_shard.c b/src/object/cli_shard.c index 2dd9ef9ac398..dec1bec13630 100644 --- a/src/object/cli_shard.c +++ b/src/object/cli_shard.c @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include "cli_csum.h" #include "obj_rpc.h" #include "obj_internal.h" @@ -105,6 +107,7 @@ struct rw_cb_args { daos_iom_t *maps; crt_endpoint_t tgt_ep; struct shard_rw_args *shard_args; + uint64_t send_time; }; static d_iov_t * @@ -640,6 +643,94 @@ dc_shard_update_size(struct rw_cb_args *rw_args, int fetch_rc) return rc; } +daos_size_t +obj_get_fetch_size(struct rw_cb_args *arg) +{ + struct obj_rw_v10_out *orwo; + daos_size_t size = 0; + + orwo = crt_reply_get(arg->rpc); + + if (orwo->orw_sgls.ca_count > 0) { + /* inline transfer */ + size = daos_sgls_packed_size(orwo->orw_sgls.ca_arrays, + orwo->orw_sgls.ca_count, NULL); + } else if (arg->rwaa_sgls != NULL) { + /* bulk transfer */ + daos_size_t *replied_sizes = orwo->orw_data_sizes.ca_arrays; + int i; + + for (i = 0; i < orwo->orw_data_sizes.ca_count; i++) + size += replied_sizes[i]; + } + + return size; +} + +static void +obj_shard_update_metrics_begin(crt_rpc_t *rpc) +{ + struct dc_obj_tls *tls; + int opc; + + if (!daos_client_metric) + return; + + tls = dc_obj_tls_get(); + D_ASSERT(tls != NULL); + opc = opc_get(rpc->cr_opc); + d_tm_inc_gauge(tls->cot_op_active[opc], 1); +} + +static void +obj_shard_update_metrics_end(crt_rpc_t *rpc, uint64_t send_time, void *arg, int ret) +{ + struct dc_obj_tls *tls; + struct rw_cb_args *rw_args; + struct obj_rw_in *orw; + struct d_tm_node_t *lat = NULL; + daos_size_t size; + uint64_t time; + int opc; + + if (!daos_client_metric) + return; + + tls = dc_obj_tls_get(); + D_ASSERT(tls != NULL); + opc = opc_get(rpc->cr_opc); + orw = crt_req_get(rpc); + d_tm_dec_gauge(tls->cot_op_active[opc], 1); + + if (ret != 0) + return; + /** + * Measure latency of successful I/O only. + * Use bit shift for performance and tolerate some inaccuracy. + */ + time = daos_get_ntime() - send_time; + time >>= 10; + + switch (opc) { + case DAOS_OBJ_RPC_UPDATE: + rw_args = arg; + size = daos_sgls_packed_size(rw_args->rwaa_sgls, orw->orw_nr, NULL); + lat = tls->cot_update_lat[lat_bucket(size)]; + break; + case DAOS_OBJ_RPC_FETCH: + rw_args = arg; + size = obj_get_fetch_size(rw_args); + lat = tls->cot_fetch_lat[lat_bucket(size)]; + break; + default: + lat = tls->cot_op_lat[opc]; + break; + } + + if (lat != NULL) + d_tm_set_gauge(lat, time); +} + static int dc_rw_cb(tse_task_t *task, void *arg) { @@ -956,10 +1047,15 @@ dc_rw_cb(tse_task_t *task, void *arg) out: if (rc == -DER_CSUM && opc == DAOS_OBJ_RPC_FETCH) dc_shard_csum_report(task, &rw_args->tgt_ep, rw_args->rpc); + + obj_shard_update_metrics_end(rw_args->rpc, rw_args->send_time, rw_args, + ret == 0 ? rc : ret); + crt_req_decref(rw_args->rpc); if (ret == 0 || obj_retry_error(rc)) ret = rc; + return ret; } @@ -1129,7 +1225,9 @@ dc_obj_shard_rw(struct dc_obj_shard *shard, enum obj_rpc_opc opc, rw_args.co = shard->do_co; rw_args.shard_args = args; /* remember the sgl to copyout the data inline for fetch */ - rw_args.rwaa_sgls = (opc == DAOS_OBJ_RPC_FETCH) ? sgls : NULL; + rw_args.rwaa_sgls = sgls; + rw_args.send_time = daos_get_ntime(); + obj_shard_update_metrics_begin(req); if (args->reasb_req && args->reasb_req->orr_recov) { rw_args.maps = NULL; orw->orw_flags |= ORF_EC_RECOV; @@ -1189,6 +1287,7 @@ struct obj_punch_cb_args { crt_rpc_t *rpc; unsigned int *map_ver; struct shard_punch_args *shard_args; + uint64_t send_time; }; static int @@ -1217,7 +1316,11 @@ obj_shard_punch_cb(tse_task_t *task, void *data) } } + obj_shard_update_metrics_end(cb_args->rpc, cb_args->send_time, cb_args, + task->dt_result); + crt_req_decref(rpc); + return task->dt_result; } @@ -1262,6 +1365,8 @@ dc_obj_shard_punch(struct dc_obj_shard *shard, enum obj_rpc_opc opc, cb_args.rpc = req; cb_args.map_ver = &args->pa_auxi.map_ver; cb_args.shard_args = args; + cb_args.send_time = daos_get_ntime(); + obj_shard_update_metrics_begin(req); rc = tse_task_register_comp_cb(task, obj_shard_punch_cb, &cb_args, sizeof(cb_args)); if (rc != 0) @@ -1324,6 +1429,7 @@ struct obj_enum_args { struct dtx_epoch *epoch; daos_handle_t *th; uint64_t *enqueue_id; + uint64_t send_time; uint32_t *max_delay; }; @@ -1652,10 +1758,15 @@ dc_enumerate_cb(tse_task_t *task, void *arg) crt_bulk_free(oei->oei_bulk); if (oei->oei_kds_bulk != NULL) crt_bulk_free(oei->oei_kds_bulk); + + obj_shard_update_metrics_end(enum_args->rpc, enum_args->send_time, + enum_args, ret == 0 ? rc : ret); + crt_req_decref(enum_args->rpc); if (ret == 0 || obj_retry_error(rc)) ret = rc; + return ret; } @@ -1805,6 +1916,8 @@ dc_obj_shard_list(struct dc_obj_shard *obj_shard, enum obj_rpc_opc opc, enum_args.th = &obj_args->th; enum_args.enqueue_id = &args->la_auxi.enqueue_id; enum_args.max_delay = &args->la_auxi.obj_auxi->max_delay; + enum_args.send_time = daos_get_ntime(); + obj_shard_update_metrics_begin(req); rc = tse_task_register_comp_cb(task, dc_enumerate_cb, &enum_args, sizeof(enum_args)); if (rc != 0) @@ -1838,6 +1951,7 @@ struct obj_query_key_cb_args { daos_handle_t th; uint32_t *max_delay; uint64_t *queue_id; + uint64_t send_time; }; static void @@ -2048,6 +2162,7 @@ obj_shard_query_key_cb(tse_task_t *task, void *data) D_SPIN_UNLOCK(&cb_args->obj->cob_spin); out: + obj_shard_update_metrics_end(rpc, cb_args->send_time, cb_args, ret == 0 ? rc : ret); crt_req_decref(rpc); if (ret == 0 || obj_retry_error(rc)) ret = rc; @@ -2101,6 +2216,8 @@ dc_obj_shard_query_key(struct dc_obj_shard *shard, struct dtx_epoch *epoch, uint cb_args.max_epoch = max_epoch; cb_args.queue_id = queue_id; cb_args.max_delay = max_delay; + cb_args.send_time = daos_get_ntime(); + obj_shard_update_metrics_begin(req); rc = tse_task_register_comp_cb(task, obj_shard_query_key_cb, &cb_args, sizeof(cb_args)); if (rc != 0) @@ -2147,6 +2264,7 @@ struct obj_shard_sync_cb_args { uint32_t *map_ver; uint32_t *max_delay; uint64_t *enqueue_id; + uint64_t send_time; }; static int @@ -2202,6 +2320,8 @@ obj_shard_sync_cb(tse_task_t *task, void *data) oso->oso_epoch, oso->oso_map_version); out: + obj_shard_update_metrics_end(rpc, cb_args->send_time, cb_args, rc); + crt_req_decref(rpc); return rc; } @@ -2248,7 +2368,8 @@ dc_obj_shard_sync(struct dc_obj_shard *shard, enum obj_rpc_opc opc, cb_args.map_ver = &args->sa_auxi.map_ver; cb_args.max_delay = &args->sa_auxi.obj_auxi->max_delay; cb_args.enqueue_id = &args->sa_auxi.enqueue_id; - + cb_args.send_time = daos_get_ntime(); + obj_shard_update_metrics_begin(req); rc = tse_task_register_comp_cb(task, obj_shard_sync_cb, &cb_args, sizeof(cb_args)); if (rc != 0) @@ -2284,8 +2405,9 @@ struct obj_k2a_args { struct dtx_epoch *epoch; daos_handle_t *th; daos_anchor_t *anchor; - uint32_t shard; uint64_t *enqueue_id; + uint64_t send_time; + uint32_t shard; uint32_t *max_delay; }; @@ -2353,6 +2475,8 @@ dc_k2a_cb(tse_task_t *task, void *arg) enum_anchor_copy(k2a_args->anchor, &oko->oko_anchor); dc_obj_shard2anchor(k2a_args->anchor, k2a_args->shard); out: + obj_shard_update_metrics_end(k2a_args->rpc, k2a_args->send_time, k2a_args, + ret == 0 ? rc : ret); if (k2a_args->eaa_obj != NULL) obj_shard_decref(k2a_args->eaa_obj); crt_req_decref(k2a_args->rpc); @@ -2429,6 +2553,8 @@ dc_obj_shard_key2anchor(struct dc_obj_shard *obj_shard, enum obj_rpc_opc opc, cb_args.shard = obj_shard->do_shard_idx; cb_args.enqueue_id = &args->ka_auxi.enqueue_id; cb_args.max_delay = &args->ka_auxi.obj_auxi->max_delay; + cb_args.send_time = daos_get_ntime(); + obj_shard_update_metrics_begin(req); rc = tse_task_register_comp_cb(task, dc_k2a_cb, &cb_args, sizeof(cb_args)); if (rc != 0) D_GOTO(out_eaa, rc); diff --git a/src/object/obj_internal.h b/src/object/obj_internal.h index 8a2b12fff55a..d13015a7c27b 100644 --- a/src/object/obj_internal.h +++ b/src/object/obj_internal.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "obj_rpc.h" #include "obj_ec.h" @@ -539,6 +540,60 @@ struct dc_obj_verify_args { struct dc_obj_verify_cursor cursor; }; +/* + * Report latency on a per-I/O size. + * Buckets starts at [0; 256B[ and are increased by power of 2 + * (i.e. [256B; 512B[, [512B; 1KB[) up to [4MB; infinity[ + * Since 4MB = 2^22 and 256B = 2^8, this means + * (22 - 8 + 1) = 15 buckets plus the 4MB+ bucket, so + * 16 buckets in total. + */ +#define NR_LATENCY_BUCKETS 16 + +struct dc_obj_tls { + /** Measure update/fetch latency based on I/O size (type = gauge) */ + struct d_tm_node_t *cot_update_lat[NR_LATENCY_BUCKETS]; + struct d_tm_node_t *cot_fetch_lat[NR_LATENCY_BUCKETS]; + + /** Measure per-operation latency in us (type = gauge) */ + struct d_tm_node_t *cot_op_lat[OBJ_PROTO_CLI_COUNT]; + /** Count number of per-opcode active requests (type = gauge) */ + struct d_tm_node_t *cot_op_active[OBJ_PROTO_CLI_COUNT]; +}; + +int +obj_latency_tm_init(uint32_t opc, int tgt_id, struct d_tm_node_t **tm, + char *op, char *desc, bool server); +extern struct daos_module_key dc_obj_module_key; + +static inline struct dc_obj_tls * +dc_obj_tls_get() +{ + struct daos_thread_local_storage *dtls; + + dtls = dc_tls_get(dc_obj_module_key.dmk_tags); + D_ASSERT(dtls != NULL); + return daos_module_key_get(dtls, &dc_obj_module_key); +} + +static inline unsigned int +lat_bucket(uint64_t size) +{ + int nr; + + if (size <= 256) + return 0; + + /** return number of leading zero-bits */ + nr = __builtin_clzl(size - 1); + + /** >4MB, return last bucket */ + if (nr < 42) + return NR_LATENCY_BUCKETS - 1; + + return 56 - nr; +} + static inline int dc_cont2uuid(struct dc_cont *dc_cont, uuid_t *hdl_uuid, uuid_t *uuid) { diff --git a/src/object/obj_utils.c b/src/object/obj_utils.c index 8312c6719d89..758ca0d8fac5 100644 --- a/src/object/obj_utils.c +++ b/src/object/obj_utils.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2018-2022 Intel Corporation. + * (C) Copyright 2018-2023 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -10,6 +10,10 @@ #define DDSUBSYS DDFAC(object) #include +#include +#include +#include +#include #include "obj_internal.h" static daos_size_t @@ -86,6 +90,61 @@ daos_iods_free(daos_iod_t *iods, int nr, bool need_free) D_FREE(iods); } +int +obj_latency_tm_init(uint32_t opc, int tgt_id, struct d_tm_node_t **tm, char *op, + char *desc, bool server) +{ + unsigned int bucket_max = 256; + int i; + int rc = 0; + + for (i = 0; i < NR_LATENCY_BUCKETS; i++) { + char *path; + + if (server) { + if (bucket_max < 1024) /** B */ + D_ASPRINTF(path, "io/latency/%s/%uB/tgt_%u", + op, bucket_max, tgt_id); + else if (bucket_max < 1024 * 1024) /** KB */ + D_ASPRINTF(path, "io/latency/%s/%uKB/tgt_%u", + op, bucket_max / 1024, tgt_id); + else if (bucket_max <= 1024 * 1024 * 4) /** MB */ + D_ASPRINTF(path, "io/latency/%s/%uMB/tgt_%u", + op, bucket_max / (1024 * 1024), tgt_id); + else /** >4MB */ + D_ASPRINTF(path, "io/latency/%s/GT4MB/tgt_%u", + op, tgt_id); + } else { + pid_t pid = getpid(); + unsigned long tid = pthread_self(); + + if (bucket_max < 1024) /** B */ + D_ASPRINTF(path, "%s/%u/%lu/io/latency/%s/%uB", + dc_jobid, pid, tid, op, bucket_max); + else if (bucket_max < 1024 * 1024) /** KB */ + D_ASPRINTF(path, "%s/%u/%lu/io/latency/%s/%uKB", + dc_jobid, pid, tid, op, bucket_max / 1024); + else if (bucket_max <= 1024 * 1024 * 4) /** MB */ + D_ASPRINTF(path, "%s/%u/%lu/io/latency/%s/%uMB", + dc_jobid, pid, tid, op, bucket_max / (1024 * 1024)); + else /** >4MB */ + D_ASPRINTF(path, "%s/%u/%lu/io/latency/%s/GT4MB", + dc_jobid, pid, tid, op); + + } + rc = d_tm_add_metric(&tm[i], D_TM_STATS_GAUGE, desc, "us", path); + if (rc) + D_WARN("Failed to create per-I/O size latency " + "sensor: "DF_RC"\n", DP_RC(rc)); + D_FREE(path); + + bucket_max <<= 1; + } + + return rc; +} + + struct recx_rec { daos_recx_t *rr_recx; }; diff --git a/src/object/srv_internal.h b/src/object/srv_internal.h index 4452e0404861..4ac391678b24 100644 --- a/src/object/srv_internal.h +++ b/src/object/srv_internal.h @@ -107,16 +107,6 @@ struct migrate_pool_tls { void migrate_pool_tls_destroy(struct migrate_pool_tls *tls); -/* - * Report latency on a per-I/O size. - * Buckets starts at [0; 256B[ and are increased by power of 2 - * (i.e. [256B; 512B[, [512B; 1KB[) up to [4MB; infinity[ - * Since 4MB = 2^22 and 256B = 2^8, this means - * (22 - 8 + 1) = 15 buckets plus the 4MB+ bucket, so - * 16 buckets in total. - */ -#define NR_LATENCY_BUCKETS 16 - struct obj_pool_metrics { /** Count number of total per-opcode requests (type = counter) */ struct d_tm_node_t *opm_total[OBJ_PROTO_CLI_COUNT]; @@ -168,24 +158,6 @@ obj_tls_get() return dss_module_key_get(dss_tls_get(), &obj_module_key); } -static inline unsigned int -lat_bucket(uint64_t size) -{ - int nr; - - if (size <= 256) - return 0; - - /** return number of leading zero-bits */ - nr = __builtin_clzl(size - 1); - - /** >4MB, return last bucket */ - if (nr < 42) - return NR_LATENCY_BUCKETS - 1; - - return 56 - nr; -} - enum latency_type { BULK_LATENCY, BIO_LATENCY, diff --git a/src/object/srv_mod.c b/src/object/srv_mod.c index 72a25ba97de1..de3436513e60 100644 --- a/src/object/srv_mod.c +++ b/src/object/srv_mod.c @@ -77,41 +77,6 @@ static struct daos_rpc_handler obj_handlers_v10[] = { #undef X -static int -obj_latency_tm_init(uint32_t opc, int tgt_id, struct d_tm_node_t **tm, char *op, char *desc) -{ - unsigned int bucket_max = 256; - int i; - int rc = 0; - - for (i = 0; i < NR_LATENCY_BUCKETS; i++) { - char *path; - - if (bucket_max < 1024) /** B */ - D_ASPRINTF(path, "io/latency/%s/%uB/tgt_%u", - op, bucket_max, tgt_id); - else if (bucket_max < 1024 * 1024) /** KB */ - D_ASPRINTF(path, "io/latency/%s/%uKB/tgt_%u", - op, bucket_max / 1024, tgt_id); - else if (bucket_max <= 1024 * 1024 * 4) /** MB */ - D_ASPRINTF(path, "io/latency/%s/%uMB/tgt_%u", - op, bucket_max / (1024 * 1024), tgt_id); - else /** >4MB */ - D_ASPRINTF(path, "io/latency/%s/GT4MB/tgt_%u", - op, tgt_id); - - rc = d_tm_add_metric(&tm[i], D_TM_STATS_GAUGE, desc, "us", path); - if (rc) - D_WARN("Failed to create per-I/O size latency " - "sensor: "DF_RC"\n", DP_RC(rc)); - D_FREE(path); - - bucket_max <<= 1; - } - - return rc; -} - static void * obj_tls_init(int tags, int xs_id, int tgt_id) { @@ -162,27 +127,33 @@ obj_tls_init(int tags, int xs_id, int tgt_id) */ obj_latency_tm_init(DAOS_OBJ_RPC_UPDATE, tgt_id, tls->ot_update_lat, - obj_opc_to_str(DAOS_OBJ_RPC_UPDATE), "update RPC processing time"); + obj_opc_to_str(DAOS_OBJ_RPC_UPDATE), "update RPC processing time", + true); obj_latency_tm_init(DAOS_OBJ_RPC_FETCH, tgt_id, tls->ot_fetch_lat, - obj_opc_to_str(DAOS_OBJ_RPC_FETCH), "fetch RPC processing time"); + obj_opc_to_str(DAOS_OBJ_RPC_FETCH), "fetch RPC processing time", + true); obj_latency_tm_init(DAOS_OBJ_RPC_TGT_UPDATE, tgt_id, tls->ot_tgt_update_lat, obj_opc_to_str(DAOS_OBJ_RPC_TGT_UPDATE), - "update tgt RPC processing time"); + "update tgt RPC processing time", + true); obj_latency_tm_init(DAOS_OBJ_RPC_UPDATE, tgt_id, tls->ot_update_bulk_lat, - "bulk_update", "Bulk update processing time"); + "bulk_update", "Bulk update processing time", + true); obj_latency_tm_init(DAOS_OBJ_RPC_FETCH, tgt_id, tls->ot_fetch_bulk_lat, - "bulk_fetch", "Bulk fetch processing time"); + "bulk_fetch", "Bulk fetch processing time", + true); obj_latency_tm_init(DAOS_OBJ_RPC_UPDATE, tgt_id, tls->ot_update_vos_lat, - "vos_update", "VOS update processing time"); + "vos_update", "VOS update processing time", + true); obj_latency_tm_init(DAOS_OBJ_RPC_FETCH, tgt_id, tls->ot_fetch_vos_lat, - "vos_fetch", "VOS fetch processing time"); + "vos_fetch", "VOS fetch processing time", true); obj_latency_tm_init(DAOS_OBJ_RPC_UPDATE, tgt_id, tls->ot_update_bio_lat, - "bio_update", "BIO update processing time"); + "bio_update", "BIO update processing time", true); obj_latency_tm_init(DAOS_OBJ_RPC_FETCH, tgt_id, tls->ot_fetch_bio_lat, - "bio_fetch", "BIO fetch processing time"); + "bio_fetch", "BIO fetch processing time", true); return tls; } diff --git a/src/utils/daos_metrics/daos_metrics.c b/src/utils/daos_metrics/daos_metrics.c index 8a8190d5203c..9182aa1de878 100644 --- a/src/utils/daos_metrics/daos_metrics.c +++ b/src/utils/daos_metrics/daos_metrics.c @@ -53,29 +53,91 @@ print_usage(const char *prog_name) "\tInclude gauges\n" "--read, -r\n" "--reset, -e\n" + "\tReset metrics value to 0\n" + "--jobid, -j\n" + "\tDisplay metrics of the specified job\n" "\tInclude timestamp of when metric was read\n", prog_name); } -int -main(int argc, char **argv) +static int +process_metrics(int metric_id, char *dirname, int format, int filter, + int extra_descriptors, int delay, int num_iter, + d_tm_iter_cb_t iter_cb, void *arg) { struct d_tm_node_t *root = NULL; struct d_tm_node_t *node = NULL; struct d_tm_context *ctx = NULL; + int iteration = 0; + int rc = 0; + + ctx = d_tm_open(metric_id); + if (!ctx) + D_GOTO(out, rc = 0); + + root = d_tm_get_root(ctx); + if (!root) + D_GOTO(out, rc = -DER_NONEXIST); + + if (strncmp(dirname, "/", D_TM_MAX_NAME_LEN) != 0) { + node = d_tm_find_metric(ctx, dirname); + if (node != NULL) { + root = node; + } else { + printf("No metrics found at: '%s'\n", dirname); + D_GOTO(out, rc = 0); + } + } + + if (format == D_TM_CSV) + d_tm_print_field_descriptors(extra_descriptors, (FILE *)arg); + + while ((num_iter == 0) || (iteration < num_iter)) { + d_tm_iterate(ctx, root, 0, filter, NULL, format, extra_descriptors, + iter_cb, arg); + iteration++; + sleep(delay); + if (format == D_TM_STANDARD) + printf("\n\n"); + } + +out: + if (ctx != NULL) + d_tm_close(&ctx); + return rc; +} + +static void +iter_print(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, void *arg) +{ + d_tm_print_node(ctx, node, level, path, format, opt_fields, (FILE *)arg); +} + +static void +iter_reset(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, void *arg) +{ + d_tm_reset_node(ctx, node, level, path, format, opt_fields, (FILE *)arg); +} + +int +main(int argc, char **argv) +{ char dirname[D_TM_MAX_NAME_LEN] = {0}; + char jobid[D_TM_MAX_NAME_LEN] = {0}; bool show_meta = false; bool show_when_read = false; bool show_type = false; int srv_idx = 0; - int iteration = 0; int num_iter = 1; int filter = 0; int delay = 1; int format = D_TM_STANDARD; int opt; int extra_descriptors = 0; - uint32_t ops = 0; + d_tm_iter_cb_t iter_cb = NULL; + int rc; sprintf(dirname, "/"); @@ -97,11 +159,12 @@ main(int argc, char **argv) {"type", no_argument, NULL, 'T'}, {"read", no_argument, NULL, 'r'}, {"reset", no_argument, NULL, 'e'}, + {"jobid", required_argument, NULL, 'j'}, {"help", no_argument, NULL, 'h'}, {NULL, 0, NULL, 0} }; - opt = getopt_long_only(argc, argv, "S:cCdtsgi:p:D:MmTrhe", + opt = getopt_long_only(argc, argv, "S:cCdtsgi:p:D:MmTrj:he", long_options, NULL); if (opt == -1) break; @@ -150,7 +213,10 @@ main(int argc, char **argv) delay = atoi(optarg); break; case 'e': - ops |= D_TM_ITER_RESET; + iter_cb = iter_reset; + break; + case 'j': + snprintf(jobid, sizeof(jobid), "%s", optarg); break; case 'h': case '?': @@ -160,37 +226,13 @@ main(int argc, char **argv) } } - if (ops == 0) - ops |= D_TM_ITER_READ; + if (iter_cb == NULL) + iter_cb = iter_print; if (filter == 0) filter = D_TM_COUNTER | D_TM_DURATION | D_TM_TIMESTAMP | D_TM_MEMINFO | D_TM_TIMER_SNAPSHOT | D_TM_GAUGE | D_TM_STATS_GAUGE; - ctx = d_tm_open(srv_idx); - if (!ctx) - goto failure; - - root = d_tm_get_root(ctx); - if (!root) - goto failure; - - if (strncmp(dirname, "/", D_TM_MAX_NAME_LEN) != 0) { - node = d_tm_find_metric(ctx, dirname); - if (node != NULL) { - root = node; - } else { - printf("No metrics found at: '%s'\n", dirname); - exit(0); - } - } - - if (format == D_TM_CSV) - filter &= ~D_TM_DIRECTORY; - else - filter |= D_TM_DIRECTORY; - - if (show_when_read) extra_descriptors |= D_TM_INCLUDE_TIMESTAMP; if (show_meta) @@ -199,27 +241,23 @@ main(int argc, char **argv) extra_descriptors |= D_TM_INCLUDE_TYPE; if (format == D_TM_CSV) - d_tm_print_field_descriptors(extra_descriptors, stdout); + filter &= ~D_TM_DIRECTORY; + else + filter |= D_TM_DIRECTORY; - while ((num_iter == 0) || (iteration < num_iter)) { - d_tm_iterate(ctx, root, 0, filter, NULL, format, extra_descriptors, - ops, stdout); - iteration++; - sleep(delay); - if (format == D_TM_STANDARD) - printf("\n\n"); + if (strlen(jobid) > 0) { + srv_idx = DC_TM_JOB_ROOT_ID; + snprintf(dirname, sizeof(dirname), "%s", jobid); } - d_tm_close(&ctx); - return 0; - -failure: - printf("Unable to attach to the shared memory for the server index: %d" - "\nMake sure to run the I/O Engine with the same index to " - "initialize the shared memory and populate it with metrics.\n" - "Verify user/group settings match those that started the I/O " - "Engine.\n", - srv_idx); - d_tm_close(&ctx); - return -1; + /* fetch metrics from server side */ + rc = process_metrics(srv_idx, dirname, format, filter, extra_descriptors, + delay, num_iter, iter_cb, stdout); + if (rc) + printf("Unable to attach to the shared memory for the server index: %d" + "\nMake sure to run the I/O Engine with the same index to " + "initialize the shared memory and populate it with metrics.\n" + "Verify user/group settings match those that started the I/O " + "Engine.\n", srv_idx); + return rc != 0 ? -1 : 0; }