From 2531692235af1bd17c784e1ec949367b5a87fa7a Mon Sep 17 00:00:00 2001
From: George Pollard <gpollard@microsoft.com>
Date: Tue, 24 Oct 2023 23:56:54 +0000
Subject: [PATCH 1/2] Parallelize some timer work

---
 .../ApiService/Functions/TimerTasks.cs        | 28 +++++++++----------
 .../ApiService/Functions/TimerWorkers.cs      | 23 +++++++++------
 .../ApiService/onefuzzlib/NodeOperations.cs   | 14 ++++++----
 3 files changed, 37 insertions(+), 28 deletions(-)

diff --git a/src/ApiService/ApiService/Functions/TimerTasks.cs b/src/ApiService/ApiService/Functions/TimerTasks.cs
index a2f08af277..f63a82f071 100644
--- a/src/ApiService/ApiService/Functions/TimerTasks.cs
+++ b/src/ApiService/ApiService/Functions/TimerTasks.cs
@@ -1,4 +1,5 @@
-using Microsoft.Azure.Functions.Worker;
+using System.Threading.Tasks;
+using Microsoft.Azure.Functions.Worker;
 using Microsoft.Extensions.Logging;
 namespace Microsoft.OneFuzz.Service.Functions;
 
@@ -22,35 +23,34 @@ public TimerTasks(ILogger<TimerTasks> logger, ITaskOperations taskOperations, IJ
 
     [Function("TimerTasks")]
     public async Async.Task Run([TimerTrigger("00:00:15")] TimerInfo myTimer) {
-        var expriredTasks = _taskOperations.SearchExpired();
-        await foreach (var task in expriredTasks) {
+        // perform up to 10 updates in parallel for each entity type
+        var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
+
+        var expiredTasks = _taskOperations.SearchExpired();
+        await Parallel.ForEachAsync(expiredTasks, parallelOptions, async (task, _cancel) => {
             _logger.LogInformation("stopping expired task. job_id:{JobId} task_id:{TaskId}", task.JobId, task.TaskId);
             await _taskOperations.MarkStopping(task, "task is expired");
-        }
-
+        });
 
         var expiredJobs = _jobOperations.SearchExpired();
-
-        await foreach (var job in expiredJobs) {
+        await Parallel.ForEachAsync(expiredJobs, parallelOptions, async (job, _cancel) => {
             _logger.LogInformation("stopping expired job. job_id:{JobId}", job.JobId);
             _ = await _jobOperations.Stopping(job);
-        }
+        });
 
         var jobs = _jobOperations.SearchState(states: JobStateHelper.NeedsWork);
-
-        await foreach (var job in jobs) {
+        await Parallel.ForEachAsync(jobs, parallelOptions, async (job, _cancel) => {
             _logger.LogInformation("update job: {JobId}", job.JobId);
             _ = await _jobOperations.ProcessStateUpdates(job);
-        }
+        });
 
         var tasks = _taskOperations.SearchStates(states: TaskStateHelper.NeedsWorkStates);
-        await foreach (var task in tasks) {
+        await Parallel.ForEachAsync(tasks, parallelOptions, async (task, _cancel) => {
             _logger.LogInformation("update task: {TaskId}", task.TaskId);
             _ = await _taskOperations.ProcessStateUpdate(task);
-        }
+        });
 
         await _scheduler.ScheduleTasks();
-
         await _jobOperations.StopNeverStartedJobs();
     }
 }
diff --git a/src/ApiService/ApiService/Functions/TimerWorkers.cs b/src/ApiService/ApiService/Functions/TimerWorkers.cs
index 136dbc33da..8d26dfd76e 100644
--- a/src/ApiService/ApiService/Functions/TimerWorkers.cs
+++ b/src/ApiService/ApiService/Functions/TimerWorkers.cs
@@ -1,4 +1,5 @@
-using Microsoft.Azure.Functions.Worker;
+using System.Threading.Tasks;
+using Microsoft.Azure.Functions.Worker;
 using Microsoft.Extensions.Logging;
 namespace Microsoft.OneFuzz.Service.Functions;
 
