From f6108035b0cb39131a78cecaef9ee71c198004b1 Mon Sep 17 00:00:00 2001 From: Devendar Bureddy Date: Wed, 8 Nov 2017 00:51:50 +0200 Subject: [PATCH] UCP eager --- src/ucp/core/ucp_ep.c | 14 +++++ src/ucp/core/ucp_ep.h | 14 ++++- src/ucp/core/ucp_request.h | 1 + src/ucp/core/ucp_worker.c | 10 ++++ src/ucp/core/ucp_worker.h | 3 + src/ucp/dt/dt.c | 109 +++++++++++++++++++++++++++++++++++++ src/ucp/dt/dt.h | 9 ++- src/ucp/dt/dt.inl | 48 ---------------- src/ucp/tag/eager.h | 4 +- src/ucp/tag/eager_rcv.c | 2 +- src/ucp/tag/offload.c | 2 +- src/ucp/tag/rndv.c | 4 +- src/ucp/tag/tag_recv.c | 4 +- src/ucp/tag/tag_send.c | 17 ++++-- 14 files changed, 180 insertions(+), 61 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index f1c24f24c66c..3721584c07bd 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -718,6 +718,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) { ucp_context_h context = worker->context; ucp_ep_rma_config_t *rma_config; + ucp_ep_addr_domain_config_t *domain_config; uct_iface_attr_t *iface_attr; uct_md_attr_t *md_attr; ucp_rsc_index_t rsc_index; @@ -824,6 +825,19 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) } } + /* Configuration for memory domains */ + for (lane = 0; lane < config->key.num_lanes; ++lane) { + + domain_config = &config->domain[lane]; + rsc_index = config->key.lanes[lane].rsc_index; + iface_attr = &worker->ifaces[rsc_index].attr; + + domain_config->tag.eager.max_short = iface_attr->cap.am.max_short; + //TODO: zcopy threshold should be based on the ep AM lane capability with domain addr(i.e can UCT do zcopy from domain) + memset(domain_config->tag.eager.zcopy_thresh, 0, UCP_MAX_IOV * sizeof(size_t)); + + } + /* Configuration for remote memory access */ for (lane = 0; lane < config->key.num_lanes; ++lane) { if (ucp_ep_config_get_multi_lane_prio(config->key.rma_lanes, lane) == -1) { diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index 28d36061ac15..6bf9a2bf55fd 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -112,6 +112,15 @@ typedef struct ucp_ep_rma_config { } ucp_ep_rma_config_t; +typedef struct ucp_ep_addr_domain_config { + struct { + struct { + ssize_t max_short; + size_t zcopy_thresh[UCP_MAX_IOV]; + } eager; + } tag; +} ucp_ep_addr_domain_config_t; + /* * Configuration for AM and tag offload protocols */ @@ -185,8 +194,11 @@ typedef struct ucp_ep_config { * (currently it's only AM based). */ const ucp_proto_t *proto; } stream; -} ucp_ep_config_t; + /* Configuration of all domains */ + ucp_ep_addr_domain_config_t domain[UCP_MAX_LANES]; + +} ucp_ep_config_t; /** * UCP_FEATURE_STREAM specific extention of the remote protocol layer endpoint diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index 9c5801d6f7c2..433de36ccbe6 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -84,6 +84,7 @@ enum { struct ucp_request { ucs_status_t status; /* Operation status */ uint16_t flags; /* Request flags */ + uct_memory_type_t mem_type; union { struct { diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index ae527905bd5b..d05afe01d98b 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -721,6 +721,10 @@ static void ucp_worker_close_ifaces(ucp_worker_h worker) ucs_list_del(&wiface->arm_list); } + if (wiface->on_mem_type_list) { + ucs_list_del(&wiface->mem_type_list); + } + if (wiface->attr.cap.flags & UCP_WORKER_UCT_ALL_EVENT_CAP_FLAGS) { status = ucs_async_remove_handler(wiface->event_fd, 1); if (status != UCS_OK) { @@ -844,6 +848,11 @@ ucp_worker_add_iface(ucp_worker_h worker, ucp_rsc_index_t tl_id, ucp_context_tag_offload_enable(context); } + if (context->tl_mds[resource->md_index].attr.cap.mem_type != UCT_MD_MEM_TYPE_HOST) { + wiface->on_mem_type_list = 0; + ucs_list_add_tail(&worker->mem_type_ifaces, &wiface->mem_type_list); + } + return UCS_OK; out_close_iface: @@ -1143,6 +1152,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context, worker->ep_config_count = 0; ucs_list_head_init(&worker->arm_ifaces); ucs_list_head_init(&worker->stream_eps); + ucs_list_head_init(&worker->mem_type_ifaces); if (params->field_mask & UCP_WORKER_PARAM_FIELD_USER_DATA) { worker->user_data = params->user_data; diff --git a/src/ucp/core/ucp_worker.h b/src/ucp/core/ucp_worker.h index 0912be2f6bb4..c18e8d8d96f0 100644 --- a/src/ucp/core/ucp_worker.h +++ b/src/ucp/core/ucp_worker.h @@ -132,10 +132,12 @@ typedef struct ucp_worker_iface { ucp_worker_h worker; /* The parent worker */ ucs_queue_elem_t queue; /* Element in tm.offload_ifaces */ ucs_list_link_t arm_list; /* Element in arm_ifaces list */ + ucs_list_link_t mem_type_list; /* Element in mem_type_ifaces list */ ucp_rsc_index_t rsc_index; /* Resource index */ int event_fd; /* Event FD, or -1 if undefined */ unsigned activate_count;/* How times this iface has been activated */ int on_arm_list; /* Is the interface on arm_list */ + int on_mem_type_list;/* Is the interface on mem_type_list */ int check_events_id;/* Callback id for check_events */ int proxy_am_count;/* Counts active messages on proxy handler */ } ucp_worker_iface_t; @@ -162,6 +164,7 @@ typedef struct ucp_worker { int eventfd; /* Event fd to support signal() calls */ unsigned uct_events; /* UCT arm events */ ucs_list_link_t arm_ifaces; /* List of interfaces to arm */ + ucs_list_link_t mem_type_ifaces;/* List of interfaces to memory types */ void *user_data; /* User-defined data */ ucs_list_link_t stream_eps; /* List of EPs with received stream data */ diff --git a/src/ucp/dt/dt.c b/src/ucp/dt/dt.c index 1208f982c795..de24da405a98 100644 --- a/src/ucp/dt/dt.c +++ b/src/ucp/dt/dt.c @@ -5,6 +5,7 @@ */ #include "dt.h" +#include #include #include @@ -46,3 +47,111 @@ size_t ucp_dt_pack(ucp_datatype_t datatype, void *dest, const void *src, state->offset += result_len; return result_len; } + +static UCS_F_ALWAYS_INLINE ucs_status_t ucp_dn_dt_unpack(ucp_request_t *req, void *buffer, size_t buffer_size, + const void *recv_data, size_t recv_length) +{ + ucs_status_t status; + ucp_worker_iface_t *wiface; + ucp_worker_h worker = req->recv.worker; + ucp_context_h context = worker->context; + unsigned md_index; + uct_mem_h memh; + uct_iov_t iov; + + if (recv_length == 0) { + return UCS_OK; + } + + /* Go over mem_type_list of active interfaces which support + ** data movement to/from memory type domain */ + ucs_list_for_each(wiface, &worker->mem_type_ifaces, mem_type_list) { + md_index = context->tl_rscs[wiface->rsc_index].md_index; + if (context->tl_mds[md_index].attr.cap.mem_type == req->mem_type) { + if (wiface->attr.cap.flags & UCT_IFACE_FLAG_PUT_ZCOPY) { + break; + } + } + } + + status = uct_md_mem_reg(context->tl_mds[md_index].md, buffer, buffer_size, + UCT_MD_MEM_ACCESS_REMOTE_PUT, &memh); + if (status != UCS_OK) { + ucs_error("Failed to reg address %p with md %s", buffer, + context->tl_mds[md_index].rsc.md_name); + return status; + } + + ucs_assert(buffer_size >= recv_length); + iov.buffer = (void *)recv_data; + iov.length = recv_length; + iov.count = 1; + iov.memh = UCT_MEM_HANDLE_NULL; + + + status = wiface->iface->ops.ep_put_zcopy(NULL, &iov, 1, (uint64_t)buffer, (uct_rkey_t )memh, NULL); + if (status != UCS_OK) { + uct_md_mem_dereg(context->tl_mds[md_index].md, memh); + ucs_error("Failed to perform uct_ep_put_zcopy to address %p", recv_data); + return status; + } + + status = uct_md_mem_dereg(context->tl_mds[md_index].md, memh); + if (status != UCS_OK) { + ucs_error("Failed to dereg address %p with md %s", buffer, + context->tl_mds[md_index].rsc.md_name); + return status; + } + + return UCS_OK; +} + + +ucs_status_t ucp_dt_unpack(ucp_request_t *req, ucp_datatype_t datatype, void *buffer, size_t buffer_size, + ucp_dt_state_t *state, const void *recv_data, size_t recv_length, int last) +{ + ucp_dt_generic_t *dt_gen; + size_t offset = state->offset; + ucs_status_t status; + + if (ucs_unlikely((recv_length + offset) > buffer_size)) { + ucs_trace_req("message truncated: recv_length %zu offset %zu buffer_size %zu", + recv_length, offset, buffer_size); + if (UCP_DT_IS_GENERIC(datatype) && last) { + ucp_dt_generic(datatype)->ops.finish(state->dt.generic.state); + } + return UCS_ERR_MESSAGE_TRUNCATED; + } + + switch (datatype & UCP_DATATYPE_CLASS_MASK) { + case UCP_DATATYPE_CONTIG: + if (ucs_likely(req->mem_type == UCT_MD_MEM_TYPE_HOST)) { + UCS_PROFILE_NAMED_CALL("memcpy_recv", memcpy, buffer + offset, + recv_data, recv_length); + return UCS_OK; + } else { + return ucp_dn_dt_unpack(req, buffer, buffer_size, recv_data, recv_length); + } + + case UCP_DATATYPE_IOV: + UCS_PROFILE_CALL(ucp_dt_iov_scatter, buffer, state->dt.iov.iovcnt, + recv_data, recv_length, &state->dt.iov.iov_offset, + &state->dt.iov.iovcnt_offset); + return UCS_OK; + + case UCP_DATATYPE_GENERIC: + dt_gen = ucp_dt_generic(datatype); + status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack, + state->dt.generic.state, offset, + recv_data, recv_length); + if (last) { + UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish, + state->dt.generic.state); + } + return status; + + default: + ucs_error("unexpected datatype=%lx", datatype); + return UCS_ERR_INVALID_PARAM; + } +} diff --git a/src/ucp/dt/dt.h b/src/ucp/dt/dt.h index 67b8b1b21f6a..d4e1cc94058f 100644 --- a/src/ucp/dt/dt.h +++ b/src/ucp/dt/dt.h @@ -15,6 +15,9 @@ #include #include #include +#include +#include +#include /** @@ -63,5 +66,9 @@ ucp_dt_have_rndv_lanes(ucp_dt_state_t *state) return !ucp_dt_is_empty_rndv_lane(state, 0); } -#endif /* UCP_DT_H_ */ +ucs_status_t ucp_dt_unpack(ucp_request_t *req, ucp_datatype_t datatype, + void *buffer, size_t buffer_size, ucp_dt_state_t *state, + const void *recv_data, size_t recv_length, int last); + +#endif /* UCP_DT_H_ */ diff --git a/src/ucp/dt/dt.inl b/src/ucp/dt/dt.inl index f267c911e074..d1886fac2212 100644 --- a/src/ucp/dt/dt.inl +++ b/src/ucp/dt/dt.inl @@ -35,51 +35,3 @@ size_t ucp_dt_length(ucp_datatype_t datatype, size_t count, return 0; } - -static UCS_F_ALWAYS_INLINE ucs_status_t -ucp_dt_unpack(ucp_datatype_t datatype, void *buffer, size_t buffer_size, - ucp_dt_state_t *state, const void *recv_data, - size_t recv_length, unsigned flags) -{ - ucp_dt_generic_t *dt_gen; - size_t offset = state->offset; - ucs_status_t status = UCS_OK; - - if (ucs_unlikely((recv_length + offset) > buffer_size)) { - ucs_debug("message truncated: recv_length %zu offset %zu buffer_size %zu", - recv_length, offset, buffer_size); - if (UCP_DT_IS_GENERIC(datatype) && (flags & UCP_RECV_DESC_FLAG_LAST)) { - ucp_dt_generic(datatype)->ops.finish(state->dt.generic.state); - } - - return UCS_ERR_MESSAGE_TRUNCATED; - } - - switch (datatype & UCP_DATATYPE_CLASS_MASK) { - case UCP_DATATYPE_CONTIG: - UCS_PROFILE_NAMED_CALL("memcpy_recv", memcpy, buffer + offset, - recv_data, recv_length); - return status; - - case UCP_DATATYPE_IOV: - UCS_PROFILE_CALL(ucp_dt_iov_scatter, buffer, state->dt.iov.iovcnt, - recv_data, recv_length, &state->dt.iov.iov_offset, - &state->dt.iov.iovcnt_offset); - return status; - - case UCP_DATATYPE_GENERIC: - dt_gen = ucp_dt_generic(datatype); - status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack, - state->dt.generic.state, offset, - recv_data, recv_length); - if (flags & UCP_RECV_DESC_FLAG_LAST) { - UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish, - state->dt.generic.state); - } - return status; - - default: - ucs_error("unexpected datatype=%lx", datatype); - return UCS_ERR_INVALID_PARAM; - } -} diff --git a/src/ucp/tag/eager.h b/src/ucp/tag/eager.h index 85c21c4aa67b..53205d4978f9 100644 --- a/src/ucp/tag/eager.h +++ b/src/ucp/tag/eager.h @@ -102,7 +102,7 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_eager_unexp_match(ucp_worker_h worker, ucp_recv_desc_t *rdesc, ucp_tag_t tag, unsigned flags, void *buffer, size_t count, ucp_datatype_t datatype, ucp_dt_state_t *state, - ucp_tag_recv_info_t *info) + ucp_request_t *req, ucp_tag_recv_info_t *info) { size_t recv_len, hdr_len; ucs_status_t status; @@ -111,7 +111,7 @@ ucp_eager_unexp_match(ucp_worker_h worker, ucp_recv_desc_t *rdesc, ucp_tag_t tag UCP_WORKER_STAT_EAGER_CHUNK(worker, UNEXP); hdr_len = rdesc->hdr_len; recv_len = rdesc->length - hdr_len; - status = ucp_dt_unpack(datatype, buffer, count, state, data + hdr_len, + status = ucp_dt_unpack(req, datatype, buffer, count, state, data + hdr_len, recv_len, flags & UCP_RECV_DESC_FLAG_LAST); state->offset += recv_len; diff --git a/src/ucp/tag/eager_rcv.c b/src/ucp/tag/eager_rcv.c index 01fae57927f3..a807ade8a6d8 100644 --- a/src/ucp/tag/eager_rcv.c +++ b/src/ucp/tag/eager_rcv.c @@ -71,7 +71,7 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, if (req != NULL) { UCS_PROFILE_REQUEST_EVENT(req, "eager_recv", recv_len); - status = ucp_dt_unpack(req->recv.datatype, req->recv.buffer, + status = ucp_dt_unpack(req, req->recv.datatype, req->recv.buffer, req->recv.length, &req->recv.state, data + hdr_len, recv_len, flags & UCP_RECV_DESC_FLAG_LAST); diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index 586d50a6540d..b731d6b65bc6 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -63,7 +63,7 @@ void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag, } if (req->recv.rdesc != NULL) { - status = ucp_dt_unpack(req->recv.datatype, req->recv.buffer, + status = ucp_dt_unpack(req, req->recv.datatype, req->recv.buffer, req->recv.length, &req->recv.state, req->recv.rdesc + 1, length, UCP_RECV_DESC_FLAG_LAST); diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index e45731fb0ce8..b9469fa01368 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -741,7 +741,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_data_handler, } UCS_PROFILE_REQUEST_EVENT(rreq, "rndv_data_recv", recv_len); - status = ucp_dt_unpack(rreq->recv.datatype, rreq->recv.buffer, + status = ucp_dt_unpack(rreq, rreq->recv.datatype, rreq->recv.buffer, rreq->recv.length, &rreq->recv.state, data + hdr_len, recv_len, 0); if ((status == UCS_OK) || (status == UCS_INPROGRESS)) { @@ -775,7 +775,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_data_last_handler, ucs_assert(rreq->recv.tag.info.length == rreq->recv.state.offset + recv_len); UCS_PROFILE_REQUEST_EVENT(rreq, "rndv_data_last_recv", recv_len); - status = ucp_dt_unpack(rreq->recv.datatype, rreq->recv.buffer, + status = ucp_dt_unpack(rreq, rreq->recv.datatype, rreq->recv.buffer, rreq->recv.length, &rreq->recv.state, data + hdr_len, recv_len, UCP_RECV_DESC_FLAG_LAST); diff --git a/src/ucp/tag/tag_recv.c b/src/ucp/tag/tag_recv.c index 228c2479ad82..cdced57ea90c 100644 --- a/src/ucp/tag/tag_recv.c +++ b/src/ucp/tag/tag_recv.c @@ -85,7 +85,7 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size, UCS_PROFILE_REQUEST_EVENT(req, "eager_match", 0); status = ucp_eager_unexp_match(worker, rdesc, recv_tag, flags, buffer, buffer_size, datatype, - &req->recv.state, info); + &req->recv.state, req, info); ucs_trace_req("release receive descriptor %p", rdesc); if (status != UCS_INPROGRESS) { goto out_release_desc; @@ -128,6 +128,8 @@ ucp_tag_recv_request_init(ucp_request_t *req, ucp_worker_h worker, void* buffer, req->recv.state.offset = 0; req->recv.worker = worker; + ucp_memory_type_detect_mds(worker->context, buffer, count, &req->mem_type); + switch (datatype & UCP_DATATYPE_CLASS_MASK) { case UCP_DATATYPE_IOV: req->recv.state.dt.iov.iov_offset = 0; diff --git a/src/ucp/tag/tag_send.c b/src/ucp/tag/tag_send.c index ee73751bab4b..96e1db4a1699 100644 --- a/src/ucp/tag/tag_send.c +++ b/src/ucp/tag/tag_send.c @@ -99,7 +99,8 @@ ucp_tag_send_req(ucp_request_t *req, size_t count, static UCS_F_ALWAYS_INLINE void ucp_tag_send_req_init(ucp_request_t* req, ucp_ep_h ep, const void* buffer, uintptr_t datatype, - size_t count, ucp_tag_t tag, uint16_t flags) + size_t count, ucp_tag_t tag, uint16_t flags, + uct_memory_type_t mem_type) { req->flags = flags; req->send.ep = ep; @@ -112,6 +113,7 @@ ucp_tag_send_req_init(ucp_request_t* req, ucp_ep_h ep, req->send.buffer, &req->send.state.dt); req->send.lane = ucp_ep_config(ep)->tag.lane; + req->mem_type = mem_type; } UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_send_nb, @@ -123,13 +125,17 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_send_nb, ucp_request_t *req; size_t length; ucs_status_ptr_t ret; + uct_memory_type_t mem_type; UCP_THREAD_CS_ENTER_CONDITIONAL(&ep->worker->mt_lock); ucs_trace_req("send_nb buffer %p count %zu tag %"PRIx64" to %s cb %p", buffer, count, tag, ucp_ep_peer_name(ep), cb); - if (ucs_likely(UCP_DT_IS_CONTIG(datatype))) { + ucp_memory_type_detect_mds(ep->worker->context, (void *)buffer, count, &mem_type); + + if (ucs_likely(mem_type == UCT_MD_MEM_TYPE_HOST) && + ucs_likely(UCP_DT_IS_CONTIG(datatype))) { length = ucp_contig_dt_length(datatype, count); if (ucs_likely((ssize_t)length <= ucp_ep_config(ep)->tag.eager.max_short)) { status = UCS_PROFILE_CALL(ucp_tag_send_eager_short, ep, tag, buffer, @@ -148,7 +154,7 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_send_nb, goto out; } - ucp_tag_send_req_init(req, ep, buffer, datatype, count, tag, 0); + ucp_tag_send_req_init(req, ep, buffer, datatype, count, tag, 0, mem_type); ret = ucp_tag_send_req(req, count, &ucp_ep_config(ep)->tag.eager, ucp_ep_config(ep)->tag.rndv.rma_thresh, @@ -166,6 +172,7 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_send_sync_nb, { ucp_request_t *req; ucs_status_ptr_t ret; + uct_memory_type_t mem_type; UCP_THREAD_CS_ENTER_CONDITIONAL(&ep->worker->mt_lock); @@ -186,8 +193,10 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_send_sync_nb, /* Remote side needs to send reply, so have it connect to us */ ucp_ep_connect_remote(ep); + ucp_memory_type_detect_mds(ep->worker->context, (void *)buffer, count, &mem_type); + ucp_tag_send_req_init(req, ep, buffer, datatype, count, tag, - UCP_REQUEST_FLAG_SYNC); + UCP_REQUEST_FLAG_SYNC, mem_type); ret = ucp_tag_send_req(req, count, &ucp_ep_config(ep)->tag.eager, ucp_ep_config(ep)->tag.rndv.rma_thresh,