Skip to content
This repository has been archived by the owner on Nov 1, 2024. It is now read-only.

Commit

Permalink
Add retry logic for failed workers (#15)
Browse files Browse the repository at this point in the history
Also updated changelog/pubspec for 0.1.7 release
  • Loading branch information
jakemac53 authored Nov 21, 2017
1 parent 088e91d commit 2ed92e7
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 42 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.1.7

* Update the `BazelWorkerDriver` class to handle worker crashes, and retry work
requests. The number of retries is configurable with the new `int maxRetries`
optional arg to the `BazelWorkerDriver` constructor.

## 0.1.6

* Update the worker_protocol.pb.dart file with the latest proto generator.
Expand Down
129 changes: 94 additions & 35 deletions lib/src/driver/driver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,44 @@ typedef Future<Process> SpawnWorker();
/// This allows you to use any binary that supports the bazel worker protocol in
/// the same way that bazel would, but from another dart process instead.
class BazelWorkerDriver {
/// Idle worker processes.
final _idleWorkers = <Process>[];

/// The maximum number of idle workers at any given time.
final int _maxIdleWorkers;

/// The maximum number of times to retry a [WorkAttempt] if there is an error.
final int _maxRetries;

/// The maximum number of concurrent workers to run at any given time.
final int _maxWorkers;

/// The number of currently active workers.
int get _numWorkers => _readyWorkers.length + _spawningWorkers.length;

/// Idle worker processes.
final _idleWorkers = <Process>[];

/// All workers that are fully spawned and ready to handle work.
final _readyWorkers = <Process>[];

/// All workers that are in the process of being spawned.
final _spawningWorkers = <Future<Process>>[];

/// Work requests that haven't been started yet.
final _workQueue = new Queue<WorkRequest>();
final _workQueue = new Queue<_WorkAttempt>();

/// Factory method that spawns a worker process.
final SpawnWorker _spawnWorker;

BazelWorkerDriver(this._spawnWorker, {int maxIdleWorkers, int maxWorkers})
BazelWorkerDriver(this._spawnWorker,
{int maxIdleWorkers, int maxWorkers, int maxRetries})
: this._maxIdleWorkers = maxIdleWorkers ?? 4,
this._maxWorkers = maxWorkers ?? 4;
this._maxWorkers = maxWorkers ?? 4,
this._maxRetries = maxRetries ?? 4;

Future<WorkResponse> doWork(WorkRequest request) {
var responseCompleter = new Completer<WorkResponse>();
_responseCompleters[request] = responseCompleter;
_workQueue.add(request);
var attempt = new _WorkAttempt(request);
_workQueue.add(attempt);
_runWorkQueue();
return responseCompleter.future;
return attempt.response;
}

/// Calls `kill` on all worker processes.
Expand Down Expand Up @@ -83,9 +87,9 @@ class BazelWorkerDriver {

// At this point we definitely want to run a task, we just need to decide
// whether or not we need to start up a new worker.
var request = _workQueue.removeFirst();
var attempt = _workQueue.removeFirst();
if (_idleWorkers.isNotEmpty) {
_runWorker(_idleWorkers.removeLast(), request);
_runWorker(_idleWorkers.removeLast(), attempt);
} else {
// No need to block here, we want to continue to synchronously drain the
// work queue.
Expand All @@ -96,13 +100,15 @@ class BazelWorkerDriver {
_readyWorkers.add(worker);

_workerConnections[worker] = new StdDriverConnection.forWorker(worker);
_runWorker(worker, request);
_runWorker(worker, attempt);

// When the worker exits we should retry running the work queue in case
// there is more work to be done. This is primarily just a defensive
// thing but is cheap to do.
worker.exitCode.then((_) {
worker.exitCode.then((exitCode) {
_idleWorkers.remove(worker);
_readyWorkers.remove(worker);
_spawningWorkers.remove(worker);
_runWorkQueue();
});
});
Expand All @@ -115,39 +121,92 @@ class BazelWorkerDriver {
///
/// Once the worker responds then it will be added back to the pool of idle
/// workers.
Future _runWorker(Process worker, WorkRequest request) async {
try {
void _runWorker(Process worker, _WorkAttempt attempt) {
bool rescheduled = false;

runZoned(() async {
var connection = _workerConnections[worker];
connection.writeRequest(request);
connection.writeRequest(attempt.request);
var response = await connection.readResponse();
_responseCompleters[request].complete(response);

// Do additional work if available.
_idleWorkers.add(worker);
_runWorkQueue();

// If the worker wasn't immediately used we might have to many idle
// workers now, kill one if necessary.
if (_idleWorkers.length > _maxIdleWorkers) {
// Note that whenever we spawn a worker we listen for its exit code
// and clean it up so we don't need to do that here.
var worker = _idleWorkers.removeLast();
_readyWorkers.remove(worker);
worker.kill();
// It is possible for us to complete with an error response due to an
// unhandled async error before we get here.
if (!attempt.responseCompleter.isCompleted) {
if (response == null) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
stderr.writeln('Failed to run request ${attempt.request}');
response = new WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output =
'Invalid response from worker, this probably means it wrote '
'invalid output or died.';
}
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
} catch (e, s) {
}, onError: (e, s) {
// Note that we don't need to do additional cleanup here on failures. If
// the worker dies that is already handled in a generic fashion, we just
// need to make sure we complete with a valid response.
if (!_responseCompleters[request].isCompleted) {
if (!attempt.responseCompleter.isCompleted) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
var response = new WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = 'Error running worker:\n$e\n$s';
_responseCompleters[request].complete(response);
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
});
}

/// Performs post-work cleanup for [worker].
void _cleanUp(Process worker) {
// If the worker crashes, it won't be in `_readyWorkers` any more, and
// we don't want to add it to _idleWorkers.
if (_readyWorkers.contains(worker)) {
_idleWorkers.add(worker);
}

// Do additional work if available.
_runWorkQueue();

// If the worker wasn't immediately used we might have to many idle
// workers now, kill one if necessary.
if (_idleWorkers.length > _maxIdleWorkers) {
// Note that whenever we spawn a worker we listen for its exit code
// and clean it up so we don't need to do that here.
var worker = _idleWorkers.removeLast();
_readyWorkers.remove(worker);
worker.kill();
}
}

/// Attempts to reschedule a failed [attempt].
///
/// Returns whether or not the job was successfully rescheduled.
bool _tryReschedule(_WorkAttempt attempt) {
if (attempt.timesRetried >= _maxRetries) return false;
stderr.writeln('Rescheduling failed request...');
attempt.timesRetried++;
_workQueue.add(attempt);
_runWorkQueue();
return true;
}
}

/// Encapsulates an attempt to fulfill a [WorkRequest], a completer for the
/// [WorkResponse], and the number of times it has been retried.
class _WorkAttempt {
final WorkRequest request;
final responseCompleter = new Completer<WorkResponse>();

Future<WorkResponse> get response => responseCompleter.future;

int timesRetried = 0;

_WorkAttempt(this.request);
}

final _responseCompleters = new Expando<Completer<WorkResponse>>('response');
final _workerConnections = new Expando<DriverConnection>('connectin');
final _workerConnections = new Expando<DriverConnection>('connection');
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: bazel_worker
version: 0.1.6
version: 0.1.7
description: Tools for creating a bazel persistent worker.
author: Dart Team <[email protected]>
homepage: https://github.com/dart-lang/bazel_worker
Expand Down
86 changes: 80 additions & 6 deletions test/driver_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,61 @@ void main() {
}
});

group('failing workers', () {
/// A driver which spawns [numBadWorkers] failing workers and then good
/// ones after that, and which will retry [maxRetries] times.
void createDriver({int maxRetries = 2, int numBadWorkers = 2}) {
int numSpawned = 0;
driver = new BazelWorkerDriver(
() async => new MockWorker(workerLoopFactory: (MockWorker worker) {
var connection = new StdAsyncWorkerConnection(
inputStream: worker._stdinController.stream,
outputStream: worker._stdoutController.sink);
if (numSpawned < numBadWorkers) {
numSpawned++;
return new ThrowingMockWorkerLoop(
worker, MockWorker.responseQueue, connection);
} else {
return new MockWorkerLoop(MockWorker.responseQueue,
connection: connection);
}
}),
maxRetries: maxRetries);
}

test('should retry up to maxRetries times', () async {
createDriver();
var expectedResponse = new WorkResponse();
MockWorker.responseQueue.addAll([null, null, expectedResponse]);
var actualResponse = await driver.doWork(new WorkRequest());
// The first 2 null responses are thrown away, and we should get the
// third one.
expect(actualResponse, expectedResponse);

expect(MockWorker.deadWorkers.length, 2);
expect(MockWorker.liveWorkers.length, 1);
});

test('should fail if it exceeds maxRetries failures', () async {
createDriver(maxRetries: 2, numBadWorkers: 3);
MockWorker.responseQueue.addAll([null, null, new WorkResponse()]);
var actualResponse = await driver.doWork(new WorkRequest());
// Should actually get a bad response.
expect(actualResponse.exitCode, 15);
expect(
actualResponse.output,
'Invalid response from worker, this probably means it wrote '
'invalid output or died.');

expect(MockWorker.deadWorkers.length, 3);
});
});

tearDown(() async {
await driver?.terminateWorkers();
expect(MockWorker.liveWorkers, isEmpty);
MockWorker.deadWorkers.clear();
MockWorker.responseQueue.clear();
});
});
}
Expand Down Expand Up @@ -108,6 +159,27 @@ class MockWorkerLoop extends AsyncWorkerLoop {
}
}

/// A mock worker loop with a custom `run` function that throws.
class ThrowingMockWorkerLoop extends MockWorkerLoop {
final MockWorker _mockWorker;

ThrowingMockWorkerLoop(this._mockWorker, Queue<WorkResponse> responseQueue,
AsyncWorkerConnection connection)
: super(responseQueue, connection: connection);

/// Run the worker loop. The returned [Future] doesn't complete until
/// [connection#readRequest] returns `null`.
@override
Future run() async {
while (true) {
var request = await connection.readRequest();
if (request == null) break;
await performRequest(request);
_mockWorker.kill();
}
}
}

/// A mock worker process.
///
/// Items in [responseQueue] will be returned in order based on requests.
Expand All @@ -132,13 +204,15 @@ class MockWorker implements Process {
static final deadWorkers = <MockWorker>[];

/// Standard constructor, creates the [_workerLoop].
MockWorker() {
MockWorker({WorkerLoop workerLoopFactory(MockWorker mockWorker)}) {
liveWorkers.add(this);
_workerLoop = new MockWorkerLoop(responseQueue,
connection: new StdAsyncWorkerConnection(
inputStream: this._stdinController.stream,
outputStream: this._stdoutController.sink))
..run();
var workerLoop = workerLoopFactory != null
? workerLoopFactory(this)
: new MockWorkerLoop(responseQueue,
connection: new StdAsyncWorkerConnection(
inputStream: this._stdinController.stream,
outputStream: this._stdoutController.sink));
_workerLoop = workerLoop..run();
}

Future<int> get exitCode => _exitCodeCompleter.future;
Expand Down

0 comments on commit 2ed92e7

Please sign in to comment.