Skip to content
This repository has been archived by the owner on Nov 1, 2024. It is now read-only.

Commit

Permalink
Regenerate worker protocol protos to add constructors and comments (#78)
Browse files Browse the repository at this point in the history
Regenerate with `package:protoc_plugin` v21.1.1 to add doc comments and constructors with named parameters to the generated proto messages.
  • Loading branch information
parlough authored Aug 31, 2023
1 parent 9b4c6a0 commit f950bbf
Show file tree
Hide file tree
Showing 17 changed files with 173 additions and 52 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 1.1.0

* Add constructors with named parameters to
the generated worker protocol messages.
* Include comments on the generated worker protocol API.

## 1.0.3

* Require `package:protobuf` >= 3.0.0.
Expand Down
7 changes: 4 additions & 3 deletions benchmark/benchmark.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ Future<void> main() async {
var path = 'blaze-bin/some/path/to/a/file/that/is/an/input/$i';
workRequest
..arguments.add('--input=$path')
..inputs.add(Input()
..path = ''
..digest.addAll(List.filled(70, 0x11)));
..inputs.add(Input(
path: '',
digest: List.filled(70, 0x11),
));
}

// Serialize it.
Expand Down
7 changes: 4 additions & 3 deletions e2e_test/lib/async_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ class ExampleAsyncWorker extends AsyncWorkerLoop {

@override
Future<WorkResponse> performRequest(WorkRequest request) async {
return WorkResponse()
..exitCode = 0
..output = request.arguments.join('\n');
return WorkResponse(
exitCode: 0,
output: request.arguments.join('\n'),
);
}
}
4 changes: 1 addition & 3 deletions e2e_test/lib/sync_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import 'package:bazel_worker/bazel_worker.dart';
class ExampleSyncWorker extends SyncWorkerLoop {
@override
WorkResponse performRequest(WorkRequest request) {
return WorkResponse()
..exitCode = 0
..output = request.arguments.join('\n');
return WorkResponse(exitCode: 0, output: request.arguments.join('\n'));
}
}
2 changes: 1 addition & 1 deletion e2e_test/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: e2e_test
publish_to: none

environment:
sdk: '>=2.19.0 <3.0.0'
sdk: '>=2.19.0 <4.0.0'

dependencies:
bazel_worker:
Expand Down
5 changes: 3 additions & 2 deletions e2e_test/test/e2e_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ void runE2eTestForWorker(String groupName, SpawnWorker spawnWorker) {
Future _doRequests(BazelWorkerDriver driver, {int? count}) async {
count ??= 100;
var requests = List.generate(count, (requestNum) {
var request = WorkRequest();
request.arguments.addAll(List.generate(requestNum, (argNum) => '$argNum'));
var request = WorkRequest(
arguments: List.generate(requestNum, (argNum) => '$argNum'),
);
return request;
});
var responses = await Future.wait(requests.map(driver.doWork));
Expand Down
2 changes: 1 addition & 1 deletion example/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ void main() async {
[Platform.script.resolve('worker.dart').toFilePath()],
workingDirectory: scratchSpace.path),
maxWorkers: 4);
var response = await driver.doWork(WorkRequest()..arguments.add('foo'));
var response = await driver.doWork(WorkRequest(arguments: ['foo']));
if (response.exitCode != EXIT_CODE_OK) {
print('Worker request failed');
} else {
Expand Down
2 changes: 1 addition & 1 deletion example/worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ class SyncSimpleWorker extends SyncWorkerLoop {
@override
WorkResponse performRequest(WorkRequest request) {
File('hello.txt').writeAsStringSync(request.arguments.first);
return WorkResponse()..exitCode = EXIT_CODE_OK;
return WorkResponse(exitCode: EXIT_CODE_OK);
}
}
16 changes: 9 additions & 7 deletions lib/src/driver/driver.dart
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,12 @@ class BazelWorkerDriver {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
stderr.writeln('Failed to run request ${attempt.request}');
response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output =
response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output:
'Invalid response from worker, this probably means it wrote '
'invalid output or died.';
'invalid output or died.',
);
}
attempt.responseCompleter.complete(response);
_cleanUp(worker);
Expand All @@ -166,9 +167,10 @@ class BazelWorkerDriver {
if (!attempt.responseCompleter.isCompleted) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
var response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = 'Error running worker:\n$e\n$s';
var response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output: 'Error running worker:\n$e\n$s',
);
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
Expand Down
21 changes: 12 additions & 9 deletions lib/src/driver/driver_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ class StdDriverConnection implements DriverConnection {
Future<WorkResponse> readResponse() async {
var buffer = await _messageGrouper.next;
if (buffer == null) {
return WorkResponse()
..exitCode = EXIT_CODE_BROKEN_PIPE
..output = 'Connection to worker closed';
return WorkResponse(
exitCode: EXIT_CODE_BROKEN_PIPE,
output: 'Connection to worker closed',
);
}

WorkResponse response;
Expand All @@ -63,9 +64,10 @@ class StdDriverConnection implements DriverConnection {
try {
// Try parsing the message as a string and set that as the output.
var output = utf8.decode(buffer);
var response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = 'Worker sent an invalid response:\n$output';
var response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output: 'Worker sent an invalid response:\n$output',
);
return response;
} catch (_) {
// Fall back to original exception and rethrow if we fail to parse as
Expand Down Expand Up @@ -108,9 +110,10 @@ class IsolateDriverConnection implements DriverConnection {
@override
Future<WorkResponse> readResponse() async {
if (!await _receivePortIterator.moveNext()) {
return WorkResponse()
..exitCode = EXIT_CODE_BROKEN_PIPE
..output = 'Connection to worker closed.';
return WorkResponse(
exitCode: EXIT_CODE_BROKEN_PIPE,
output: 'Connection to worker closed.',
);
}
return WorkResponse.fromBuffer(_receivePortIterator.current as List<int>);
}
Expand Down
7 changes: 4 additions & 3 deletions lib/src/worker/async_worker_loop.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ abstract class AsyncWorkerLoop implements WorkerLoop {
response.output = '${response.output}$printMessages';
}
} catch (e, s) {
response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = '$e\n$s';
response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output: '$e\n$s',
);
}

connection.writeResponse(response);
Expand Down
7 changes: 4 additions & 3 deletions lib/src/worker/sync_worker_loop.dart
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ abstract class SyncWorkerLoop implements WorkerLoop {
response.output = '${response.output}$printMessages';
}
} catch (e, s) {
response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = '$e\n$s';
response = WorkResponse(
exitCode: EXIT_CODE_ERROR,
output: '$e\n$s',
);
}

connection.writeResponse(response);
Expand Down
117 changes: 113 additions & 4 deletions lib/src/worker_protocol.pb.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//
// @dart = 2.12

// ignore_for_file: annotate_overrides, camel_case_types
// ignore_for_file: annotate_overrides, camel_case_types, comment_references
// ignore_for_file: constant_identifier_names, library_prefixes
// ignore_for_file: non_constant_identifier_names, prefer_final_fields
// ignore_for_file: unnecessary_import, unnecessary_this, unused_import
Expand All @@ -13,8 +13,21 @@ import 'dart:core' as $core;

import 'package:protobuf/protobuf.dart' as $pb;

/// An input file.
class Input extends $pb.GeneratedMessage {
factory Input() => create();
factory Input({
$core.String? path,
$core.List<$core.int>? digest,
}) {
final $result = create();
if (path != null) {
$result.path = path;
}
if (digest != null) {
$result.digest = digest;
}
return $result;
}
Input._() : super();
factory Input.fromBuffer($core.List<$core.int> i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) =>
Expand Down Expand Up @@ -53,6 +66,10 @@ class Input extends $pb.GeneratedMessage {
_defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<Input>(create);
static Input? _defaultInstance;

/// The path in the file system where to read this input artifact from. This is
/// either a path relative to the execution root (the worker process is
/// launched with the working directory set to the execution root), or an
/// absolute path.
@$pb.TagNumber(1)
$core.String get path => $_getSZ(0);
@$pb.TagNumber(1)
Expand All @@ -65,6 +82,9 @@ class Input extends $pb.GeneratedMessage {
@$pb.TagNumber(1)
void clearPath() => clearField(1);

/// A hash-value of the contents. The format of the contents is unspecified and
/// the digest should be treated as an opaque token. This can be empty in some
/// cases.
@$pb.TagNumber(2)
$core.List<$core.int> get digest => $_getN(1);
@$pb.TagNumber(2)
Expand All @@ -78,8 +98,37 @@ class Input extends $pb.GeneratedMessage {
void clearDigest() => clearField(2);
}

/// This represents a single work unit that Blaze sends to the worker.
class WorkRequest extends $pb.GeneratedMessage {
factory WorkRequest() => create();
factory WorkRequest({
$core.Iterable<$core.String>? arguments,
$core.Iterable<Input>? inputs,
$core.int? requestId,
$core.bool? cancel,
$core.int? verbosity,
$core.String? sandboxDir,
}) {
final $result = create();
if (arguments != null) {
$result.arguments.addAll(arguments);
}
if (inputs != null) {
$result.inputs.addAll(inputs);
}
if (requestId != null) {
$result.requestId = requestId;
}
if (cancel != null) {
$result.cancel = cancel;
}
if (verbosity != null) {
$result.verbosity = verbosity;
}
if (sandboxDir != null) {
$result.sandboxDir = sandboxDir;
}
return $result;
}
WorkRequest._() : super();
factory WorkRequest.fromBuffer($core.List<$core.int> i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) =>
Expand Down Expand Up @@ -126,9 +175,19 @@ class WorkRequest extends $pb.GeneratedMessage {
@$pb.TagNumber(1)
$core.List<$core.String> get arguments => $_getList(0);

/// The inputs that the worker is allowed to read during execution of this
/// request.
@$pb.TagNumber(2)
$core.List<Input> get inputs => $_getList(1);

/// Each WorkRequest must have either a unique
/// request_id or request_id = 0. If request_id is 0, this WorkRequest must be
/// processed alone (singleplex), otherwise the worker may process multiple
/// WorkRequests in parallel (multiplexing). As an exception to the above, if
/// the cancel field is true, the request_id must be the same as a previously
/// sent WorkRequest. The request_id must be attached unchanged to the
/// corresponding WorkResponse. Only one singleplex request may be sent to a
/// worker at a time.
@$pb.TagNumber(3)
$core.int get requestId => $_getIZ(2);
@$pb.TagNumber(3)
Expand All @@ -141,6 +200,9 @@ class WorkRequest extends $pb.GeneratedMessage {
@$pb.TagNumber(3)
void clearRequestId() => clearField(3);

/// EXPERIMENTAL: When true, this is a cancel request, indicating that a
/// previously sent WorkRequest with the same request_id should be cancelled.
/// The arguments and inputs fields must be empty and should be ignored.
@$pb.TagNumber(4)
$core.bool get cancel => $_getBF(3);
@$pb.TagNumber(4)
Expand All @@ -153,6 +215,9 @@ class WorkRequest extends $pb.GeneratedMessage {
@$pb.TagNumber(4)
void clearCancel() => clearField(4);

/// Values greater than 0 indicate that the worker may output extra debug
/// information to stderr (which will go into the worker log). Setting the
/// --worker_verbose flag for Bazel makes this flag default to 10.
@$pb.TagNumber(5)
$core.int get verbosity => $_getIZ(4);
@$pb.TagNumber(5)
Expand All @@ -165,6 +230,15 @@ class WorkRequest extends $pb.GeneratedMessage {
@$pb.TagNumber(5)
void clearVerbosity() => clearField(5);

/// The relative directory inside the workers working directory where the
/// inputs and outputs are placed, for sandboxing purposes. For singleplex
/// workers, this is unset, as they can use their working directory as sandbox.
/// For multiplex workers, this will be set when the
/// --experimental_worker_multiplex_sandbox flag is set _and_ the execution
/// requirements for the worker includes 'supports-multiplex-sandbox'.
/// The paths in `inputs` will not contain this prefix, but the actual files
/// will be placed/must be written relative to this directory. The worker
/// implementation is responsible for resolving the file paths.
@$pb.TagNumber(6)
$core.String get sandboxDir => $_getSZ(5);
@$pb.TagNumber(6)
Expand All @@ -178,8 +252,30 @@ class WorkRequest extends $pb.GeneratedMessage {
void clearSandboxDir() => clearField(6);
}

/// The worker sends this message to Blaze when it finished its work on the
/// WorkRequest message.
class WorkResponse extends $pb.GeneratedMessage {
factory WorkResponse() => create();
factory WorkResponse({
$core.int? exitCode,
$core.String? output,
$core.int? requestId,
$core.bool? wasCancelled,
}) {
final $result = create();
if (exitCode != null) {
$result.exitCode = exitCode;
}
if (output != null) {
$result.output = output;
}
if (requestId != null) {
$result.requestId = requestId;
}
if (wasCancelled != null) {
$result.wasCancelled = wasCancelled;
}
return $result;
}
WorkResponse._() : super();
factory WorkResponse.fromBuffer($core.List<$core.int> i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) =>
Expand Down Expand Up @@ -233,6 +329,9 @@ class WorkResponse extends $pb.GeneratedMessage {
@$pb.TagNumber(1)
void clearExitCode() => clearField(1);

/// This is printed to the user after the WorkResponse has been received and is
/// supposed to contain compiler warnings / errors etc. - thus we'll use a
/// string type here, which gives us UTF-8 encoding.
@$pb.TagNumber(2)
$core.String get output => $_getSZ(1);
@$pb.TagNumber(2)
Expand All @@ -245,6 +344,10 @@ class WorkResponse extends $pb.GeneratedMessage {
@$pb.TagNumber(2)
void clearOutput() => clearField(2);

/// This field must be set to the same request_id as the WorkRequest it is a
/// response to. Since worker processes which support multiplex worker will
/// handle multiple WorkRequests in parallel, this ID will be used to
/// determined which WorkerProxy does this WorkResponse belong to.
@$pb.TagNumber(3)
$core.int get requestId => $_getIZ(2);
@$pb.TagNumber(3)
Expand All @@ -257,6 +360,12 @@ class WorkResponse extends $pb.GeneratedMessage {
@$pb.TagNumber(3)
void clearRequestId() => clearField(3);

/// EXPERIMENTAL When true, indicates that this response was sent due to
/// receiving a cancel request. The exit_code and output fields should be empty
/// and will be ignored. Exactly one WorkResponse must be sent for each
/// non-cancelling WorkRequest received by the worker, but if the worker
/// received a cancel request, it doesn't matter if it replies with a regular
/// WorkResponse or with one where was_cancelled = true.
@$pb.TagNumber(4)
$core.bool get wasCancelled => $_getBF(3);
@$pb.TagNumber(4)
Expand Down
Loading

0 comments on commit f950bbf

Please sign in to comment.