From 5eb5330431cd95746be5ddc3021e17c7b5bf8b5a Mon Sep 17 00:00:00 2001 From: Omri Mor Date: Thu, 4 Jun 2020 19:58:47 -0500 Subject: [PATCH] pkpool: return packets to allocating pool Populate p->context.poolid when allocating a packet and use lc_pool_put_to when returning it so that packets don't get stuck in the same pool. This reduces the number of retries needed, but may have other effects - needs testing. --- src/cq.c | 2 +- src/include/proto.h | 8 ++++---- src/include/server/server_psm2.h | 6 +++++- src/long.c | 6 +++--- src/medium.c | 9 +++------ 5 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/cq.c b/src/cq.c index 50a8c114..ec78e60b 100644 --- a/src/cq.c +++ b/src/cq.c @@ -14,6 +14,6 @@ lc_status lc_cq_pop(lc_ep ep, lc_req** req_ptr) lc_status lc_cq_reqfree(lc_ep ep, lc_req* req) { lc_packet* packet = (lc_packet*) req->parent; - lc_pool_put(ep->pkpool, packet); + lci_pk_free_data(ep, packet); return LC_OK; } diff --git a/src/include/proto.h b/src/include/proto.h index fe9d41a4..21df1445 100644 --- a/src/include/proto.h +++ b/src/include/proto.h @@ -109,7 +109,7 @@ static inline void lci_ce_queue(lc_ep ep, lc_packet* p) static inline void lci_handle_rtr(struct lci_ep* ep, lc_packet* p) { dprintf("Recv RTR %p\n", p); - lci_pk_init(ep, -1, LC_PROTO_LONG, p); + lci_pk_init(ep, p->context.poolid, LC_PROTO_LONG, p); // dprintf("%d] rma %p --> %p %.4x via %d\n", lcg_rank, p->data.rts.src_addr, p->data.rtr.tgt_addr, crc32c((char*) p->data.rts.src_addr, p->data.rts.size), p->data.rtr.rkey); lc_server_rma_rtr(ep->server, p->context.req->rhandle, @@ -121,7 +121,7 @@ static inline void lci_handle_rtr(struct lci_ep* ep, lc_packet* p) static inline void lci_handle_rts(struct lci_ep* ep, lc_packet* p) { dprintf("Recv RTS: %p\n", p); - lci_pk_init(ep, -1, LC_PROTO_RTR, p); + lci_pk_init(ep, p->context.poolid, LC_PROTO_RTR, p); lc_proto proto = MAKE_PROTO(ep->gid, LC_PROTO_RTR, 0); lci_prepare_rtr(ep, p->context.req->buffer, p->data.rts.size, p); lc_server_sendm(ep->server, p->context.req->rhandle, @@ -264,10 +264,10 @@ static inline void lci_serve_send(lc_packet* p) } else if (proto == LC_PROTO_LONG) { dprintf("SENT LONG: %p\n", p); p->data.rts.cb((void*) p->data.rts.ce); - lci_pk_free(ep, p); + lci_pk_free_data(ep, p); } else if (proto == LC_PROTO_RTS) { dprintf("SENT RTS: %p\n", p); - lci_pk_free(ep, p); + lci_pk_free_data(ep, p); } else { dprintf("SENT UNKNOWN: %p\n", p); lci_pk_free_data(ep, p); diff --git a/src/include/server/server_psm2.h b/src/include/server/server_psm2.h index ca47927c..1d9c512d 100644 --- a/src/include/server/server_psm2.h +++ b/src/include/server/server_psm2.h @@ -292,7 +292,10 @@ static inline int lc_server_progress(lc_server* s) lci_serve_recv_rdma(p, status.msg_tag.tag1); } else { p->context.req = &p->context.req_s; - lc_pool_put(s->pkpool, p); + if (p->context.poolid != -1) + lc_pool_put_to(s->pkpool, p, p->context.poolid); + else + lc_pool_put(s->pkpool, p); } } else if (ctx & PSM_SEND) { lc_packet* p = (lc_packet*) (ctx ^ PSM_SEND); @@ -318,6 +321,7 @@ static inline void lc_server_post_recv(lc_server* s, lc_packet* p) } psm2_mq_tag_t rtag = PSM_TAG_TRECV_DATA(); + p->context.poolid = lc_pool_get_local(s->pkpool); PSM_SAFECALL(psm2_mq_irecv2( s->mq, PSM2_MQ_ANY_ADDR, &rtag, /* message tag */ diff --git a/src/long.c b/src/long.c index 5be2670c..0e02009e 100644 --- a/src/long.c +++ b/src/long.c @@ -7,7 +7,7 @@ lc_status lc_sendl(void* src, size_t size, int rank, int tag, lc_ep ep, lc_send_cb cb, void* ce) { LC_POOL_GET_OR_RETN(ep->pkpool, p); - lci_pk_init(ep, -1, LC_PROTO_RTS, p); + lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_RTS, p); struct lci_rep* rep = &(ep->rep[rank]); lci_prepare_rts(src, size, cb, ce, p); lc_server_sendm(ep->server, rep->handle, @@ -20,7 +20,7 @@ lc_status lc_putl(void* src, size_t size, int rank, uintptr_t addr, lc_ep ep, lc_send_cb cb, void* ce) { LC_POOL_GET_OR_RETN(ep->pkpool, p); - lci_pk_init(ep, -1, LC_PROTO_LONG, p); + lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_LONG, p); p->data.rts.cb = cb; p->data.rts.ce = (uintptr_t) ce; @@ -34,7 +34,7 @@ lc_status lc_putls(void* src, size_t size, int rank, uintptr_t addr, int meta, lc_ep ep, lc_send_cb cb, void* ce) { LC_POOL_GET_OR_RETN(ep->pkpool, p); - lci_pk_init(ep, -1, LC_PROTO_LONG, p); + lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_LONG, p); p->data.rts.cb = cb; p->data.rts.ce = (uintptr_t) ce; struct lci_rep* rep = &(ep->rep[rank]); diff --git a/src/medium.c b/src/medium.c index e758575c..3a0f6753 100644 --- a/src/medium.c +++ b/src/medium.c @@ -6,8 +6,7 @@ lc_status lc_sendm(void* src, size_t size, int rank, int tag, lc_ep ep) { LC_POOL_GET_OR_RETN(ep->pkpool, p); - lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1, - LC_PROTO_DATA, p); + lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_DATA, p); struct lci_rep* rep = &(ep->rep[rank]); memcpy(p->data.buffer, src, size); lc_server_sendm(ep->server, rep->handle, size, p, @@ -18,8 +17,7 @@ lc_status lc_sendm(void* src, size_t size, int rank, int tag, lc_ep ep) lc_status lc_putm(void* src, size_t size, int rank, uintptr_t addr, lc_ep ep) { LC_POOL_GET_OR_RETN(ep->pkpool, p); - lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1, - LC_PROTO_DATA, p); + lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_DATA, p); struct lci_rep* rep = &(ep->rep[rank]); memcpy(&p->data, src, size); lc_server_putm(ep->server, rep->handle, rep->base, (uint32_t) (addr - rep->base), @@ -30,8 +28,7 @@ lc_status lc_putm(void* src, size_t size, int rank, uintptr_t addr, lc_ep ep) lc_status lc_putms(void* src, size_t size, int rank, uintptr_t addr, int meta, lc_ep ep) { LC_POOL_GET_OR_RETN(ep->pkpool, p); - lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1, - LC_PROTO_DATA, p); + lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_DATA, p); struct lci_rep* rep = &(ep->rep[rank]); memcpy(&p->data, src, size); lc_server_putms(ep->server, rep->handle, rep->base, (uint32_t) (addr - rep->base),