Skip to content

Commit

Permalink
prov/tcp: Derive cq flags from op and msg flags
Browse files Browse the repository at this point in the history
This patch adds the setting of FI_COMPLETION to tx entry cq flags
combining with default opflags or  msg flags depending on the
transfer interface being used.

Current implemntation

Signed-off-by: Nikhil Nanal <[email protected]>
  • Loading branch information
nikhilnanal committed Oct 20, 2023
1 parent c68c5b3 commit dc10729
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 32 deletions.
1 change: 0 additions & 1 deletion include/ofi_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,6 @@ ssize_t ofi_cq_read_entries(struct util_cq *cq, void *buf, size_t count,
struct fi_cq_tagged_entry *entry;
struct util_cq_aux_entry *aux_entry;
ssize_t i;

ofi_genlock_lock(&cq->cq_lock);
if (ofi_cirque_isempty(cq->cirq)) {
i = -FI_EAGAIN;
Expand Down
1 change: 0 additions & 1 deletion prov/rxm/src/rxm_rma.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ rxm_ep_rma_common(struct rxm_ep *rxm_ep, const struct fi_msg_rma *msg,
ret = -FI_EAGAIN;
goto unlock;
}

rma_buf->hdr.state = RXM_RMA;
rma_buf->pkt.ctrl_hdr.type = rxm_ctrl_eager;
rma_buf->app_context = msg->context;
Expand Down
19 changes: 12 additions & 7 deletions prov/tcp/src/xnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -581,14 +581,13 @@ xnet_set_commit_flags(struct xnet_xfer_entry *xfer, uint64_t flags)
xfer->ctrl_flags |= XNET_NEED_ACK;
}
}

static inline uint64_t
xnet_tx_completion_flag(struct xnet_ep *ep, uint64_t op_flags)
xnet_tx_completion_get_msgflags(struct xnet_ep *ep, uint64_t flags)
{
/* Generate a completion if op flags indicate or we generate
/* Generate a completion if msg flags indicate or we generate
* completions by default
*/
return (ep->util_ep.tx_op_flags | op_flags) & FI_COMPLETION;
return (ep->util_ep.tx_msg_flags | flags) & FI_COMPLETION;
}

static inline uint64_t
Expand All @@ -597,6 +596,15 @@ xnet_rx_completion_flag(struct xnet_ep *ep)
return ep->util_ep.rx_op_flags & FI_COMPLETION;
}

static inline uint64_t
xnet_tx_completion_get_opflags(struct xnet_ep *ep)
{
/* Generate a completion if op flags indicate or we generate
* completions by default
*/
return ep->util_ep.tx_op_flags & FI_COMPLETION;
}

static inline struct xnet_xfer_entry *
xnet_alloc_xfer(struct xnet_progress *progress)
{
Expand All @@ -623,7 +631,6 @@ static inline void
xnet_free_xfer(struct xnet_progress *progress, struct xnet_xfer_entry *xfer)
{
assert(xnet_progress_locked(progress));

if (xfer->ctrl_flags & XNET_FREE_BUF)
free(xfer->user_buf);

Expand All @@ -643,7 +650,6 @@ xnet_alloc_rx(struct xnet_ep *ep)
xfer->cntr = ep->util_ep.cntrs[CNTR_RX];
xfer->cq = xnet_ep_rx_cq(ep);
}

return xfer;
}

Expand All @@ -659,7 +665,6 @@ xnet_alloc_tx(struct xnet_ep *ep)
xfer->hdr.base_hdr.op_data = 0;
xfer->cq = xnet_ep_tx_cq(ep);
}

return xfer;
}

Expand Down
2 changes: 0 additions & 2 deletions prov/tcp/src/xnet_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ void xnet_report_success(struct xnet_xfer_entry *xfer_entry)
struct util_cq *cq;
uint64_t flags, data, tag;
size_t len;

if (xfer_entry->ctrl_flags & (XNET_INTERNAL_XFER | XNET_SAVED_XFER))
return;

Expand Down Expand Up @@ -170,7 +169,6 @@ void xnet_report_success(struct xnet_xfer_entry *xfer_entry)
data = 0;
tag = 0;
}

