diff --git a/lci/api/lci.h b/lci/api/lci.h index 8ee74770..5f9ab289 100644 --- a/lci/api/lci.h +++ b/lci/api/lci.h @@ -592,13 +592,6 @@ typedef enum { } LCI_rdv_protocol_t; extern LCI_rdv_protocol_t LCI_RDV_PROTOCOL; -/** - * @ingroup LCI_COMM - * @brief For the libfabric cxi provider, Try turning off the hacking to see - * whether cxi has fixed the double mr_bind error. - */ -extern bool LCI_OFI_CXI_TRY_NO_HACK; - /** * @ingroup LCI_COMM * @brief For the UCX backend, use try_lock to wrap the ucx function calls. @@ -692,6 +685,8 @@ LCI_error_t LCI_barrier(); */ LCI_API LCI_error_t LCI_device_init(LCI_device_t* device_ptr); +LCI_API +LCI_error_t LCI_device_initx(LCI_device_t* device_ptr); /** * @ingroup LCI_DEVICE * @brief Initialize a device. diff --git a/lci/backend/ibv/server_ibv.c b/lci/backend/ibv/server_ibv.c index d92d7909..59ad892c 100644 --- a/lci/backend/ibv/server_ibv.c +++ b/lci/backend/ibv/server_ibv.c @@ -73,11 +73,10 @@ void LCISI_event_polling_thread_fina(LCISI_server_t* server) } } -void LCISD_server_init(LCI_device_t device, LCIS_server_t* s) +void LCISD_server_init(LCIS_server_t* s) { LCISI_server_t* server = LCIU_malloc(sizeof(LCISI_server_t)); *s = (LCIS_server_t)server; - server->device = device; int num_devices; server->dev_list = ibv_get_device_list(&num_devices); diff --git a/lci/backend/ibv/server_ibv.h b/lci/backend/ibv/server_ibv.h index db955eeb..cb9d47e0 100644 --- a/lci/backend/ibv/server_ibv.h +++ b/lci/backend/ibv/server_ibv.h @@ -18,8 +18,6 @@ ; typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_server_t { - LCI_device_t device; - // Device fields. struct ibv_device** dev_list; struct ibv_device* ib_dev; @@ -62,10 +60,11 @@ typedef struct LCISI_endpoint_t { int qp2rank_mod; } LCISI_endpoint_t; -static inline void* LCISI_real_server_reg(LCIS_server_t s, void* buf, - size_t size) +static inline void* LCISI_real_server_reg(LCIS_endpoint_t endpoint_pp, + void* buf, size_t size) { - LCISI_server_t* server = (LCISI_server_t*)s; + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + LCISI_server_t* server = endpoint_p->server; int mr_flags; if (LCI_IBV_USE_ODP) { mr_flags = IBV_ACCESS_ON_DEMAND | IBV_ACCESS_LOCAL_WRITE | @@ -87,16 +86,18 @@ static inline uint32_t ibv_rma_lkey(LCIS_mr_t mr) return ((struct ibv_mr*)mr.mr_p)->lkey; } -static inline LCIS_mr_t LCISD_rma_reg(LCIS_server_t s, void* buf, size_t size) +static inline LCIS_mr_t LCISD_rma_reg(LCIS_endpoint_t endpoint_pp, void* buf, + size_t size) { - LCISI_server_t* server = (LCISI_server_t*)s; + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + LCISI_server_t* server = endpoint_p->server; LCIS_mr_t mr; if (LCI_IBV_USE_ODP == 2) { mr.mr_p = server->odp_mr; mr.address = buf; mr.length = size; } else { - mr.mr_p = LCISI_real_server_reg(s, buf, size); + mr.mr_p = LCISI_real_server_reg(endpoint_pp, buf, size); mr.address = buf; mr.length = size; } @@ -139,7 +140,8 @@ static inline int LCISD_poll_cq(LCIS_endpoint_t endpoint_pp, #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS LCIU_release_spinlock(&endpoint_p->cq_lock); #endif - if (ne > 0) LCII_PCOUNTER_ADD(net_poll_cq_num, ne); + LCII_PCOUNTER_ADD(net_poll_cq_calls, 1); + if (ne > 0) LCII_PCOUNTER_ADD(net_poll_cq_entry_count, ne); for (int i = 0; i < ne; i++) { LCI_DBG_Assert( wc[i].status == IBV_WC_SUCCESS, "Failed status %s (%d) for wr_id %d\n", diff --git a/lci/backend/ofi/server_ofi.c b/lci/backend/ofi/server_ofi.c index dd3f1eef..ff89b093 100644 --- a/lci/backend/ofi/server_ofi.c +++ b/lci/backend/ofi/server_ofi.c @@ -15,11 +15,10 @@ static struct fi_info* search_for_prov(struct fi_info* info, char* prov_name) return NULL; } -void LCISD_server_init(LCI_device_t device, LCIS_server_t* s) +void LCISD_server_init(LCIS_server_t* s) { LCISI_server_t* server = LCIU_malloc(sizeof(LCISI_server_t)); *s = (LCIS_server_t)server; - server->device = device; // Create hint. char* p = getenv("LCI_OFI_PROVIDER_HINT"); @@ -104,6 +103,10 @@ void LCISD_server_init(LCI_device_t device, LCIS_server_t* s) LCI_Assert(LCI_USE_DREG == 0, "The registration cache should be turned off " "for libfabric cxi backend. Use `export LCI_USE_DREG=0`.\n"); + LCI_Assert(LCI_ENABLE_PRG_NET_ENDPOINT == 0, + "The progress-specific network endpoint " + "for libfabric cxi backend. Use `export " + "LCI_ENABLE_PRG_NET_ENDPOINT=0`.\n"); if (LCI_RDV_PROTOCOL != LCI_RDV_WRITE) { LCI_RDV_PROTOCOL = LCI_RDV_WRITE; LCI_Warn( @@ -114,19 +117,11 @@ void LCISD_server_init(LCI_device_t device, LCIS_server_t* s) // Create libfabric obj. FI_SAFECALL(fi_fabric(server->info->fabric_attr, &server->fabric, NULL)); - - // Create domain. - FI_SAFECALL(fi_domain(server->fabric, server->info, &server->domain, NULL)); - - server->endpoint_count = 0; } void LCISD_server_fina(LCIS_server_t s) { LCISI_server_t* server = (LCISI_server_t*)s; - LCI_Assert(server->endpoint_count == 0, "Endpoint count is not zero (%d)\n", - server->endpoint_count); - FI_SAFECALL(fi_close((struct fid*)&server->domain->fid)); FI_SAFECALL(fi_close((struct fid*)&server->fabric->fid)); fi_freeinfo(server->info); free(s); @@ -139,23 +134,16 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp, LCISI_endpoint_t* endpoint_p = LCIU_malloc(sizeof(LCISI_endpoint_t)); *endpoint_pp = (LCIS_endpoint_t)endpoint_p; endpoint_p->server = (LCISI_server_t*)server_pp; - endpoint_p->server->endpoints[endpoint_p->server->endpoint_count++] = - endpoint_p; endpoint_p->is_single_threaded = single_threaded; - if (!LCI_OFI_CXI_TRY_NO_HACK && - strcmp(endpoint_p->server->info->fabric_attr->prov_name, "cxi") == 0 && - endpoint_p->server->info->domain_attr->mr_mode & FI_MR_ENDPOINT && - endpoint_p->server->endpoint_count > 1) { - // We are using more than one endpoint per server, but the cxi provider - // can only bind mr to one endpoint. We have to guess here. - endpoint_p->server->cxi_mr_bind_hack = true; - } else { - endpoint_p->server->cxi_mr_bind_hack = false; - } + + // Create domain. + FI_SAFECALL(fi_domain(endpoint_p->server->fabric, endpoint_p->server->info, + &endpoint_p->domain, NULL)); + // Create end-point; endpoint_p->server->info->tx_attr->size = LCI_SERVER_MAX_SENDS; endpoint_p->server->info->rx_attr->size = LCI_SERVER_MAX_RECVS; - FI_SAFECALL(fi_endpoint(endpoint_p->server->domain, endpoint_p->server->info, + FI_SAFECALL(fi_endpoint(endpoint_p->domain, endpoint_p->server->info, &endpoint_p->ep, NULL)); // Create cq. @@ -163,8 +151,7 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp, memset(&cq_attr, 0, sizeof(struct fi_cq_attr)); cq_attr.format = FI_CQ_FORMAT_DATA; cq_attr.size = LCI_SERVER_MAX_CQES; - FI_SAFECALL( - fi_cq_open(endpoint_p->server->domain, &cq_attr, &endpoint_p->cq, NULL)); + FI_SAFECALL(fi_cq_open(endpoint_p->domain, &cq_attr, &endpoint_p->cq, NULL)); // Bind my ep to cq. FI_SAFECALL( @@ -173,8 +160,7 @@ void LCISD_endpoint_init(LCIS_server_t server_pp, LCIS_endpoint_t* endpoint_pp, struct fi_av_attr av_attr; memset(&av_attr, 0, sizeof(av_attr)); av_attr.type = FI_AV_MAP; - FI_SAFECALL( - fi_av_open(endpoint_p->server->domain, &av_attr, &endpoint_p->av, NULL)); + FI_SAFECALL(fi_av_open(endpoint_p->domain, &av_attr, &endpoint_p->av, NULL)); FI_SAFECALL(fi_ep_bind(endpoint_p->ep, (fid_t)endpoint_p->av, 0)); FI_SAFECALL(fi_enable(endpoint_p->ep)); @@ -224,11 +210,8 @@ void LCISD_endpoint_fina(LCIS_endpoint_t endpoint_pp) LCT_pmi_barrier(); LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; LCIU_free(endpoint_p->peer_addrs); - int my_idx = --endpoint_p->server->endpoint_count; - LCI_Assert(endpoint_p->server->endpoints[my_idx] == endpoint_p, - "This is not me!\n"); - endpoint_p->server->endpoints[my_idx] = NULL; FI_SAFECALL(fi_close((struct fid*)&endpoint_p->ep->fid)); FI_SAFECALL(fi_close((struct fid*)&endpoint_p->cq->fid)); FI_SAFECALL(fi_close((struct fid*)&endpoint_p->av->fid)); + FI_SAFECALL(fi_close((struct fid*)&endpoint_p->domain->fid)); } \ No newline at end of file diff --git a/lci/backend/ofi/server_ofi.h b/lci/backend/ofi/server_ofi.h index 96643a66..1a25dbed 100644 --- a/lci/backend/ofi/server_ofi.h +++ b/lci/backend/ofi/server_ofi.h @@ -36,18 +36,14 @@ struct LCISI_endpoint_t; typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_server_t { - LCI_device_t device; struct fi_info* info; struct fid_fabric* fabric; - struct fid_domain* domain; - struct LCISI_endpoint_t* endpoints[LCI_SERVER_MAX_ENDPOINTS]; - int endpoint_count; - bool cxi_mr_bind_hack; } LCISI_server_t; typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_endpoint_t { struct LCISI_endpoint_super_t super; LCISI_server_t* server; + struct fid_domain* domain; struct fid_ep* ep; struct fid_cq* cq; struct fid_av* av; @@ -57,10 +53,11 @@ typedef struct __attribute__((aligned(LCI_CACHE_LINE))) LCISI_endpoint_t { extern int g_next_rdma_key; -static inline void* LCISI_real_server_reg(LCIS_server_t s, void* buf, - size_t size) +static inline void* LCISI_real_server_reg(LCIS_endpoint_t endpoint_pp, + void* buf, size_t size) { - LCISI_server_t* server = (LCISI_server_t*)s; + LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; + LCISI_server_t* server = endpoint_p->server; int rdma_key; if (server->info->domain_attr->mr_mode & FI_MR_PROV_KEY) { rdma_key = 0; @@ -68,21 +65,11 @@ static inline void* LCISI_real_server_reg(LCIS_server_t s, void* buf, rdma_key = __sync_fetch_and_add(&g_next_rdma_key, 1); } struct fid_mr* mr; - FI_SAFECALL(fi_mr_reg(server->domain, buf, size, + FI_SAFECALL(fi_mr_reg(endpoint_p->domain, buf, size, FI_READ | FI_WRITE | FI_REMOTE_WRITE, 0, rdma_key, 0, &mr, 0)); if (server->info->domain_attr->mr_mode & FI_MR_ENDPOINT) { - LCI_DBG_Assert(server->endpoint_count >= 1, "No endpoints available!\n"); - if (server->cxi_mr_bind_hack) { - // A temporary fix for the cxi provider, currently cxi cannot bind a - // memory region to more than one endpoint. - FI_SAFECALL(fi_mr_bind( - mr, &server->endpoints[server->endpoint_count - 1]->ep->fid, 0)); - } else { - for (int i = 0; i < server->endpoint_count; ++i) { - FI_SAFECALL(fi_mr_bind(mr, &server->endpoints[i]->ep->fid, 0)); - } - } + FI_SAFECALL(fi_mr_bind(mr, &endpoint_p->ep->fid, 0)); FI_SAFECALL(fi_mr_enable(mr)); } return (void*)mr; @@ -94,10 +81,11 @@ static inline void LCISI_real_server_dereg(void* mr_opaque) FI_SAFECALL(fi_close((struct fid*)&mr->fid)); } -static inline LCIS_mr_t LCISD_rma_reg(LCIS_server_t s, void* buf, size_t size) +static inline LCIS_mr_t LCISD_rma_reg(LCIS_endpoint_t endpoint_pp, void* buf, + size_t size) { LCIS_mr_t mr; - mr.mr_p = LCISI_real_server_reg(s, buf, size); + mr.mr_p = LCISI_real_server_reg(endpoint_pp, buf, size); mr.address = buf; mr.length = size; return mr; @@ -132,8 +120,9 @@ static inline int LCISD_poll_cq(LCIS_endpoint_t endpoint_pp, ne = fi_cq_read(endpoint_p->cq, &fi_entry, LCI_CQ_MAX_POLL); LCISI_OFI_CS_EXIT(endpoint_p, LCI_BACKEND_TRY_LOCK_POLL) ret = ne; + LCII_PCOUNTER_ADD(net_poll_cq_calls, 1); if (ne > 0) { - LCII_PCOUNTER_ADD(net_poll_cq_num, ne); + LCII_PCOUNTER_ADD(net_poll_cq_entry_count, ne); // Got an entry here for (int i = 0; i < ne; i++) { if (fi_entry[i].flags & FI_RECV) { @@ -240,12 +229,6 @@ static inline LCI_error_t LCISD_post_puts(LCIS_endpoint_t endpoint_pp, int rank, LCIS_rkey_t rkey) { LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; - LCI_Assert( - !endpoint_p->server->cxi_mr_bind_hack || - endpoint_p == endpoint_p->server - ->endpoints[endpoint_p->server->endpoint_count - 1], - "We are using cxi mr_bind hacking mode but unexpected endpoint is " - "performing remote put. Try `export LCI_ENABLE_PRG_NET_ENDPOINT=0`.\n"); uintptr_t addr; if (endpoint_p->server->info->domain_attr->mr_mode & FI_MR_VIRT_ADDR || endpoint_p->server->info->domain_attr->mr_mode & FI_MR_BASIC) { @@ -292,12 +275,6 @@ static inline LCI_error_t LCISD_post_put(LCIS_endpoint_t endpoint_pp, int rank, LCIS_rkey_t rkey, void* ctx) { LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; - LCI_Assert( - !endpoint_p->server->cxi_mr_bind_hack || - endpoint_p == endpoint_p->server - ->endpoints[endpoint_p->server->endpoint_count - 1], - "We are using cxi mr_bind hacking mode but an unexpected endpoint is " - "performing remote put. Try `export LCI_ENABLE_PRG_NET_ENDPOINT=0`.\n"); uintptr_t addr; if (endpoint_p->server->info->domain_attr->mr_mode & FI_MR_VIRT_ADDR || endpoint_p->server->info->domain_attr->mr_mode & FI_MR_BASIC) { @@ -345,12 +322,6 @@ static inline LCI_error_t LCISD_post_putImms(LCIS_endpoint_t endpoint_pp, LCIS_rkey_t rkey, uint32_t meta) { LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; - LCI_Assert( - !endpoint_p->server->cxi_mr_bind_hack || - endpoint_p == endpoint_p->server - ->endpoints[endpoint_p->server->endpoint_count - 1], - "We are using cxi mr_bind hacking mode but an unexpected endpoint is " - "performing remote put. Try `export LCI_ENABLE_PRG_NET_ENDPOINT=0`.\n"); uintptr_t addr; if (endpoint_p->server->info->domain_attr->mr_mode & FI_MR_VIRT_ADDR || endpoint_p->server->info->domain_attr->mr_mode & FI_MR_BASIC) { @@ -381,12 +352,6 @@ static inline LCI_error_t LCISD_post_putImm(LCIS_endpoint_t endpoint_pp, void* ctx) { LCISI_endpoint_t* endpoint_p = (LCISI_endpoint_t*)endpoint_pp; - LCI_Assert( - !endpoint_p->server->cxi_mr_bind_hack || - endpoint_p == endpoint_p->server - ->endpoints[endpoint_p->server->endpoint_count - 1], - "We are using cxi mr_bind hacking mode but an unexpected endpoint is " - "performing remote put. Try `export LCI_ENABLE_PRG_NET_ENDPOINT=0`.\n"); uintptr_t addr; if (endpoint_p->server->info->domain_attr->mr_mode & FI_MR_VIRT_ADDR || endpoint_p->server->info->domain_attr->mr_mode & FI_MR_BASIC) { diff --git a/lci/backend/server.h b/lci/backend/server.h index b6c1aea1..777a91d2 100644 --- a/lci/backend/server.h +++ b/lci/backend/server.h @@ -49,9 +49,10 @@ static inline void LCIS_serve_send(void* ctx); /* Following functions are required to be implemented by each server backend. */ -void LCISD_server_init(LCI_device_t device, LCIS_server_t* s); +void LCISD_server_init(LCIS_server_t* s); void LCISD_server_fina(LCIS_server_t s); -static inline LCIS_mr_t LCISD_rma_reg(LCIS_server_t s, void* buf, size_t size); +static inline LCIS_mr_t LCISD_rma_reg(LCIS_endpoint_t endpoint_pp, void* buf, + size_t size); static inline void LCISD_rma_dereg(LCIS_mr_t mr); static inline LCIS_rkey_t LCISD_rma_rkey(LCIS_mr_t mr); @@ -116,10 +117,7 @@ static inline LCI_error_t LCISD_post_recv(LCIS_endpoint_t endpoint_pp, LCIU_release_spinlock(&LCIS_endpoint_super(endpoint).lock); /* Wrapper functions */ -static inline void LCIS_server_init(LCI_device_t device, LCIS_server_t* s) -{ - LCISD_server_init(device, s); -} +static inline void LCIS_server_init(LCIS_server_t* s) { LCISD_server_init(s); } static inline void LCIS_server_fina(LCIS_server_t s) { LCISD_server_fina(s); } @@ -128,10 +126,11 @@ static inline LCIS_rkey_t LCIS_rma_rkey(LCIS_mr_t mr) return LCISD_rma_rkey(mr); } -static inline LCIS_mr_t LCIS_rma_reg(LCIS_server_t s, void* buf, size_t size) +static inline LCIS_mr_t LCIS_rma_reg(LCIS_endpoint_t endpoint_pp, void* buf, + size_t size) { LCII_PCOUNTER_START(net_mem_reg_timer); - LCIS_mr_t mr = LCISD_rma_reg(s, buf, size); + LCIS_mr_t mr = LCISD_rma_reg(endpoint_pp, buf, size); LCII_PCOUNTER_END(net_mem_reg_timer); LCI_DBG_Log(LCI_LOG_TRACE, "server-reg", "LCIS_rma_reg: mr %p buf %p size %lu rkey %lu\n", mr.mr_p, buf, @@ -166,6 +165,7 @@ static inline void LCIS_endpoint_fina(LCIS_endpoint_t endpoint_pp) static inline int LCIS_poll_cq(LCIS_endpoint_t endpoint_pp, LCIS_cq_entry_t* entry) { + LCII_PCOUNTER_ADD(net_poll_cq_attempts, 1); LCISI_CS_ENTER(endpoint_pp, 0); int ret = LCISD_poll_cq(endpoint_pp, entry); LCISI_CS_EXIT(endpoint_pp); diff --git a/lci/profile/performance_counter.h b/lci/profile/performance_counter.h index eca53982..a825aff5 100644 --- a/lci/profile/performance_counter.h +++ b/lci/profile/performance_counter.h @@ -30,7 +30,9 @@ extern LCT_pcounter_ctx_t LCII_pcounter_ctx; _macro(net_send_failed_lock) \ _macro(net_send_failed_nomem) \ _macro(net_recv_failed_nopacket) \ - _macro(net_poll_cq_num) \ + _macro(net_poll_cq_attempts) \ + _macro(net_poll_cq_calls) \ + _macro(net_poll_cq_entry_count) \ _macro(progress_call) \ _macro(packet_get) \ _macro(packet_put) \ diff --git a/lci/runtime/1sided_primitive.c b/lci/runtime/1sided_primitive.c index 4c525bd2..49db3933 100644 --- a/lci/runtime/1sided_primitive.c +++ b/lci/runtime/1sided_primitive.c @@ -49,7 +49,8 @@ LCI_error_t LCI_putmac(LCI_endpoint_t ep, LCI_mbuffer_t buffer, int rank, "(set by LCI_plist_set_default_comp, " "the default value is LCI_UR_CQ)\n"); LCI_error_t ret = LCI_OK; - bool is_user_provided_packet = LCII_is_packet(ep->device, buffer.address); + bool is_user_provided_packet = + LCII_is_packet(ep->device->heap, buffer.address); if (local_completion == NULL && buffer.length <= LCI_SHORT_SIZE) { /* if data is this short, we will be able to inline it * no reason to get a packet, allocate a ctx, etc */ @@ -94,7 +95,7 @@ LCI_error_t LCI_putmac(LCI_endpoint_t ep, LCI_mbuffer_t buffer, int rank, } ret = LCIS_post_send( ep->device->endpoint_worker->endpoint, rank, packet->data.address, - buffer.length, ep->device->heap.segment->mr, + buffer.length, ep->device->heap_segment->mr, LCII_MAKE_PROTO(ep->gid, LCI_MSG_RDMA_MEDIUM, tag), ctx); if (ret == LCI_ERR_RETRY) { if (!is_user_provided_packet) LCII_free_packet(packet); @@ -179,7 +180,7 @@ LCI_error_t LCI_putla(LCI_endpoint_t ep, LCI_lbuffer_t buffer, packet->data.rts.size); LCI_error_t ret = LCIS_post_send( ep->device->endpoint_worker->endpoint, rank, packet->data.address, - sizeof(struct LCII_packet_rts_t), ep->device->heap.segment->mr, + sizeof(struct LCII_packet_rts_t), ep->device->heap_segment->mr, LCII_MAKE_PROTO(ep->gid, LCI_MSG_RTS, tag), rts_ctx); if (ret == LCI_ERR_RETRY) { LCII_free_packet(packet); @@ -278,7 +279,7 @@ LCI_error_t LCI_putva(LCI_endpoint_t ep, LCI_iovec_t iovec, (uintptr_t)packet->data.address + iovec.piggy_back.length; LCI_error_t ret = LCIS_post_send(ep->device->endpoint_worker->endpoint, rank, - packet->data.address, length, ep->device->heap.segment->mr, + packet->data.address, length, ep->device->heap_segment->mr, LCII_MAKE_PROTO(ep->gid, LCI_MSG_RTS, tag), rts_ctx); if (ret == LCI_ERR_RETRY) { LCII_free_packet(packet); diff --git a/lci/runtime/2sided_primitive.c b/lci/runtime/2sided_primitive.c index f74a4231..4b063854 100644 --- a/lci/runtime/2sided_primitive.c +++ b/lci/runtime/2sided_primitive.c @@ -26,7 +26,8 @@ LCI_error_t LCI_sendmc(LCI_endpoint_t ep, LCI_mbuffer_t buffer, int rank, "buffer is too large %lu (maximum: %d)\n", buffer.length, LCI_MEDIUM_SIZE); LCI_error_t ret = LCI_OK; - bool is_user_provided_packet = LCII_is_packet(ep->device, buffer.address); + bool is_user_provided_packet = + LCII_is_packet(ep->device->heap, buffer.address); if (completion == NULL && buffer.length <= LCI_SHORT_SIZE) { /* if data is this short, we will be able to inline it * no reason to get a packet, allocate a ctx, etc */ @@ -72,7 +73,7 @@ LCI_error_t LCI_sendmc(LCI_endpoint_t ep, LCI_mbuffer_t buffer, int rank, ret = LCIS_post_send(ep->device->endpoint_worker->endpoint, rank, packet->data.address, buffer.length, - ep->device->heap.segment->mr, + ep->device->heap_segment->mr, LCII_MAKE_PROTO(ep->gid, LCI_MSG_MEDIUM, tag), ctx); if (ret == LCI_ERR_RETRY) { if (!is_user_provided_packet) LCII_free_packet(packet); @@ -141,7 +142,7 @@ LCI_error_t LCI_sendl(LCI_endpoint_t ep, LCI_lbuffer_t buffer, int rank, LCI_error_t ret = LCIS_post_send( ep->device->endpoint_worker->endpoint, rank, packet->data.address, - sizeof(struct LCII_packet_rts_t), ep->device->heap.segment->mr, + sizeof(struct LCII_packet_rts_t), ep->device->heap_segment->mr, LCII_MAKE_PROTO(ep->gid, LCI_MSG_RTS, tag), rts_ctx); if (ret == LCI_ERR_RETRY) { LCII_free_packet(packet); diff --git a/lci/runtime/device.c b/lci/runtime/device.c index 6127addd..e351fa76 100644 --- a/lci/runtime/device.c +++ b/lci/runtime/device.c @@ -15,7 +15,7 @@ void LCII_device_endpoint_init(LCI_device_t device, bool single_threaded, endpoint_p->recv_posted = 0; #endif endpoint_p->device = device; - LCIS_endpoint_init(device->server, &endpoint_p->endpoint, single_threaded); + LCIS_endpoint_init(g_server, &endpoint_p->endpoint, single_threaded); } void LCII_endpoint_fina(LCII_endpoint_t** endpoint_pp) @@ -44,7 +44,6 @@ LCI_error_t LCI_device_init(LCI_device_t* device_ptr) #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS single_threaded_prg = false; #endif - LCIS_server_init(device, &device->server); LCII_device_endpoint_init(device, false, &device->endpoint_worker); if (LCI_ENABLE_PRG_NET_ENDPOINT) { LCII_device_endpoint_init(device, single_threaded_prg, @@ -61,37 +60,10 @@ LCI_error_t LCI_device_init(LCI_device_t* device_ptr) LCII_bq_init(&device->bq); LCIU_spinlock_init(&device->bq_spinlock); - const size_t heap_size = - LCI_CACHE_LINE + (size_t)LCI_SERVER_NUM_PKTS * LCI_PACKET_SIZE; - LCI_error_t ret = - LCI_lbuffer_memalign(device, heap_size, LCI_PAGESIZE, &device->heap); - LCI_Assert(ret == LCI_OK, "Device heap memory allocation failed\n"); - uintptr_t base_addr = (uintptr_t)device->heap.address; + device->heap = &g_heap; + LCI_memory_register(device, device->heap->address, device->heap->length, + &device->heap_segment); - LCI_Assert(sizeof(struct LCII_packet_context) <= LCI_CACHE_LINE, - "Unexpected packet_context size\n"); - device->base_packet = - base_addr + LCI_CACHE_LINE - sizeof(struct LCII_packet_context); - LCI_Assert(LCI_PACKET_SIZE % LCI_CACHE_LINE == 0, - "The size of packets should be a multiple of cache line size\n"); - - LCII_pool_create(&device->pkpool); - for (size_t i = 0; i < LCI_SERVER_NUM_PKTS; i++) { - LCII_packet_t* packet = - (LCII_packet_t*)(device->base_packet + i * LCI_PACKET_SIZE); - LCI_Assert(((uint64_t) & (packet->data)) % LCI_CACHE_LINE == 0, - "packet.data is not well-aligned\n"); - LCI_Assert(LCII_is_packet(device, packet->data.address), - "Not a packet. The computation is wrong!\n"); - packet->context.pkpool = device->pkpool; - packet->context.poolid = 0; -#ifdef LCI_DEBUG - packet->context.isInPool = true; -#endif - LCII_pool_put(device->pkpool, packet); - } - LCI_Assert(LCI_SERVER_NUM_PKTS > 2 * LCI_SERVER_MAX_RECVS, - "The packet number is too small!\n"); if (LCI_ENABLE_PRG_NET_ENDPOINT) LCII_fill_rq(device->endpoint_progress, true); LCII_fill_rq(device->endpoint_worker, true); @@ -100,27 +72,27 @@ LCI_error_t LCI_device_init(LCI_device_t* device_ptr) return LCI_OK; } +LCI_error_t LCI_device_initx(LCI_device_t* device_ptr) +{ + return LCI_device_init(device_ptr); +} + LCI_error_t LCI_device_free(LCI_device_t* device_ptr) { LCI_device_t device = *device_ptr; LCI_Log(LCI_LOG_INFO, "device", "free device %p\n", device); LCI_barrier(); - int total_recv_posted = + device->heap->total_recv_posted += LCII_endpoint_get_recv_posted(device->endpoint_worker); if (LCI_ENABLE_PRG_NET_ENDPOINT) { - total_recv_posted += + device->heap->total_recv_posted += LCII_endpoint_get_recv_posted(device->endpoint_progress); } - int total_num = LCII_pool_count(device->pkpool) + total_recv_posted; - if (total_num != LCI_SERVER_NUM_PKTS) - LCI_Warn("Potentially losing packets %d != %d\n", total_num, - LCI_SERVER_NUM_PKTS); + LCI_memory_deregister(&device->heap_segment); LCII_matchtable_free(&device->mt); LCM_archive_fini(&(device->ctx_archive)); LCII_bq_fini(&device->bq); LCIU_spinlock_fina(&device->bq_spinlock); - LCI_lbuffer_free(device->heap); - LCII_pool_destroy(device->pkpool); if (LCI_USE_DREG) { LCII_rcache_fina(device); } @@ -128,7 +100,6 @@ LCI_error_t LCI_device_free(LCI_device_t* device_ptr) LCII_endpoint_fina(&device->endpoint_progress); } LCII_endpoint_fina(&device->endpoint_worker); - LCIS_server_fina(device->server); LCIU_free(device); *device_ptr = NULL; return LCI_OK; diff --git a/lci/runtime/endpoint.c b/lci/runtime/endpoint.c index 1a56ad41..f70a0de9 100644 --- a/lci/runtime/endpoint.c +++ b/lci/runtime/endpoint.c @@ -14,7 +14,7 @@ LCI_error_t LCII_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device, *ep_ptr = ep; ep->device = device; - ep->pkpool = device->pkpool; + ep->pkpool = device->heap->pool; ep->mt = device->mt; ep->ctx_archive_p = &device->ctx_archive; ep->bq_p = &device->bq; diff --git a/lci/runtime/lci.c b/lci/runtime/lci.c index 983ff290..d69dc834 100644 --- a/lci/runtime/lci.c +++ b/lci/runtime/lci.c @@ -7,6 +7,48 @@ static int opened = 0; int LCIU_nthreads = 0; __thread int LCIU_thread_id = -1; __thread unsigned int LCIU_rand_seed = 0; +LCIS_server_t g_server; +LCII_packet_heap_t g_heap; + +void initialize_packet_heap(LCII_packet_heap_t* heap) +{ + heap->length = LCI_CACHE_LINE + (size_t)LCI_SERVER_NUM_PKTS * LCI_PACKET_SIZE; + heap->address = LCIU_memalign(LCI_PAGESIZE, heap->length); + LCI_Assert(LCI_CACHE_LINE >= sizeof(struct LCII_packet_context), + "packet context is too large!\n"); + heap->base_packet_p = + heap->address + LCI_CACHE_LINE - sizeof(struct LCII_packet_context); + LCI_Assert(LCI_PACKET_SIZE % LCI_CACHE_LINE == 0, + "The size of packets should be a multiple of cache line size\n"); + LCII_pool_create(&heap->pool); + for (size_t i = 0; i < LCI_SERVER_NUM_PKTS; i++) { + LCII_packet_t* packet = + (LCII_packet_t*)(heap->base_packet_p + i * LCI_PACKET_SIZE); + LCI_Assert(((uint64_t) & (packet->data)) % LCI_CACHE_LINE == 0, + "packet.data is not well-aligned\n"); + LCI_Assert(LCII_is_packet(heap, packet->data.address), + "Not a packet. The computation is wrong!\n"); + packet->context.pkpool = heap->pool; + packet->context.poolid = 0; +#ifdef LCI_DEBUG + packet->context.isInPool = true; +#endif + LCII_pool_put(heap->pool, packet); + } + LCI_Assert(LCI_SERVER_NUM_PKTS > 2 * LCI_SERVER_MAX_RECVS, + "The packet number is too small!\n"); + heap->total_recv_posted = 0; +} + +void finalize_packet_heap(LCII_packet_heap_t* heap) +{ + int total_num = LCII_pool_count(heap->pool) + heap->total_recv_posted; + if (total_num != LCI_SERVER_NUM_PKTS) + LCI_Warn("Potentially losing packets %d != %d\n", total_num, + LCI_SERVER_NUM_PKTS); + LCII_pool_destroy(heap->pool); + LCIU_free(heap->address); +} LCI_error_t LCI_initialize() { @@ -35,9 +77,11 @@ LCI_error_t LCI_initialize() LCI_Assert(false, "LCI_COMPILE_DREG is not enabled!\n"); #endif } - + // initialize global data structure + LCIS_server_init(&g_server); + initialize_packet_heap(&g_heap); + // UR objects LCI_device_init(&LCI_UR_DEVICE); - LCI_queue_create(LCI_UR_DEVICE, &LCI_UR_CQ); LCI_plist_t plist; LCI_plist_create(&plist); @@ -66,6 +110,8 @@ LCI_error_t LCI_finalize() LCI_endpoint_free(&LCI_UR_ENDPOINT); LCI_queue_free(&LCI_UR_CQ); LCI_device_free(&LCI_UR_DEVICE); + LCIS_server_fina(g_server); + finalize_packet_heap(&g_heap); if (LCI_USE_DREG) { #ifdef LCI_COMPILE_DREG LCII_ucs_cleanup(); diff --git a/lci/runtime/lcii.h b/lci/runtime/lcii.h index f817ec90..7b476a4c 100644 --- a/lci/runtime/lcii.h +++ b/lci/runtime/lcii.h @@ -58,20 +58,29 @@ struct LCII_endpoint_t { }; typedef struct LCII_endpoint_t LCII_endpoint_t; +struct LCII_packet_heap_t { + void* address; + size_t length; + void* base_packet_p; + LCII_pool_t* pool; + int total_recv_posted; // for debugging purpose +}; +typedef struct LCII_packet_heap_t LCII_packet_heap_t; + +extern LCIS_server_t g_server; +extern LCII_packet_heap_t g_heap; + struct __attribute__((aligned(LCI_CACHE_LINE))) LCI_device_s { // the following will not be changed after initialization - LCIS_server_t server; // 8B LCII_endpoint_t* endpoint_worker; // 8B LCII_endpoint_t* endpoint_progress; // 8B - LCII_pool_t* pkpool; // 8B LCI_matchtable_t mt; // 8B + LCII_packet_heap_t* heap; // 8B LCII_rcache_t rcache; // 8B - LCI_lbuffer_t heap; // 24B - uintptr_t base_packet; // 8B - LCIU_CACHE_PADDING(sizeof(LCIS_server_t) + 2 * sizeof(LCIS_endpoint_t) + - sizeof(LCII_pool_t*) + sizeof(LCI_matchtable_t) + - sizeof(LCII_rcache_t*) + sizeof(LCI_lbuffer_t) + - sizeof(uintptr_t)); + LCI_segment_t heap_segment; // 8B + LCIU_CACHE_PADDING(2 * sizeof(LCIS_endpoint_t) + sizeof(LCI_matchtable_t) + + sizeof(LCII_packet_heap_t*) + sizeof(LCII_rcache_t) + + sizeof(LCI_segment_t)); // the following is shared by both progress threads and worker threads LCM_archive_t ctx_archive; // used for long message protocol LCIU_CACHE_PADDING(sizeof(LCM_archive_t)); diff --git a/lci/runtime/memory_registration.c b/lci/runtime/memory_registration.c index 9587c2a6..f92848e1 100644 --- a/lci/runtime/memory_registration.c +++ b/lci/runtime/memory_registration.c @@ -9,7 +9,7 @@ LCI_error_t LCI_memory_register(LCI_device_t device, void* address, if (LCI_USE_DREG) LCII_rcache_reg(device, address, length, mr); else { - mr->mr = LCIS_rma_reg(device->server, address, length); + mr->mr = LCIS_rma_reg(device->endpoint_progress->endpoint, address, length); } *segment = mr; LCII_PCOUNTER_END(mem_reg_timer); @@ -33,15 +33,14 @@ LCI_error_t LCI_memory_deregister(LCI_segment_t* segment) LCI_error_t LCI_mbuffer_alloc(LCI_device_t device, LCI_mbuffer_t* mbuffer) { - LCII_packet_t* packet = LCII_alloc_packet_nb(device->pkpool); + LCII_packet_t* packet = LCII_alloc_packet_nb(device->heap->pool); if (packet == NULL) // no packet is available return LCI_ERR_RETRY; - packet->context.poolid = LCII_POOLID_LOCAL; mbuffer->address = packet->data.address; mbuffer->length = LCI_MEDIUM_SIZE; - LCI_DBG_Assert(LCII_is_packet(device, mbuffer->address), ""); + LCI_DBG_Assert(LCII_is_packet(device->heap, mbuffer->address), ""); return LCI_OK; } diff --git a/lci/runtime/packet.h b/lci/runtime/packet.h index 429d597d..1f33b256 100644 --- a/lci/runtime/packet.h +++ b/lci/runtime/packet.h @@ -70,6 +70,7 @@ static inline LCII_packet_t* LCII_alloc_packet_nb(struct LCII_pool_t* pool) LCII_packet_t* packet = LCII_pool_get_nb(pool); if (packet != NULL) { LCII_PCOUNTER_ADD(packet_get, 1); + packet->context.poolid = LCII_POOLID_LOCAL; #ifdef LCI_DEBUG LCI_DBG_Assert(packet->context.isInPool, "This packet has already been allocated!\n"); @@ -102,12 +103,12 @@ static inline LCII_packet_t* LCII_mbuffer2packet(LCI_mbuffer_t mbuffer) return (LCII_packet_t*)(mbuffer.address - offsetof(LCII_packet_t, data)); } -static inline bool LCII_is_packet(LCI_device_t device, void* address) +static inline bool LCII_is_packet(LCII_packet_heap_t* heap, void* address) { void* packet_address = (LCII_packet_t*)(address - offsetof(LCII_packet_t, data)); - uintptr_t offset = (uintptr_t)packet_address - device->base_packet; - return (uintptr_t)packet_address >= device->base_packet && + uintptr_t offset = (uintptr_t)packet_address - (uintptr_t)heap->base_packet_p; + return (uintptr_t)packet_address >= (uintptr_t)heap->base_packet_p && offset % LCI_PACKET_SIZE == 0 && offset / LCI_PACKET_SIZE < LCI_SERVER_NUM_PKTS; } diff --git a/lci/runtime/progress.c b/lci/runtime/progress.c index 649b1081..79b02b94 100644 --- a/lci/runtime/progress.c +++ b/lci/runtime/progress.c @@ -113,7 +113,7 @@ LCI_error_t LCII_fill_rq(LCII_endpoint_t* endpoint, bool block) // First, get a packet. LCII_PCOUNTER_START(get_recv_packet_timer); - LCII_packet_t* packet = LCII_alloc_packet_nb(endpoint->device->pkpool); + LCII_packet_t* packet = LCII_alloc_packet_nb(endpoint->device->heap->pool); LCII_PCOUNTER_END(get_recv_packet_timer); if (packet == NULL) { LCII_PCOUNTER_ADD(net_recv_failed_nopacket, 1); @@ -124,7 +124,7 @@ LCI_error_t LCII_fill_rq(LCII_endpoint_t* endpoint, bool block) // packet->context.poolid = lc_pool_get_local(endpoint->device->pkpool); LCI_error_t rc = LCIS_post_recv( endpoint->endpoint, packet->data.address, LCI_MEDIUM_SIZE, - endpoint->device->heap.segment->mr, packet); + endpoint->device->heap_segment->mr, packet); if (rc == LCI_OK) { LCII_PCOUNTER_START(update_posted_recv); #ifdef LCI_ENABLE_MULTITHREAD_PROGRESS diff --git a/lci/runtime/rcache/lcii_rcache.c b/lci/runtime/rcache/lcii_rcache.c index f80b64f0..2b96699e 100644 --- a/lci/runtime/rcache/lcii_rcache.c +++ b/lci/runtime/rcache/lcii_rcache.c @@ -30,9 +30,9 @@ static ucs_status_t LCII_rcache_mem_reg_cb(void* context, ucs_rcache_t* rcache, { LCII_rcache_entry_t* region = ucs_derived_of(rregion, LCII_rcache_entry_t); LCI_device_t device = context; - region->mr = - LCIS_rma_reg(device->server, (void*)region->super.super.start, - region->super.super.end - region->super.super.start); + region->mr = LCIS_rma_reg( + device->endpoint_progress->endpoint, (void*)region->super.super.start, + region->super.super.end - region->super.super.start); return UCS_OK; } diff --git a/lci/runtime/rendezvous.h b/lci/runtime/rendezvous.h index 5baf9ed2..aa9149be 100644 --- a/lci/runtime/rendezvous.h +++ b/lci/runtime/rendezvous.h @@ -287,7 +287,7 @@ static inline void LCII_handle_rts(LCI_endpoint_t ep, LCII_packet_t* packet, LCII_PCOUNTER_START(rts_send_timer); LCIS_post_send_bq(ep->bq_p, ep->bq_spinlock_p, endpoint_to_use, (int)rdv_ctx->rank, packet->data.address, length, - ep->device->heap.segment->mr, + ep->device->heap_segment->mr, LCII_MAKE_PROTO(ep->gid, LCI_MSG_RTR, 0), rtr_ctx); LCII_PCOUNTER_END(rts_send_timer); } diff --git a/lct/pcounter/pcounter.cpp b/lct/pcounter/pcounter.cpp index 3001a5b7..c754d205 100644 --- a/lct/pcounter/pcounter.cpp +++ b/lct/pcounter/pcounter.cpp @@ -475,6 +475,8 @@ void record_thread_fn(ctx_t* ctx, uint64_t record_interval) else if (ctx->dump_record_on_the_fly) { ctx->record(); ctx->dump(ctx->dump_ofile); + } else { + break; } usleep(record_interval); }