Skip to content

Commit

Permalink
UCP eager
Browse files Browse the repository at this point in the history
  • Loading branch information
bureddy committed Nov 19, 2017
1 parent ac5779b commit f610803
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 61 deletions.
14 changes: 14 additions & 0 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config)
{
ucp_context_h context = worker->context;
ucp_ep_rma_config_t *rma_config;
ucp_ep_addr_domain_config_t *domain_config;
uct_iface_attr_t *iface_attr;
uct_md_attr_t *md_attr;
ucp_rsc_index_t rsc_index;
Expand Down Expand Up @@ -824,6 +825,19 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config)
}
}

/* Configuration for memory domains */
for (lane = 0; lane < config->key.num_lanes; ++lane) {

domain_config = &config->domain[lane];
rsc_index = config->key.lanes[lane].rsc_index;
iface_attr = &worker->ifaces[rsc_index].attr;

domain_config->tag.eager.max_short = iface_attr->cap.am.max_short;
//TODO: zcopy threshold should be based on the ep AM lane capability with domain addr(i.e can UCT do zcopy from domain)
memset(domain_config->tag.eager.zcopy_thresh, 0, UCP_MAX_IOV * sizeof(size_t));

}

/* Configuration for remote memory access */
for (lane = 0; lane < config->key.num_lanes; ++lane) {
if (ucp_ep_config_get_multi_lane_prio(config->key.rma_lanes, lane) == -1) {
Expand Down
14 changes: 13 additions & 1 deletion src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ typedef struct ucp_ep_rma_config {
} ucp_ep_rma_config_t;


typedef struct ucp_ep_addr_domain_config {
struct {
struct {
ssize_t max_short;
size_t zcopy_thresh[UCP_MAX_IOV];
} eager;
} tag;
} ucp_ep_addr_domain_config_t;

/*
* Configuration for AM and tag offload protocols
*/
Expand Down Expand Up @@ -185,8 +194,11 @@ typedef struct ucp_ep_config {
* (currently it's only AM based). */
const ucp_proto_t *proto;
} stream;
} ucp_ep_config_t;

/* Configuration of all domains */
ucp_ep_addr_domain_config_t domain[UCP_MAX_LANES];

} ucp_ep_config_t;

/**
* UCP_FEATURE_STREAM specific extention of the remote protocol layer endpoint
Expand Down
1 change: 1 addition & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ enum {
struct ucp_request {
ucs_status_t status; /* Operation status */
uint16_t flags; /* Request flags */
uct_memory_type_t mem_type;

union {
struct {
Expand Down
10 changes: 10 additions & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,10 @@ static void ucp_worker_close_ifaces(ucp_worker_h worker)
ucs_list_del(&wiface->arm_list);
}

if (wiface->on_mem_type_list) {
ucs_list_del(&wiface->mem_type_list);
}

if (wiface->attr.cap.flags & UCP_WORKER_UCT_ALL_EVENT_CAP_FLAGS) {
status = ucs_async_remove_handler(wiface->event_fd, 1);
if (status != UCS_OK) {
Expand Down Expand Up @@ -844,6 +848,11 @@ ucp_worker_add_iface(ucp_worker_h worker, ucp_rsc_index_t tl_id,
ucp_context_tag_offload_enable(context);
}

if (context->tl_mds[resource->md_index].attr.cap.mem_type != UCT_MD_MEM_TYPE_HOST) {
wiface->on_mem_type_list = 0;
ucs_list_add_tail(&worker->mem_type_ifaces, &wiface->mem_type_list);
}

return UCS_OK;

out_close_iface:
Expand Down Expand Up @@ -1143,6 +1152,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
worker->ep_config_count = 0;
ucs_list_head_init(&worker->arm_ifaces);
ucs_list_head_init(&worker->stream_eps);
ucs_list_head_init(&worker->mem_type_ifaces);

if (params->field_mask & UCP_WORKER_PARAM_FIELD_USER_DATA) {
worker->user_data = params->user_data;
Expand Down
3 changes: 3 additions & 0 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,12 @@ typedef struct ucp_worker_iface {
ucp_worker_h worker; /* The parent worker */
ucs_queue_elem_t queue; /* Element in tm.offload_ifaces */
ucs_list_link_t arm_list; /* Element in arm_ifaces list */
ucs_list_link_t mem_type_list; /* Element in mem_type_ifaces list */
ucp_rsc_index_t rsc_index; /* Resource index */
int event_fd; /* Event FD, or -1 if undefined */
unsigned activate_count;/* How times this iface has been activated */
int on_arm_list; /* Is the interface on arm_list */
int on_mem_type_list;/* Is the interface on mem_type_list */
int check_events_id;/* Callback id for check_events */
int proxy_am_count;/* Counts active messages on proxy handler */
} ucp_worker_iface_t;
Expand All @@ -162,6 +164,7 @@ typedef struct ucp_worker {
int eventfd; /* Event fd to support signal() calls */
unsigned uct_events; /* UCT arm events */
ucs_list_link_t arm_ifaces; /* List of interfaces to arm */
ucs_list_link_t mem_type_ifaces;/* List of interfaces to memory types */

void *user_data; /* User-defined data */
ucs_list_link_t stream_eps; /* List of EPs with received stream data */
Expand Down
109 changes: 109 additions & 0 deletions src/ucp/dt/dt.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include "dt.h"
#include <ucp/core/ucp_request.inl>

#include <ucp/core/ucp_request.h>
#include <ucs/debug/profile.h>
Expand Down Expand Up @@ -46,3 +47,111 @@ size_t ucp_dt_pack(ucp_datatype_t datatype, void *dest, const void *src,
state->offset += result_len;
return result_len;
}

static UCS_F_ALWAYS_INLINE ucs_status_t ucp_dn_dt_unpack(ucp_request_t *req, void *buffer, size_t buffer_size,
const void *recv_data, size_t recv_length)
{
ucs_status_t status;
ucp_worker_iface_t *wiface;
ucp_worker_h worker = req->recv.worker;
ucp_context_h context = worker->context;
unsigned md_index;
uct_mem_h memh;
uct_iov_t iov;

if (recv_length == 0) {
return UCS_OK;
}

/* Go over mem_type_list of active interfaces which support
** data movement to/from memory type domain */
ucs_list_for_each(wiface, &worker->mem_type_ifaces, mem_type_list) {
md_index = context->tl_rscs[wiface->rsc_index].md_index;
if (context->tl_mds[md_index].attr.cap.mem_type == req->mem_type) {
if (wiface->attr.cap.flags & UCT_IFACE_FLAG_PUT_ZCOPY) {
break;
}
}
}

status = uct_md_mem_reg(context->tl_mds[md_index].md, buffer, buffer_size,
UCT_MD_MEM_ACCESS_REMOTE_PUT, &memh);
if (status != UCS_OK) {
ucs_error("Failed to reg address %p with md %s", buffer,
context->tl_mds[md_index].rsc.md_name);
return status;
}

ucs_assert(buffer_size >= recv_length);
iov.buffer = (void *)recv_data;
iov.length = recv_length;
iov.count = 1;
iov.memh = UCT_MEM_HANDLE_NULL;


status = wiface->iface->ops.ep_put_zcopy(NULL, &iov, 1, (uint64_t)buffer, (uct_rkey_t )memh, NULL);
if (status != UCS_OK) {
uct_md_mem_dereg(context->tl_mds[md_index].md, memh);
ucs_error("Failed to perform uct_ep_put_zcopy to address %p", recv_data);
return status;
}

status = uct_md_mem_dereg(context->tl_mds[md_index].md, memh);
if (status != UCS_OK) {
ucs_error("Failed to dereg address %p with md %s", buffer,
context->tl_mds[md_index].rsc.md_name);
return status;
}

return UCS_OK;
}


ucs_status_t ucp_dt_unpack(ucp_request_t *req, ucp_datatype_t datatype, void *buffer, size_t buffer_size,
ucp_dt_state_t *state, const void *recv_data, size_t recv_length, int last)
{
ucp_dt_generic_t *dt_gen;
size_t offset = state->offset;
ucs_status_t status;

if (ucs_unlikely((recv_length + offset) > buffer_size)) {
ucs_trace_req("message truncated: recv_length %zu offset %zu buffer_size %zu",
recv_length, offset, buffer_size);
if (UCP_DT_IS_GENERIC(datatype) && last) {
ucp_dt_generic(datatype)->ops.finish(state->dt.generic.state);
}
return UCS_ERR_MESSAGE_TRUNCATED;
}

switch (datatype & UCP_DATATYPE_CLASS_MASK) {
case UCP_DATATYPE_CONTIG:
if (ucs_likely(req->mem_type == UCT_MD_MEM_TYPE_HOST)) {
UCS_PROFILE_NAMED_CALL("memcpy_recv", memcpy, buffer + offset,
recv_data, recv_length);
return UCS_OK;
} else {
return ucp_dn_dt_unpack(req, buffer, buffer_size, recv_data, recv_length);
}

case UCP_DATATYPE_IOV:
UCS_PROFILE_CALL(ucp_dt_iov_scatter, buffer, state->dt.iov.iovcnt,
recv_data, recv_length, &state->dt.iov.iov_offset,
&state->dt.iov.iovcnt_offset);
return UCS_OK;

case UCP_DATATYPE_GENERIC:
dt_gen = ucp_dt_generic(datatype);
status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack,
state->dt.generic.state, offset,
recv_data, recv_length);
if (last) {
UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish,
state->dt.generic.state);
}
return status;

default:
ucs_error("unexpected datatype=%lx", datatype);
return UCS_ERR_INVALID_PARAM;
}
}
9 changes: 8 additions & 1 deletion src/ucp/dt/dt.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#include <ucp/core/ucp_types.h>
#include <uct/api/uct.h>
#include <ucp/api/ucp.h>
#include <ucs/debug/profile.h>
#include <string.h>
#include <ucp/core/ucp_types.h>


/**
Expand Down Expand Up @@ -63,5 +66,9 @@ ucp_dt_have_rndv_lanes(ucp_dt_state_t *state)
return !ucp_dt_is_empty_rndv_lane(state, 0);
}

#endif /* UCP_DT_H_ */

ucs_status_t ucp_dt_unpack(ucp_request_t *req, ucp_datatype_t datatype,
void *buffer, size_t buffer_size, ucp_dt_state_t *state,
const void *recv_data, size_t recv_length, int last);

#endif /* UCP_DT_H_ */
48 changes: 0 additions & 48 deletions src/ucp/dt/dt.inl
Original file line number Diff line number Diff line change
Expand Up @@ -35,51 +35,3 @@ size_t ucp_dt_length(ucp_datatype_t datatype, size_t count,

return 0;
}

static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_dt_unpack(ucp_datatype_t datatype, void *buffer, size_t buffer_size,
ucp_dt_state_t *state, const void *recv_data,
size_t recv_length, unsigned flags)
{
ucp_dt_generic_t *dt_gen;
size_t offset = state->offset;
ucs_status_t status = UCS_OK;

if (ucs_unlikely((recv_length + offset) > buffer_size)) {
ucs_debug("message truncated: recv_length %zu offset %zu buffer_size %zu",
recv_length, offset, buffer_size);
if (UCP_DT_IS_GENERIC(datatype) && (flags & UCP_RECV_DESC_FLAG_LAST)) {
ucp_dt_generic(datatype)->ops.finish(state->dt.generic.state);
}

return UCS_ERR_MESSAGE_TRUNCATED;
}

switch (datatype & UCP_DATATYPE_CLASS_MASK) {
case UCP_DATATYPE_CONTIG:
UCS_PROFILE_NAMED_CALL("memcpy_recv", memcpy, buffer + offset,
recv_data, recv_length);
return status;

case UCP_DATATYPE_IOV:
UCS_PROFILE_CALL(ucp_dt_iov_scatter, buffer, state->dt.iov.iovcnt,
recv_data, recv_length, &state->dt.iov.iov_offset,
&state->dt.iov.iovcnt_offset);
return status;

case UCP_DATATYPE_GENERIC:
dt_gen = ucp_dt_generic(datatype);
status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack,
state->dt.generic.state, offset,
recv_data, recv_length);
if (flags & UCP_RECV_DESC_FLAG_LAST) {
UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish,
state->dt.generic.state);
}
return status;

default:
ucs_error("unexpected datatype=%lx", datatype);
return UCS_ERR_INVALID_PARAM;
}
}
4 changes: 2 additions & 2 deletions src/ucp/tag/eager.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ static UCS_F_ALWAYS_INLINE ucs_status_t
ucp_eager_unexp_match(ucp_worker_h worker, ucp_recv_desc_t *rdesc, ucp_tag_t tag,
unsigned flags, void *buffer, size_t count,
ucp_datatype_t datatype, ucp_dt_state_t *state,
ucp_tag_recv_info_t *info)
ucp_request_t *req, ucp_tag_recv_info_t *info)
{
size_t recv_len, hdr_len;
ucs_status_t status;
Expand All @@ -111,7 +111,7 @@ ucp_eager_unexp_match(ucp_worker_h worker, ucp_recv_desc_t *rdesc, ucp_tag_t tag
UCP_WORKER_STAT_EAGER_CHUNK(worker, UNEXP);
hdr_len = rdesc->hdr_len;
recv_len = rdesc->length - hdr_len;
status = ucp_dt_unpack(datatype, buffer, count, state, data + hdr_len,
status = ucp_dt_unpack(req, datatype, buffer, count, state, data + hdr_len,
recv_len, flags & UCP_RECV_DESC_FLAG_LAST);
state->offset += recv_len;

Expand Down
2 changes: 1 addition & 1 deletion src/ucp/tag/eager_rcv.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags,
if (req != NULL) {
UCS_PROFILE_REQUEST_EVENT(req, "eager_recv", recv_len);

status = ucp_dt_unpack(req->recv.datatype, req->recv.buffer,
status = ucp_dt_unpack(req, req->recv.datatype, req->recv.buffer,
req->recv.length, &req->recv.state,
data + hdr_len, recv_len,
flags & UCP_RECV_DESC_FLAG_LAST);
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/tag/offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag,
}

if (req->recv.rdesc != NULL) {
status = ucp_dt_unpack(req->recv.datatype, req->recv.buffer,
status = ucp_dt_unpack(req, req->recv.datatype, req->recv.buffer,
req->recv.length, &req->recv.state,
req->recv.rdesc + 1, length,
UCP_RECV_DESC_FLAG_LAST);
Expand Down
4 changes: 2 additions & 2 deletions src/ucp/tag/rndv.c
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_data_handler,
}

UCS_PROFILE_REQUEST_EVENT(rreq, "rndv_data_recv", recv_len);
status = ucp_dt_unpack(rreq->recv.datatype, rreq->recv.buffer,
status = ucp_dt_unpack(rreq, rreq->recv.datatype, rreq->recv.buffer,
rreq->recv.length, &rreq->recv.state,
data + hdr_len, recv_len, 0);
if ((status == UCS_OK) || (status == UCS_INPROGRESS)) {
Expand Down Expand Up @@ -775,7 +775,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_data_last_handler,
ucs_assert(rreq->recv.tag.info.length ==
rreq->recv.state.offset + recv_len);
UCS_PROFILE_REQUEST_EVENT(rreq, "rndv_data_last_recv", recv_len);
status = ucp_dt_unpack(rreq->recv.datatype, rreq->recv.buffer,
status = ucp_dt_unpack(rreq, rreq->recv.datatype, rreq->recv.buffer,
rreq->recv.length, &rreq->recv.state,
data + hdr_len, recv_len,
UCP_RECV_DESC_FLAG_LAST);
Expand Down
4 changes: 3 additions & 1 deletion src/ucp/tag/tag_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size,
UCS_PROFILE_REQUEST_EVENT(req, "eager_match", 0);
status = ucp_eager_unexp_match(worker, rdesc, recv_tag, flags,
buffer, buffer_size, datatype,
&req->recv.state, info);
&req->recv.state, req, info);
ucs_trace_req("release receive descriptor %p", rdesc);
if (status != UCS_INPROGRESS) {
goto out_release_desc;
Expand Down Expand Up @@ -128,6 +128,8 @@ ucp_tag_recv_request_init(ucp_request_t *req, ucp_worker_h worker, void* buffer,
req->recv.state.offset = 0;
req->recv.worker = worker;

ucp_memory_type_detect_mds(worker->context, buffer, count, &req->mem_type);

switch (datatype & UCP_DATATYPE_CLASS_MASK) {
case UCP_DATATYPE_IOV:
req->recv.state.dt.iov.iov_offset = 0;
Expand Down
Loading

0 comments on commit f610803

Please sign in to comment.