Skip to content

Commit

Permalink
refactor heap with a new structure
Browse files Browse the repository at this point in the history
  • Loading branch information
JiakunYan committed May 20, 2024
1 parent 86335dd commit b47b07f
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 57 deletions.
3 changes: 2 additions & 1 deletion lci/runtime/1sided_primitive.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ LCI_error_t LCI_putmac(LCI_endpoint_t ep, LCI_mbuffer_t buffer, int rank,
"(set by LCI_plist_set_default_comp, "
"the default value is LCI_UR_CQ)\n");
LCI_error_t ret = LCI_OK;
bool is_user_provided_packet = LCII_is_packet(buffer.address);
bool is_user_provided_packet =
LCII_is_packet(ep->device->heap, buffer.address);
if (local_completion == NULL && buffer.length <= LCI_SHORT_SIZE) {
/* if data is this short, we will be able to inline it
* no reason to get a packet, allocate a ctx, etc */
Expand Down
3 changes: 2 additions & 1 deletion lci/runtime/2sided_primitive.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ LCI_error_t LCI_sendmc(LCI_endpoint_t ep, LCI_mbuffer_t buffer, int rank,
"buffer is too large %lu (maximum: %d)\n", buffer.length,
LCI_MEDIUM_SIZE);
LCI_error_t ret = LCI_OK;
bool is_user_provided_packet = LCII_is_packet(buffer.address);
bool is_user_provided_packet =
LCII_is_packet(ep->device->heap, buffer.address);
if (completion == NULL && buffer.length <= LCI_SHORT_SIZE) {
/* if data is this short, we will be able to inline it
* no reason to get a packet, allocate a ctx, etc */
Expand Down
8 changes: 5 additions & 3 deletions lci/runtime/device.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ LCI_error_t LCI_device_init(LCI_device_t* device_ptr)
LCII_bq_init(&device->bq);
LCIU_spinlock_init(&device->bq_spinlock);

LCI_memory_register(device, g_heap, g_heap_size, &device->heap_segment);
device->heap = &g_heap;
LCI_memory_register(device, device->heap->address, device->heap->length,
&device->heap_segment);

if (LCI_ENABLE_PRG_NET_ENDPOINT)
LCII_fill_rq(device->endpoint_progress, true);
Expand All @@ -80,10 +82,10 @@ LCI_error_t LCI_device_free(LCI_device_t* device_ptr)
LCI_device_t device = *device_ptr;
LCI_Log(LCI_LOG_INFO, "device", "free device %p\n", device);
LCI_barrier();
g_total_recv_posted +=
device->heap->total_recv_posted +=
LCII_endpoint_get_recv_posted(device->endpoint_worker);
if (LCI_ENABLE_PRG_NET_ENDPOINT) {
g_total_recv_posted +=
device->heap->total_recv_posted +=
LCII_endpoint_get_recv_posted(device->endpoint_progress);
}
LCII_matchtable_free(&device->mt);
Expand Down
2 changes: 1 addition & 1 deletion lci/runtime/endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ LCI_error_t LCII_endpoint_init(LCI_endpoint_t* ep_ptr, LCI_device_t device,
*ep_ptr = ep;

ep->device = device;
ep->pkpool = g_pkpool;
ep->pkpool = device->heap->pool;
ep->mt = device->mt;
ep->ctx_archive_p = &device->ctx_archive;
ep->bq_p = &device->bq;
Expand Down
82 changes: 43 additions & 39 deletions lci/runtime/lci.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,47 @@ int LCIU_nthreads = 0;
__thread int LCIU_thread_id = -1;
__thread unsigned int LCIU_rand_seed = 0;
LCIS_server_t g_server;
void *g_heap;
size_t g_heap_size;
void *g_base_packet;
LCII_pool_t* g_pkpool;
int g_total_recv_posted;
LCII_packet_heap_t g_heap;

void initialize_packet_heap(LCII_packet_heap_t* heap)
{
heap->length = LCI_CACHE_LINE + (size_t)LCI_SERVER_NUM_PKTS * LCI_PACKET_SIZE;
heap->address = LCIU_memalign(LCI_PAGESIZE, heap->length);
LCI_Assert(LCI_CACHE_LINE >= sizeof(struct LCII_packet_context),
"packet context is too large!\n");
heap->base_packet_p =
heap->address + LCI_CACHE_LINE - sizeof(struct LCII_packet_context);
LCI_Assert(LCI_PACKET_SIZE % LCI_CACHE_LINE == 0,
"The size of packets should be a multiple of cache line size\n");
LCII_pool_create(&heap->pool);
for (size_t i = 0; i < LCI_SERVER_NUM_PKTS; i++) {
LCII_packet_t* packet =
(LCII_packet_t*)(heap->base_packet_p + i * LCI_PACKET_SIZE);
LCI_Assert(((uint64_t) & (packet->data)) % LCI_CACHE_LINE == 0,
"packet.data is not well-aligned\n");
LCI_Assert(LCII_is_packet(heap, packet->data.address),
"Not a packet. The computation is wrong!\n");
packet->context.pkpool = heap->pool;
packet->context.poolid = 0;
#ifdef LCI_DEBUG
packet->context.isInPool = true;
#endif
LCII_pool_put(heap->pool, packet);
}
LCI_Assert(LCI_SERVER_NUM_PKTS > 2 * LCI_SERVER_MAX_RECVS,
"The packet number is too small!\n");
heap->total_recv_posted = 0;
}

void finalize_packet_heap(LCII_packet_heap_t* heap)
{
int total_num = LCII_pool_count(heap->pool) + heap->total_recv_posted;
if (total_num != LCI_SERVER_NUM_PKTS)
LCI_Warn("Potentially losing packets %d != %d\n", total_num,
LCI_SERVER_NUM_PKTS);
LCII_pool_destroy(heap->pool);
LCIU_free(heap->address);
}

LCI_error_t LCI_initialize()
{
Expand Down Expand Up @@ -43,34 +79,7 @@ LCI_error_t LCI_initialize()
}
// initialize global data structure
LCIS_server_init(&g_server);
g_heap_size =
LCI_CACHE_LINE + (size_t)LCI_SERVER_NUM_PKTS * LCI_PACKET_SIZE;
g_heap = LCIU_memalign(LCI_PAGESIZE, g_heap_size);
LCI_Assert(LCI_CACHE_LINE >= sizeof(struct LCII_packet_context),
"packet context is too large!\n");
g_base_packet =
g_heap + LCI_CACHE_LINE - sizeof(struct LCII_packet_context);
LCI_Assert(LCI_PACKET_SIZE % LCI_CACHE_LINE == 0,
"The size of packets should be a multiple of cache line size\n");

LCII_pool_create(&g_pkpool);
for (size_t i = 0; i < LCI_SERVER_NUM_PKTS; i++) {
LCII_packet_t* packet =
(LCII_packet_t*)(g_base_packet + i * LCI_PACKET_SIZE);
LCI_Assert(((uint64_t) & (packet->data)) % LCI_CACHE_LINE == 0,
"packet.data is not well-aligned\n");
LCI_Assert(LCII_is_packet(packet->data.address),
"Not a packet. The computation is wrong!\n");
packet->context.pkpool = g_pkpool;
packet->context.poolid = 0;
#ifdef LCI_DEBUG
packet->context.isInPool = true;
#endif
LCII_pool_put(g_pkpool, packet);
}
LCI_Assert(LCI_SERVER_NUM_PKTS > 2 * LCI_SERVER_MAX_RECVS,
"The packet number is too small!\n");
g_total_recv_posted = 0;
initialize_packet_heap(&g_heap);
// UR objects
LCI_device_init(&LCI_UR_DEVICE);
LCI_queue_create(LCI_UR_DEVICE, &LCI_UR_CQ);
Expand Down Expand Up @@ -101,13 +110,8 @@ LCI_error_t LCI_finalize()
LCI_endpoint_free(&LCI_UR_ENDPOINT);
LCI_queue_free(&LCI_UR_CQ);
LCI_device_free(&LCI_UR_DEVICE);
int total_num = LCII_pool_count(g_pkpool) + g_total_recv_posted;
if (total_num != LCI_SERVER_NUM_PKTS)
LCI_Warn("Potentially losing packets %d != %d\n", total_num,
LCI_SERVER_NUM_PKTS);
LCII_pool_destroy(g_pkpool);
LCIU_free(g_heap);
LCIS_server_fina(g_server);
finalize_packet_heap(&g_heap);
if (LCI_USE_DREG) {
#ifdef LCI_COMPILE_DREG
LCII_ucs_cleanup();
Expand Down
19 changes: 13 additions & 6 deletions lci/runtime/lcii.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,29 @@ struct LCII_endpoint_t {
};
typedef struct LCII_endpoint_t LCII_endpoint_t;

struct LCII_packet_heap_t {
void* address;
size_t length;
void* base_packet_p;
LCII_pool_t* pool;
int total_recv_posted; // for debugging purpose
};
typedef struct LCII_packet_heap_t LCII_packet_heap_t;

extern LCIS_server_t g_server;
extern void *g_heap;
extern size_t g_heap_size;
extern void *g_base_packet;
extern LCII_pool_t* g_pkpool;
extern int g_total_recv_posted;
extern LCII_packet_heap_t g_heap;

struct __attribute__((aligned(LCI_CACHE_LINE))) LCI_device_s {
// the following will not be changed after initialization
LCII_endpoint_t* endpoint_worker; // 8B
LCII_endpoint_t* endpoint_progress; // 8B
LCI_matchtable_t mt; // 8B
LCII_packet_heap_t* heap; // 8B
LCII_rcache_t rcache; // 8B
LCI_segment_t heap_segment; // 8B
LCIU_CACHE_PADDING(2 * sizeof(LCIS_endpoint_t) + sizeof(LCI_matchtable_t) +
sizeof(LCII_rcache_t) + sizeof(LCI_segment_t));
sizeof(LCII_packet_heap_t*) + sizeof(LCII_rcache_t) +
sizeof(LCI_segment_t));
// the following is shared by both progress threads and worker threads
LCM_archive_t ctx_archive; // used for long message protocol
LCIU_CACHE_PADDING(sizeof(LCM_archive_t));
Expand Down
4 changes: 2 additions & 2 deletions lci/runtime/memory_registration.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ LCI_error_t LCI_memory_deregister(LCI_segment_t* segment)

LCI_error_t LCI_mbuffer_alloc(LCI_device_t device, LCI_mbuffer_t* mbuffer)
{
LCII_packet_t* packet = LCII_alloc_packet_nb(g_pkpool);
LCII_packet_t* packet = LCII_alloc_packet_nb(device->heap->pool);
if (packet == NULL)
// no packet is available
return LCI_ERR_RETRY;
packet->context.poolid = LCII_POOLID_LOCAL;

mbuffer->address = packet->data.address;
mbuffer->length = LCI_MEDIUM_SIZE;
LCI_DBG_Assert(LCII_is_packet(mbuffer->address), "");
LCI_DBG_Assert(LCII_is_packet(device->heap, mbuffer->address), "");
return LCI_OK;
}

Expand Down
6 changes: 3 additions & 3 deletions lci/runtime/packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ static inline LCII_packet_t* LCII_mbuffer2packet(LCI_mbuffer_t mbuffer)
return (LCII_packet_t*)(mbuffer.address - offsetof(LCII_packet_t, data));
}

static inline bool LCII_is_packet(void* address)
static inline bool LCII_is_packet(LCII_packet_heap_t* heap, void* address)
{
void* packet_address =
(LCII_packet_t*)(address - offsetof(LCII_packet_t, data));
uintptr_t offset = (uintptr_t)packet_address - (uintptr_t)g_base_packet;
return (uintptr_t)packet_address >= (uintptr_t)g_base_packet &&
uintptr_t offset = (uintptr_t)packet_address - (uintptr_t)heap->base_packet_p;
return (uintptr_t)packet_address >= (uintptr_t)heap->base_packet_p &&
offset % LCI_PACKET_SIZE == 0 &&
offset / LCI_PACKET_SIZE < LCI_SERVER_NUM_PKTS;
}
Expand Down
2 changes: 1 addition & 1 deletion lci/runtime/progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ LCI_error_t LCII_fill_rq(LCII_endpoint_t* endpoint, bool block)

// First, get a packet.
LCII_PCOUNTER_START(get_recv_packet_timer);
LCII_packet_t* packet = LCII_alloc_packet_nb(g_pkpool);
LCII_packet_t* packet = LCII_alloc_packet_nb(endpoint->device->heap->pool);
LCII_PCOUNTER_END(get_recv_packet_timer);
if (packet == NULL) {
LCII_PCOUNTER_ADD(net_recv_failed_nopacket, 1);
Expand Down

0 comments on commit b47b07f

Please sign in to comment.