Skip to content
This repository has been archived by the owner on Nov 1, 2024. It is now read-only.

Commit

Permalink
Add cancel method to BazelWorkerConnection and use it (#16)
Browse files Browse the repository at this point in the history
Also update pubspec/changelog for release
  • Loading branch information
jakemac53 authored Nov 27, 2017
1 parent 2ed92e7 commit 60186c5
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 9 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 0.1.8

* Add `Future cancel()` method to `DriverConnection`, which in the case of a
`StdDriverConnection` closes the input stream.
* The `terminateWorkers` method on `BazelWorkerDriver` now calls `cancel` on
all worker connections to ensure the vm can exit correctly.

## 0.1.7

* Update the `BazelWorkerDriver` class to handle worker crashes, and retry work
Expand Down
15 changes: 13 additions & 2 deletions lib/src/async_message_grouper.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ class AsyncMessageGrouper implements MessageGrouper {
/// The input stream.
final StreamQueue<List<int>> _inputQueue;

// Whether or not the input queue has already been cancelled.
bool _inputQueueCancelled = false;

/// The current buffer.
final Queue<int> _buffer = new Queue<int>();

Expand All @@ -38,7 +41,7 @@ class AsyncMessageGrouper implements MessageGrouper {
}

// If there is nothing left in the queue then cancel the subscription.
if (message == null) _inputQueue.cancel();
if (message == null) _cancel();

return message;
} catch (e) {
Expand All @@ -49,6 +52,14 @@ class AsyncMessageGrouper implements MessageGrouper {
}
}

Future _cancel() {
if (!_inputQueueCancelled) {
_inputQueueCancelled = true;
return _inputQueue.cancel();
}
return new Future.value(null);
}

/// Stop listening to the stream for further updates.
Future cancel() => _inputQueue.cancel();
Future cancel() => _cancel();
}
17 changes: 11 additions & 6 deletions lib/src/driver/driver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ class BazelWorkerDriver {

/// Calls `kill` on all worker processes.
Future terminateWorkers() async {
for (var worker in _readyWorkers) {
worker.kill();
for (var worker in _readyWorkers.toList()) {
_killWorker(worker);
}
await Future.wait(_spawningWorkers.map((worker) async {
(await worker).kill();
_killWorker(await worker);
}));
}

Expand Down Expand Up @@ -108,7 +108,6 @@ class BazelWorkerDriver {
worker.exitCode.then((exitCode) {
_idleWorkers.remove(worker);
_readyWorkers.remove(worker);
_spawningWorkers.remove(worker);
_runWorkQueue();
});
});
Expand Down Expand Up @@ -178,8 +177,7 @@ class BazelWorkerDriver {
// Note that whenever we spawn a worker we listen for its exit code
// and clean it up so we don't need to do that here.
var worker = _idleWorkers.removeLast();
_readyWorkers.remove(worker);
worker.kill();
_killWorker(worker);
}
}

Expand All @@ -194,6 +192,13 @@ class BazelWorkerDriver {
_runWorkQueue();
return true;
}

void _killWorker(Process worker) {
_workerConnections[worker].cancel();
_readyWorkers.remove(worker);
_idleWorkers.remove(worker);
worker.kill();
}
}

/// Encapsulates an attempt to fulfill a [WorkRequest], a completer for the
Expand Down
5 changes: 5 additions & 0 deletions lib/src/driver/driver_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ abstract class DriverConnection {
Future<WorkResponse> readResponse();

void writeRequest(WorkRequest request);

Future cancel();
}

/// Default implementation of [DriverConnection] that works with [Stdin]
Expand Down Expand Up @@ -72,4 +74,7 @@ class StdDriverConnection implements DriverConnection {
void writeRequest(WorkRequest request) {
_outputStream.add(protoToDelimitedBuffer(request));
}

@override
Future cancel() => _messageGrouper.cancel();
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: bazel_worker
version: 0.1.7
version: 0.1.8
description: Tools for creating a bazel persistent worker.
author: Dart Team <[email protected]>
homepage: https://github.com/dart-lang/bazel_worker
Expand Down

0 comments on commit 60186c5

Please sign in to comment.