diff --git a/include/ofi_util.h b/include/ofi_util.h index d6e790f8691..34801f6aa01 100644 --- a/include/ofi_util.h +++ b/include/ofi_util.h @@ -554,7 +554,6 @@ ssize_t ofi_cq_read_entries(struct util_cq *cq, void *buf, size_t count, struct fi_cq_tagged_entry *entry; struct util_cq_aux_entry *aux_entry; ssize_t i; - ofi_genlock_lock(&cq->cq_lock); if (ofi_cirque_isempty(cq->cirq)) { i = -FI_EAGAIN; diff --git a/prov/rxm/src/rxm_rma.c b/prov/rxm/src/rxm_rma.c index ca79e36d729..8de5a1a41db 100644 --- a/prov/rxm/src/rxm_rma.c +++ b/prov/rxm/src/rxm_rma.c @@ -86,7 +86,6 @@ rxm_ep_rma_common(struct rxm_ep *rxm_ep, const struct fi_msg_rma *msg, ret = -FI_EAGAIN; goto unlock; } - rma_buf->hdr.state = RXM_RMA; rma_buf->pkt.ctrl_hdr.type = rxm_ctrl_eager; rma_buf->app_context = msg->context; diff --git a/prov/tcp/src/xnet.h b/prov/tcp/src/xnet.h index ffece256b36..169608c4d46 100644 --- a/prov/tcp/src/xnet.h +++ b/prov/tcp/src/xnet.h @@ -581,14 +581,13 @@ xnet_set_commit_flags(struct xnet_xfer_entry *xfer, uint64_t flags) xfer->ctrl_flags |= XNET_NEED_ACK; } } - static inline uint64_t -xnet_tx_completion_flag(struct xnet_ep *ep, uint64_t op_flags) +xnet_tx_completion_get_msgflags(struct xnet_ep *ep, uint64_t flags) { - /* Generate a completion if op flags indicate or we generate + /* Generate a completion if msg flags indicate or we generate * completions by default */ - return (ep->util_ep.tx_op_flags | op_flags) & FI_COMPLETION; + return (ep->util_ep.tx_msg_flags | flags) & FI_COMPLETION; } static inline uint64_t @@ -597,6 +596,15 @@ xnet_rx_completion_flag(struct xnet_ep *ep) return ep->util_ep.rx_op_flags & FI_COMPLETION; } +static inline uint64_t +xnet_tx_completion_get_opflags(struct xnet_ep *ep) +{ + /* Generate a completion if op flags indicate or we generate + * completions by default + */ + return ep->util_ep.tx_op_flags & FI_COMPLETION; +} + static inline struct xnet_xfer_entry * xnet_alloc_xfer(struct xnet_progress *progress) { @@ -623,7 +631,6 @@ static inline void xnet_free_xfer(struct xnet_progress *progress, struct xnet_xfer_entry *xfer) { assert(xnet_progress_locked(progress)); - if (xfer->ctrl_flags & XNET_FREE_BUF) free(xfer->user_buf); @@ -643,7 +650,6 @@ xnet_alloc_rx(struct xnet_ep *ep) xfer->cntr = ep->util_ep.cntrs[CNTR_RX]; xfer->cq = xnet_ep_rx_cq(ep); } - return xfer; } @@ -659,7 +665,6 @@ xnet_alloc_tx(struct xnet_ep *ep) xfer->hdr.base_hdr.op_data = 0; xfer->cq = xnet_ep_tx_cq(ep); } - return xfer; } diff --git a/prov/tcp/src/xnet_cq.c b/prov/tcp/src/xnet_cq.c index 03ea975371d..be83a85d0db 100644 --- a/prov/tcp/src/xnet_cq.c +++ b/prov/tcp/src/xnet_cq.c @@ -129,7 +129,6 @@ void xnet_report_success(struct xnet_xfer_entry *xfer_entry) struct util_cq *cq; uint64_t flags, data, tag; size_t len; - if (xfer_entry->ctrl_flags & (XNET_INTERNAL_XFER | XNET_SAVED_XFER)) return; @@ -170,7 +169,6 @@ void xnet_report_success(struct xnet_xfer_entry *xfer_entry) data = 0; tag = 0; } - if (cq->src) { ofi_cq_write_src(cq, xfer_entry->context, flags, len, xfer_entry->user_buf, data, tag, diff --git a/prov/tcp/src/xnet_msg.c b/prov/tcp/src/xnet_msg.c index 08b91ca8932..6e040b2b9a9 100644 --- a/prov/tcp/src/xnet_msg.c +++ b/prov/tcp/src/xnet_msg.c @@ -340,7 +340,7 @@ xnet_sendmsg(struct fid_ep *ep_fid, const struct fi_msg *msg, uint64_t flags) } xnet_init_tx_iov(tx_entry, hdr_len, msg->msg_iov, msg->iov_count); - tx_entry->cq_flags = xnet_tx_completion_flag(ep, flags) | + tx_entry->cq_flags = xnet_tx_completion_get_msgflags(ep, flags) | FI_MSG | FI_SEND; xnet_set_ack_flags(tx_entry, flags); tx_entry->context = msg->context; @@ -370,7 +370,7 @@ xnet_send(struct fid_ep *ep_fid, const void *buf, size_t len, xnet_init_tx_buf(tx_entry, sizeof(tx_entry->hdr.base_hdr), buf, len); tx_entry->context = context; - tx_entry->cq_flags = xnet_tx_completion_flag(ep, 0) | + tx_entry->cq_flags = xnet_tx_completion_get_opflags(ep) | FI_MSG | FI_SEND; xnet_set_ack_flags(tx_entry, ep->util_ep.tx_op_flags); @@ -399,7 +399,7 @@ xnet_sendv(struct fid_ep *ep_fid, const struct iovec *iov, xnet_init_tx_iov(tx_entry, sizeof(tx_entry->hdr.base_hdr), iov, count); tx_entry->context = context; - tx_entry->cq_flags = xnet_tx_completion_flag(ep, 0) | + tx_entry->cq_flags = xnet_tx_completion_opflag(ep) | FI_MSG | FI_SEND; xnet_set_ack_flags(tx_entry, ep->util_ep.tx_op_flags); @@ -462,7 +462,7 @@ xnet_senddata(struct fid_ep *ep_fid, const void *buf, size_t len, xnet_init_tx_buf(tx_entry, sizeof(tx_entry->hdr.cq_data_hdr), buf, len); tx_entry->context = context; - tx_entry->cq_flags = xnet_tx_completion_flag(ep, 0) | + tx_entry->cq_flags = xnet_tx_completion_get_opflags(ep) | FI_MSG | FI_SEND; xnet_set_ack_flags(tx_entry, ep->util_ep.tx_op_flags); @@ -545,7 +545,7 @@ xnet_tsendmsg(struct fid_ep *fid_ep, const struct fi_msg_tagged *msg, } xnet_init_tx_iov(tx_entry, hdr_len, msg->msg_iov, msg->iov_count); - tx_entry->cq_flags = xnet_tx_completion_flag(ep, flags) | + tx_entry->cq_flags = xnet_tx_completion_get_msgflags(ep, flags) | FI_TAGGED | FI_SEND; xnet_set_ack_flags(tx_entry, flags); tx_entry->context = msg->context; @@ -579,7 +579,7 @@ xnet_tsend(struct fid_ep *fid_ep, const void *buf, size_t len, xnet_init_tx_buf(tx_entry, sizeof(tx_entry->hdr.tag_hdr), buf, len); tx_entry->context = context; - tx_entry->cq_flags = xnet_tx_completion_flag(ep, 0) | + tx_entry->cq_flags = xnet_tx_completion_opflag(ep) | FI_TAGGED | FI_SEND; xnet_set_ack_flags(tx_entry, ep->util_ep.tx_op_flags); @@ -612,7 +612,7 @@ xnet_tsendv(struct fid_ep *fid_ep, const struct iovec *iov, void **desc, xnet_init_tx_iov(tx_entry, sizeof(tx_entry->hdr.tag_hdr), iov, count); tx_entry->context = context; - tx_entry->cq_flags = xnet_tx_completion_flag(ep, 0) | + tx_entry->cq_flags = xnet_tx_completion_get_opflags(ep) | FI_TAGGED | FI_SEND; xnet_set_ack_flags(tx_entry, ep->util_ep.tx_op_flags); @@ -678,7 +678,7 @@ xnet_tsenddata(struct fid_ep *fid_ep, const void *buf, size_t len, void *desc, xnet_init_tx_buf(tx_entry, sizeof(tx_entry->hdr.tag_data_hdr), buf, len); tx_entry->context = context; - tx_entry->cq_flags = xnet_tx_completion_flag(ep, 0) | + tx_entry->cq_flags = xnet_tx_completion_get_opflags(ep) | FI_TAGGED | FI_SEND; xnet_set_ack_flags(tx_entry, ep->util_ep.tx_op_flags); diff --git a/prov/tcp/src/xnet_progress.c b/prov/tcp/src/xnet_progress.c index 378f34dab9b..00452749502 100644 --- a/prov/tcp/src/xnet_progress.c +++ b/prov/tcp/src/xnet_progress.c @@ -717,7 +717,7 @@ static int xnet_handle_ack(struct xnet_ep *ep) assert(!slist_empty(&ep->need_ack_queue)); tx_entry = container_of(slist_remove_head(&ep->need_ack_queue), struct xnet_xfer_entry, entry); - + printf("%d:handle_ack:%p", getpid(), tx_entry); xnet_report_success(tx_entry); xnet_free_xfer(xnet_ep2_progress(ep), tx_entry); xnet_reset_rx(ep); @@ -1101,6 +1101,7 @@ static void xnet_complete_rx(struct xnet_ep *ep, ssize_t ret) } if (!(rx_entry->ctrl_flags & XNET_SAVED_XFER)) { + printf("%d:IN complete_rx: rx_entry:%p, context:%p\n",getpid(), rx_entry, rx_entry->context); xnet_report_success(rx_entry); xnet_free_xfer(xnet_ep2_progress(ep), rx_entry); } else { diff --git a/prov/tcp/src/xnet_rma.c b/prov/tcp/src/xnet_rma.c index cdf4285b05c..8a6165c92d4 100644 --- a/prov/tcp/src/xnet_rma.c +++ b/prov/tcp/src/xnet_rma.c @@ -92,7 +92,7 @@ static void xnet_rma_read_recv_entry_fill(struct xnet_xfer_entry *recv_entry, recv_entry->iov_cnt = msg->iov_count; recv_entry->context = msg->context; - recv_entry->cq_flags = xnet_tx_completion_flag(ep, flags) | + recv_entry->cq_flags = xnet_tx_compget_get_msgflags(ep, flags) | FI_RMA | FI_READ; /* Read response completes the RMA read transmit */ @@ -166,8 +166,9 @@ xnet_rma_read(struct fid_ep *ep_fid, void *buf, size_t len, void *desc, .context = context, .data = 0, }; - - return xnet_rma_readmsg(ep_fid, &msg, 0); + struct xnet_ep *ep; + ep = container_of(ep_fid, struct xnet_ep, util_ep.ep_fid); + return xnet_rma_readmsg(ep_fid, &msg, ep->util_ep.tx_op_flags); } static ssize_t @@ -190,8 +191,9 @@ xnet_rma_readv(struct fid_ep *ep_fid, const struct iovec *iov, void **desc, .context = context, .data = 0, }; - - return xnet_rma_readmsg(ep_fid, &msg, 0); + struct xnet_ep *ep; + ep = container_of(ep_fid, struct xnet_ep, util_ep.ep_fid); + return xnet_rma_readmsg(ep_fid, &msg, ep->util_ep.tx_op_flags); } static ssize_t @@ -258,7 +260,7 @@ xnet_rma_writemsg(struct fid_ep *ep_fid, const struct fi_msg_rma *msg, send_entry->iov[0].iov_base = (void *) &send_entry->hdr; send_entry->iov[0].iov_len = offset; - send_entry->cq_flags = xnet_tx_completion_flag(ep, flags) | + send_entry->cq_flags = xnet_tx_completion_get_msgflags(ep, flags) | FI_RMA | FI_WRITE; send_entry->cntr = ep->util_ep.cntrs[CNTR_WR]; xnet_set_commit_flags(send_entry, flags); @@ -293,8 +295,9 @@ xnet_rma_write(struct fid_ep *ep_fid, const void *buf, size_t len, void *desc, .context = context, .data = 0, }; - - return xnet_rma_writemsg(ep_fid, &msg, 0); + struct xnet_ep *ep; + ep = container_of(ep_fid, struct xnet_ep, util_ep.ep_fid); + return xnet_rma_writemsg(ep_fid, &msg, ep->util_ep.tx_op_flags); } static ssize_t @@ -317,8 +320,9 @@ xnet_rma_writev(struct fid_ep *ep_fid, const struct iovec *iov, void **desc, .context = context, .data = 0, }; - - return xnet_rma_writemsg(ep_fid, &msg, 0); + struct xnet_ep *ep; + ep = container_of(ep_fid, struct xnet_ep, util_ep.ep_fid); + return xnet_rma_writemsg(ep_fid, &msg, ep->util_ep.tx_op_flags); } @@ -346,8 +350,9 @@ xnet_rma_writedata(struct fid_ep *ep_fid, const void *buf, size_t len, .context = context, .data = data, }; - - return xnet_rma_writemsg(ep_fid, &msg, FI_REMOTE_CQ_DATA); + struct xnet_ep *ep; + ep = container_of(ep_fid, struct xnet_ep, util_ep.ep_fid); + return xnet_rma_writemsg(ep_fid, &msg, FI_REMOTE_CQ_DATA | ep->util_ep.tx_op_flags); } static ssize_t