@@ -43,8 +44,9 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) {
         // (such as shutdown or resize) happen during this iteration `timer_worker`
         // rather than the following iteration.
 
-        var pools = _poolOps.SearchStates(states: PoolStateHelper.NeedsWork);
-        await foreach (var pool in pools) {
+        // we do not expect there to be many pools that need work, process them all in parallel
+        var pools = await _poolOps.SearchStates(states: PoolStateHelper.NeedsWork).ToListAsync();
+        await Async.Task.WhenAll(pools.Select(async pool => {
             try {
                 _log.LogInformation("updating pool: {PoolId} ({PoolName}) - state: {PoolState}", pool.PoolId, pool.Name, pool.State);
                 var newPool = await _poolOps.ProcessStateUpdate(pool);
@@ -52,7 +54,7 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) {
             } catch (Exception ex) {
                 _log.LogError(ex, "failed to process pool");
             }
-        }
+        }));
 
         // NOTE: Nodes, and Scalesets should be processed in a consistent order such
         // during 'pool scale down' operations. This means that pools that are
@@ -63,8 +65,10 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) {
         await _nodeOps.MarkOutdatedNodes();
         await _nodeOps.CleanupBusyNodesWithoutWork();
 
+        // process up to 10 nodes in parallel
         var nodes = _nodeOps.SearchStates(states: NodeStateHelper.NeedsWorkStates);
-        await foreach (var node in nodes) {
+        var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
+        await Parallel.ForEachAsync(nodes, parallelOptions, async (node, _cancel) => {
             try {
                 _log.LogInformation("updating node: {MachineId} - state: {NodeState}", node.MachineId, node.State);
                 var newNode = await _nodeOps.ProcessStateUpdate(node);
@@ -72,10 +76,11 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) {
             } catch (Exception ex) {
                 _log.LogError(ex, "failed to process node");
             }
-        }
+        });
 
-        var scalesets = _scaleSetOps.SearchAll();
-        await foreach (var scaleset in scalesets) {
+        // we do not expect there to be many scalesets, process them all in parallel
+        var scalesets = await _scaleSetOps.SearchAll().ToListAsync();
+        await Async.Task.WhenAll(scalesets.Select(async scaleset => {
             try {
                 _log.LogInformation("updating scaleset: {ScalesetId} - state: {ScalesetState}", scaleset.ScalesetId, scaleset.State);
                 var newScaleset = await ProcessScalesets(scaleset);
@@ -83,6 +88,6 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) {
             } catch (Exception ex) {
                 _log.LogError(ex, "failed to process scaleset");
             }
-        }
+        }));
     }
 }
diff --git a/src/ApiService/ApiService/onefuzzlib/NodeOperations.cs b/src/ApiService/ApiService/onefuzzlib/NodeOperations.cs
index 83d06a5df1..f798934de8 100644
--- a/src/ApiService/ApiService/onefuzzlib/NodeOperations.cs
+++ b/src/ApiService/ApiService/onefuzzlib/NodeOperations.cs
@@ -287,7 +287,9 @@ public async Async.Task MarkOutdatedNodes() {
         }
 
         var outdated = SearchOutdated(excludeUpdateScheduled: true);
-        await foreach (var node in outdated) {
+        // update up to 10 nodes in parallel
+        var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
+        await Parallel.ForEachAsync(outdated, parallelOptions, async (node, _cancel) => {
             _logTracer.LogInformation("node is outdated: {MachineId} - {NodeVersion}", node.MachineId, node.Version);
 
             if (node.Version == "1.0.0") {
@@ -295,7 +297,7 @@ public async Async.Task MarkOutdatedNodes() {
             } else {
                 _ = await ToReimage(node);
             }
-        }
+        });
     }
 
 
@@ -352,7 +354,7 @@ IAsyncEnumerable<Node> SearchOutdated(
         if (numResults is null) {
             return QueryAsync(query);
         } else {
-            return QueryAsync(query).Take(numResults.Value!);
+            return QueryAsync(query).Take(numResults.Value);
         }
     }
 
