Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NO NOT REVIEW]ONLY FOR CI #236

Open
wants to merge 2 commits into
base: integration3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions buildlib/pr/io_demo/az-stage-io-demo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ steps:
--tasks-per-node 1 \
--duration ${{ parameters.duration }} \
-v \
--bind \
--num-clients 1 \
--num-servers 1 \
--map-by slot \
Expand Down
217 changes: 143 additions & 74 deletions src/ucs/datastruct/arbiter.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,28 @@ static inline void ucs_arbiter_group_head_reset(ucs_arbiter_elem_t *head)
head->list.next = NULL; /* Not scheduled yet */
}

static inline void ucs_arbiter_elem_set_scheduled(ucs_arbiter_elem_t *elem,
ucs_arbiter_group_t *group)
{
elem->group = group;
}

void ucs_arbiter_group_push_elem_always(ucs_arbiter_group_t *group,
ucs_arbiter_elem_t *elem)
{
ucs_arbiter_elem_t *tail = group->tail;

UCS_ARBITER_GROUP_GUARD_CHECK(group);

if (tail == NULL) {
/* group is empty */
ucs_arbiter_group_head_reset(elem);
elem->next = elem; /* Connect to itself */
} else {
elem->next = tail->next; /* Point to first element */
tail->next = elem; /* Point previous element to new one */
}

elem->group = group; /* Always point to group */
group->tail = elem; /* Update group tail */
ucs_arbiter_elem_set_scheduled(elem, group);
}

