Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2 pkpool #20

Open
wants to merge 7 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions include/lc/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,21 @@ LC_INLINE int32_t lc_pool_get_local(struct lc_pool* pool)
return pid;
}

LC_INLINE void* lc_pool_get_slow(struct lc_pool* pool) {
void* elm = NULL;
while (!elm) {
int steal = rand() % (pool->npools);
LC_INLINE int32_t lc_pool_get_steal(struct lc_pool* pool)
danghvu marked this conversation as resolved.
Show resolved Hide resolved
{
danghvu marked this conversation as resolved.
Show resolved Hide resolved
int32_t pid = lc_pool_get_local(pool);
danghvu marked this conversation as resolved.
Show resolved Hide resolved
int32_t npools = pool->npools;
int32_t r = rand() % (npools - 1);
return (r + pid + 1) % npools;
}

LC_INLINE void* lc_pool_steal(struct lc_pool* pool)
{
void* elm = NULL;
int32_t steal = lc_pool_get_steal(pool);
if (likely(pool->lpools[steal] != NULL))
elm = dq_pop_bot(pool->lpools[steal]);
}
return elm;
return elm;
}

LC_INLINE void lc_pool_put(struct lc_pool* pool, void* elm) {
Expand All @@ -111,26 +118,19 @@ LC_INLINE void lc_pool_put_to(struct lc_pool* pool, void* elm, int32_t pid) {
dq_push_top(lpool, elm);
}

LC_INLINE void* lc_pool_get(struct lc_pool* pool) {
LC_INLINE void* lc_pool_get_nb(struct lc_pool* pool) {
int32_t pid = lc_pool_get_local(pool);
struct dequeue* lpool = pool->lpools[pid];
void *elm = NULL;
elm = dq_pop_top(lpool);
void* elm = dq_pop_top(lpool);
if (elm == NULL)
elm = lc_pool_get_slow(pool);
elm = lc_pool_steal(pool);
return elm;
}

LC_INLINE void* lc_pool_get_nb(struct lc_pool* pool) {
int32_t pid = lc_pool_get_local(pool);
struct dequeue* lpool = pool->lpools[pid];
LC_INLINE void* lc_pool_get(struct lc_pool* pool) {
void* elm = NULL;
elm = dq_pop_top(lpool);
if (elm == NULL) {
int steal = rand() % (pool->npools);
if (likely(pool->lpools[steal] != NULL))
elm = dq_pop_bot(pool->lpools[steal]);
}
while (elm == NULL)
elm = lc_pool_get_nb(pool);
return elm;
}

Expand Down
2 changes: 1 addition & 1 deletion src/cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ lc_status lc_cq_pop(lc_ep ep, lc_req** req_ptr)
lc_status lc_cq_reqfree(lc_ep ep, lc_req* req)
{
lc_packet* packet = (lc_packet*) req->parent;
lc_pool_put(ep->pkpool, packet);
lci_pk_free_data(ep, packet);
return LC_OK;
}
8 changes: 4 additions & 4 deletions src/include/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ static inline void lci_ce_queue(lc_ep ep, lc_packet* p)
static inline void lci_handle_rtr(struct lci_ep* ep, lc_packet* p)
{
dprintf("Recv RTR %p\n", p);
lci_pk_init(ep, -1, LC_PROTO_LONG, p);
lci_pk_init(ep, p->context.poolid, LC_PROTO_LONG, p);
// dprintf("%d] rma %p --> %p %.4x via %d\n", lcg_rank, p->data.rts.src_addr, p->data.rtr.tgt_addr, crc32c((char*) p->data.rts.src_addr, p->data.rts.size), p->data.rtr.rkey);

lc_server_rma_rtr(ep->server, p->context.req->rhandle,
Expand All @@ -121,7 +121,7 @@ static inline void lci_handle_rtr(struct lci_ep* ep, lc_packet* p)
static inline void lci_handle_rts(struct lci_ep* ep, lc_packet* p)
{
dprintf("Recv RTS: %p\n", p);
lci_pk_init(ep, -1, LC_PROTO_RTR, p);
lci_pk_init(ep, p->context.poolid, LC_PROTO_RTR, p);
lc_proto proto = MAKE_PROTO(ep->gid, LC_PROTO_RTR, 0);
lci_prepare_rtr(ep, p->context.req->buffer, p->data.rts.size, p);
lc_server_sendm(ep->server, p->context.req->rhandle,
Expand Down Expand Up @@ -264,10 +264,10 @@ static inline void lci_serve_send(lc_packet* p)
} else if (proto == LC_PROTO_LONG) {
dprintf("SENT LONG: %p\n", p);
p->data.rts.cb((void*) p->data.rts.ce);
lci_pk_free(ep, p);
lci_pk_free_data(ep, p);
} else if (proto == LC_PROTO_RTS) {
dprintf("SENT RTS: %p\n", p);
lci_pk_free(ep, p);
lci_pk_free_data(ep, p);
} else {
dprintf("SENT UNKNOWN: %p\n", p);
lci_pk_free_data(ep, p);
Expand Down
6 changes: 5 additions & 1 deletion src/include/server/server_psm2.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,10 @@ static inline int lc_server_progress(lc_server* s)
lci_serve_recv_rdma(p, status.msg_tag.tag1);
} else {
p->context.req = &p->context.req_s;
lc_pool_put(s->pkpool, p);
if (p->context.poolid != -1)
lc_pool_put_to(s->pkpool, p, p->context.poolid);
else
lc_pool_put(s->pkpool, p);
}
} else if (ctx & PSM_SEND) {
lc_packet* p = (lc_packet*) (ctx ^ PSM_SEND);
Expand All @@ -318,6 +321,7 @@ static inline void lc_server_post_recv(lc_server* s, lc_packet* p)
}

psm2_mq_tag_t rtag = PSM_TAG_TRECV_DATA();
p->context.poolid = lc_pool_get_local(s->pkpool);

PSM_SAFECALL(psm2_mq_irecv2(
s->mq, PSM2_MQ_ANY_ADDR, &rtag, /* message tag */
Expand Down
6 changes: 3 additions & 3 deletions src/long.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ lc_status lc_sendl(void* src, size_t size, int rank, int tag, lc_ep ep,
lc_send_cb cb, void* ce)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, -1, LC_PROTO_RTS, p);
lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_RTS, p);
struct lci_rep* rep = &(ep->rep[rank]);
lci_prepare_rts(src, size, cb, ce, p);
lc_server_sendm(ep->server, rep->handle,
Expand All @@ -20,7 +20,7 @@ lc_status lc_putl(void* src, size_t size, int rank, uintptr_t addr,
lc_ep ep, lc_send_cb cb, void* ce)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, -1, LC_PROTO_LONG, p);
lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_LONG, p);
p->data.rts.cb = cb;
p->data.rts.ce = (uintptr_t) ce;

Expand All @@ -34,7 +34,7 @@ lc_status lc_putls(void* src, size_t size, int rank, uintptr_t addr, int meta,
lc_ep ep, lc_send_cb cb, void* ce)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, -1, LC_PROTO_LONG, p);
lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_LONG, p);
p->data.rts.cb = cb;
p->data.rts.ce = (uintptr_t) ce;
struct lci_rep* rep = &(ep->rep[rank]);
Expand Down
9 changes: 3 additions & 6 deletions src/medium.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
lc_status lc_sendm(void* src, size_t size, int rank, int tag, lc_ep ep)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1,
LC_PROTO_DATA, p);
lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_DATA, p);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in some benchmark it can be an issue since the cost of putting the data on a remote pool can be significant i.e. it's always guarantee a cache miss vs. returning to the its own pool which is very cheap.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point. In our use case however, there is only ever one thread (per device I guess) that can return packets. Is it then better to cause a cache miss or to allow a pool to empty, guaranteeing a cache miss later?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, there is no right or wrong here. Things like this need to be tuned and measured against application behavior. Hence giving some flex over it for tuning is better, I would change 1024 to a const, and if you set to 0, compiler will get rid of it.

Btw note also cache miss on the progress thread trying to return to the original pool can be worst than on sender stealing. Firstly it delay progress, secondly assuming we have large number of thread doing send, cost are amortized as you may have other threads doing useful works. Again may not matter in your case now which you have only one sender thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I see your point.

The branch of ParSEC I'm currently on is old and funnels all sends via the communication thread; but more recent versions can send from other threads as well, so this will become more relevant in the future once I rebase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've remembered that actually, the communication and progress threads are supposed to run on the same core (they're bound). This means that the should end up using the same pool. In PaRSEC/LCI, the user's main thread is actually the one to initialize LCI and thus fill the initial pool; the communication and progress threads are started later (and presumably run on a different core) and therefore start with an empty pool.

I'll do some more tests that discard the changes for medium and long sends, but keep the remaining changes (most notably those in server_psm2.h).

Long-term, I'll probably introduce some compile-time parameters for a) limit for medium send packet return and b) whether to do long send packet return.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange if they are using the same pool, please verify.

Copy link
Member Author

@omor1 omor1 Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll verify tomorrow, but my intuition is that my above statement is correct. Both threads should be bound to the same core and so sched_getcpu should return the same value, leading to using the same pool. This is PaRSEC-specific though, and in the future if/when we enable using more than one device we'll need to have better logic for thread placement (in PaRSEC).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on the activation mode, PaRSEC communication thread might not be bound to a core, but instead be allowed to freely move around. You can be more restrictive and force a binding to the communication thread, in which case the resource will be assumed reserved, and no computation thread will be allowed to use it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default (which I've been testing with) is to have it be bound. When we have a single progress thread, it makes sense to bind it to the communication thread, but if we have more (multiple NICs/hardware queues/"LCI devices") then the decision is less obvious. At that point, we'll want to introduce an MCA parameter to control the binding.

struct lci_rep* rep = &(ep->rep[rank]);
memcpy(p->data.buffer, src, size);
lc_server_sendm(ep->server, rep->handle, size, p,
Expand All @@ -18,8 +17,7 @@ lc_status lc_sendm(void* src, size_t size, int rank, int tag, lc_ep ep)
lc_status lc_putm(void* src, size_t size, int rank, uintptr_t addr, lc_ep ep)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1,
LC_PROTO_DATA, p);
lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_DATA, p);
struct lci_rep* rep = &(ep->rep[rank]);
memcpy(&p->data, src, size);
lc_server_putm(ep->server, rep->handle, rep->base, (uint32_t) (addr - rep->base),
Expand All @@ -30,8 +28,7 @@ lc_status lc_putm(void* src, size_t size, int rank, uintptr_t addr, lc_ep ep)
lc_status lc_putms(void* src, size_t size, int rank, uintptr_t addr, int meta, lc_ep ep)
{
LC_POOL_GET_OR_RETN(ep->pkpool, p);
lci_pk_init(ep, (size > 1024) ? lc_pool_get_local(ep->pkpool) : -1,
LC_PROTO_DATA, p);
lci_pk_init(ep, lc_pool_get_local(ep->pkpool), LC_PROTO_DATA, p);
struct lci_rep* rep = &(ep->rep[rank]);
memcpy(&p->data, src, size);
lc_server_putms(ep->server, rep->handle, rep->base, (uint32_t) (addr - rep->base),
Expand Down