Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proto message renaming and package refactoring #2983

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
syntax = "proto3";

package edu.uci.ics.amber.engine.architecture.python;

import "edu/uci/ics/amber/engine/architecture/rpc/control_commands.proto";
import "edu/uci/ics/amber/engine/architecture/rpc/control_returns.proto";
import "edu/uci/ics/amber/engine/common/virtual_identity.proto";
import "edu/uci/ics/amber/engine/common/actor_message.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
scope: FILE,
preserve_unknown_fields: false
no_default_values_in_constructor: true
flat_package: true
};

message ControlPayloadV2 {
oneof value {
architecture.rpc.ControlInvocation control_invocation = 1;
architecture.rpc.ReturnInvocation return_invocation = 2;
}
}

message PythonDataHeader {
common.ActorVirtualIdentity tag = 1 [(scalapb.field).no_box = true];
string payload_type = 2;
}

message PythonControlMessage {
common.ActorVirtualIdentity tag = 1 [(scalapb.field).no_box = true];
ControlPayloadV2 payload = 2 [(scalapb.field).no_box = true];
}

message PythonActorMessage {
common.ActorCommand payload = 1 [(scalapb.field).no_box = true];
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
syntax = "proto3";
package edu.uci.ics.amber.engine.architecture.rpc;

import "edu/uci/ics/amber/engine/common/virtualidentity.proto";
import "edu/uci/ics/amber/engine/common/virtual_identity.proto";
import "edu/uci/ics/amber/engine/common/workflow.proto";
import "edu/uci/ics/amber/engine/architecture/worker/statistics.proto";
import "edu/uci/ics/amber/engine/architecture/sendsemantics/partitionings.proto";
Expand All @@ -13,47 +13,49 @@ option (scalapb.options) = {
scope: FILE,
preserve_unknown_fields: false
no_default_values_in_constructor: true
flat_package: true
};

message ControlRequest {
oneof sealed_value {
// request for controller
PropagateChannelMarkerRequest propagateChannelMarkerRequest = 1;
TakeGlobalCheckpointRequest takeGlobalCheckpointRequest = 2;
DebugCommandRequest debugCommandRequest = 3;
EvaluatePythonExpressionRequest evaluatePythonExpressionRequest = 4;
ModifyLogicRequest modifyLogicRequest = 5;
RetryWorkflowRequest retryWorkflowRequest = 6;
ConsoleMessageTriggeredRequest consoleMessageTriggeredRequest = 8;
PortCompletedRequest portCompletedRequest = 9;
WorkerStateUpdatedRequest workerStateUpdatedRequest = 10;
LinkWorkersRequest linkWorkersRequest = 11;
PropagateChannelMarkerRequest propagate_channel_marker_request = 1;
TakeGlobalCheckpointRequest take_global_checkpoint_request = 2;
DebugCommandRequest debug_command_request = 3;
EvaluatePythonExpressionRequest evaluate_python_expression_request = 4;
ModifyLogicRequest modify_logic_request = 5;
RetryWorkflowRequest retry_workflow_request = 6;
ConsoleMessageTriggeredRequest console_message_triggered_request = 8;
PortCompletedRequest port_completed_request = 9;
WorkerStateUpdatedRequest worker_state_updated_request = 10;
LinkWorkersRequest link_workers_request = 11;

// request for worker
AddInputChannelRequest addInputChannelRequest = 50;
AddPartitioningRequest addPartitioningRequest = 51;
AssignPortRequest assignPortRequest = 52;
FinalizeCheckpointRequest finalizeCheckpointRequest = 53;
InitializeExecutorRequest initializeExecutorRequest = 54;
UpdateExecutorRequest updateExecutorRequest = 55;
EmptyRequest emptyRequest = 56;
PrepareCheckpointRequest prepareCheckpointRequest = 57;
QueryStatisticsRequest queryStatisticsRequest = 58;
AddInputChannelRequest add_input_channel_request = 50;
AddPartitioningRequest add_partitioning_request = 51;
AssignPortRequest assign_port_request = 52;
FinalizeCheckpointRequest finalize_checkpoint_request = 53;
InitializeExecutorRequest initialize_executor_request = 54;
UpdateExecutorRequest update_executor_request = 55;
EmptyRequest empty_request = 56;
PrepareCheckpointRequest prepare_checkpoint_request = 57;
QueryStatisticsRequest query_statistics_request = 58;

// request for testing
Ping ping = 100;
Pong pong = 101;
Nested nested = 102;
Pass pass = 103;
ErrorCommand errorCommand = 104;
ErrorCommand error_command = 104;
Recursion recursion = 105;
Collect collect = 106;
GenerateNumber generateNumber = 107;
MultiCall multiCall = 108;
GenerateNumber generate_number = 107;
MultiCall multi_call = 108;
Chain chain = 109;
}
}


message EmptyRequest{}

message AsyncRPCContext {
Expand All @@ -64,10 +66,10 @@ message AsyncRPCContext {

message ControlInvocation {
option (scalapb.message).extends = "edu.uci.ics.amber.engine.common.ambermessage.ControlPayload";
string methodName = 1;
string method_name = 1;
ControlRequest command = 2 [(scalapb.field).no_box = true];
AsyncRPCContext context = 3;
int64 commandId = 4;
int64 command_id = 4;
}

// Enum for ChannelMarkerType
Expand All @@ -80,45 +82,45 @@ enum ChannelMarkerType {
message ChannelMarkerPayload {
option (scalapb.message).extends = "edu.uci.ics.amber.engine.common.ambermessage.WorkflowFIFOMessagePayload";
common.ChannelMarkerIdentity id = 1 [(scalapb.field).no_box = true];
ChannelMarkerType markerType = 2;
ChannelMarkerType marker_type = 2;
repeated common.ChannelIdentity scope = 3;
map<string, ControlInvocation> commandMapping = 4;
map<string, ControlInvocation> command_mapping = 4;
}

message PropagateChannelMarkerRequest {
repeated common.PhysicalOpIdentity sourceOpToStartProp = 1;
repeated common.PhysicalOpIdentity source_op_to_start_prop = 1;
common.ChannelMarkerIdentity id = 2 [(scalapb.field).no_box = true];
ChannelMarkerType markerType = 3;
ChannelMarkerType marker_type = 3;
repeated common.PhysicalOpIdentity scope = 4;
repeated common.PhysicalOpIdentity targetOps = 5;
ControlRequest markerCommand = 6;
string markerMethodName = 7;
repeated common.PhysicalOpIdentity target_ops = 5;
ControlRequest marker_command = 6;
string marker_method_name = 7;
}

message TakeGlobalCheckpointRequest {
bool estimationOnly = 1;
common.ChannelMarkerIdentity checkpointId = 2 [(scalapb.field).no_box = true];
bool estimation_only = 1;
common.ChannelMarkerIdentity checkpoint_id = 2 [(scalapb.field).no_box = true];
string destination = 3;
}

message WorkflowReconfigureRequest{
ModifyLogicRequest reconfiguration = 1 [(scalapb.field).no_box = true];
string reconfigurationId = 2;
string reconfiguration_id = 2;
}


message DebugCommandRequest {
string workerId = 1;
string worker_id = 1;
string cmd = 2;
}

message EvaluatePythonExpressionRequest {
string expression = 1;
string operatorId = 2;
string operator_id = 2;
}

message ModifyLogicRequest {
repeated UpdateExecutorRequest updateRequest = 1;
repeated UpdateExecutorRequest update_request = 1;
}

message RetryWorkflowRequest {
Expand All @@ -143,11 +145,11 @@ message ConsoleMessage {
}

message ConsoleMessageTriggeredRequest {
ConsoleMessage consoleMessage = 1 [(scalapb.field).no_box = true];
ConsoleMessage console_message = 1 [(scalapb.field).no_box = true];
}

message PortCompletedRequest {
common.PortIdentity portId = 1 [(scalapb.field).no_box = true];
common.PortIdentity port_id = 1 [(scalapb.field).no_box = true];
bool input = 2;
}

Expand Down Expand Up @@ -213,8 +215,8 @@ message Recursion {

// Messages for the commands
message AddInputChannelRequest {
common.ChannelIdentity channelId = 1 [(scalapb.field).no_box = true];
common.PortIdentity portId = 2 [(scalapb.field).no_box = true];
common.ChannelIdentity channel_id = 1 [(scalapb.field).no_box = true];
common.PortIdentity port_id = 2 [(scalapb.field).no_box = true];
}

message AddPartitioningRequest {
Expand All @@ -223,34 +225,34 @@ message AddPartitioningRequest {
}

message AssignPortRequest {
common.PortIdentity portId = 1 [(scalapb.field).no_box = true];
common.PortIdentity port_id = 1 [(scalapb.field).no_box = true];
bool input = 2;
map<string, string> schema = 3;
}

message FinalizeCheckpointRequest {
common.ChannelMarkerIdentity checkpointId = 1 [(scalapb.field).no_box = true];
string writeTo = 2;
common.ChannelMarkerIdentity checkpoint_id = 1 [(scalapb.field).no_box = true];
string write_to = 2;
}

message InitializeExecutorRequest {
int32 totalWorkerCount = 1;
google.protobuf.Any opExecInitInfo = 2 [(scalapb.field).no_box = true];
bool isSource = 3;
int32 total_worker_count = 1;
google.protobuf.Any op_exec_init_info = 2 [(scalapb.field).no_box = true];
bool is_source = 3;
string language = 4;
}

message UpdateExecutorRequest {
common.PhysicalOpIdentity targetOpId = 1 [(scalapb.field).no_box = true];
google.protobuf.Any newExecutor = 2 [(scalapb.field).no_box = true];
google.protobuf.Any stateTransferFunc = 3;
common.PhysicalOpIdentity target_op_id = 1 [(scalapb.field).no_box = true];
google.protobuf.Any new_executor = 2 [(scalapb.field).no_box = true];
google.protobuf.Any state_transfer_func = 3;
}

message PrepareCheckpointRequest{
common.ChannelMarkerIdentity checkpointId = 1 [(scalapb.field).no_box = true];
bool estimationOnly = 2;
common.ChannelMarkerIdentity checkpoint_id = 1 [(scalapb.field).no_box = true];
bool estimation_only = 2;
}

message QueryStatisticsRequest{
repeated common.ActorVirtualIdentity filterByWorkers = 1;
repeated common.ActorVirtualIdentity filter_by_workers = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ syntax = "proto3";
package edu.uci.ics.amber.engine.architecture.rpc;

import "edu/uci/ics/amber/engine/architecture/worker/statistics.proto";
import "edu/uci/ics/amber/engine/common/workflow_metrics.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
scope: FILE,
preserve_unknown_fields: false
no_default_values_in_constructor: true
flat_package: true
};


Expand All @@ -16,22 +18,22 @@ message ControlReturn {
// Oneof block for various return types
oneof sealed_value {
// controller responses
RetrieveWorkflowStateResponse retrieveWorkflowStateResponse = 1;
PropagateChannelMarkerResponse propagateChannelMarkerResponse = 2;
TakeGlobalCheckpointResponse takeGlobalCheckpointResponse = 3;
EvaluatePythonExpressionResponse evaluatePythonExpressionResponse = 4;
StartWorkflowResponse startWorkflowResponse = 5;
RetrieveWorkflowStateResponse retrieve_workflow_state_response = 1;
PropagateChannelMarkerResponse propagate_channel_marker_response = 2;
TakeGlobalCheckpointResponse take_global_checkpoint_response = 3;
EvaluatePythonExpressionResponse evaluate_python_expression_response = 4;
StartWorkflowResponse start_workflow_response = 5;

// worker responses
WorkerStateResponse workerStateResponse = 50;
WorkerMetricsResponse workerMetricsResponse = 51;
FinalizeCheckpointResponse finalizeCheckpointResponse = 52;
WorkerStateResponse worker_state_response = 50;
WorkerMetricsResponse worker_metrics_response = 51;
FinalizeCheckpointResponse finalize_checkpoint_response = 52;

// common responses
ControlError error = 101;
EmptyReturn emptyReturn = 102;
StringResponse stringResponse = 103;
IntResponse intResponse = 104;
ControlException error = 101;
EmptyReturn empty_return = 102;
StringResponse string_response = 103;
IntResponse int_response = 104;
}
}

Expand All @@ -42,7 +44,7 @@ enum ErrorLanguage {
SCALA = 1;
}

message ControlError {
message ControlException {
string errorMessage = 1;
string errorDetails = 2;
string stackTrace = 3;
Expand Down Expand Up @@ -97,21 +99,8 @@ message EvaluatePythonExpressionResponse {
repeated EvaluatedValue values = 1;
}

enum WorkflowAggregatedState {
UNINITIALIZED = 0;
READY = 1;
RUNNING = 2;
PAUSING = 3;
PAUSED = 4;
RESUMING = 5;
COMPLETED = 6;
FAILED = 7;
UNKNOWN = 8;
KILLED = 9;
}

message StartWorkflowResponse {
WorkflowAggregatedState workflowState = 1 [(scalapb.field).no_box = true];
common.WorkflowAggregatedState workflow_state = 1 [(scalapb.field).no_box = true];
}

message WorkerStateResponse {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
syntax = "proto3";
package edu.uci.ics.amber.engine.architecture.rpc;

import "edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto";
import "edu/uci/ics/amber/engine/architecture/rpc/controlreturns.proto";
import "edu/uci/ics/amber/engine/architecture/rpc/control_commands.proto";
import "edu/uci/ics/amber/engine/architecture/rpc/control_returns.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
scope: FILE,
preserve_unknown_fields: false
no_default_values_in_constructor: true
flat_package: true
};


Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
syntax = "proto3";
package edu.uci.ics.amber.engine.architecture.rpc;

import "edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto";
import "edu/uci/ics/amber/engine/architecture/rpc/controlreturns.proto";
import "edu/uci/ics/amber/engine/architecture/rpc/control_commands.proto";
import "edu/uci/ics/amber/engine/architecture/rpc/control_returns.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
scope: FILE,
preserve_unknown_fields: false
no_default_values_in_constructor: true
flat_package: true
};


Expand Down
Loading
Loading