Skip to content

Commit

Permalink
DAOS-15829 object: fix potential DRAM leak when retry after DTX refre…
Browse files Browse the repository at this point in the history
…sh (#14394) (#14432)

Two possible DRAM leak when re-enter obj_local_rw_internal():

1. daos_iod_recx_dup() will allocate new iods to replace input one. But
   the input one may be former daos_iod_recx_dup() allocated.

2. obj_fetch_csum_init() may allocate new orw_iod_csums arrays that may
   overwrite former allocated ones.

Some other fixes:

a. obj_fetch_create_maps() may miss new iods wnen re-enter.

b. obj_prep_fetch_sgls() miss to handle re-enter case.

c. Drop redundant anthor restore for enumeration retry after DTX refresh.

d. Add some log message for the cases that need DTX refresh.

Signed-off-by: Fan Yong <[email protected]>
  • Loading branch information
jolivier23 authored May 25, 2024
1 parent 28d189a commit 868bf18
Showing 1 changed file with 98 additions and 99 deletions.
197 changes: 98 additions & 99 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -705,20 +705,21 @@ obj_set_reply_nrs(crt_rpc_t *rpc, daos_handle_t ioh, d_sg_list_t *echo_sgl, uint
/* Re-entry case. */
if (orwo->orw_nrs.ca_count != 0) {
D_ASSERT(orwo->orw_nrs.ca_count == nrs_count);
return 0;
}

/* return sg_nr_out and data size for sgl */
D_ALLOC(orwo->orw_nrs.ca_arrays,
nrs_count * (sizeof(uint32_t) + sizeof(daos_size_t)));
if (orwo->orw_nrs.ca_arrays == NULL)
return -DER_NOMEM;
D_ASSERT(orwo->orw_data_sizes.ca_count == nrs_count);
D_ASSERT(orwo->orw_nrs.ca_arrays != NULL);
D_ASSERT(orwo->orw_data_sizes.ca_arrays != NULL);
} else {
/* return sg_nr_out and data size for sgl */
D_ALLOC(orwo->orw_nrs.ca_arrays,
nrs_count * (sizeof(uint32_t) + sizeof(daos_size_t)));
if (orwo->orw_nrs.ca_arrays == NULL)
return -DER_NOMEM;

orwo->orw_nrs.ca_count = nrs_count;
orwo->orw_data_sizes.ca_count = nrs_count;
orwo->orw_data_sizes.ca_arrays =
(void *)((char *)orwo->orw_nrs.ca_arrays +
nrs_count * (sizeof(uint32_t)));
orwo->orw_nrs.ca_count = nrs_count;
orwo->orw_data_sizes.ca_count = nrs_count;
orwo->orw_data_sizes.ca_arrays = (void *)((char *)orwo->orw_nrs.ca_arrays +
nrs_count * (sizeof(uint32_t)));
}

nrs = orwo->orw_nrs.ca_arrays;
data_sizes = orwo->orw_data_sizes.ca_arrays;
Expand Down Expand Up @@ -858,13 +859,20 @@ obj_fetch_csum_init(struct ds_cont_child *cont, struct obj_rw_in *orw, struct ob
*
* The memory will be freed in obj_rw_reply
*/
rc = daos_csummer_alloc_iods_csums(cont->sc_csummer, orw->orw_iod_array.oia_iods,
orw->orw_iod_array.oia_iod_nr, false, NULL,
&orwo->orw_iod_csums.ca_arrays);

if (rc >= 0) {
orwo->orw_iod_csums.ca_count = (uint64_t)rc;
rc = 0;
/* Re-entry case. */
if (orwo->orw_iod_csums.ca_count != 0) {
D_ASSERT(orwo->orw_iod_csums.ca_arrays != NULL);
rc = 0;
} else {
rc = daos_csummer_alloc_iods_csums(cont->sc_csummer, orw->orw_iod_array.oia_iods,
orw->orw_iod_array.oia_iod_nr, false, NULL,
&orwo->orw_iod_csums.ca_arrays);

if (rc >= 0) {
orwo->orw_iod_csums.ca_count = rc;
rc = 0;
}
}

return rc;
Expand Down Expand Up @@ -1115,10 +1123,10 @@ obj_fetch_create_maps(crt_rpc_t *rpc, struct bio_desc *biod, daos_iod_t *iods, u
if (skips == NULL)
D_ASSERTF(total_nr == iods_nr, "total nr %d, iods_nr %d\n", total_nr, iods_nr);

/* Re-entry case. */
if (orwo->orw_maps.ca_count != 0) {
D_ASSERT(orwo->orw_maps.ca_count == total_nr);
return 0;
/* Re-entry case, iods may be changed, let's re-generate the maps. */
if (orwo->orw_maps.ca_arrays != NULL) {
ds_iom_free(&orwo->orw_maps.ca_arrays, orwo->orw_maps.ca_count);
orwo->orw_maps.ca_count = 0;
}

rc = ds_iom_create(biod, iods, iods_nr, flags, &maps);
Expand Down Expand Up @@ -1198,6 +1206,10 @@ obj_prep_fetch_sgls(crt_rpc_t *rpc, struct obj_io_context *ioc)
int j;
int rc = 0;

/* Re-entry case. */
if (ioc->ioc_free_sgls)
return 0;

for (i = 0; i < nr; i++) {
for (j = 0; j < sgls[i].sg_nr; j++) {
d_iov_t *iov = &sgls[i].sg_iovs[j];
Expand Down Expand Up @@ -1993,7 +2005,7 @@ obj_get_iods_offs(daos_unit_oid_t uoid, struct obj_iod_array *iod_array,
}

static int
obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
obj_local_rw_internal_wrap(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
{
struct obj_rw_in *orw = crt_req_get(rpc);
daos_iod_t iod = { 0 };
Expand All @@ -2007,32 +2019,13 @@ obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
uint8_t *skips = (uint8_t *)&local_skips;
uint32_t nr = 0;
int rc;
int count = 0;

rc = obj_get_iods_offs(orw->orw_oid, &orw->orw_iod_array, &ioc->ioc_oca,
orw->orw_dkey_hash, ioc->ioc_layout_ver, &iods,
&offs, &skips, &csums, &csum_info, &nr);
if (rc != 0)
D_GOTO(out, rc);
again:
rc = obj_local_rw_internal(rpc, ioc, iods, csums, offs, skips, nr, dth);
if (dth != NULL && obj_dtx_need_refresh(dth, rc)) {
if (unlikely(++count % 10 == 3)) {
struct dtx_share_peer *dsp;

dsp = d_list_entry(dth->dth_share_tbd_list.next, struct dtx_share_peer,
dsp_link);
D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d times, "
"maybe dead loop\n", DP_DTI(&dth->dth_xid), DP_DTI(&dsp->dsp_xid),
dth->dth_share_tbd_count, count);
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN)
goto again;
}
if (rc == 0)
rc = obj_local_rw_internal(rpc, ioc, iods, csums, offs, skips, nr, dth);

out:
if (csums != NULL && csums != &csum && csums != orw->orw_iod_array.oia_iod_csums) {
int i;

Expand All @@ -2052,6 +2045,32 @@ obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
return rc;
}

static int
obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
{
struct dtx_share_peer *dsp;
uint32_t retry = 0;
int rc;

again:
rc = obj_local_rw_internal_wrap(rpc, ioc, dth);
if (dth != NULL && obj_dtx_need_refresh(dth, rc)) {
if (unlikely(++retry % 10 == 3)) {
dsp = d_list_entry(dth->dth_share_tbd_list.next, struct dtx_share_peer,
dsp_link);
D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d times, "
"maybe dead loop\n", DP_DTI(&dth->dth_xid), DP_DTI(&dsp->dsp_xid),
dth->dth_share_tbd_count, retry);
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN)
goto again;
}

return rc;
}

static int
obj_capa_check(struct ds_cont_hdl *coh, bool is_write, bool is_agg_migrate)
{
Expand Down Expand Up @@ -3098,45 +3117,12 @@ obj_enum_complete(crt_rpc_t *rpc, int status, int map_version,
D_FREE(oeo->oeo_csum_iov.iov_buf);
}

static int
obj_restore_enum_args(crt_rpc_t *rpc, struct ds_obj_enum_arg *des,
struct ds_obj_enum_arg *src)
{
struct obj_key_enum_out *oeo = crt_reply_get(rpc);
struct obj_key_enum_in *oei = crt_req_get(rpc);
int rc;

if (!des->fill_recxs && des->csum_iov.iov_buf != NULL)
daos_iov_free(&des->csum_iov);

*des = *src;

if (des->fill_recxs)
return 0;

if (des->kds != NULL)
memset(des->kds, 0, des->kds_cap * sizeof(daos_key_desc_t));
des->kds_len = 0;

if (oeo->oeo_sgl.sg_iovs == NULL)
return 0;

d_sgl_fini(&oeo->oeo_sgl, true);
rc = daos_sgls_alloc(&oeo->oeo_sgl, &oei->oei_sgl, 1);
if (rc != 0)
return rc;

des->sgl = &oeo->oeo_sgl;
return 0;
}

static int
obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
struct vos_iter_anchors *anchors, struct ds_obj_enum_arg *enum_arg,
daos_epoch_t *e_out)
{
vos_iter_param_t param = { 0 };
struct ds_obj_enum_arg saved_arg;
struct obj_key_enum_in *oei = crt_req_get(rpc);
struct dtx_handle *dth = NULL;
uint32_t flags = 0;
Expand Down Expand Up @@ -3199,7 +3185,7 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
D_ASSERT(opc == DAOS_OBJ_RPC_ENUMERATE);
type = VOS_ITER_DKEY;
param.ip_flags |= VOS_IT_RECX_VISIBLE;
if (daos_anchor_get_flags(&anchors[0].ia_dkey) &
if (daos_anchor_get_flags(&anchors->ia_dkey) &
DIOF_WITH_SPEC_EPOCH) {
/* For obj verification case. */
param.ip_epc_expr = VOS_IT_EPC_RR;
Expand Down Expand Up @@ -3229,7 +3215,7 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
* 'type' to indicate the anchor is on SV tree or EV tree.
*/
if (type == VOS_ITER_SINGLE)
anchors[0].ia_sv = anchors[0].ia_ev;
anchors->ia_sv = anchors->ia_ev;
else if (oei->oei_oid.id_shard % 3 == 1 &&
DAOS_FAIL_CHECK(DAOS_VC_LOST_REPLICA))
D_GOTO(failed, rc = -DER_NONEXIST);
Expand All @@ -3245,9 +3231,6 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
goto failed;
}

anchors[1] = anchors[0];
saved_arg = *enum_arg;

if (oei->oei_flags & ORF_FOR_MIGRATION)
flags = DTX_FOR_MIGRATION;

Expand All @@ -3258,16 +3241,12 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
goto failed;

re_pack:
rc = ds_obj_enum_pack(&param, type, recursive, &anchors[0], enum_arg, vos_iterate, dth);
rc = ds_obj_enum_pack(&param, type, recursive, anchors, enum_arg, vos_iterate, dth);
if (obj_dtx_need_refresh(dth, rc)) {
rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN) {
anchors[0] = anchors[1];
obj_restore_enum_args(rpc, enum_arg, &saved_arg);
if (opc == DAOS_OBJ_RPC_ENUMERATE)
fill_oid(oei->oei_oid, enum_arg);
/* After DTX refresh, re_pack will resume from the position at \@anchors. */
if (rc == -DER_AGAIN)
goto re_pack;
}
}

if ((rc == -DER_KEY2BIG) && opc == DAOS_OBJ_RPC_ENUMERATE &&
Expand All @@ -3293,7 +3272,7 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
rc = rc_tmp;

if (type == VOS_ITER_SINGLE)
anchors[0].ia_ev = anchors[0].ia_sv;
anchors->ia_ev = anchors->ia_sv;

D_DEBUG(DB_IO, ""DF_UOID" iterate "DF_X64"-"DF_X64" type %d tag %d"
" rc %d\n", DP_UOID(oei->oei_oid), param.ip_epr.epr_lo,
Expand Down Expand Up @@ -3390,13 +3369,13 @@ ds_obj_enum_handler(crt_rpc_t *rpc)
dss_get_module_info()->dmi_xs_id,
oei->oei_map_ver, ioc.ioc_map_ver);

D_ALLOC_ARRAY(anchors, 2);
D_ALLOC_PTR(anchors);
if (anchors == NULL)
D_GOTO(out, rc = -DER_NOMEM);

anchors[0].ia_dkey = oei->oei_dkey_anchor;
anchors[0].ia_akey = oei->oei_akey_anchor;
anchors[0].ia_ev = oei->oei_anchor;
anchors->ia_dkey = oei->oei_dkey_anchor;
anchors->ia_akey = oei->oei_akey_anchor;
anchors->ia_ev = oei->oei_anchor;

/* TODO: Transfer the inline_thres from enumerate RPC */
enum_arg.inline_thres = 32;
Expand Down Expand Up @@ -3447,9 +3426,9 @@ ds_obj_enum_handler(crt_rpc_t *rpc)
if (rc)
D_GOTO(out, rc);

oeo->oeo_dkey_anchor = anchors[0].ia_dkey;
oeo->oeo_akey_anchor = anchors[0].ia_akey;
oeo->oeo_anchor = anchors[0].ia_ev;
oeo->oeo_dkey_anchor = anchors->ia_dkey;
oeo->oeo_akey_anchor = anchors->ia_akey;
oeo->oeo_anchor = anchors->ia_ev;

if (enum_arg.eprs)
oeo->oeo_eprs.ca_count = enum_arg.eprs_len;
Expand Down Expand Up @@ -3498,7 +3477,9 @@ obj_local_punch(struct obj_punch_in *opi, crt_opcode_t opc,
struct obj_io_context *ioc, struct dtx_handle *dth)
{
struct ds_cont_child *cont = ioc->ioc_coc;
struct dtx_share_peer *dsp;
uint64_t sched_seq;
uint32_t retry = 0;
int rc = 0;

if (daos_is_zero_dti(&opi->opi_dti)) {
Expand Down Expand Up @@ -3544,6 +3525,14 @@ obj_local_punch(struct obj_punch_in *opi, crt_opcode_t opc,
}

if (dth != NULL && obj_dtx_need_refresh(dth, rc)) {
if (unlikely(++retry % 10 == 3)) {
dsp = d_list_entry(dth->dth_share_tbd_list.next,
struct dtx_share_peer, dsp_link);
D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d "
"times, maybe dead loop\n", DP_DTI(&dth->dth_xid),
DP_DTI(&dsp->dsp_xid), dth->dth_share_tbd_count, retry);
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc != -DER_AGAIN)
goto out;
Expand Down Expand Up @@ -4679,11 +4668,21 @@ ds_cpd_handle_one_wrap(crt_rpc_t *rpc, struct daos_cpd_sub_head *dcsh,
struct daos_cpd_disp_ent *dcde, struct daos_cpd_sub_req *dcsrs,
struct obj_io_context *ioc, struct dtx_handle *dth)
{
int rc;
struct dtx_share_peer *dsp;
uint32_t retry = 0;
int rc;

again:
rc = ds_cpd_handle_one(rpc, dcsh, dcde, dcsrs, ioc, dth);
if (obj_dtx_need_refresh(dth, rc)) {
if (unlikely(++retry % 10 == 3)) {
dsp = d_list_entry(dth->dth_share_tbd_list.next,
struct dtx_share_peer, dsp_link);
D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d "
"times, maybe dead loop\n", DP_DTI(&dth->dth_xid),
DP_DTI(&dsp->dsp_xid), dth->dth_share_tbd_count, retry);
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN)
goto again;
Expand Down

0 comments on commit 868bf18

Please sign in to comment.