Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/jiadong-add-file-result-storage'…
Browse files Browse the repository at this point in the history
… into jiadong-add-file-result-storage
  • Loading branch information
bobbai00 committed Dec 30, 2024
2 parents 9b69f59 + 4cf144b commit 60445e6
Show file tree
Hide file tree
Showing 247 changed files with 425 additions and 410 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
syntax = "proto3";
package edu.uci.ics.amber.engine.architecture.rpc;

import "edu/uci/ics/amber/virtualidentity.proto";
import "edu/uci/ics/amber/workflow.proto";
import "edu/uci/ics/amber/core/virtualidentity.proto";
import "edu/uci/ics/amber/core/workflow.proto";
import "edu/uci/ics/amber/engine/architecture/worker/statistics.proto";
import "edu/uci/ics/amber/engine/architecture/sendsemantics/partitionings.proto";
import "scalapb/scalapb.proto";
Expand Down Expand Up @@ -58,8 +58,8 @@ message EmptyRequest{}

message AsyncRPCContext {
option (scalapb.message).no_box = true;
ActorVirtualIdentity sender = 1 [(scalapb.field).no_box = true];
ActorVirtualIdentity receiver = 2 [(scalapb.field).no_box = true];
core.ActorVirtualIdentity sender = 1 [(scalapb.field).no_box = true];
core.ActorVirtualIdentity receiver = 2 [(scalapb.field).no_box = true];
}

