diff --git a/lci/runtime/1sided_primitive.c b/lci/runtime/1sided_primitive.c index c286ca83..49db3933 100644 --- a/lci/runtime/1sided_primitive.c +++ b/lci/runtime/1sided_primitive.c @@ -49,7 +49,8 @@ LCI_error_t LCI_putmac(LCI_endpoint_t ep, LCI_mbuffer_t buffer, int rank, "(set by LCI_plist_set_default_comp, " "the default value is LCI_UR_CQ)\n"); LCI_error_t ret = LCI_OK; - bool is_user_provided_packet = LCII_is_packet(buffer.address); + bool is_user_provided_packet = + LCII_is_packet(ep->device->heap, buffer.address); if (local_completion == NULL && buffer.length <= LCI_SHORT_SIZE) { /* if data is this short, we will be able to inline it * no reason to get a packet, allocate a ctx, etc */ diff --git a/lci/runtime/2sided_primitive.c b/lci/runtime/2sided_primitive.c index 0d77586a..4b063854 100644 --- a/lci/runtime/2sided_primitive.c +++ b/lci/runtime/2sided_primitive.c @@ -26,7 +26,8 @@ LCI_error_t LCI_sendmc(LCI_endpoint_t ep, LCI_mbuffer_t buffer, int rank, "buffer is too large %lu (maximum: %d)\n", buffer.length, LCI_MEDIUM_SIZE); LCI_error_t ret = LCI_OK; - bool is_user_provided_packet = LCII_is_packet(buffer.address); + bool is_user_provided_packet = + LCII_is_packet(ep->device->heap, buffer.address); if (completion == NULL && buffer.length <= LCI_SHORT_SIZE) { /* if data is this short, we will be able to inline it * no reason to get a packet, allocate a ctx, etc */ diff --git a/lci/runtime/device.c b/lci/runtime/device.c index ceea1f94..b3f41593 100644 --- a/lci/runtime/device.c +++ b/lci/runtime/device.c @@ -60,7 +60,9 @@ LCI_error_t LCI_device_init(LCI_device_t* device_ptr) LCII_bq_init(&device->bq); LCIU_spinlock_init(&device->bq_spinlock); - LCI_memory_register(device, g_heap, g_heap_size, &device->heap_segment); + device->heap = &g_heap; + LCI_memory_register(device, device->heap->address, device->heap->length, + &device->heap_segment); if (LCI_ENABLE_PRG_NET_ENDPOINT) LCII_fill_rq(device->endpoint_progress, true); @@ -80,10 +82,10 @@ LCI_error_t LCI_device_free(LCI_device_t* device_ptr) LCI_device_t device = *device_ptr; LCI_Log(LCI_LOG_INFO, "device", "free device %p\n", device); LCI_barrier(); - g_total_recv_posted += + device->heap->total_recv_posted += LCII_endpoint_get_recv_posted(device->endpoint_worker); if (LCI_ENABLE_PRG_NET_ENDPOINT) { - g_total_recv_posted += + device->heap->total_recv_posted += LCII_endpoint_get_recv_posted(device->endpoint_progress); } LCII_matchtable_free(&device->mt); diff --git a/lci/runtime/endpoint.c b/lci/runtime/endpoint.c index 04f62c13..f70a0de9 100644 --- a/lci/runtime/endpoint.c +++ b/lci/runtime/endpoint.c @@ -14,7 +14,7 @@ LCI_error_t LCII_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device, *ep_ptr = ep; ep->device = device; - ep->pkpool = g_pkpool; + ep->pkpool = device->heap->pool; ep->mt = device->mt; ep->ctx_archive_p = &device->ctx_archive; ep->bq_p = &device->bq; diff --git a/lci/runtime/lci.c b/lci/runtime/lci.c index 46f5c041..d69dc834 100644 --- a/lci/runtime/lci.c +++ b/lci/runtime/lci.c @@ -8,11 +8,47 @@ int LCIU_nthreads = 0; __thread int LCIU_thread_id = -1; __thread unsigned int LCIU_rand_seed = 0; LCIS_server_t g_server; -void *g_heap; -size_t g_heap_size; -void *g_base_packet; -LCII_pool_t* g_pkpool; -int g_total_recv_posted; +LCII_packet_heap_t g_heap; + +void initialize_packet_heap(LCII_packet_heap_t* heap) +{ + heap->length = LCI_CACHE_LINE + (size_t)LCI_SERVER_NUM_PKTS * LCI_PACKET_SIZE; + heap->address = LCIU_memalign(LCI_PAGESIZE, heap->length); + LCI_Assert(LCI_CACHE_LINE >= sizeof(struct LCII_packet_context), + "packet context is too large!\n"); + heap->base_packet_p = + heap->address + LCI_CACHE_LINE - sizeof(struct LCII_packet_context); + LCI_Assert(LCI_PACKET_SIZE % LCI_CACHE_LINE == 0, + "The size of packets should be a multiple of cache line size\n"); + LCII_pool_create(&heap->pool); + for (size_t i = 0; i < LCI_SERVER_NUM_PKTS; i++) { + LCII_packet_t* packet = + (LCII_packet_t*)(heap->base_packet_p + i * LCI_PACKET_SIZE); + LCI_Assert(((uint64_t) & (packet->data)) % LCI_CACHE_LINE == 0, + "packet.data is not well-aligned\n"); + LCI_Assert(LCII_is_packet(heap, packet->data.address), + "Not a packet. The computation is wrong!\n"); + packet->context.pkpool = heap->pool; + packet->context.poolid = 0; +#ifdef LCI_DEBUG + packet->context.isInPool = true; +#endif + LCII_pool_put(heap->pool, packet); + } + LCI_Assert(LCI_SERVER_NUM_PKTS > 2 * LCI_SERVER_MAX_RECVS, + "The packet number is too small!\n"); + heap->total_recv_posted = 0; +} + +void finalize_packet_heap(LCII_packet_heap_t* heap) +{ + int total_num = LCII_pool_count(heap->pool) + heap->total_recv_posted; + if (total_num != LCI_SERVER_NUM_PKTS) + LCI_Warn("Potentially losing packets %d != %d\n", total_num, + LCI_SERVER_NUM_PKTS); + LCII_pool_destroy(heap->pool); + LCIU_free(heap->address); +} LCI_error_t LCI_initialize() { @@ -43,34 +79,7 @@ LCI_error_t LCI_initialize() } // initialize global data structure LCIS_server_init(&g_server); - g_heap_size = - LCI_CACHE_LINE + (size_t)LCI_SERVER_NUM_PKTS * LCI_PACKET_SIZE; - g_heap = LCIU_memalign(LCI_PAGESIZE, g_heap_size); - LCI_Assert(LCI_CACHE_LINE >= sizeof(struct LCII_packet_context), - "packet context is too large!\n"); - g_base_packet = - g_heap + LCI_CACHE_LINE - sizeof(struct LCII_packet_context); - LCI_Assert(LCI_PACKET_SIZE % LCI_CACHE_LINE == 0, - "The size of packets should be a multiple of cache line size\n"); - - LCII_pool_create(&g_pkpool); - for (size_t i = 0; i < LCI_SERVER_NUM_PKTS; i++) { - LCII_packet_t* packet = - (LCII_packet_t*)(g_base_packet + i * LCI_PACKET_SIZE); - LCI_Assert(((uint64_t) & (packet->data)) % LCI_CACHE_LINE == 0, - "packet.data is not well-aligned\n"); - LCI_Assert(LCII_is_packet(packet->data.address), - "Not a packet. The computation is wrong!\n"); - packet->context.pkpool = g_pkpool; - packet->context.poolid = 0; -#ifdef LCI_DEBUG - packet->context.isInPool = true; -#endif - LCII_pool_put(g_pkpool, packet); - } - LCI_Assert(LCI_SERVER_NUM_PKTS > 2 * LCI_SERVER_MAX_RECVS, - "The packet number is too small!\n"); - g_total_recv_posted = 0; + initialize_packet_heap(&g_heap); // UR objects LCI_device_init(&LCI_UR_DEVICE); LCI_queue_create(LCI_UR_DEVICE, &LCI_UR_CQ); @@ -101,13 +110,8 @@ LCI_error_t LCI_finalize() LCI_endpoint_free(&LCI_UR_ENDPOINT); LCI_queue_free(&LCI_UR_CQ); LCI_device_free(&LCI_UR_DEVICE); - int total_num = LCII_pool_count(g_pkpool) + g_total_recv_posted; - if (total_num != LCI_SERVER_NUM_PKTS) - LCI_Warn("Potentially losing packets %d != %d\n", total_num, - LCI_SERVER_NUM_PKTS); - LCII_pool_destroy(g_pkpool); - LCIU_free(g_heap); LCIS_server_fina(g_server); + finalize_packet_heap(&g_heap); if (LCI_USE_DREG) { #ifdef LCI_COMPILE_DREG LCII_ucs_cleanup(); diff --git a/lci/runtime/lcii.h b/lci/runtime/lcii.h index 06cb2a5e..7b476a4c 100644 --- a/lci/runtime/lcii.h +++ b/lci/runtime/lcii.h @@ -58,22 +58,29 @@ struct LCII_endpoint_t { }; typedef struct LCII_endpoint_t LCII_endpoint_t; +struct LCII_packet_heap_t { + void* address; + size_t length; + void* base_packet_p; + LCII_pool_t* pool; + int total_recv_posted; // for debugging purpose +}; +typedef struct LCII_packet_heap_t LCII_packet_heap_t; + extern LCIS_server_t g_server; -extern void *g_heap; -extern size_t g_heap_size; -extern void *g_base_packet; -extern LCII_pool_t* g_pkpool; -extern int g_total_recv_posted; +extern LCII_packet_heap_t g_heap; struct __attribute__((aligned(LCI_CACHE_LINE))) LCI_device_s { // the following will not be changed after initialization LCII_endpoint_t* endpoint_worker; // 8B LCII_endpoint_t* endpoint_progress; // 8B LCI_matchtable_t mt; // 8B + LCII_packet_heap_t* heap; // 8B LCII_rcache_t rcache; // 8B LCI_segment_t heap_segment; // 8B LCIU_CACHE_PADDING(2 * sizeof(LCIS_endpoint_t) + sizeof(LCI_matchtable_t) + - sizeof(LCII_rcache_t) + sizeof(LCI_segment_t)); + sizeof(LCII_packet_heap_t*) + sizeof(LCII_rcache_t) + + sizeof(LCI_segment_t)); // the following is shared by both progress threads and worker threads LCM_archive_t ctx_archive; // used for long message protocol LCIU_CACHE_PADDING(sizeof(LCM_archive_t)); diff --git a/lci/runtime/memory_registration.c b/lci/runtime/memory_registration.c index 602293ae..2335b6ad 100644 --- a/lci/runtime/memory_registration.c +++ b/lci/runtime/memory_registration.c @@ -33,7 +33,7 @@ LCI_error_t LCI_memory_deregister(LCI_segment_t* segment) LCI_error_t LCI_mbuffer_alloc(LCI_device_t device, LCI_mbuffer_t* mbuffer) { - LCII_packet_t* packet = LCII_alloc_packet_nb(g_pkpool); + LCII_packet_t* packet = LCII_alloc_packet_nb(device->heap->pool); if (packet == NULL) // no packet is available return LCI_ERR_RETRY; @@ -41,7 +41,7 @@ LCI_error_t LCI_mbuffer_alloc(LCI_device_t device, LCI_mbuffer_t* mbuffer) mbuffer->address = packet->data.address; mbuffer->length = LCI_MEDIUM_SIZE; - LCI_DBG_Assert(LCII_is_packet(mbuffer->address), ""); + LCI_DBG_Assert(LCII_is_packet(device->heap, mbuffer->address), ""); return LCI_OK; } diff --git a/lci/runtime/packet.h b/lci/runtime/packet.h index 40e77858..f51abaf2 100644 --- a/lci/runtime/packet.h +++ b/lci/runtime/packet.h @@ -102,12 +102,12 @@ static inline LCII_packet_t* LCII_mbuffer2packet(LCI_mbuffer_t mbuffer) return (LCII_packet_t*)(mbuffer.address - offsetof(LCII_packet_t, data)); } -static inline bool LCII_is_packet(void* address) +static inline bool LCII_is_packet(LCII_packet_heap_t* heap, void* address) { void* packet_address = (LCII_packet_t*)(address - offsetof(LCII_packet_t, data)); - uintptr_t offset = (uintptr_t)packet_address - (uintptr_t)g_base_packet; - return (uintptr_t)packet_address >= (uintptr_t)g_base_packet && + uintptr_t offset = (uintptr_t)packet_address - (uintptr_t)heap->base_packet_p; + return (uintptr_t)packet_address >= (uintptr_t)heap->base_packet_p && offset % LCI_PACKET_SIZE == 0 && offset / LCI_PACKET_SIZE < LCI_SERVER_NUM_PKTS; } diff --git a/lci/runtime/progress.c b/lci/runtime/progress.c index d3b89f2b..79b02b94 100644 --- a/lci/runtime/progress.c +++ b/lci/runtime/progress.c @@ -113,7 +113,7 @@ LCI_error_t LCII_fill_rq(LCII_endpoint_t* endpoint, bool block) // First, get a packet. LCII_PCOUNTER_START(get_recv_packet_timer); - LCII_packet_t* packet = LCII_alloc_packet_nb(g_pkpool); + LCII_packet_t* packet = LCII_alloc_packet_nb(endpoint->device->heap->pool); LCII_PCOUNTER_END(get_recv_packet_timer); if (packet == NULL) { LCII_PCOUNTER_ADD(net_recv_failed_nopacket, 1);