From 2ed92e7eaf07dde318ffa6340f6d31ebca4b3833 Mon Sep 17 00:00:00 2001 From: Jacob MacDonald Date: Tue, 21 Nov 2017 09:06:00 -0800 Subject: [PATCH] Add retry logic for failed workers (#15) Also updated changelog/pubspec for 0.1.7 release --- CHANGELOG.md | 6 ++ lib/src/driver/driver.dart | 129 +++++++++++++++++++++++++++---------- pubspec.yaml | 2 +- test/driver_test.dart | 86 +++++++++++++++++++++++-- 4 files changed, 181 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9c373a..77cd606 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.1.7 + +* Update the `BazelWorkerDriver` class to handle worker crashes, and retry work + requests. The number of retries is configurable with the new `int maxRetries` + optional arg to the `BazelWorkerDriver` constructor. + ## 0.1.6 * Update the worker_protocol.pb.dart file with the latest proto generator. diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart index 9d0bb85..78bec3b 100644 --- a/lib/src/driver/driver.dart +++ b/lib/src/driver/driver.dart @@ -17,18 +17,21 @@ typedef Future SpawnWorker(); /// This allows you to use any binary that supports the bazel worker protocol in /// the same way that bazel would, but from another dart process instead. class BazelWorkerDriver { + /// Idle worker processes. + final _idleWorkers = []; + /// The maximum number of idle workers at any given time. final int _maxIdleWorkers; + /// The maximum number of times to retry a [WorkAttempt] if there is an error. + final int _maxRetries; + /// The maximum number of concurrent workers to run at any given time. final int _maxWorkers; /// The number of currently active workers. int get _numWorkers => _readyWorkers.length + _spawningWorkers.length; - /// Idle worker processes. - final _idleWorkers = []; - /// All workers that are fully spawned and ready to handle work. final _readyWorkers = []; @@ -36,21 +39,22 @@ class BazelWorkerDriver { final _spawningWorkers = >[]; /// Work requests that haven't been started yet. - final _workQueue = new Queue(); + final _workQueue = new Queue<_WorkAttempt>(); /// Factory method that spawns a worker process. final SpawnWorker _spawnWorker; - BazelWorkerDriver(this._spawnWorker, {int maxIdleWorkers, int maxWorkers}) + BazelWorkerDriver(this._spawnWorker, + {int maxIdleWorkers, int maxWorkers, int maxRetries}) : this._maxIdleWorkers = maxIdleWorkers ?? 4, - this._maxWorkers = maxWorkers ?? 4; + this._maxWorkers = maxWorkers ?? 4, + this._maxRetries = maxRetries ?? 4; Future doWork(WorkRequest request) { - var responseCompleter = new Completer(); - _responseCompleters[request] = responseCompleter; - _workQueue.add(request); + var attempt = new _WorkAttempt(request); + _workQueue.add(attempt); _runWorkQueue(); - return responseCompleter.future; + return attempt.response; } /// Calls `kill` on all worker processes. @@ -83,9 +87,9 @@ class BazelWorkerDriver { // At this point we definitely want to run a task, we just need to decide // whether or not we need to start up a new worker. - var request = _workQueue.removeFirst(); + var attempt = _workQueue.removeFirst(); if (_idleWorkers.isNotEmpty) { - _runWorker(_idleWorkers.removeLast(), request); + _runWorker(_idleWorkers.removeLast(), attempt); } else { // No need to block here, we want to continue to synchronously drain the // work queue. @@ -96,13 +100,15 @@ class BazelWorkerDriver { _readyWorkers.add(worker); _workerConnections[worker] = new StdDriverConnection.forWorker(worker); - _runWorker(worker, request); + _runWorker(worker, attempt); // When the worker exits we should retry running the work queue in case // there is more work to be done. This is primarily just a defensive // thing but is cheap to do. - worker.exitCode.then((_) { + worker.exitCode.then((exitCode) { + _idleWorkers.remove(worker); _readyWorkers.remove(worker); + _spawningWorkers.remove(worker); _runWorkQueue(); }); }); @@ -115,39 +121,92 @@ class BazelWorkerDriver { /// /// Once the worker responds then it will be added back to the pool of idle /// workers. - Future _runWorker(Process worker, WorkRequest request) async { - try { + void _runWorker(Process worker, _WorkAttempt attempt) { + bool rescheduled = false; + + runZoned(() async { var connection = _workerConnections[worker]; - connection.writeRequest(request); + connection.writeRequest(attempt.request); var response = await connection.readResponse(); - _responseCompleters[request].complete(response); - // Do additional work if available. - _idleWorkers.add(worker); - _runWorkQueue(); - - // If the worker wasn't immediately used we might have to many idle - // workers now, kill one if necessary. - if (_idleWorkers.length > _maxIdleWorkers) { - // Note that whenever we spawn a worker we listen for its exit code - // and clean it up so we don't need to do that here. - var worker = _idleWorkers.removeLast(); - _readyWorkers.remove(worker); - worker.kill(); + // It is possible for us to complete with an error response due to an + // unhandled async error before we get here. + if (!attempt.responseCompleter.isCompleted) { + if (response == null) { + rescheduled = _tryReschedule(attempt); + if (rescheduled) return; + stderr.writeln('Failed to run request ${attempt.request}'); + response = new WorkResponse() + ..exitCode = EXIT_CODE_ERROR + ..output = + 'Invalid response from worker, this probably means it wrote ' + 'invalid output or died.'; + } + attempt.responseCompleter.complete(response); + _cleanUp(worker); } - } catch (e, s) { + }, onError: (e, s) { // Note that we don't need to do additional cleanup here on failures. If // the worker dies that is already handled in a generic fashion, we just // need to make sure we complete with a valid response. - if (!_responseCompleters[request].isCompleted) { + if (!attempt.responseCompleter.isCompleted) { + rescheduled = _tryReschedule(attempt); + if (rescheduled) return; var response = new WorkResponse() ..exitCode = EXIT_CODE_ERROR ..output = 'Error running worker:\n$e\n$s'; - _responseCompleters[request].complete(response); + attempt.responseCompleter.complete(response); + _cleanUp(worker); } + }); + } + + /// Performs post-work cleanup for [worker]. + void _cleanUp(Process worker) { + // If the worker crashes, it won't be in `_readyWorkers` any more, and + // we don't want to add it to _idleWorkers. + if (_readyWorkers.contains(worker)) { + _idleWorkers.add(worker); + } + + // Do additional work if available. + _runWorkQueue(); + + // If the worker wasn't immediately used we might have to many idle + // workers now, kill one if necessary. + if (_idleWorkers.length > _maxIdleWorkers) { + // Note that whenever we spawn a worker we listen for its exit code + // and clean it up so we don't need to do that here. + var worker = _idleWorkers.removeLast(); + _readyWorkers.remove(worker); + worker.kill(); } } + + /// Attempts to reschedule a failed [attempt]. + /// + /// Returns whether or not the job was successfully rescheduled. + bool _tryReschedule(_WorkAttempt attempt) { + if (attempt.timesRetried >= _maxRetries) return false; + stderr.writeln('Rescheduling failed request...'); + attempt.timesRetried++; + _workQueue.add(attempt); + _runWorkQueue(); + return true; + } +} + +/// Encapsulates an attempt to fulfill a [WorkRequest], a completer for the +/// [WorkResponse], and the number of times it has been retried. +class _WorkAttempt { + final WorkRequest request; + final responseCompleter = new Completer(); + + Future get response => responseCompleter.future; + + int timesRetried = 0; + + _WorkAttempt(this.request); } -final _responseCompleters = new Expando>('response'); -final _workerConnections = new Expando('connectin'); +final _workerConnections = new Expando('connection'); diff --git a/pubspec.yaml b/pubspec.yaml index f4e6514..397ee9b 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: bazel_worker -version: 0.1.6 +version: 0.1.7 description: Tools for creating a bazel persistent worker. author: Dart Team homepage: https://github.com/dart-lang/bazel_worker diff --git a/test/driver_test.dart b/test/driver_test.dart index 0191c3a..ede5f99 100644 --- a/test/driver_test.dart +++ b/test/driver_test.dart @@ -69,10 +69,61 @@ void main() { } }); + group('failing workers', () { + /// A driver which spawns [numBadWorkers] failing workers and then good + /// ones after that, and which will retry [maxRetries] times. + void createDriver({int maxRetries = 2, int numBadWorkers = 2}) { + int numSpawned = 0; + driver = new BazelWorkerDriver( + () async => new MockWorker(workerLoopFactory: (MockWorker worker) { + var connection = new StdAsyncWorkerConnection( + inputStream: worker._stdinController.stream, + outputStream: worker._stdoutController.sink); + if (numSpawned < numBadWorkers) { + numSpawned++; + return new ThrowingMockWorkerLoop( + worker, MockWorker.responseQueue, connection); + } else { + return new MockWorkerLoop(MockWorker.responseQueue, + connection: connection); + } + }), + maxRetries: maxRetries); + } + + test('should retry up to maxRetries times', () async { + createDriver(); + var expectedResponse = new WorkResponse(); + MockWorker.responseQueue.addAll([null, null, expectedResponse]); + var actualResponse = await driver.doWork(new WorkRequest()); + // The first 2 null responses are thrown away, and we should get the + // third one. + expect(actualResponse, expectedResponse); + + expect(MockWorker.deadWorkers.length, 2); + expect(MockWorker.liveWorkers.length, 1); + }); + + test('should fail if it exceeds maxRetries failures', () async { + createDriver(maxRetries: 2, numBadWorkers: 3); + MockWorker.responseQueue.addAll([null, null, new WorkResponse()]); + var actualResponse = await driver.doWork(new WorkRequest()); + // Should actually get a bad response. + expect(actualResponse.exitCode, 15); + expect( + actualResponse.output, + 'Invalid response from worker, this probably means it wrote ' + 'invalid output or died.'); + + expect(MockWorker.deadWorkers.length, 3); + }); + }); + tearDown(() async { await driver?.terminateWorkers(); expect(MockWorker.liveWorkers, isEmpty); MockWorker.deadWorkers.clear(); + MockWorker.responseQueue.clear(); }); }); } @@ -108,6 +159,27 @@ class MockWorkerLoop extends AsyncWorkerLoop { } } +/// A mock worker loop with a custom `run` function that throws. +class ThrowingMockWorkerLoop extends MockWorkerLoop { + final MockWorker _mockWorker; + + ThrowingMockWorkerLoop(this._mockWorker, Queue responseQueue, + AsyncWorkerConnection connection) + : super(responseQueue, connection: connection); + + /// Run the worker loop. The returned [Future] doesn't complete until + /// [connection#readRequest] returns `null`. + @override + Future run() async { + while (true) { + var request = await connection.readRequest(); + if (request == null) break; + await performRequest(request); + _mockWorker.kill(); + } + } +} + /// A mock worker process. /// /// Items in [responseQueue] will be returned in order based on requests. @@ -132,13 +204,15 @@ class MockWorker implements Process { static final deadWorkers = []; /// Standard constructor, creates the [_workerLoop]. - MockWorker() { + MockWorker({WorkerLoop workerLoopFactory(MockWorker mockWorker)}) { liveWorkers.add(this); - _workerLoop = new MockWorkerLoop(responseQueue, - connection: new StdAsyncWorkerConnection( - inputStream: this._stdinController.stream, - outputStream: this._stdoutController.sink)) - ..run(); + var workerLoop = workerLoopFactory != null + ? workerLoopFactory(this) + : new MockWorkerLoop(responseQueue, + connection: new StdAsyncWorkerConnection( + inputStream: this._stdinController.stream, + outputStream: this._stdoutController.sink)); + _workerLoop = workerLoop..run(); } Future get exitCode => _exitCodeCompleter.future;