diff --git a/include/ofi_net.h b/include/ofi_net.h index fdcd8853f15..1eeaea980d7 100644 --- a/include/ofi_net.h +++ b/include/ofi_net.h @@ -606,8 +606,8 @@ int ofi_bsock_sendv(struct ofi_bsock *bsock, const struct iovec *iov, int ofi_bsock_recv(struct ofi_bsock *bsock, void *buf, size_t *len); int ofi_bsock_recvv(struct ofi_bsock *bsock, struct iovec *iov, size_t cnt, size_t *len); -uint32_t ofi_bsock_async_done(const struct fi_provider *prov, - struct ofi_bsock *bsock); +int ofi_bsock_async_done(const struct fi_provider *prov, + struct ofi_bsock *bsock); void ofi_bsock_prefetch_done(struct ofi_bsock *bsock, size_t len); diff --git a/prov/tcp/src/xnet_progress.c b/prov/tcp/src/xnet_progress.c index 01fc74bd686..342dcf72e28 100644 --- a/prov/tcp/src/xnet_progress.c +++ b/prov/tcp/src/xnet_progress.c @@ -1136,14 +1136,19 @@ void xnet_progress_rx(struct xnet_ep *ep) void xnet_progress_async(struct xnet_ep *ep) { struct xnet_xfer_entry *xfer; - uint32_t done; + int ret; assert(xnet_progress_locked(xnet_ep2_progress(ep))); - done = ofi_bsock_async_done(&xnet_prov, &ep->bsock); + ret = ofi_bsock_async_done(&xnet_prov, &ep->bsock); + if (ret) { + xnet_ep_disable(ep, 0, NULL, 0); + return; + } + while (!slist_empty(&ep->async_queue)) { xfer = container_of(ep->async_queue.head, struct xnet_xfer_entry, entry); - if (ofi_val32_gt(xfer->async_index, done)) + if (ofi_val32_gt(xfer->async_index, ep->bsock.done_index)) break; slist_remove_head(&ep->async_queue); diff --git a/src/common.c b/src/common.c index 0d6e2de33e9..cac720c54ff 100644 --- a/src/common.c +++ b/src/common.c @@ -1402,8 +1402,8 @@ void ofi_bsock_prefetch_done(struct ofi_bsock *bsock, size_t len) } #ifdef MSG_ZEROCOPY -uint32_t ofi_bsock_async_done(const struct fi_provider *prov, - struct ofi_bsock *bsock) +int ofi_bsock_async_done(const struct fi_provider *prov, + struct ofi_bsock *bsock) { struct msghdr msg = {}; struct sock_extended_err *serr; @@ -1418,7 +1418,7 @@ uint32_t ofi_bsock_async_done(const struct fi_provider *prov, if (ret < 0) { FI_WARN(prov, FI_LOG_EP_DATA, "Error reading MSG_ERRQUEUE (%s)\n", strerror(errno)); - goto disable; + return -errno; } assert(!(msg.msg_flags & MSG_CTRUNC)); @@ -1427,31 +1427,30 @@ uint32_t ofi_bsock_async_done(const struct fi_provider *prov, (cmsg->cmsg_level != SOL_IPV6 && cmsg->cmsg_type != IPV6_RECVERR)) { FI_WARN(prov, FI_LOG_EP_DATA, "Unexpected cmsg level (!IP) or type (!RECVERR)\n"); - goto disable; + return -FI_EINVAL; } serr = (void *) CMSG_DATA(cmsg); if ((serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) || serr->ee_errno) { FI_WARN(prov, FI_LOG_EP_DATA, "Unexpected sock err origin or errno\n"); - goto disable; + return -FI_EINVAL; } bsock->done_index = serr->ee_data; if (serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED) { FI_WARN(prov, FI_LOG_EP_DATA, "Zerocopy data was copied\n"); -disable: if (bsock->zerocopy_size != SIZE_MAX) { FI_WARN(prov, FI_LOG_EP_DATA, "disabling zerocopy\n"); bsock->zerocopy_size = SIZE_MAX; } } - return bsock->done_index; + return 0; } #else -uint32_t ofi_bsock_async_done(const struct fi_provider *prov, - struct ofi_bsock *bsock) +int ofi_bsock_async_done(const struct fi_provider *prov, + struct ofi_bsock *bsock) { return 0; }