diff --git a/CHANGELOG.md b/CHANGELOG.md index 77cd606..b8b0031 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 0.1.8 + +* Add `Future cancel()` method to `DriverConnection`, which in the case of a + `StdDriverConnection` closes the input stream. + * The `terminateWorkers` method on `BazelWorkerDriver` now calls `cancel` on + all worker connections to ensure the vm can exit correctly. + ## 0.1.7 * Update the `BazelWorkerDriver` class to handle worker crashes, and retry work diff --git a/lib/src/async_message_grouper.dart b/lib/src/async_message_grouper.dart index 93caf49..3044bbe 100644 --- a/lib/src/async_message_grouper.dart +++ b/lib/src/async_message_grouper.dart @@ -19,6 +19,9 @@ class AsyncMessageGrouper implements MessageGrouper { /// The input stream. final StreamQueue> _inputQueue; + // Whether or not the input queue has already been cancelled. + bool _inputQueueCancelled = false; + /// The current buffer. final Queue _buffer = new Queue(); @@ -38,7 +41,7 @@ class AsyncMessageGrouper implements MessageGrouper { } // If there is nothing left in the queue then cancel the subscription. - if (message == null) _inputQueue.cancel(); + if (message == null) _cancel(); return message; } catch (e) { @@ -49,6 +52,14 @@ class AsyncMessageGrouper implements MessageGrouper { } } + Future _cancel() { + if (!_inputQueueCancelled) { + _inputQueueCancelled = true; + return _inputQueue.cancel(); + } + return new Future.value(null); + } + /// Stop listening to the stream for further updates. - Future cancel() => _inputQueue.cancel(); + Future cancel() => _cancel(); } diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart index 78bec3b..80d82a9 100644 --- a/lib/src/driver/driver.dart +++ b/lib/src/driver/driver.dart @@ -59,11 +59,11 @@ class BazelWorkerDriver { /// Calls `kill` on all worker processes. Future terminateWorkers() async { - for (var worker in _readyWorkers) { - worker.kill(); + for (var worker in _readyWorkers.toList()) { + _killWorker(worker); } await Future.wait(_spawningWorkers.map((worker) async { - (await worker).kill(); + _killWorker(await worker); })); } @@ -108,7 +108,6 @@ class BazelWorkerDriver { worker.exitCode.then((exitCode) { _idleWorkers.remove(worker); _readyWorkers.remove(worker); - _spawningWorkers.remove(worker); _runWorkQueue(); }); }); @@ -178,8 +177,7 @@ class BazelWorkerDriver { // Note that whenever we spawn a worker we listen for its exit code // and clean it up so we don't need to do that here. var worker = _idleWorkers.removeLast(); - _readyWorkers.remove(worker); - worker.kill(); + _killWorker(worker); } } @@ -194,6 +192,13 @@ class BazelWorkerDriver { _runWorkQueue(); return true; } + + void _killWorker(Process worker) { + _workerConnections[worker].cancel(); + _readyWorkers.remove(worker); + _idleWorkers.remove(worker); + worker.kill(); + } } /// Encapsulates an attempt to fulfill a [WorkRequest], a completer for the diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart index b170586..29a00a1 100644 --- a/lib/src/driver/driver_connection.dart +++ b/lib/src/driver/driver_connection.dart @@ -20,6 +20,8 @@ abstract class DriverConnection { Future readResponse(); void writeRequest(WorkRequest request); + + Future cancel(); } /// Default implementation of [DriverConnection] that works with [Stdin] @@ -72,4 +74,7 @@ class StdDriverConnection implements DriverConnection { void writeRequest(WorkRequest request) { _outputStream.add(protoToDelimitedBuffer(request)); } + + @override + Future cancel() => _messageGrouper.cancel(); } diff --git a/pubspec.yaml b/pubspec.yaml index 397ee9b..d2007e5 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: bazel_worker -version: 0.1.7 +version: 0.1.8 description: Tools for creating a bazel persistent worker. author: Dart Team homepage: https://github.com/dart-lang/bazel_worker