void ucs_arbiter_group_push_head_elem_always(ucs_arbiter_t *arbiter,
Expand All @@ -74,8 +79,8 @@ void ucs_arbiter_group_push_head_elem_always(ucs_arbiter_t *arbiter,

UCS_ARBITER_GROUP_GUARD_CHECK(group);

elem->group = group; /* Always point to group */
ucs_arbiter_group_head_reset(elem);
ucs_arbiter_elem_set_scheduled(elem, group);

if (tail == NULL) {
elem->next = elem; /* Connect to itself */
Expand All @@ -94,14 +99,6 @@ void ucs_arbiter_group_push_head_elem_always(ucs_arbiter_t *arbiter,
ucs_list_replace(&head->list, &elem->list);
}

void ucs_arbiter_group_head_desched(ucs_arbiter_t *arbiter,
ucs_arbiter_elem_t *head)
{
if (ucs_arbiter_group_head_is_scheduled(head)) {
ucs_list_del(&head->list);
}
}

void ucs_arbiter_group_purge(ucs_arbiter_t *arbiter,
ucs_arbiter_group_t *group,
ucs_arbiter_callback_t cb, void *cb_arg)
Expand Down Expand Up @@ -133,8 +130,8 @@ void ucs_arbiter_group_purge(ucs_arbiter_t *arbiter,
ptr = next;
next = ptr->next;
/* Can't touch the element after cb is called if it gets removed. But it
* can be reused later as well, so it's next should be NULL. */
ptr->next = NULL;
* can be reused later as well, so it's group should be NULL. */
ucs_arbiter_elem_init(ptr);
result = cb(arbiter, group, ptr, cb_arg);

if (result == UCS_ARBITER_CB_RESULT_REMOVE_ELEM) {
Expand All @@ -158,8 +155,8 @@ void ucs_arbiter_group_purge(ucs_arbiter_t *arbiter,
prev->next = next;
} else {
/* keep the element */
ptr->next = next; /* Restore next pointer */
prev = ptr;
ucs_arbiter_elem_set_scheduled(ptr, group);
prev = ptr;
}
} while (ptr != tail);

Expand All @@ -174,6 +171,25 @@ void ucs_arbiter_group_purge(ucs_arbiter_t *arbiter,
}
}

size_t ucs_arbiter_group_num_elems(ucs_arbiter_group_t *group)
{
ucs_arbiter_elem_t *elem = group->tail;
size_t num_elems;

if (elem == NULL) {
return 0;
}

num_elems = 0;
do {
++num_elems;
elem = elem->next;
} while (elem != group->tail);

return num_elems;
}


int ucs_arbiter_group_is_scheduled(ucs_arbiter_group_t *group)
{
ucs_arbiter_elem_t *head;
Expand Down Expand Up @@ -204,13 +220,7 @@ void ucs_arbiter_group_schedule_nonempty(ucs_arbiter_t *arbiter,
ucs_assert(tail != NULL);
head = tail->next;

if (head == NULL) {
/* It means that 1 element group is scheduled during dispatch.
* Restore next pointer.
*/
head = tail;
}

ucs_assert(head != NULL);
ucs_arbiter_schedule_head_if_not_scheduled(arbiter, head);
UCS_ARBITER_GROUP_ARBITER_SET(group, arbiter);
}
Expand All @@ -230,19 +240,47 @@ void ucs_arbiter_group_desched_nonempty(ucs_arbiter_t *arbiter,
ucs_arbiter_group_head_reset(head);
}

static inline void
ucs_arbiter_remove_and_reset_if_scheduled(ucs_arbiter_elem_t *elem)
{
if (ucs_unlikely(ucs_arbiter_group_head_is_scheduled(elem))) {
ucs_list_del(&elem->list);
ucs_arbiter_group_head_reset(elem);
}
}

static inline void
ucs_arbiter_group_head_replace(ucs_arbiter_group_t *group,
ucs_arbiter_elem_t *group_head,
ucs_arbiter_elem_t *new_group_head)
{
/* check if this is really the group head */
ucs_assert(!ucs_arbiter_group_is_empty(group));
ucs_assert(group->tail->next == group_head);

if (group_head->next == group_head) {
group->tail = new_group_head;
} else {
new_group_head->next = group_head->next;
}
group->tail->next = new_group_head;
}

void ucs_arbiter_dispatch_nonempty(ucs_arbiter_t *arbiter, unsigned per_group,
ucs_arbiter_callback_t cb, void *cb_arg)
{
ucs_arbiter_elem_t *group_head, *group_tail, *next_elem;
ucs_arbiter_elem_t *group_head;
ucs_arbiter_cb_result_t result;
unsigned group_dispatch_count;
ucs_arbiter_group_t *group;
UCS_LIST_HEAD(resched_list);
int sched_group;
ucs_arbiter_elem_t dummy;

ucs_assert(!ucs_list_is_empty(&arbiter->list));

for (;;) {
ucs_arbiter_group_head_reset(&dummy);

do {
group_head = ucs_list_extract_head(&arbiter->list, ucs_arbiter_elem_t,
list);
ucs_assert(group_head != NULL);
Expand All @@ -254,80 +292,111 @@ void ucs_arbiter_dispatch_nonempty(ucs_arbiter_t *arbiter, unsigned per_group,
ucs_arbiter_group_head_reset(group_head);

group_dispatch_count = 0;
sched_group = 1;
group = group_head->group;
dummy.group = group;
UCS_ARBITER_GROUP_GUARD_CHECK(group);

do {
/* zero pointer to next elem here because:
for (;;) {
ucs_assert(group_head->group == group);
ucs_assert(dummy.group == group);
ucs_assert(group_dispatch_count < per_group);

/* reset the dispatched element here because:
* 1. if the element is removed from the arbiter it must be kept in
* initialized state otherwise push will fail
* 2. we can't zero the pointer after calling the callback because
* 2. we can't reset the element after calling the callback because
* the callback could release the element.
*/
next_elem = group_head->next;
group_head->next = NULL;
ucs_assert(group_head->group == group);
ucs_arbiter_elem_init(group_head);
ucs_assert(!ucs_arbiter_group_head_is_scheduled(group_head));

/* replace group head by a dummy element, to allow scheduling more
* elements on this group from the dispatch callback.
*/
ucs_arbiter_group_head_replace(group, group_head, &dummy);

/* dispatch the element */
ucs_trace_poll("dispatching arbiter element %p", group_head);
UCS_ARBITER_GROUP_GUARD_ENTER(group);
result = cb(arbiter, group, group_head, cb_arg);
UCS_ARBITER_GROUP_GUARD_EXIT(group);
ucs_trace_poll("dispatch result: %d", result);
++group_dispatch_count;

if (result == UCS_ARBITER_CB_RESULT_REMOVE_ELEM) {
group_tail = group->tail;
if (group_head == group_tail) {
/* Last element */
group->tail = NULL; /* Group is empty now */
sched_group = 0;
group_head = NULL; /* for debugging */
/* recursive push to head (during dispatch) is not allowed */
ucs_assert(group->tail->next == &dummy);

/* element is not removed */
if (ucs_unlikely(result != UCS_ARBITER_CB_RESULT_REMOVE_ELEM)) {
/* restore group pointer */
ucs_arbiter_elem_set_scheduled(group_head, group);

/* the head should not move, since dummy replaces it */
ucs_assert(!ucs_arbiter_group_head_is_scheduled(group_head));

/* replace dummy element by group_head */
ucs_arbiter_group_head_replace(group, &dummy, group_head);

if (result == UCS_ARBITER_CB_RESULT_DESCHED_GROUP) {
/* take over a recursively scheduled group */
if (ucs_unlikely(ucs_arbiter_group_head_is_scheduled(&dummy))) {
ucs_list_replace(&dummy.list, &group_head->list);
ucs_arbiter_group_head_reset(&dummy);
}
UCS_ARBITER_GROUP_ARBITER_SET(group, NULL);
break;
} else {
/* Not last element */
ucs_assert(group_head == group_tail->next);
ucs_assert(group_head != next_elem);
group_head = next_elem; /* Update group head */
group_tail->next = group_head; /* Tail points to new head */
ucs_arbiter_group_head_reset(group_head);
}
} else {
/* element is not removed, restore next pointer */
group_head->next = next_elem;

/* group must still be active */
ucs_assert(sched_group == 1);

if (result == UCS_ARBITER_CB_RESULT_STOP) {
/* exit the outmost loop and make sure that next dispatch()
* will continue from the current group */
ucs_list_add_head(&arbiter->list, &group_head->list);
goto out;
} else if (result != UCS_ARBITER_CB_RESULT_NEXT_GROUP) {
/* resched/desched must avoid adding the group to the arbiter */
sched_group = 0;
if (result == UCS_ARBITER_CB_RESULT_DESCHED_GROUP) {
UCS_ARBITER_GROUP_ARBITER_SET(group, NULL);
/* remove a recursively scheduled group, give priority
* to the original order */
ucs_arbiter_remove_and_reset_if_scheduled(&dummy);

if (result == UCS_ARBITER_CB_RESULT_NEXT_GROUP) {
/* add to arbiter tail */
ucs_list_add_tail(&arbiter->list, &group_head->list);
} else if (result == UCS_ARBITER_CB_RESULT_RESCHED_GROUP) {
/* add to resched list */
ucs_list_add_tail(&resched_list, &group_head->list);
} else if (result == UCS_ARBITER_CB_RESULT_STOP) {
/* exit the outmost loop and make sure that next dispatch()
* will continue from the current group */
ucs_list_add_head(&arbiter->list, &group_head->list);
goto out;
} else {
ucs_bug("unexpected return value from arbiter callback");
}
break;
}

break;
}
} while (group_dispatch_count < per_group);

if (sched_group) {
/* the group could be scheduled again from dispatch callback */
ucs_arbiter_schedule_head_if_not_scheduled(arbiter, group_head);
ucs_assert(!ucs_list_is_empty(&arbiter->list));
} else if (ucs_list_is_empty(&arbiter->list)) {
break;
/* last element removed */
if (dummy.next == &dummy) {
group->tail = NULL; /* group is empty now */
group_head = NULL; /* for debugging */
ucs_arbiter_remove_and_reset_if_scheduled(&dummy);
UCS_ARBITER_GROUP_ARBITER_SET(group, NULL);
break;
}

/* non-last element removed */
group_head = dummy.next; /* Update group head */
group->tail->next = group_head; /* Tail points to new head */

if (ucs_unlikely(ucs_arbiter_group_head_is_scheduled(&dummy))) {
/* take over a recursively scheduled group */
ucs_list_replace(&dummy.list, &group_head->list);
ucs_arbiter_group_head_reset(&dummy);
/* the group is already scheduled, continue to next group */
break;
} else if (group_dispatch_count >= per_group) {
/* add to arbiter tail and continue to next group */
ucs_list_add_tail(&arbiter->list, &group_head->list);
break;
}

/* continue with new group head */
ucs_arbiter_group_head_reset(group_head);
}
}
} while (!ucs_list_is_empty(&arbiter->list));

out:
ucs_list_splice_tail(&arbiter->list, &resched_list);
Expand Down
15 changes: 10 additions & 5 deletions src/ucs/datastruct/arbiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void ucs_arbiter_group_cleanup(ucs_arbiter_group_t *group);
*/
static inline void ucs_arbiter_elem_init(ucs_arbiter_elem_t *elem)
{
elem->next = NULL;
elem->group = NULL;
}


Expand Down Expand Up @@ -233,6 +233,12 @@ void ucs_arbiter_group_purge(ucs_arbiter_t *arbiter, ucs_arbiter_group_t *group,
ucs_arbiter_callback_t cb, void *cb_arg);


/**
* @return Number of elements in the group
*/
size_t ucs_arbiter_group_num_elems(ucs_arbiter_group_t *group);


void ucs_arbiter_dump(ucs_arbiter_t *arbiter, FILE *stream);


Expand Down Expand Up @@ -306,11 +312,10 @@ static inline void ucs_arbiter_group_desched(ucs_arbiter_t *arbiter,
/**
* @return Whether the element is queued in an arbiter group.
* (an element can't be queued more than once)
*
*/
static inline int ucs_arbiter_elem_is_scheduled(ucs_arbiter_elem_t *elem)
{
return elem->next != NULL;
return elem->group != NULL;
}


Expand Down Expand Up @@ -381,9 +386,9 @@ ucs_arbiter_dispatch(ucs_arbiter_t *arbiter, unsigned per_group,
* @return true if element is the only one in the group
*/
static inline int
ucs_arbiter_elem_is_only(ucs_arbiter_group_t *group, ucs_arbiter_elem_t *elem)
ucs_arbiter_elem_is_only(ucs_arbiter_elem_t *elem)
{
return (group->tail == elem) && ((elem->next == elem) || (elem->next == NULL));
return elem->next == elem;
}

#endif
2 changes: 1 addition & 1 deletion src/uct/ib/dc/dc_mlx5_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ uct_dc_mlx5_iface_dci_do_dcs_pending_tx(ucs_arbiter_t *arbiter,
arb_group);
uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface,
uct_dc_mlx5_iface_t);
int is_only = ucs_arbiter_elem_is_only(group, elem);
int is_only = ucs_arbiter_elem_is_only(elem);
ucs_arbiter_cb_result_t res;

res = uct_dc_mlx5_iface_dci_do_common_pending_tx(ep, elem);
Expand Down
Loading