Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Parallelize some timer work #3600

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions src/ApiService/ApiService/Functions/TimerTasks.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Azure.Functions.Worker;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace Microsoft.OneFuzz.Service.Functions;

Expand All @@ -22,35 +23,39 @@ public TimerTasks(ILogger<TimerTasks> logger, ITaskOperations taskOperations, IJ

[Function("TimerTasks")]
public async Async.Task Run([TimerTrigger("00:00:15")] TimerInfo myTimer) {
var expriredTasks = _taskOperations.SearchExpired();
await foreach (var task in expriredTasks) {
// perform up to 10 updates in parallel for each entity type
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };

var expiredTasks = _taskOperations.SearchExpired();
// marking one task stopping can also mark other tasks in the same job as failed
// this could make parallel updates stomp on each other, so don't do it in parallel
await foreach (var task in expiredTasks) {
_logger.LogInformation("stopping expired task. job_id:{JobId} task_id:{TaskId}", task.JobId, task.TaskId);
await _taskOperations.MarkStopping(task, "task is expired");
}


var expiredJobs = _jobOperations.SearchExpired();

await foreach (var job in expiredJobs) {
// job updates are all distinct and only update tasks owned by that job, can be performed in parallel
await Parallel.ForEachAsync(expiredJobs, parallelOptions, async (job, _cancel) => {
Porges marked this conversation as resolved.
Show resolved Hide resolved
_logger.LogInformation("stopping expired job. job_id:{JobId}", job.JobId);
_ = await _jobOperations.Stopping(job);
}
});

var jobs = _jobOperations.SearchState(states: JobStateHelper.NeedsWork);

await foreach (var job in jobs) {
// job updates are okay to do in parallel
await Parallel.ForEachAsync(jobs, parallelOptions, async (job, _cancel) => {
_logger.LogInformation("update job: {JobId}", job.JobId);
_ = await _jobOperations.ProcessStateUpdates(job);
}
});

var tasks = _taskOperations.SearchStates(states: TaskStateHelper.NeedsWorkStates);
// task state transitions might affect the job, so parallel updates could stomp on each other
await foreach (var task in tasks) {
_logger.LogInformation("update task: {TaskId}", task.TaskId);
_ = await _taskOperations.ProcessStateUpdate(task);
}

await _scheduler.ScheduleTasks();

await _jobOperations.StopNeverStartedJobs();
}
}
23 changes: 14 additions & 9 deletions src/ApiService/ApiService/Functions/TimerWorkers.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Azure.Functions.Worker;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace Microsoft.OneFuzz.Service.Functions;

Expand Down Expand Up @@ -43,16 +44,17 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) {
// (such as shutdown or resize) happen during this iteration `timer_worker`
// rather than the following iteration.

var pools = _poolOps.SearchStates(states: PoolStateHelper.NeedsWork);
await foreach (var pool in pools) {
// we do not expect there to be many pools that need work, process them all in parallel
var pools = await _poolOps.SearchStates(states: PoolStateHelper.NeedsWork).ToListAsync();
await Async.Task.WhenAll(pools.Select(async pool => {
try {
_log.LogInformation("updating pool: {PoolId} ({PoolName}) - state: {PoolState}", pool.PoolId, pool.Name, pool.State);
var newPool = await _poolOps.ProcessStateUpdate(pool);
_log.LogInformation("completed updating pool: {PoolId} ({PoolName}) - now in state {PoolState}", pool.PoolId, pool.Name, newPool.State);
} catch (Exception ex) {
_log.LogError(ex, "failed to process pool");
}
}
}));

// NOTE: Nodes, and Scalesets should be processed in a consistent order such
// during 'pool scale down' operations. This means that pools that are
Expand All @@ -63,26 +65,29 @@ public async Async.Task Run([TimerTrigger("00:01:30")] TimerInfo t) {
await _nodeOps.MarkOutdatedNodes();
await _nodeOps.CleanupBusyNodesWithoutWork();

// process up to 10 nodes in parallel
var nodes = _nodeOps.SearchStates(states: NodeStateHelper.NeedsWorkStates);
await foreach (var node in nodes) {
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
await Parallel.ForEachAsync(nodes, parallelOptions, async (node, _cancel) => {
try {
_log.LogInformation("updating node: {MachineId} - state: {NodeState}", node.MachineId, node.State);
var newNode = await _nodeOps.ProcessStateUpdate(node);
_log.LogInformation("completed updating node: {MachineId} - now in state {NodeState}", node.MachineId, newNode.State);
} catch (Exception ex) {
_log.LogError(ex, "failed to process node");
}
}
});

var scalesets = _scaleSetOps.SearchAll();
await foreach (var scaleset in scalesets) {
// we do not expect there to be many scalesets, process them all in parallel
var scalesets = await _scaleSetOps.SearchAll().ToListAsync();
await Async.Task.WhenAll(scalesets.Select(async scaleset => {
try {
_log.LogInformation("updating scaleset: {ScalesetId} - state: {ScalesetState}", scaleset.ScalesetId, scaleset.State);
var newScaleset = await ProcessScalesets(scaleset);
_log.LogInformation("completed updating scaleset: {ScalesetId} - now in state {ScalesetState}", scaleset.ScalesetId, newScaleset.State);
} catch (Exception ex) {
_log.LogError(ex, "failed to process scaleset");
}
}
}));
}
}
2 changes: 1 addition & 1 deletion src/ApiService/ApiService/onefuzzlib/JobOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public async Async.Task<Job> Init(Job job) {

public async Async.Task<Job> Stopping(Job job) {
job = job with { State = JobState.Stopping };
var tasks = await _context.TaskOperations.QueryAsync(Query.PartitionKey(job.JobId.ToString())).ToListAsync();
var tasks = await _context.TaskOperations.GetByJobId(job.JobId).ToListAsync();
var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped);

var notStopped = taskNotStopped[true];
Expand Down
14 changes: 9 additions & 5 deletions src/ApiService/ApiService/onefuzzlib/NodeOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,17 @@ public async Async.Task MarkOutdatedNodes() {
}

var outdated = SearchOutdated(excludeUpdateScheduled: true);
await foreach (var node in outdated) {
// update up to 10 nodes in parallel
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
await Parallel.ForEachAsync(outdated, parallelOptions, async (node, _cancel) => {
_logTracer.LogInformation("node is outdated: {MachineId} - {NodeVersion}", node.MachineId, node.Version);

if (node.Version == "1.0.0") {
_ = await ToReimage(node, done: true);
} else {
_ = await ToReimage(node);
}
}
});
}


Expand Down Expand Up @@ -352,7 +354,7 @@ IAsyncEnumerable<Node> SearchOutdated(
if (numResults is null) {
return QueryAsync(query);
} else {
return QueryAsync(query).Take(numResults.Value!);
return QueryAsync(query).Take(numResults.Value);
}
}

Expand All @@ -362,9 +364,11 @@ public async Async.Task CleanupBusyNodesWithoutWork() {
//# that hit this race condition will get cleaned up.
var nodes = _context.NodeOperations.SearchStates(states: NodeStateHelper.BusyStates);

await foreach (var node in nodes) {
// update up to 10 nodes in parallel
var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = 10 };
await Parallel.ForEachAsync(nodes, async (node, _cancel) => {
_ = await StopIfComplete(node, true);
}
});
}

public async Async.Task<Node> ToReimage(Node node, bool done = false) {
Expand Down