@@ -362,9 +364,11 @@ public async Async.Task CleanupBusyNodesWithoutWork() {
         //# that hit this race condition will get cleaned up.
         var nodes = _context.NodeOperations.SearchStates(states: NodeStateHelper.BusyStates);
 
-        await foreach (var node in nodes) {
+        // update up to 10 nodes in parallel
+        var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
+        await Parallel.ForEachAsync(nodes, async (node, _cancel) => {
             _ = await StopIfComplete(node, true);
-        }
+        });
     }
 
     public async Async.Task<Node> ToReimage(Node node, bool done = false) {

From 54a06802b9091c47d1e3407c4ed4425864222a88 Mon Sep 17 00:00:00 2001
From: George Pollard <gpollard@microsoft.com>
Date: Wed, 25 Oct 2023 00:04:22 +0000
Subject: [PATCH 2/2] Undo some parallelism

---
 src/ApiService/ApiService/Functions/TimerTasks.cs   | 13 +++++++++----
 .../ApiService/onefuzzlib/JobOperations.cs          |  2 +-
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/src/ApiService/ApiService/Functions/TimerTasks.cs b/src/ApiService/ApiService/Functions/TimerTasks.cs
index f63a82f071..441a70ea3b 100644
--- a/src/ApiService/ApiService/Functions/TimerTasks.cs
+++ b/src/ApiService/ApiService/Functions/TimerTasks.cs
@@ -27,28 +27,33 @@ public async Async.Task Run([TimerTrigger("00:00:15")] TimerInfo myTimer) {
         var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
 
         var expiredTasks = _taskOperations.SearchExpired();
-        await Parallel.ForEachAsync(expiredTasks, parallelOptions, async (task, _cancel) => {
+        // marking one task stopping can also mark other tasks in the same job as failed
+        // this could make parallel updates stomp on each other, so don't do it in parallel
+        await foreach (var task in expiredTasks) {
             _logger.LogInformation("stopping expired task. job_id:{JobId} task_id:{TaskId}", task.JobId, task.TaskId);
             await _taskOperations.MarkStopping(task, "task is expired");
-        });
+        }
 
         var expiredJobs = _jobOperations.SearchExpired();
+        // job updates are all distinct and only update tasks owned by that job, can be performed in parallel
         await Parallel.ForEachAsync(expiredJobs, parallelOptions, async (job, _cancel) => {
             _logger.LogInformation("stopping expired job. job_id:{JobId}", job.JobId);
             _ = await _jobOperations.Stopping(job);
         });
 
         var jobs = _jobOperations.SearchState(states: JobStateHelper.NeedsWork);
+        // job updates are okay to do in parallel
         await Parallel.ForEachAsync(jobs, parallelOptions, async (job, _cancel) => {
             _logger.LogInformation("update job: {JobId}", job.JobId);
             _ = await _jobOperations.ProcessStateUpdates(job);
         });
 
         var tasks = _taskOperations.SearchStates(states: TaskStateHelper.NeedsWorkStates);
-        await Parallel.ForEachAsync(tasks, parallelOptions, async (task, _cancel) => {
+        // task state transitions might affect the job, so parallel updates could stomp on each other
+        await foreach (var task in tasks) {
             _logger.LogInformation("update task: {TaskId}", task.TaskId);
             _ = await _taskOperations.ProcessStateUpdate(task);
-        });
+        }
 
         await _scheduler.ScheduleTasks();
         await _jobOperations.StopNeverStartedJobs();
diff --git a/src/ApiService/ApiService/onefuzzlib/JobOperations.cs b/src/ApiService/ApiService/onefuzzlib/JobOperations.cs
index 781df97943..1c4ca17601 100644
--- a/src/ApiService/ApiService/onefuzzlib/JobOperations.cs
+++ b/src/ApiService/ApiService/onefuzzlib/JobOperations.cs
@@ -105,7 +105,7 @@ public async Async.Task<Job> Init(Job job) {
 
     public async Async.Task<Job> Stopping(Job job) {
         job = job with { State = JobState.Stopping };
-        var tasks = await _context.TaskOperations.QueryAsync(Query.PartitionKey(job.JobId.ToString())).ToListAsync();
+        var tasks = await _context.TaskOperations.GetByJobId(job.JobId).ToListAsync();
         var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped);
 
         var notStopped = taskNotStopped[true];