if (cq->src) {
ofi_cq_write_src(cq, xfer_entry->context, flags, len,
xfer_entry->user_buf, data, tag,
Expand Down
16 changes: 8 additions & 8 deletions prov/tcp/src/xnet_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ xnet_sendmsg(struct fid_ep *ep_fid, const struct fi_msg *msg, uint64_t flags)
}

xnet_init_tx_iov(tx_entry, hdr_len, msg->msg_iov, msg->iov_count);
tx_entry->cq_flags = xnet_tx_completion_flag(ep, flags) |
tx_entry->cq_flags = xnet_tx_completion_get_msgflags(ep, flags) |
FI_MSG | FI_SEND;
xnet_set_ack_flags(tx_entry, flags);
tx_entry->context = msg->context;
Expand Down Expand Up @@ -370,7 +370,7 @@ xnet_send(struct fid_ep *ep_fid, const void *buf, size_t len,

xnet_init_tx_buf(tx_entry, sizeof(tx_entry->hdr.base_hdr), buf, len);
tx_entry->context = context;
tx_entry->cq_flags = xnet_tx_completion_flag(ep, 0) |
tx_entry->cq_flags = xnet_tx_completion_get_opflags(ep) |
FI_MSG | FI_SEND;
xnet_set_ack_flags(tx_entry, ep->util_ep.tx_op_flags);

Expand Down Expand Up @@ -399,7 +399,7 @@ xnet_sendv(struct fid_ep *ep_fid, const struct iovec *iov,

xnet_init_tx_iov(tx_entry, sizeof(tx_entry->hdr.base_hdr), iov, count);
tx_entry->context = context;
tx_entry->cq_flags = xnet_tx_completion_flag(ep, 0) |
tx_entry->cq_flags = xnet_tx_completion_opflag(ep) |
FI_MSG | FI_SEND;
xnet_set_ack_flags(tx_entry, ep->util_ep.tx_op_flags);

Expand Down Expand Up @@ -462,7 +462,7 @@ xnet_senddata(struct fid_ep *ep_fid, const void *buf, size_t len,
xnet_init_tx_buf(tx_entry, sizeof(tx_entry->hdr.cq_data_hdr),
buf, len);
tx_entry->context = context;
tx_entry->cq_flags = xnet_tx_completion_flag(ep, 0) |
tx_entry->cq_flags = xnet_tx_completion_get_opflags(ep) |
FI_MSG | FI_SEND;
xnet_set_ack_flags(tx_entry, ep->util_ep.tx_op_flags);

Expand Down Expand Up @@ -545,7 +545,7 @@ xnet_tsendmsg(struct fid_ep *fid_ep, const struct fi_msg_tagged *msg,
}

xnet_init_tx_iov(tx_entry, hdr_len, msg->msg_iov, msg->iov_count);
tx_entry->cq_flags = xnet_tx_completion_flag(ep, flags) |
tx_entry->cq_flags = xnet_tx_completion_get_msgflags(ep, flags) |
FI_TAGGED | FI_SEND;
xnet_set_ack_flags(tx_entry, flags);
tx_entry->context = msg->context;
Expand Down Expand Up @@ -579,7 +579,7 @@ xnet_tsend(struct fid_ep *fid_ep, const void *buf, size_t len,

xnet_init_tx_buf(tx_entry, sizeof(tx_entry->hdr.tag_hdr), buf, len);
tx_entry->context = context;
tx_entry->cq_flags = xnet_tx_completion_flag(ep, 0) |
tx_entry->cq_flags = xnet_tx_completion_opflag(ep) |
FI_TAGGED | FI_SEND;
xnet_set_ack_flags(tx_entry, ep->util_ep.tx_op_flags);

Expand Down Expand Up @@ -612,7 +612,7 @@ xnet_tsendv(struct fid_ep *fid_ep, const struct iovec *iov, void **desc,

xnet_init_tx_iov(tx_entry, sizeof(tx_entry->hdr.tag_hdr), iov, count);
tx_entry->context = context;
tx_entry->cq_flags = xnet_tx_completion_flag(ep, 0) |
tx_entry->cq_flags = xnet_tx_completion_get_opflags(ep) |
FI_TAGGED | FI_SEND;
xnet_set_ack_flags(tx_entry, ep->util_ep.tx_op_flags);

Expand Down Expand Up @@ -678,7 +678,7 @@ xnet_tsenddata(struct fid_ep *fid_ep, const void *buf, size_t len, void *desc,
xnet_init_tx_buf(tx_entry, sizeof(tx_entry->hdr.tag_data_hdr),
buf, len);
tx_entry->context = context;
tx_entry->cq_flags = xnet_tx_completion_flag(ep, 0) |
tx_entry->cq_flags = xnet_tx_completion_get_opflags(ep) |
FI_TAGGED | FI_SEND;
xnet_set_ack_flags(tx_entry, ep->util_ep.tx_op_flags);

Expand Down
3 changes: 2 additions & 1 deletion prov/tcp/src/xnet_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ static int xnet_handle_ack(struct xnet_ep *ep)
assert(!slist_empty(&ep->need_ack_queue));
tx_entry = container_of(slist_remove_head(&ep->need_ack_queue),
struct xnet_xfer_entry, entry);

printf("%d:handle_ack:%p", getpid(), tx_entry);
xnet_report_success(tx_entry);
xnet_free_xfer(xnet_ep2_progress(ep), tx_entry);
xnet_reset_rx(ep);
Expand Down Expand Up @@ -1101,6 +1101,7 @@ static void xnet_complete_rx(struct xnet_ep *ep, ssize_t ret)
}

if (!(rx_entry->ctrl_flags & XNET_SAVED_XFER)) {
printf("%d:IN complete_rx: rx_entry:%p, context:%p\n",getpid(), rx_entry, rx_entry->context);
xnet_report_success(rx_entry);
xnet_free_xfer(xnet_ep2_progress(ep), rx_entry);
} else {
Expand Down
29 changes: 17 additions & 12 deletions prov/tcp/src/xnet_rma.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ static void xnet_rma_read_recv_entry_fill(struct xnet_xfer_entry *recv_entry,

recv_entry->iov_cnt = msg->iov_count;
recv_entry->context = msg->context;
recv_entry->cq_flags = xnet_tx_completion_flag(ep, flags) |
recv_entry->cq_flags = xnet_tx_compget_get_msgflags(ep, flags) |
FI_RMA | FI_READ;

/* Read response completes the RMA read transmit */
Expand Down Expand Up @@ -166,8 +166,9 @@ xnet_rma_read(struct fid_ep *ep_fid, void *buf, size_t len, void *desc,
.context = context,
.data = 0,
};

return xnet_rma_readmsg(ep_fid, &msg, 0);
struct xnet_ep *ep;
ep = container_of(ep_fid, struct xnet_ep, util_ep.ep_fid);
return xnet_rma_readmsg(ep_fid, &msg, ep->util_ep.tx_op_flags);
}

static ssize_t
Expand All @@ -190,8 +191,9 @@ xnet_rma_readv(struct fid_ep *ep_fid, const struct iovec *iov, void **desc,
.context = context,
.data = 0,
};

return xnet_rma_readmsg(ep_fid, &msg, 0);
struct xnet_ep *ep;
ep = container_of(ep_fid, struct xnet_ep, util_ep.ep_fid);
return xnet_rma_readmsg(ep_fid, &msg, ep->util_ep.tx_op_flags);
}

static ssize_t
Expand Down Expand Up @@ -258,7 +260,7 @@ xnet_rma_writemsg(struct fid_ep *ep_fid, const struct fi_msg_rma *msg,
send_entry->iov[0].iov_base = (void *) &send_entry->hdr;
send_entry->iov[0].iov_len = offset;

send_entry->cq_flags = xnet_tx_completion_flag(ep, flags) |
send_entry->cq_flags = xnet_tx_completion_get_msgflags(ep, flags) |
FI_RMA | FI_WRITE;
send_entry->cntr = ep->util_ep.cntrs[CNTR_WR];
xnet_set_commit_flags(send_entry, flags);
Expand Down Expand Up @@ -293,8 +295,9 @@ xnet_rma_write(struct fid_ep *ep_fid, const void *buf, size_t len, void *desc,
.context = context,
.data = 0,
};

return xnet_rma_writemsg(ep_fid, &msg, 0);
struct xnet_ep *ep;
ep = container_of(ep_fid, struct xnet_ep, util_ep.ep_fid);
return xnet_rma_writemsg(ep_fid, &msg, ep->util_ep.tx_op_flags);
}

static ssize_t
Expand All @@ -317,8 +320,9 @@ xnet_rma_writev(struct fid_ep *ep_fid, const struct iovec *iov, void **desc,
.context = context,
.data = 0,
};

return xnet_rma_writemsg(ep_fid, &msg, 0);
struct xnet_ep *ep;
ep = container_of(ep_fid, struct xnet_ep, util_ep.ep_fid);
return xnet_rma_writemsg(ep_fid, &msg, ep->util_ep.tx_op_flags);
}


Expand Down Expand Up @@ -346,8 +350,9 @@ xnet_rma_writedata(struct fid_ep *ep_fid, const void *buf, size_t len,
.context = context,
.data = data,
};

return xnet_rma_writemsg(ep_fid, &msg, FI_REMOTE_CQ_DATA);
struct xnet_ep *ep;
ep = container_of(ep_fid, struct xnet_ep, util_ep.ep_fid);
return xnet_rma_writemsg(ep_fid, &msg, FI_REMOTE_CQ_DATA | ep->util_ep.tx_op_flags);
}

static ssize_t
Expand Down

0 comments on commit dc10729

Please sign in to comment.