Skip to content

Commit

Permalink
Merge branch 'grand_dispatch_queue' of github.com:awslabs/aws-c-io in…
Browse files Browse the repository at this point in the history
…to nw_socket
  • Loading branch information
xiazhvera committed Jan 9, 2025
2 parents 0b7ba13 + cce6210 commit 535d92b
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 19 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ jobs:
fail-fast: false
matrix:
eventloop: ["kqueue", "dispatch_queue"]
sanitizers: [",thread", ",address,undefined"]
steps:
- uses: aws-actions/configure-aws-credentials@v4
with:
Expand All @@ -242,7 +243,7 @@ jobs:
run: |
python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')"
chmod a+x builder
./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_NETWORK_FRAMEWORK=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }}
./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_NETWORK_FRAMEWORK=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }} --cmake-extra=-DENABLE_SANITIZERS=ON --cmake-extra=-DSANITIZERS="${{ matrix.sanitizers }}"
macos-x64:
runs-on: macos-14-large # latest
Expand All @@ -263,6 +264,7 @@ jobs:
fail-fast: false
matrix:
eventloop: ["kqueue", "dispatch_queue"]
sanitizers: [",thread", ",address,undefined"]
steps:
- uses: aws-actions/configure-aws-credentials@v4
with:
Expand All @@ -272,7 +274,7 @@ jobs:
run: |
python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')"
chmod a+x builder
./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_NETWORK_FRAMEWORK=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }} --config Debug
./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=-DAWS_USE_APPLE_NETWORK_FRAMEWORK=${{ matrix.eventloop == 'dispatch_queue' && 'ON' || 'OFF' }} --cmake-extra=-DENABLE_SANITIZERS=ON --cmake-extra=-DSANITIZERS="${{ matrix.sanitizers }}" --config Debug
freebsd:
runs-on: ubuntu-24.04 # latest
Expand Down
90 changes: 73 additions & 17 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ struct dispatch_scheduling_state {
*/
struct aws_mutex services_lock;
/**
* List<scheduled_service_entry> in sorted order by timestamp. Each scheduled_service_entry represents a block
* ALREADY SCHEDULED on apple dispatch queue.
* priority queue of <scheduled_service_entry> in sorted order by timestamp. Each scheduled_service_entry represents
* a block ALREADY SCHEDULED on apple dispatch queue.
*
* When we go to schedule a new iteration, we check here first to see if our scheduling attempt is redundant.
*/
struct aws_linked_list scheduled_services;
struct aws_priority_queue scheduled_services;
};

