Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2 pkpool #20

Open
wants to merge 7 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 43 additions & 26 deletions include/lc/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct dequeue;
extern "C" {
#endif

LC_INLINE int lc_worker_id()
LC_INLINE int lc_worker_id(void)
{
if (unlikely(lcg_core_id == -1)) {
lcg_core_id = sched_getcpu();
Expand Down Expand Up @@ -69,7 +69,7 @@ void* lc_pool_get_nb(lc_pool* pool);

#define POOL_UNINIT ((int32_t)-1)

LC_INLINE int32_t lc_pool_get_local(struct lc_pool* pool)
LC_INLINE int32_t lc_pool_get_local_id(struct lc_pool* pool)
{
int wid = lc_worker_id();
int32_t pid = tls_pool_struct[wid][pool->key];
Expand All @@ -90,47 +90,64 @@ LC_INLINE int32_t lc_pool_get_local(struct lc_pool* pool)
return pid;
}

LC_INLINE void* lc_pool_get_slow(struct lc_pool* pool) {
LC_INLINE int32_t lc_pool_get_steal_id(int32_t npools, int32_t pid)
{
danghvu marked this conversation as resolved.
Show resolved Hide resolved
if (npools == 1)
return -1; /* if only one pool, no one else to steal from */
int32_t r = rand() % (npools - 1);
return (r + pid + 1) % npools;
}

LC_INLINE void* lc_pool_steal_from(struct lc_pool* pool, int32_t pid)
{
void* elm = NULL;
while (!elm) {
int steal = rand() % (pool->npools);
if (likely(pool->lpools[steal] != NULL))
elm = dq_pop_bot(pool->lpools[steal]);
}
if (likely(pool->lpools[pid] != NULL))
elm = dq_pop_bot(pool->lpools[pid]);
return elm;
}

LC_INLINE void lc_pool_put(struct lc_pool* pool, void* elm) {
int32_t pid = lc_pool_get_local(pool);
struct dequeue* lpool = pool->lpools[pid];
dq_push_top(lpool, elm);
LC_INLINE void* lc_pool_steal(struct lc_pool* pool, int32_t pid)
{
void* elm = NULL;
int32_t target = lc_pool_get_steal_id(pool->npools, pid);
if (target != -1)
elm = lc_pool_steal_from(pool, target);
return elm;
}

LC_INLINE void lc_pool_put_to(struct lc_pool* pool, void* elm, int32_t pid) {
LC_INLINE void lc_pool_put_to(struct lc_pool* pool, void* elm, int32_t pid)
{
struct dequeue* lpool = pool->lpools[pid];
dq_push_top(lpool, elm);
}

LC_INLINE void* lc_pool_get(struct lc_pool* pool) {
int32_t pid = lc_pool_get_local(pool);
LC_INLINE void lc_pool_put(struct lc_pool* pool, void* elm)
{
int32_t pid = lc_pool_get_local_id(pool);
lc_pool_put_to(pool, elm, pid);
}

LC_INLINE void* lc_pool_get_nb(struct lc_pool* pool)
{
int32_t pid = lc_pool_get_local_id(pool);
struct dequeue* lpool = pool->lpools[pid];
void *elm = NULL;
elm = dq_pop_top(lpool);
void* elm = dq_pop_top(lpool);
if (elm == NULL)
elm = lc_pool_get_slow(pool);
elm = lc_pool_steal(pool, pid);
return elm;
}

LC_INLINE void* lc_pool_get_nb(struct lc_pool* pool) {
int32_t pid = lc_pool_get_local(pool);
LC_INLINE void* lc_pool_get(struct lc_pool* pool)
{
int32_t pid = lc_pool_get_local_id(pool);
struct dequeue* lpool = pool->lpools[pid];
void* elm = NULL;
elm = dq_pop_top(lpool);
if (elm == NULL) {
int steal = rand() % (pool->npools);
if (likely(pool->lpools[steal] != NULL))
elm = dq_pop_bot(pool->lpools[steal]);
}
do {
/* must try self every iteration since we never steal from self */
elm = dq_pop_top(lpool);
if (elm == NULL)
elm = lc_pool_steal(pool, pid);
} while (elm == NULL);
return elm;
}

Expand Down
2 changes: 1 addition & 1 deletion src/cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ lc_status lc_cq_pop(lc_ep ep, lc_req** req_ptr)
lc_status lc_cq_reqfree(lc_ep ep, lc_req* req)
{
lc_packet* packet = (lc_packet*) req->parent;
lc_pool_put(ep->pkpool, packet);
lci_pk_free_data(ep, packet);
return LC_OK;
}
3 changes: 3 additions & 0 deletions src/include/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@
#define LC_SERVER_NUM_PKTS 1024
#define LC_CACHE_LINE 64

#define LC_PKT_RET_MED_SIZE 1024
// #define LC_PKT_RET_LONG

#endif
8 changes: 4 additions & 4 deletions src/include/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ static inline void lci_ce_queue(lc_ep ep, lc_packet* p)
static inline void lci_handle_rtr(struct lci_ep* ep, lc_packet* p)
{
dprintf("Recv RTR %p\n", p);
lci_pk_init(ep, -1, LC_PROTO_LONG, p);
lci_pk_init(ep, p->context.poolid, LC_PROTO_LONG, p);
// dprintf("%d] rma %p --> %p %.4x via %d\n", lcg_rank, p->data.rts.src_addr, p->data.rtr.tgt_addr, crc32c((char*) p->data.rts.src_addr, p->data.rts.size), p->data.rtr.rkey);

lc_server_rma_rtr(ep->server, p->context.req->rhandle,
Expand All @@ -121,7 +121,7 @@ static inline void lci_handle_rtr(struct lci_ep* ep, lc_packet* p)
static inline void lci_handle_rts(struct lci_ep* ep, lc_packet* p)
{
dprintf("Recv RTS: %p\n", p);
lci_pk_init(ep, -1, LC_PROTO_RTR, p);
lci_pk_init(ep, p->context.poolid, LC_PROTO_RTR, p);
lc_proto proto = MAKE_PROTO(ep->gid, LC_PROTO_RTR, 0);
lci_prepare_rtr(ep, p->context.req->buffer, p->data.rts.size, p);
lc_server_sendm(ep->server, p->context.req->rhandle,
Expand Down Expand Up @@ -264,10 +264,10 @@ static inline void lci_serve_send(lc_packet* p)
} else if (proto == LC_PROTO_LONG) {
dprintf("SENT LONG: %p\n", p);
p->data.rts.cb((void*) p->data.rts.ce);
lci_pk_free(ep, p);
lci_pk_free_data(ep, p);
} else if (proto == LC_PROTO_RTS) {
dprintf("SENT RTS: %p\n", p);
lci_pk_free(ep, p);
lci_pk_free_data(ep, p);
} else {
dprintf("SENT UNKNOWN: %p\n", p);
lci_pk_free_data(ep, p);
Expand Down
2 changes: 1 addition & 1 deletion src/include/server/server_ibv_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ static inline void ibv_post_recv_(lc_server* s, lc_packet* p)
.num_sge = 1,
};

p->context.poolid = lc_pool_get_local(s->pkpool);
p->context.poolid = lc_pool_get_local_id(s->pkpool);

struct ibv_recv_wr* bad_wr = 0;
IBV_SAFECALL(ibv_post_srq_recv(s->dev_srq, &wr, &bad_wr));
Expand Down
6 changes: 5 additions & 1 deletion src/include/server/server_psm2.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,10 @@ static inline int lc_server_progress(lc_server* s)
lci_serve_recv_rdma(p, status.msg_tag.tag1);
} else {
p->context.req = &p->context.req_s;
lc_pool_put(s->pkpool, p);
if (p->context.poolid != -1)
lc_pool_put_to(s->pkpool, p, p->context.poolid);
else
lc_pool_put(s->pkpool, p);
}
} else if (ctx & PSM_SEND) {
lc_packet* p = (lc_packet*) (ctx ^ PSM_SEND);
Expand All @@ -318,6 +321,7 @@ static inline void lc_server_post_recv(lc_server* s, lc_packet* p)
}

psm2_mq_tag_t rtag = PSM_TAG_TRECV_DATA();
p->context.poolid = lc_pool_get_local_id(s->pkpool);

PSM_SAFECALL(psm2_mq_irecv2(
s->mq, PSM2_MQ_ANY_ADDR, &rtag, /* message tag */
Expand Down
13 changes: 10 additions & 3 deletions src/long.c
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
#include "lc.h"
#include "config.h"

#include "lc_priv.h"
#include "lc/pool.h"

#ifdef LC_PKT_RET_LONG
#define LC_LONG_POOL_ID(ep) (lc_pool_get_local_id(ep->pkpool))
#else
#define LC_LONG_POOL_ID(ep) (-1)
#endif

lc_status lc_sendl(void* src, size_t size, int rank, int tag, lc_ep ep,
lc_send_cb cb, void* ce)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, -1, LC_PROTO_RTS, p);
lci_pk_init(ep, LC_LONG_POOL_ID(ep), LC_PROTO_RTS, p);
struct lci_rep* rep = &(ep->rep[rank]);
lci_prepare_rts(src, size, cb, ce, p);
lc_server_sendm(ep->server, rep->handle,
Expand All @@ -20,7 +27,7 @@ lc_status lc_putl(void* src, size_t size, int rank, uintptr_t addr,
lc_ep ep, lc_send_cb cb, void* ce)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, -1, LC_PROTO_LONG, p);
lci_pk_init(ep, LC_LONG_POOL_ID(ep), LC_PROTO_LONG, p);
p->data.rts.cb = cb;
p->data.rts.ce = (uintptr_t) ce;

Expand All @@ -34,7 +41,7 @@ lc_status lc_putls(void* src, size_t size, int rank, uintptr_t addr, int meta,
lc_ep ep, lc_send_cb cb, void* ce)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, -1, LC_PROTO_LONG, p);
lci_pk_init(ep, LC_LONG_POOL_ID(ep), LC_PROTO_LONG, p);
p->data.rts.cb = cb;
p->data.rts.ce = (uintptr_t) ce;
struct lci_rep* rep = &(ep->rep[rank]);
Expand Down
12 changes: 6 additions & 6 deletions src/medium.c
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#include "lc.h"
#include "config.h"

#include "lc_priv.h"
#include "lc/pool.h"

#define LC_MED_POOL_ID(ep, size) ((size > LC_PKT_RET_MED_SIZE) ? lc_pool_get_local_id(ep->pkpool) : -1)

lc_status lc_sendm(void* src, size_t size, int rank, int tag, lc_ep ep)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1,
LC_PROTO_DATA, p);
lci_pk_init(ep, LC_MED_POOL_ID(ep, size), LC_PROTO_DATA, p);
struct lci_rep* rep = &(ep->rep[rank]);
memcpy(p->data.buffer, src, size);
lc_server_sendm(ep->server, rep->handle, size, p,
Expand All @@ -18,8 +20,7 @@ lc_status lc_sendm(void* src, size_t size, int rank, int tag, lc_ep ep)
lc_status lc_putm(void* src, size_t size, int rank, uintptr_t addr, lc_ep ep)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1,
LC_PROTO_DATA, p);
lci_pk_init(ep, LC_MED_POOL_ID(ep, size), LC_PROTO_DATA, p);
struct lci_rep* rep = &(ep->rep[rank]);
memcpy(&p->data, src, size);
lc_server_putm(ep->server, rep->handle, rep->base, (uint32_t) (addr - rep->base),
Expand All @@ -30,8 +31,7 @@ lc_status lc_putm(void* src, size_t size, int rank, uintptr_t addr, lc_ep ep)
lc_status lc_putms(void* src, size_t size, int rank, uintptr_t addr, int meta, lc_ep ep)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1,
LC_PROTO_DATA, p);
lci_pk_init(ep, LC_MED_POOL_ID(ep, size), LC_PROTO_DATA, p);
struct lci_rep* rep = &(ep->rep[rank]);
memcpy(&p->data, src, size);
lc_server_putms(ep->server, rep->handle, rep->base, (uint32_t) (addr - rep->base),
Expand Down