diff --git a/src/ApiService/ApiService/Functions/TimerTasks.cs b/src/ApiService/ApiService/Functions/TimerTasks.cs index a2f08af277..441a70ea3b 100644 --- a/src/ApiService/ApiService/Functions/TimerTasks.cs +++ b/src/ApiService/ApiService/Functions/TimerTasks.cs @@ -1,4 +1,5 @@ -using Microsoft.Azure.Functions.Worker; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.Logging; namespace Microsoft.OneFuzz.Service.Functions; @@ -22,35 +23,39 @@ public TimerTasks(ILogger<TimerTasks> logger, ITaskOperations taskOperations, IJ [Function("TimerTasks")] public async Async.Task Run([TimerTrigger("00:00:15")] TimerInfo myTimer) { - var expriredTasks = _taskOperations.SearchExpired(); - await foreach (var task in expriredTasks) { + // perform up to 10 updates in parallel for each entity type + var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 }; + + var expiredTasks = _taskOperations.SearchExpired(); + // marking one task stopping can also mark other tasks in the same job as failed + // this could make parallel updates stomp on each other, so don't do it in parallel + await foreach (var task in expiredTasks) { _logger.LogInformation("stopping expired task. job_id:{JobId} task_id:{TaskId}", task.JobId, task.TaskId); await _taskOperations.MarkStopping(task, "task is expired"); } - var expiredJobs = _jobOperations.SearchExpired(); - - await foreach (var job in expiredJobs) { + // job updates are all distinct and only update tasks owned by that job, can be performed in parallel + await Parallel.ForEachAsync(expiredJobs, parallelOptions, async (job, _cancel) => { _logger.LogInformation("stopping expired job. job_id:{JobId}", job.JobId); _ = await _jobOperations.Stopping(job); - } + }); var jobs = _jobOperations.SearchState(states: JobStateHelper.NeedsWork); - - await foreach (var job in jobs) { + // job updates are okay to do in parallel + await Parallel.ForEachAsync(jobs, parallelOptions, async (job, _cancel) => { _logger.LogInformation("update job: {JobId}", job.JobId); _ = await _jobOperations.ProcessStateUpdates(job); - } + }); var tasks = _taskOperations.SearchStates(states: TaskStateHelper.NeedsWorkStates); + // task state transitions might affect the job, so parallel updates could stomp on each other await foreach (var task in tasks) { _logger.LogInformation("update task: {TaskId}", task.TaskId); _ = await _taskOperations.ProcessStateUpdate(task); } await _scheduler.ScheduleTasks(); - await _jobOperations.StopNeverStartedJobs(); } } diff --git a/src/ApiService/ApiService/Functions/TimerWorkers.cs b/src/ApiService/ApiService/Functions/TimerWorkers.cs index 136dbc33da..8d26dfd76e 100644 --- a/src/ApiService/ApiService/Functions/TimerWorkers.cs +++ b/src/ApiService/ApiService/Functions/TimerWorkers.cs @@ -1,4 +1,5 @@ -using Microsoft.Azure.Functions.Worker; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.Logging; namespace Microsoft.OneFuzz.Service.Functions; @@ -43,8 +44,9 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) { // (such as shutdown or resize) happen during this iteration `timer_worker` // rather than the following iteration. - var pools = _poolOps.SearchStates(states: PoolStateHelper.NeedsWork); - await foreach (var pool in pools) { + // we do not expect there to be many pools that need work, process them all in parallel + var pools = await _poolOps.SearchStates(states: PoolStateHelper.NeedsWork).ToListAsync(); + await Async.Task.WhenAll(pools.Select(async pool => { try { _log.LogInformation("updating pool: {PoolId} ({PoolName}) - state: {PoolState}", pool.PoolId, pool.Name, pool.State); var newPool = await _poolOps.ProcessStateUpdate(pool); @@ -52,7 +54,7 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) { } catch (Exception ex) { _log.LogError(ex, "failed to process pool"); } - } + })); // NOTE: Nodes, and Scalesets should be processed in a consistent order such // during 'pool scale down' operations. This means that pools that are @@ -63,8 +65,10 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) { await _nodeOps.MarkOutdatedNodes(); await _nodeOps.CleanupBusyNodesWithoutWork(); + // process up to 10 nodes in parallel var nodes = _nodeOps.SearchStates(states: NodeStateHelper.NeedsWorkStates); - await foreach (var node in nodes) { + var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 }; + await Parallel.ForEachAsync(nodes, parallelOptions, async (node, _cancel) => { try { _log.LogInformation("updating node: {MachineId} - state: {NodeState}", node.MachineId, node.State); var newNode = await _nodeOps.ProcessStateUpdate(node); @@ -72,10 +76,11 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) { } catch (Exception ex) { _log.LogError(ex, "failed to process node"); } - } + }); - var scalesets = _scaleSetOps.SearchAll(); - await foreach (var scaleset in scalesets) { + // we do not expect there to be many scalesets, process them all in parallel + var scalesets = await _scaleSetOps.SearchAll().ToListAsync(); + await Async.Task.WhenAll(scalesets.Select(async scaleset => { try { _log.LogInformation("updating scaleset: {ScalesetId} - state: {ScalesetState}", scaleset.ScalesetId, scaleset.State); var newScaleset = await ProcessScalesets(scaleset); @@ -83,6 +88,6 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) { } catch (Exception ex) { _log.LogError(ex, "failed to process scaleset"); } - } + })); } } diff --git a/src/ApiService/ApiService/onefuzzlib/JobOperations.cs b/src/ApiService/ApiService/onefuzzlib/JobOperations.cs index 781df97943..1c4ca17601 100644 --- a/src/ApiService/ApiService/onefuzzlib/JobOperations.cs +++ b/src/ApiService/ApiService/onefuzzlib/JobOperations.cs @@ -105,7 +105,7 @@ public async Async.Task<Job> Init(Job job) { public async Async.Task<Job> Stopping(Job job) { job = job with { State = JobState.Stopping }; - var tasks = await _context.TaskOperations.QueryAsync(Query.PartitionKey(job.JobId.ToString())).ToListAsync(); + var tasks = await _context.TaskOperations.GetByJobId(job.JobId).ToListAsync(); var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped); var notStopped = taskNotStopped[true]; diff --git a/src/ApiService/ApiService/onefuzzlib/NodeOperations.cs b/src/ApiService/ApiService/onefuzzlib/NodeOperations.cs index 83d06a5df1..f798934de8 100644 --- a/src/ApiService/ApiService/onefuzzlib/NodeOperations.cs +++ b/src/ApiService/ApiService/onefuzzlib/NodeOperations.cs @@ -287,7 +287,9 @@ public async Async.Task MarkOutdatedNodes() { } var outdated = SearchOutdated(excludeUpdateScheduled: true); - await foreach (var node in outdated) { + // update up to 10 nodes in parallel + var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 }; + await Parallel.ForEachAsync(outdated, parallelOptions, async (node, _cancel) => { _logTracer.LogInformation("node is outdated: {MachineId} - {NodeVersion}", node.MachineId, node.Version); if (node.Version == "1.0.0") { @@ -295,7 +297,7 @@ public async Async.Task MarkOutdatedNodes() { } else { _ = await ToReimage(node); } - } + }); } @@ -352,7 +354,7 @@ IAsyncEnumerable<Node> SearchOutdated( if (numResults is null) { return QueryAsync(query); } else { - return QueryAsync(query).Take(numResults.Value!); + return QueryAsync(query).Take(numResults.Value); } } @@ -362,9 +364,11 @@ public async Async.Task CleanupBusyNodesWithoutWork() { //# that hit this race condition will get cleaned up. var nodes = _context.NodeOperations.SearchStates(states: NodeStateHelper.BusyStates); - await foreach (var node in nodes) { + // update up to 10 nodes in parallel + var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 }; + await Parallel.ForEachAsync(nodes, async (node, _cancel) => { _ = await StopIfComplete(node, true); - } + }); } public async Async.Task<Node> ToReimage(Node node, bool done = false) {