message ControlInvocation {
Expand All @@ -79,25 +79,25 @@ enum ChannelMarkerType {
// Message for ChannelMarkerPayload
message ChannelMarkerPayload {
option (scalapb.message).extends = "edu.uci.ics.amber.engine.common.ambermessage.WorkflowFIFOMessagePayload";
ChannelMarkerIdentity id = 1 [(scalapb.field).no_box = true];
core.ChannelMarkerIdentity id = 1 [(scalapb.field).no_box = true];
ChannelMarkerType markerType = 2;
repeated ChannelIdentity scope = 3;
repeated core.ChannelIdentity scope = 3;
map<string, ControlInvocation> commandMapping = 4;
}

message PropagateChannelMarkerRequest {
repeated PhysicalOpIdentity sourceOpToStartProp = 1;
ChannelMarkerIdentity id = 2 [(scalapb.field).no_box = true];
repeated core.PhysicalOpIdentity sourceOpToStartProp = 1;
core.ChannelMarkerIdentity id = 2 [(scalapb.field).no_box = true];
ChannelMarkerType markerType = 3;
repeated PhysicalOpIdentity scope = 4;
repeated PhysicalOpIdentity targetOps = 5;
repeated core.PhysicalOpIdentity scope = 4;
repeated core.PhysicalOpIdentity targetOps = 5;
ControlRequest markerCommand = 6;
string markerMethodName = 7;
}

message TakeGlobalCheckpointRequest {
bool estimationOnly = 1;
ChannelMarkerIdentity checkpointId = 2 [(scalapb.field).no_box = true];
core.ChannelMarkerIdentity checkpointId = 2 [(scalapb.field).no_box = true];
string destination = 3;
}

Expand All @@ -122,7 +122,7 @@ message ModifyLogicRequest {
}

message RetryWorkflowRequest {
repeated ActorVirtualIdentity workers = 1;
repeated core.ActorVirtualIdentity workers = 1;
}

enum ConsoleMessageType{
Expand All @@ -147,7 +147,7 @@ message ConsoleMessageTriggeredRequest {
}

message PortCompletedRequest {
PortIdentity portId = 1 [(scalapb.field).no_box = true];
core.PortIdentity portId = 1 [(scalapb.field).no_box = true];
bool input = 2;
}

Expand All @@ -156,21 +156,21 @@ message WorkerStateUpdatedRequest {
}

message LinkWorkersRequest {
PhysicalLink link = 1 [(scalapb.field).no_box = true];
core.PhysicalLink link = 1 [(scalapb.field).no_box = true];
}

// Ping message
message Ping {
int32 i = 1;
int32 end = 2;
ActorVirtualIdentity to = 3 [(scalapb.field).no_box = true];
core.ActorVirtualIdentity to = 3 [(scalapb.field).no_box = true];
}

// Pong message
message Pong {
int32 i = 1;
int32 end = 2;
ActorVirtualIdentity to = 3 [(scalapb.field).no_box = true];
core.ActorVirtualIdentity to = 3 [(scalapb.field).no_box = true];
}

// Pass message
Expand All @@ -185,7 +185,7 @@ message Nested {

// MultiCall message
message MultiCall {
repeated ActorVirtualIdentity seq = 1;
repeated core.ActorVirtualIdentity seq = 1;
}

// ErrorCommand message
Expand All @@ -194,7 +194,7 @@ message ErrorCommand {

// Collect message
message Collect {
repeated ActorVirtualIdentity workers = 1;
repeated core.ActorVirtualIdentity workers = 1;
}

// GenerateNumber message
Expand All @@ -203,7 +203,7 @@ message GenerateNumber {

// Chain message
message Chain {
repeated ActorVirtualIdentity nexts = 1;
repeated core.ActorVirtualIdentity nexts = 1;
}

// Recursion message
Expand All @@ -213,23 +213,23 @@ message Recursion {

// Messages for the commands
message AddInputChannelRequest {
ChannelIdentity channelId = 1 [(scalapb.field).no_box = true];
PortIdentity portId = 2 [(scalapb.field).no_box = true];
core.ChannelIdentity channelId = 1 [(scalapb.field).no_box = true];
core.PortIdentity portId = 2 [(scalapb.field).no_box = true];
}

message AddPartitioningRequest {
PhysicalLink tag = 1 [(scalapb.field).no_box = true];
core.PhysicalLink tag = 1 [(scalapb.field).no_box = true];
sendsemantics.Partitioning partitioning = 2 [(scalapb.field).no_box = true];
}

message AssignPortRequest {
PortIdentity portId = 1 [(scalapb.field).no_box = true];
core.PortIdentity portId = 1 [(scalapb.field).no_box = true];
bool input = 2;
map<string, string> schema = 3;
}

message FinalizeCheckpointRequest {
ChannelMarkerIdentity checkpointId = 1 [(scalapb.field).no_box = true];
core.ChannelMarkerIdentity checkpointId = 1 [(scalapb.field).no_box = true];
string writeTo = 2;
}

Expand All @@ -241,16 +241,16 @@ message InitializeExecutorRequest {
}

message UpdateExecutorRequest {
PhysicalOpIdentity targetOpId = 1 [(scalapb.field).no_box = true];
core.PhysicalOpIdentity targetOpId = 1 [(scalapb.field).no_box = true];
google.protobuf.Any newExecutor = 2 [(scalapb.field).no_box = true];
google.protobuf.Any stateTransferFunc = 3;
}

message PrepareCheckpointRequest{
ChannelMarkerIdentity checkpointId = 1 [(scalapb.field).no_box = true];
core.ChannelMarkerIdentity checkpointId = 1 [(scalapb.field).no_box = true];
bool estimationOnly = 2;
}

message QueryStatisticsRequest{
repeated ActorVirtualIdentity filterByWorkers = 1;
repeated core.ActorVirtualIdentity filterByWorkers = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package edu.uci.ics.amber.engine.architecture.sendsemantics;

import "edu/uci/ics/amber/core/virtualidentity.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
Expand All @@ -10,8 +11,6 @@ option (scalapb.options) = {
no_default_values_in_constructor: true
};

import "edu/uci/ics/amber/virtualidentity.proto";

message Partitioning{
oneof sealed_value{
OneToOnePartitioning oneToOnePartitioning = 1;
Expand All @@ -24,29 +23,29 @@ message Partitioning{

message OneToOnePartitioning{
int32 batchSize = 1;
repeated ChannelIdentity channels = 2;
repeated core.ChannelIdentity channels = 2;
}

message RoundRobinPartitioning{
int32 batchSize = 1;
repeated ChannelIdentity channels = 2;
repeated core.ChannelIdentity channels = 2;
}

message HashBasedShufflePartitioning{
int32 batchSize = 1;
repeated ChannelIdentity channels = 2;
repeated core.ChannelIdentity channels = 2;
repeated string hashAttributeNames = 3;
}

message RangeBasedShufflePartitioning {
int32 batchSize = 1;
repeated ChannelIdentity channels = 2;
repeated core.ChannelIdentity channels = 2;
repeated string rangeAttributeNames = 3;
int64 rangeMin = 4;
int64 rangeMax = 5;
}

message BroadcastPartitioning{
int32 batchSize = 1;
repeated ChannelIdentity channels = 2;
repeated core.ChannelIdentity channels = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ syntax = "proto3";

package edu.uci.ics.amber.engine.architecture.worker;

import "edu/uci/ics/amber/workflow.proto";
import "edu/uci/ics/amber/core/workflow.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
Expand All @@ -22,7 +22,7 @@ enum WorkerState {
}

message PortTupleCountMapping {
PortIdentity port_id = 1 [(scalapb.field).no_box = true];
core.PortIdentity port_id = 1 [(scalapb.field).no_box = true];
int64 tuple_count = 2;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package edu.uci.ics.amber.engine.common;

import "edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto";
import "edu/uci/ics/amber/engine/architecture/rpc/controlreturns.proto";
import "edu/uci/ics/amber/virtualidentity.proto";
import "edu/uci/ics/amber/core/virtualidentity.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
Expand All @@ -21,11 +21,11 @@ message ControlPayloadV2 {
}

message PythonDataHeader {
ActorVirtualIdentity tag = 1 [(scalapb.field).no_box = true];
core.ActorVirtualIdentity tag = 1 [(scalapb.field).no_box = true];
string payload_type = 2;
}

message PythonControlMessage {
ActorVirtualIdentity tag = 1 [(scalapb.field).no_box = true];
core.ActorVirtualIdentity tag = 1 [(scalapb.field).no_box = true];
ControlPayloadV2 payload = 2 [(scalapb.field).no_box = true];
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package edu.uci.ics.amber.engine.common;
import "edu/uci/ics/amber/engine/architecture/rpc/controlcommands.proto";
import "edu/uci/ics/amber/engine/architecture/rpc/controlreturns.proto";
import "edu/uci/ics/amber/engine/architecture/worker/statistics.proto";
import "edu/uci/ics/amber/virtualidentity.proto";
import "edu/uci/ics/amber/workflowruntimestate.proto";
import "edu/uci/ics/amber/core/virtualidentity.proto";
import "edu/uci/ics/amber/core/workflowruntimestate.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
Expand Down Expand Up @@ -36,11 +36,11 @@ message ExecutionBreakpointStore{
}

message EvaluatedValueList{
repeated amber.engine.architecture.rpc.EvaluatedValue values = 1;
repeated architecture.rpc.EvaluatedValue values = 1;
}

message OperatorConsole{
repeated edu.uci.ics.amber.engine.architecture.rpc.ConsoleMessage console_messages = 1;
repeated architecture.rpc.ConsoleMessage console_messages = 1;
map<string, EvaluatedValueList> evaluate_expr_results = 2;
}

Expand All @@ -54,16 +54,16 @@ message OperatorWorkerMapping{
}

message OperatorStatistics{
repeated amber.engine.architecture.worker.PortTupleCountMapping input_count = 1;
repeated amber.engine.architecture.worker.PortTupleCountMapping output_count = 2;
repeated architecture.worker.PortTupleCountMapping input_count = 1;
repeated architecture.worker.PortTupleCountMapping output_count = 2;
int32 num_workers = 3;
int64 data_processing_time = 4;
int64 control_processing_time = 5;
int64 idle_time = 6;
}

message OperatorMetrics{
edu.uci.ics.amber.engine.architecture.rpc.WorkflowAggregatedState operator_state = 1 [(scalapb.field).no_box = true];
architecture.rpc.WorkflowAggregatedState operator_state = 1 [(scalapb.field).no_box = true];
OperatorStatistics operator_statistics = 2 [(scalapb.field).no_box = true];
}

Expand All @@ -76,8 +76,8 @@ message ExecutionStatsStore {


message ExecutionMetadataStore{
edu.uci.ics.amber.engine.architecture.rpc.WorkflowAggregatedState state = 1;
repeated WorkflowFatalError fatal_errors = 2;
ExecutionIdentity executionId = 3 [(scalapb.field).no_box = true];
architecture.rpc.WorkflowAggregatedState state = 1;
repeated core.WorkflowFatalError fatal_errors = 2;
core.ExecutionIdentity executionId = 3 [(scalapb.field).no_box = true];
bool is_recovering = 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import edu.uci.ics.amber.engine.architecture.rpc.controlreturns.WorkflowAggregat
}
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberLogging}
import edu.uci.ics.amber.error.ErrorUtils.getStackTraceWithAllCauses
import edu.uci.ics.amber.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.amber.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE
import edu.uci.ics.amber.workflowruntimestate.WorkflowFatalError
import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE
import edu.uci.ics.amber.core.workflowruntimestate.WorkflowFatalError
import edu.uci.ics.texera.web.SessionState
import edu.uci.ics.texera.web.model.websocket.response.ClusterStatusUpdateEvent
import edu.uci.ics.texera.web.service.{WorkflowExecutionService, WorkflowService}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import edu.uci.ics.amber.engine.architecture.common.WorkflowActor.{
import edu.uci.ics.amber.engine.common.AmberLogging
import edu.uci.ics.amber.engine.common.virtualidentity.util.{CONTROLLER, SELF}
import edu.uci.ics.amber.util.VirtualIdentityUtils
import edu.uci.ics.amber.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import edu.uci.ics.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}

import scala.collection.mutable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package edu.uci.ics.amber.engine.architecture.common
import akka.actor.{ActorContext, ActorRef, Address, Cancellable, Props}
import akka.util.Timeout
import edu.uci.ics.amber.engine.common.FutureBijection._
import edu.uci.ics.amber.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.amber.core.virtualidentity.ActorVirtualIdentity

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{DurationInt, FiniteDuration}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import edu.uci.ics.amber.engine.architecture.common.WorkflowActor.NetworkMessage
import edu.uci.ics.amber.engine.architecture.messaginglayer.{CongestionControl, FlowControl}
import edu.uci.ics.amber.engine.common.ambermessage.WorkflowFIFOMessage
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberLogging}
import edu.uci.ics.amber.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import edu.uci.ics.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}

import scala.collection.mutable
import scala.concurrent.duration.DurationInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import edu.uci.ics.amber.engine.architecture.worker.managers.StatisticsManager
import edu.uci.ics.amber.engine.common.AmberLogging
import edu.uci.ics.amber.engine.common.ambermessage.{ControlPayload, WorkflowFIFOMessage}
import edu.uci.ics.amber.engine.common.rpc.{AsyncRPCClient, AsyncRPCServer}
import edu.uci.ics.amber.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import edu.uci.ics.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}

abstract class AmberProcessor(
val actorId: ActorVirtualIdentity,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package edu.uci.ics.amber.engine.architecture.common

import edu.uci.ics.amber.engine.architecture.common.ProcessingStepCursor.INIT_STEP
import edu.uci.ics.amber.virtualidentity.ChannelIdentity
import edu.uci.ics.amber.core.virtualidentity.ChannelIdentity

object ProcessingStepCursor {
// step value before processing any incoming message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import edu.uci.ics.amber.engine.architecture.worker.WorkflowWorker.{
import edu.uci.ics.amber.engine.common.ambermessage.WorkflowFIFOMessage
import edu.uci.ics.amber.engine.common.storage.SequentialRecordStorage
import edu.uci.ics.amber.engine.common.{AmberLogging, CheckpointState}
import edu.uci.ics.amber.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import edu.uci.ics.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
Expand Down
Loading

0 comments on commit 60445e6

Please sign in to comment.