Skip to content

Commit

Permalink
DAOS-14535 pool: start/stop ds_pool_child individually (#13347) (#14431)
Browse files Browse the repository at this point in the history
Reorganize ds_pool_child start/stop code to make it able to be
started/stopped individually:

- Introduced four states for ds_pool_child: NEW, STARTING, STARTED
  and STOPPING.
- ds_pool_child is added/removed to/from cache through the collective
  call of pool_child_add/delete_one(), now the cache doesn't hold the
  ds_pool_child reference anymore.
- Introduced ds_pool_child_start/stop() for callers to start/stop
  ds_pool_child individually, ds_pool_child_state() to query the state
  of ds_pool_child.
- Removed ds_pool_child_get();

Signed-off-by: Niu Yawei <yawei.niu@intel.com>
jolivier23 authored May 24, 2024
1 parent 7687853 commit 28d189a
Showing 3 changed files with 285 additions and 113 deletions.
2 changes: 1 addition & 1 deletion src/container/srv_target.c
Original file line number Diff line number Diff line change
@@ -1539,7 +1539,7 @@ ds_cont_local_open(uuid_t pool_uuid, uuid_t cont_hdl_uuid, uuid_t cont_uuid,
if (ddra == NULL)
D_GOTO(err_dtx, rc = -DER_NOMEM);

ddra->pool = ds_pool_child_get(hdl->sch_cont->sc_pool);
ddra->pool = ds_pool_child_lookup(hdl->sch_cont->sc_pool->spc_uuid);
uuid_copy(ddra->co_uuid, cont_uuid);
rc = dss_ult_create(ds_dtx_resync, ddra, DSS_XS_SELF,
0, 0, NULL);
22 changes: 19 additions & 3 deletions src/include/daos_srv/pool.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2016-2023 Intel Corporation.
* (C) Copyright 2016-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
@@ -57,6 +57,7 @@ struct ds_pool {
ABT_cond sp_fetch_hdls_cond;
ABT_cond sp_fetch_hdls_done_cond;
struct ds_iv_ns *sp_iv_ns;
uint32_t *sp_states; /* pool child state array */

/* structure related to EC aggregate epoch query */
d_list_t sp_ec_ephs_list;
@@ -127,6 +128,13 @@ struct ds_pool_hdl {
struct ds_pool_hdl *ds_pool_hdl_lookup(const uuid_t uuid);
void ds_pool_hdl_put(struct ds_pool_hdl *hdl);

enum pool_child_state {
POOL_CHILD_NEW = 0,
POOL_CHILD_STARTING,
POOL_CHILD_STARTED,
POOL_CHILD_STOPPING,
};

/*
* Per-thread pool object
*
@@ -159,8 +167,9 @@ struct ds_pool_child {
int spc_ref;
ABT_eventual spc_ref_eventual;

uint64_t spc_discard_done:1;
uint32_t spc_discard_done:1;
uint32_t spc_reint_mode;
uint32_t *spc_state; /* Pointer to ds_pool->sp_states[i] */
/**
* Per-pool per-module metrics, see ${modname}_pool_metrics for the
* actual structure. Initialized only for modules that specified a
@@ -170,9 +179,16 @@ struct ds_pool_child {
void *spc_metrics[DAOS_NR_MODULE];
};

/* Find ds_pool_child in cache, hold one reference */
struct ds_pool_child *ds_pool_child_lookup(const uuid_t uuid);
struct ds_pool_child *ds_pool_child_get(struct ds_pool_child *child);
/* Put the reference held by ds_pool_child_lookup() */
void ds_pool_child_put(struct ds_pool_child *child);
/* Start ds_pool child */
int ds_pool_child_start(uuid_t pool_uuid);
/* Stop ds_pool_child */
int ds_pool_child_stop(uuid_t pool_uuid);
/* Query pool child state */
uint32_t ds_pool_child_state(struct ds_pool *pool, uint32_t tgt_id);

int ds_pool_bcast_create(crt_context_t ctx, struct ds_pool *pool,
enum daos_module_id module, crt_opcode_t opcode,
374 changes: 265 additions & 109 deletions src/pool/srv_target.c
Original file line number Diff line number Diff line change
@@ -72,24 +72,37 @@ stop_flush_ult(struct ds_pool_child *child)
child->spc_flush_req = NULL;
}

struct ds_pool_child *
ds_pool_child_lookup(const uuid_t uuid)
static struct ds_pool_child *
pool_child_lookup_noref(const uuid_t uuid)
{
struct ds_pool_child *child;
struct pool_tls *tls = pool_tls_get();

d_list_for_each_entry(child, &tls->dt_pool_list, spc_list) {
if (uuid_compare(uuid, child->spc_uuid) == 0) {
child->spc_ref++;
return child;
}
}
return NULL;
}

struct ds_pool_child *
ds_pool_child_get(struct ds_pool_child *child)
ds_pool_child_lookup(const uuid_t uuid)
{
struct ds_pool_child *child;

child = pool_child_lookup_noref(uuid);
if (child == NULL) {
D_ERROR(DF_UUID": Pool child isn't found.\n", DP_UUID(uuid));
return child;
}

if (*child->spc_state == POOL_CHILD_NEW || *child->spc_state == POOL_CHILD_STOPPING) {
D_ERROR(DF_UUID": Pool child isn't ready. (%u)\n",
DP_UUID(uuid), *child->spc_state);
return NULL;
}

child->spc_ref++;
return child;
}
@@ -99,23 +112,9 @@ ds_pool_child_put(struct ds_pool_child *child)
{
D_ASSERTF(child->spc_ref > 0, "%d\n", child->spc_ref);
child->spc_ref--;
if (child->spc_ref == 0) {
D_DEBUG(DB_MGMT, DF_UUID": destroying\n",
DP_UUID(child->spc_uuid));
D_ASSERT(d_list_empty(&child->spc_list));
D_ASSERT(d_list_empty(&child->spc_cont_list));

ds_stop_chkpt_ult(child);
/* only stop gc ULT when all ops ULTs are done */
stop_gc_ult(child);
stop_flush_ult(child);

vos_pool_close(child->spc_hdl);
dss_module_fini_metrics(DAOS_TGT_TAG, child->spc_metrics);
ABT_eventual_set(child->spc_ref_eventual,
(void *)&child->spc_ref,
if (child->spc_ref == 0 && *child->spc_state == POOL_CHILD_STOPPING)
ABT_eventual_set(child->spc_ref_eventual, (void *)&child->spc_ref,
sizeof(child->spc_ref));
}
}

static int
@@ -260,155 +259,304 @@ start_flush_ult(struct ds_pool_child *child)
return 0;
}

struct pool_child_lookup_arg {
struct ds_pool *pla_pool;
void *pla_uuid;
uint32_t pla_map_version;
};
/* This query API could be called from any xstream */
uint32_t
ds_pool_child_state(struct ds_pool *pool, uint32_t tgt_id)
{
struct dss_module_info *info = dss_get_module_info();

/*
* Called via dss_thread_collective() to create and add the ds_pool_child object
* for one thread. This opens the matching VOS pool.
*/
static int
pool_child_add_one(void *varg)
D_ASSERT(info->dmi_tgt_id < dss_tgt_nr);
return pool->sp_states[info->dmi_tgt_id];
}

static void
pool_child_free(struct ds_pool_child *child)
{
struct pool_child_lookup_arg *arg = varg;
struct pool_tls *tls = pool_tls_get();
struct ds_pool_child *child;
struct dss_module_info *info = dss_get_module_info();
char *path;
int rc;
D_ASSERTF(*child->spc_state == POOL_CHILD_NEW, "state:%u", *child->spc_state);
D_ASSERT(child->spc_ref == 0);

child = ds_pool_child_lookup(arg->pla_uuid);
if (child != NULL) {
ds_pool_child_put(child);
return 0;
}
/* Remove from cache */
d_list_del_init(&child->spc_list);
dss_module_fini_metrics(DAOS_TGT_TAG, child->spc_metrics);
ABT_eventual_free(&child->spc_ref_eventual);
D_FREE(child);
}

static struct ds_pool_child *
pool_child_create(uuid_t pool_uuid, struct ds_pool *pool, uint32_t pool_map_ver)
{
struct dss_module_info *info = dss_get_module_info();
struct pool_tls *tls = pool_tls_get();
struct ds_pool_child *child;
int rc;

D_DEBUG(DB_MGMT, DF_UUID": creating\n", DP_UUID(arg->pla_uuid));
D_DEBUG(DB_MGMT, DF_UUID": Create pool child\n", DP_UUID(pool_uuid));

D_ALLOC_PTR(child);
if (child == NULL)
return -DER_NOMEM;
return NULL;

/* initialize metrics on the target xstream for each module */
rc = dss_module_init_metrics(DAOS_TGT_TAG, child->spc_metrics,
arg->pla_pool->sp_path, info->dmi_tgt_id);
rc = dss_module_init_metrics(DAOS_TGT_TAG, child->spc_metrics, pool->sp_path,
info->dmi_tgt_id);
if (rc != 0) {
D_ERROR(DF_UUID ": failed to initialize module metrics for pool"
"." DF_RC "\n", DP_UUID(child->spc_uuid), DP_RC(rc));
"." DF_RC "\n", DP_UUID(pool_uuid), DP_RC(rc));
goto out_free;
}

rc = ds_mgmt_tgt_file(arg->pla_uuid, VOS_FILE, &info->dmi_tgt_id,
&path);
if (rc != 0)
rc = ABT_eventual_create(sizeof(child->spc_ref), &child->spc_ref_eventual);
if (rc != ABT_SUCCESS)
goto out_metrics;

D_ASSERT(child->spc_metrics[DAOS_VOS_MODULE] != NULL);
rc = vos_pool_open_metrics(path, arg->pla_uuid, VOS_POF_EXCL | VOS_POF_EXTERNAL_FLUSH,
child->spc_metrics[DAOS_VOS_MODULE], &child->spc_hdl);
uuid_copy(child->spc_uuid, pool_uuid);
child->spc_map_version = pool_map_ver;
child->spc_pool = pool;
D_INIT_LIST_HEAD(&child->spc_list);
D_INIT_LIST_HEAD(&child->spc_cont_list);

D_FREE(path);
D_ASSERT(info->dmi_tgt_id < dss_tgt_nr);
child->spc_state = &pool->sp_states[info->dmi_tgt_id];
*child->spc_state = POOL_CHILD_NEW;

/* Add to cache */
d_list_add(&child->spc_list, &tls->dt_pool_list);

return child;

out_metrics:
dss_module_fini_metrics(DAOS_TGT_TAG, child->spc_metrics);
out_free:
D_FREE(child);
return NULL;
}

static int
pool_child_start(struct ds_pool_child *child)
{
struct dss_module_info *info = dss_get_module_info();
char *path;
int rc;

D_ASSERTF(*child->spc_state == POOL_CHILD_NEW, "state:%u", *child->spc_state);
D_ASSERT(!d_list_empty(&child->spc_list));

*child->spc_state = POOL_CHILD_STARTING;

rc = ds_mgmt_tgt_file(child->spc_uuid, VOS_FILE, &info->dmi_tgt_id, &path);
if (rc != 0)
goto out_metrics;
goto out;

uuid_copy(child->spc_uuid, arg->pla_uuid);
child->spc_map_version = arg->pla_map_version;
child->spc_ref = 1; /* 1 for the list */
D_ASSERT(child->spc_metrics[DAOS_VOS_MODULE] != NULL);
rc = vos_pool_open_metrics(path, child->spc_uuid, VOS_POF_EXCL | VOS_POF_EXTERNAL_FLUSH,
child->spc_metrics[DAOS_VOS_MODULE], &child->spc_hdl);

rc = ABT_eventual_create(sizeof(child->spc_ref),
&child->spc_ref_eventual);
if (rc != ABT_SUCCESS) {
rc = dss_abterr2der(rc);
goto out_vos;
}
D_FREE(path);

child->spc_pool = arg->pla_pool;
D_INIT_LIST_HEAD(&child->spc_list);
D_INIT_LIST_HEAD(&child->spc_cont_list);
if (rc) {
D_ERROR(DF_UUID": Open VOS pool failed. "DF_RC"\n",
DP_UUID(child->spc_uuid), DP_RC(rc));
goto out;
}

rc = start_gc_ult(child);
if (rc != 0)
goto out_eventual;
goto out_close;

rc = start_flush_ult(child);
if (rc != 0)
goto out_gc;

rc = ds_start_scrubbing_ult(child);
rc = ds_start_chkpt_ult(child);
if (rc != 0)
goto out_flush;

rc = ds_start_chkpt_ult(child);
rc = ds_start_scrubbing_ult(child);
if (rc != 0)
goto out_scrub;
goto out_chkpt;

d_list_add(&child->spc_list, &tls->dt_pool_list);

/* Load all containers */
/* Start all containers */
rc = ds_cont_child_start_all(child);
if (rc)
goto out_list;
goto out_cont;

*child->spc_state = POOL_CHILD_STARTED;
return 0;

out_list:
d_list_del_init(&child->spc_list);
out_cont:
ds_cont_child_stop_all(child);
ds_stop_chkpt_ult(child);
out_scrub:
ds_stop_scrubbing_ult(child);
out_chkpt:
ds_stop_chkpt_ult(child);
out_flush:
stop_flush_ult(child);
out_gc:
stop_gc_ult(child);
out_eventual:
ABT_eventual_free(&child->spc_ref_eventual);
out_vos:
out_close:
vos_pool_close(child->spc_hdl);
out_metrics:
dss_module_fini_metrics(DAOS_TGT_TAG, child->spc_metrics);
out_free:
D_FREE(child);
out:
*child->spc_state = POOL_CHILD_NEW;
return rc;
}

int
ds_pool_child_start(uuid_t pool_uuid)
{
struct ds_pool_child *child;
int rc;

child = pool_child_lookup_noref(pool_uuid);
if (child == NULL) {
D_ERROR(DF_UUID": Pool child not found.\n", DP_UUID(pool_uuid));
return -DER_NONEXIST;
}

if (*child->spc_state == POOL_CHILD_STOPPING) {
D_ERROR(DF_UUID": Pool is in stopping.\n", DP_UUID(pool_uuid));
return -DER_BUSY;
} else if (*child->spc_state == POOL_CHILD_STARTING) {
D_DEBUG(DB_MGMT, DF_UUID": Pool is already in starting.\n", DP_UUID(pool_uuid));
return 1;
} else if (*child->spc_state == POOL_CHILD_STARTED) {
D_DEBUG(DB_MGMT, DF_UUID": Pool is already started.\n", DP_UUID(pool_uuid));
return 0;
}

rc = pool_child_start(child);
if (rc)
D_ERROR(DF_UUID": Pool start failed. "DF_RC"\n", DP_UUID(pool_uuid), DP_RC(rc));

return rc;
}

/*
* Called via dss_thread_collective() to delete the ds_pool_child object for one
* thread. If nobody else is referencing this object, then its VOS pool handle
* is closed and the object itself is freed.
*/
static int
pool_child_delete_one(void *uuid)
pool_child_stop(struct ds_pool_child *child)
{
struct ds_pool_child *child;
int *ref, rc;
int *ref, rc;

child = ds_pool_child_lookup(uuid);
if (child == NULL)
D_ASSERT(!d_list_empty(&child->spc_list));

if (*child->spc_state == POOL_CHILD_STARTING) {
D_ERROR(DF_UUID": Pool is in starting.\n", DP_UUID(child->spc_uuid));
return -DER_BUSY;
} else if (*child->spc_state == POOL_CHILD_STOPPING) {
D_DEBUG(DB_MGMT, DF_UUID": Pool is already in stopping.\n",
DP_UUID(child->spc_uuid));
return 1;
} else if (*child->spc_state == POOL_CHILD_NEW) {
D_DEBUG(DB_MGMT, DF_UUID": Pool isn't started.\n", DP_UUID(child->spc_uuid));
return 0;
}

D_DEBUG(DB_MGMT, DF_UUID": Stopping pool child.\n", DP_UUID(child->spc_uuid));

*child->spc_state = POOL_CHILD_STOPPING;
/* First stop all the ULTs who might need to hold ds_pool_child (or ds_cont_child) */
ds_cont_child_stop_all(child);
D_ASSERT(d_list_empty(&child->spc_cont_list));
d_list_del_init(&child->spc_list);
ds_stop_chkpt_ult(child);
ds_stop_scrubbing_ult(child);
ds_pool_child_put(child); /* -1 for the list */

ds_pool_child_put(child); /* -1 for lookup */
/* Wait for all references dropped */
if (child->spc_ref > 0) {
D_DEBUG(DB_MGMT, DF_UUID": Wait on pool child refs (%d) dropping.\n",
DP_UUID(child->spc_uuid), child->spc_ref);
rc = ABT_eventual_wait(child->spc_ref_eventual, (void **)&ref);
D_ASSERT(rc == ABT_SUCCESS);
ABT_eventual_reset(child->spc_ref_eventual);
}
D_DEBUG(DB_MGMT, DF_UUID": Pool child refs dropped.\n", DP_UUID(child->spc_uuid));

rc = ABT_eventual_wait(child->spc_ref_eventual, (void **)&ref);
if (rc != ABT_SUCCESS)
return dss_abterr2der(rc);
/* Stop all pool child owned ULTs which doesn't hold ds_pool_child reference */
ds_stop_chkpt_ult(child);
stop_gc_ult(child);
stop_flush_ult(child);

ABT_eventual_free(&child->spc_ref_eventual);
/* Close VOS pool at the end */
vos_pool_close(child->spc_hdl);
child->spc_hdl = DAOS_HDL_INVAL;

/* ds_pool_child must be freed here to keep
* spc_ref_enventual usage safe
*/
D_FREE(child);
*child->spc_state = POOL_CHILD_NEW;
return 0;
}

int
ds_pool_child_stop(uuid_t pool_uuid)
{
struct ds_pool_child *child;

child = pool_child_lookup_noref(pool_uuid);
if (child == NULL) {
D_ERROR(DF_UUID": Pool child not found.\n", DP_UUID(pool_uuid));
return -DER_NONEXIST;
}

return pool_child_stop(child);
}

struct pool_child_lookup_arg {
struct ds_pool *pla_pool;
void *pla_uuid;
uint32_t pla_map_version;
};

/*
* Called via dss_thread_collective() to create and add the ds_pool_child object
* for one thread. This opens the matching VOS pool.
*/
static int
pool_child_add_one(void *varg)
{
struct pool_child_lookup_arg *arg = varg;
struct ds_pool_child *child;
int rc;

child = pool_child_lookup_noref(arg->pla_uuid);
if (child != NULL)
return 0;

child = pool_child_create(arg->pla_uuid, arg->pla_pool, arg->pla_map_version);
if (child == NULL) {
D_ERROR(DF_UUID ": Create pool child failed.\n", DP_UUID(child->spc_uuid));
return -DER_NOMEM;
}

rc = pool_child_start(child);
if (rc) {
D_ERROR(DF_UUID": Pool start failed. "DF_RC"\n",
DP_UUID(child->spc_uuid), DP_RC(rc));
pool_child_free(child);
}

return rc;
}

/*
* Called via dss_thread_collective() to delete the ds_pool_child object for one
* thread. If nobody else is referencing this object, then its VOS pool handle
* is closed and the object itself is freed.
*/
static int
pool_child_delete_one(void *uuid)
{
struct ds_pool_child *child;
int rc, retry_cnt = 0;

child = pool_child_lookup_noref(uuid);
if (child == NULL)
return 0;
retry:
rc = pool_child_stop(child);
if (rc) {
D_ASSERT(rc == 1 || rc == -DER_BUSY);
/* Rare race case, simply retry */
retry_cnt++;
if (retry_cnt % 10 == 0)
D_WARN(DF_UUID": Pool stop race (%d). retry:%d\n",
DP_UUID(uuid), rc, retry_cnt);
dss_sleep(500);
goto retry;
}
pool_child_free(child);

return 0;
}
@@ -462,9 +610,14 @@ pool_alloc_ref(void *key, unsigned int ksize, void *varg,
if (pool == NULL)
D_GOTO(err, rc = -DER_NOMEM);

D_ASSERT(dss_tgt_nr > 0);
D_ALLOC_ARRAY(pool->sp_states, dss_tgt_nr);
if (pool->sp_states == NULL)
D_GOTO(err_pool, rc = -DER_NOMEM);

rc = ABT_rwlock_create(&pool->sp_lock);
if (rc != ABT_SUCCESS)
D_GOTO(err_pool, rc = dss_abterr2der(rc));
D_GOTO(err_states, rc = dss_abterr2der(rc));

rc = ABT_mutex_create(&pool->sp_mutex);
if (rc != ABT_SUCCESS)
@@ -541,6 +694,8 @@ pool_alloc_ref(void *key, unsigned int ksize, void *varg,
ABT_mutex_free(&pool->sp_mutex);
err_lock:
ABT_rwlock_free(&pool->sp_lock);
err_states:
D_FREE(pool->sp_states);
err_pool:
D_FREE(pool);
err:
@@ -582,6 +737,7 @@ pool_free_ref(struct daos_llink *llink)
ABT_cond_free(&pool->sp_fetch_hdls_done_cond);
ABT_mutex_free(&pool->sp_mutex);
ABT_rwlock_free(&pool->sp_lock);
D_FREE(pool->sp_states);
D_FREE(pool);
}

0 comments on commit 28d189a

Please sign in to comment.