Skip to content

Commit

Permalink
prov/xnet,common: Disable the EP if an error is detected for zero-copy
Browse files Browse the repository at this point in the history
This patch disables the endpoint if zero-copy reports an error. The
goal is to prevent the stream to be corrupted if zero-copy gets disabled
after the transfer has started.

Signed-off-by: Sylvain Didelot <[email protected]>
Change-Id: If14d99892904b64ba550b1b800d902e0c8041376
  • Loading branch information
sydidelot authored and shefty committed Sep 20, 2023
1 parent 123339c commit 99f8770
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
4 changes: 2 additions & 2 deletions include/ofi_net.h
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,8 @@ int ofi_bsock_sendv(struct ofi_bsock *bsock, const struct iovec *iov,
int ofi_bsock_recv(struct ofi_bsock *bsock, void *buf, size_t *len);
int ofi_bsock_recvv(struct ofi_bsock *bsock, struct iovec *iov,
size_t cnt, size_t *len);
uint32_t ofi_bsock_async_done(const struct fi_provider *prov,
struct ofi_bsock *bsock);
int ofi_bsock_async_done(const struct fi_provider *prov,
struct ofi_bsock *bsock);
void ofi_bsock_prefetch_done(struct ofi_bsock *bsock, size_t len);


Expand Down
11 changes: 8 additions & 3 deletions prov/tcp/src/xnet_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -1136,14 +1136,19 @@ void xnet_progress_rx(struct xnet_ep *ep)
void xnet_progress_async(struct xnet_ep *ep)
{
struct xnet_xfer_entry *xfer;
uint32_t done;
int ret;

assert(xnet_progress_locked(xnet_ep2_progress(ep)));
done = ofi_bsock_async_done(&xnet_prov, &ep->bsock);
ret = ofi_bsock_async_done(&xnet_prov, &ep->bsock);
if (ret) {
xnet_ep_disable(ep, 0, NULL, 0);
return;
}

while (!slist_empty(&ep->async_queue)) {
xfer = container_of(ep->async_queue.head,
struct xnet_xfer_entry, entry);
if (ofi_val32_gt(xfer->async_index, done))
if (ofi_val32_gt(xfer->async_index, ep->bsock.done_index))
break;

slist_remove_head(&ep->async_queue);
Expand Down
17 changes: 8 additions & 9 deletions src/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1402,8 +1402,8 @@ void ofi_bsock_prefetch_done(struct ofi_bsock *bsock, size_t len)
}

#ifdef MSG_ZEROCOPY
uint32_t ofi_bsock_async_done(const struct fi_provider *prov,
struct ofi_bsock *bsock)
int ofi_bsock_async_done(const struct fi_provider *prov,
struct ofi_bsock *bsock)
{
struct msghdr msg = {};
struct sock_extended_err *serr;
Expand All @@ -1418,7 +1418,7 @@ uint32_t ofi_bsock_async_done(const struct fi_provider *prov,
if (ret < 0) {
FI_WARN(prov, FI_LOG_EP_DATA,
"Error reading MSG_ERRQUEUE (%s)\n", strerror(errno));
goto disable;
return -errno;
}

assert(!(msg.msg_flags & MSG_CTRUNC));
Expand All @@ -1427,31 +1427,30 @@ uint32_t ofi_bsock_async_done(const struct fi_provider *prov,
(cmsg->cmsg_level != SOL_IPV6 && cmsg->cmsg_type != IPV6_RECVERR)) {
FI_WARN(prov, FI_LOG_EP_DATA,
"Unexpected cmsg level (!IP) or type (!RECVERR)\n");
goto disable;
return -FI_EINVAL;
}

serr = (void *) CMSG_DATA(cmsg);
if ((serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) || serr->ee_errno) {
FI_WARN(prov, FI_LOG_EP_DATA,
"Unexpected sock err origin or errno\n");
goto disable;
return -FI_EINVAL;
}

bsock->done_index = serr->ee_data;
if (serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED) {
FI_WARN(prov, FI_LOG_EP_DATA,
"Zerocopy data was copied\n");
disable:
if (bsock->zerocopy_size != SIZE_MAX) {
FI_WARN(prov, FI_LOG_EP_DATA, "disabling zerocopy\n");
bsock->zerocopy_size = SIZE_MAX;
}
}
return bsock->done_index;
return 0;
}
#else
uint32_t ofi_bsock_async_done(const struct fi_provider *prov,
struct ofi_bsock *bsock)
int ofi_bsock_async_done(const struct fi_provider *prov,
struct ofi_bsock *bsock)
{
return 0;
}
Expand Down

0 comments on commit 99f8770

Please sign in to comment.