diff --git a/core/gui/src/assets/operator_images/If.png b/core/gui/src/assets/operator_images/If.png new file mode 100644 index 00000000000..5808af116b0 Binary files /dev/null and b/core/gui/src/assets/operator_images/If.png differ diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala index ac88f098cd3..6d37d51ef35 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala @@ -20,6 +20,7 @@ import edu.uci.ics.amber.operator.huggingFace.{ HuggingFaceSpamSMSDetectionOpDesc, HuggingFaceTextSummarizationOpDesc } +import edu.uci.ics.amber.operator.ifStatement.IfOpDesc import edu.uci.ics.amber.operator.intersect.IntersectOpDesc import edu.uci.ics.amber.operator.intervalJoin.IntervalJoinOpDesc import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc @@ -142,6 +143,7 @@ trait StateTransferFunc ) @JsonSubTypes( Array( + new Type(value = classOf[IfOpDesc], name = "If"), new Type(value = classOf[SankeyDiagramOpDesc], name = "SankeyDiagram"), new Type(value = classOf[IcicleChartOpDesc], name = "IcicleChart"), new Type(value = classOf[CSVScanSourceOpDesc], name = "CSVFileScan"), diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/ifStatement/IfOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/ifStatement/IfOpDesc.scala new file mode 100644 index 00000000000..f4c0d7db2b6 --- /dev/null +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/ifStatement/IfOpDesc.scala @@ -0,0 +1,61 @@ +package edu.uci.ics.amber.operator.ifStatement + +import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import edu.uci.ics.amber.core.executor.OpExecInitInfo +import edu.uci.ics.amber.core.tuple.Schema +import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc} +import edu.uci.ics.amber.operator.LogicalOp +import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity} + +class IfOpDesc extends LogicalOp { + @JsonProperty(required = true) + @JsonSchemaTitle("Condition State") + @JsonPropertyDescription("name of the state variable to evaluate") + var conditionName: String = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecInitInfo((_, _) => { + new IfOpExec(conditionName) + }) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withParallelizable(false) + .withPropagateSchema( + SchemaPropagationFunc(inputSchemas => + operatorInfo.outputPorts + .map(_.id) + .map(id => id -> inputSchemas(operatorInfo.inputPorts.last.id)) + .toMap + ) + ) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "If", + "If", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List( + InputPort(PortIdentity(), "Condition"), + InputPort(PortIdentity(1), dependencies = List(PortIdentity())) + ), + outputPorts = List(OutputPort(PortIdentity(), "False"), OutputPort(PortIdentity(1), "True")) + ) + + override def getOutputSchema(schemas: Array[Schema]): Schema = throw new NotImplementedError() + + override def getOutputSchemas(schemas: Array[Schema]): Array[Schema] = + Array(schemas(1), schemas(1)) +} diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/ifStatement/IfOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/ifStatement/IfOpExec.scala new file mode 100644 index 00000000000..c96bd7457d3 --- /dev/null +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/ifStatement/IfOpExec.scala @@ -0,0 +1,27 @@ +package edu.uci.ics.amber.operator.ifStatement + +import edu.uci.ics.amber.core.executor.OperatorExecutor +import edu.uci.ics.amber.core.marker.State +import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} +import edu.uci.ics.amber.workflow.PortIdentity + +class IfOpExec(conditionName: String) extends OperatorExecutor { + private var outputPort: PortIdentity = PortIdentity(1) // by default, it should be the true port. + + //This function can handle one or more states. + //The state can have mutiple key-value pairs. Keys are not identified by conditionName will be ignored. + //It can accept any value that can be converted to a boolean. For example, Int 1 will be converted to true. + override def processState(state: State, port: Int): Option[State] = { + outputPort = + if (state.get(conditionName).asInstanceOf[Boolean]) PortIdentity(1) else PortIdentity() + Some(state) + } + + override def processTupleMultiPort( + tuple: Tuple, + port: Int + ): Iterator[(TupleLike, Option[PortIdentity])] = + Iterator((tuple, Some(outputPort))) + + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = ??? +} diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/metadata/OperatorGroupConstants.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/metadata/OperatorGroupConstants.scala index 9ea299a55d4..4bfb59f3047 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/metadata/OperatorGroupConstants.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/metadata/OperatorGroupConstants.scala @@ -21,6 +21,7 @@ object OperatorGroupConstants { final val JAVA_GROUP = "Java" final val R_GROUP = "R" final val MACHINE_LEARNING_GENERAL_GROUP = "Machine Learning General" + final val CONTROL_GROUP = "Control Block" /** * The order of the groups to show up in the frontend operator panel. @@ -46,6 +47,7 @@ object OperatorGroupConstants { GroupInfo(UTILITY_GROUP), GroupInfo(API_GROUP), GroupInfo(UDF_GROUP, List(GroupInfo(PYTHON_GROUP), GroupInfo(JAVA_GROUP), GroupInfo(R_GROUP))), - GroupInfo(VISUALIZATION_GROUP) + GroupInfo(VISUALIZATION_GROUP), + GroupInfo(CONTROL_GROUP) ) }