/* Internal ref-counted dispatch loop context to processing Apple Dispatch Queue Resources */
Expand All @@ -118,10 +118,12 @@ struct dispatch_loop_context {
struct scheduled_service_entry {
struct aws_allocator *allocator;
uint64_t timestamp;
struct aws_linked_list_node node;
struct aws_priority_queue_node priority_queue_node;
struct dispatch_loop_context *dispatch_queue_context;
};

/** Help functions to track context ref-count */

static void *s_acquire_dispatch_loop_context(struct dispatch_loop_context *context) {
return aws_ref_count_acquire(&context->ref_count);
}
Expand All @@ -130,6 +132,7 @@ static size_t s_release_dispatch_loop_context(struct dispatch_loop_context *cont
return aws_ref_count_release(&context->ref_count);
}

/** Help functions to lock status */
static int s_rlock_dispatch_loop_context(struct dispatch_loop_context *context) {
return aws_rw_lock_rlock(&context->lock);
}
Expand Down Expand Up @@ -162,6 +165,37 @@ static int s_unlock_service_entries(struct dispatch_loop_context *context) {
return aws_mutex_unlock(&context->scheduling_state.services_lock);
}

// Not sure why use 7 as the default queue size. Just follow what we used in task_scheduler.c
static const size_t DEFAULT_QUEUE_SIZE = 7;
static int s_compare_timestamps(const void *a, const void *b) {
uint64_t a_time = (*(struct scheduled_service_entry **)a)->timestamp;
uint64_t b_time = (*(struct scheduled_service_entry **)b)->timestamp;
return a_time > b_time; /* min-heap */
}

// /** Help function to insert the service entry in the order of timestamp
// * The function should always be wrapped with lock scheduling_state.lock.
// */
// static int s_sorted_insert_service_entry(
// struct dispatch_scheduling_state *service_entry,
// struct scheduled_service_entry *entry) {

// size_t time_to_run = entry->timestamp;

// /* Perform a sorted insertion into timed_list. We didn't directly use a O(log(n))*/
// struct aws_linked_list_node *node_i;
// for (node_i = aws_linked_list_begin(&service_entry->scheduled_services);
// node_i != aws_linked_list_end(&service_entry->scheduled_services);
// node_i = aws_linked_list_next(node_i)) {

// struct scheduled_service_entry *entry_i = AWS_CONTAINER_OF(node_i, struct aws_task, node);
// if (entry_i->timestamp > time_to_run) {
// break;
// }
// }
// aws_linked_list_insert_before(node_i, &entry->node);
// }

static struct scheduled_service_entry *s_scheduled_service_entry_new(
struct dispatch_loop_context *context,
uint64_t timestamp) {
Expand All @@ -171,13 +205,19 @@ static struct scheduled_service_entry *s_scheduled_service_entry_new(
entry->allocator = context->allocator;
entry->timestamp = timestamp;
entry->dispatch_queue_context = s_acquire_dispatch_loop_context(context);
aws_priority_queue_node_init(&entry->priority_queue_node);

return entry;
}

static void s_scheduled_service_entry_destroy(struct scheduled_service_entry *entry) {
if (aws_linked_list_node_is_in_list(&entry->node)) {
aws_linked_list_remove(&entry->node);
/**
* The function should be wrapped around scheduling_status->lock
*/
static void s_scheduled_service_entry_destroy(
struct dispatch_scheduling_state scheduling_status,
struct scheduled_service_entry *entry) {
if (aws_priority_queue_node_is_in_queue(&entry->priority_queue_node)) {
aws_priority_queue_remove(&scheduling_status.scheduled_services, entry, &entry->priority_queue_node);
}
struct dispatch_loop_context *dispatch_queue_context = entry->dispatch_queue_context;
s_release_dispatch_loop_context(dispatch_queue_context);
Expand All @@ -191,16 +231,18 @@ static void s_scheduled_service_entry_destroy(struct scheduled_service_entry *en
* The function should be wrapped with the following locks:
* scheduled_services lock: To safely access the scheduled_services list
*/
static bool s_should_schedule_iteration(struct aws_linked_list *scheduled_services, uint64_t proposed_iteration_time) {
if (aws_linked_list_empty(scheduled_services)) {
static bool s_should_schedule_iteration(
struct aws_priority_queue *scheduled_services,
uint64_t proposed_iteration_time) {
if (aws_priority_queue_size(scheduled_services) == 0) {
return true;
}

struct aws_linked_list_node *head_node = aws_linked_list_front(scheduled_services);
struct scheduled_service_entry *entry = AWS_CONTAINER_OF(head_node, struct scheduled_service_entry, node);
struct scheduled_service_entry **entry = NULL;
aws_priority_queue_top(scheduled_services, (void **)&entry);

// is the next scheduled iteration later than what we require?
return entry->timestamp > proposed_iteration_time;
return (*entry)->timestamp > proposed_iteration_time;
}

/* On dispatch event loop context ref-count reaches 0 */
Expand Down Expand Up @@ -308,7 +350,16 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
aws_ref_count_init(&context->ref_count, context, s_dispatch_loop_context_destroy);
context->allocator = alloc;
aws_mutex_init(&context->scheduling_state.services_lock);
aws_linked_list_init(&context->scheduling_state.scheduled_services);

if (aws_priority_queue_init_dynamic(
&context->scheduling_state.scheduled_services,
alloc,
DEFAULT_QUEUE_SIZE,
sizeof(struct scheduled_service_entry *),
&s_compare_timestamps)) {
goto clean_up;
};

aws_rw_lock_init(&context->lock);
context->io_dispatch_loop = dispatch_loop;
dispatch_loop->context = context;
Expand Down Expand Up @@ -443,7 +494,9 @@ static void end_iteration(struct scheduled_service_entry *entry) {
dispatch_loop->synced_data.is_executing = false;

// Remove the node before do scheduling so we didnt consider the entry itself
aws_linked_list_remove(&entry->node);
s_lock_service_entries(context);
aws_priority_queue_remove(&context->scheduling_state.scheduled_services, entry, &entry->priority_queue_node);
s_unlock_service_entries(context);

bool should_schedule = false;
uint64_t should_schedule_at_time = 0;
Expand Down Expand Up @@ -508,7 +561,9 @@ static void s_run_iteration(void *context) {
end_iteration(entry);

iteration_done:
s_scheduled_service_entry_destroy(entry);
s_lock_service_entries(dispatch_queue_context);
s_scheduled_service_entry_destroy(dispatch_queue_context->scheduling_state, entry);
s_unlock_service_entries(dispatch_queue_context);
s_runlock_dispatch_loop_context(dispatch_queue_context);
}

Expand All @@ -532,7 +587,8 @@ static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_
return;
}
struct scheduled_service_entry *entry = s_scheduled_service_entry_new(dispatch_loop_context, timestamp);
aws_linked_list_push_front(&dispatch_loop_context->scheduling_state.scheduled_services, &entry->node);
aws_priority_queue_push_ref(
&dispatch_loop_context->scheduling_state.scheduled_services, entry, &entry->priority_queue_node);

uint64_t now_ns = 0;
aws_event_loop_current_clock_time(dispatch_loop->base_loop, &now_ns);
Expand All @@ -543,7 +599,7 @@ static void s_try_schedule_new_iteration(struct dispatch_loop_context *dispatch_
* unnecessarily, even if the app has shutdown. To avoid this, Ensure an iteration is scheduled within a
* 1-second interval to prevent it from remaining in the Apple dispatch queue indefinitely.
*/
delta = MIN(delta, AWS_TIMESTAMP_NANOS);
delta = aws_min_u64(delta, AWS_TIMESTAMP_NANOS);

if (delta == 0) {
// dispatch_after_f(0 , ...) is equivclient to dispatch_async_f(...) functionality wise, while
Expand Down

0 comments on commit 535d92b

Please sign in to comment.