Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce IF operator #3090

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0245900
init
aglinxinyuan Nov 20, 2024
2f9f1a7
update
aglinxinyuan Nov 20, 2024
247e43e
fix fmt
aglinxinyuan Nov 20, 2024
a5f1e01
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Nov 21, 2024
b961031
rename op
aglinxinyuan Nov 21, 2024
ce9a777
rename op
aglinxinyuan Nov 21, 2024
d080335
rename op
aglinxinyuan Nov 21, 2024
8ba8e95
Remove "Data" the name
aglinxinyuan Nov 22, 2024
f573953
Remove (State) in the port name
aglinxinyuan Nov 22, 2024
09a4a0d
update
aglinxinyuan Nov 22, 2024
c44bb31
update
aglinxinyuan Nov 22, 2024
112965c
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Nov 22, 2024
29d4975
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Nov 22, 2024
3bb3d04
fix fmt
aglinxinyuan Nov 22, 2024
d052a43
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Nov 23, 2024
70c749a
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Nov 23, 2024
c140c83
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Nov 25, 2024
b87d37e
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Nov 27, 2024
2c95f98
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Dec 11, 2024
f4f9eb9
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Dec 12, 2024
a6b0a26
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Dec 15, 2024
cff5998
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Dec 16, 2024
c601b0d
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Dec 17, 2024
3dfa0f9
update
aglinxinyuan Dec 17, 2024
58b1650
update
aglinxinyuan Dec 17, 2024
412e728
update
aglinxinyuan Dec 17, 2024
adda4d6
update
aglinxinyuan Dec 17, 2024
e5a7a52
update
aglinxinyuan Dec 17, 2024
2749a1d
update
aglinxinyuan Dec 17, 2024
f229ae5
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Dec 17, 2024
da4abc9
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Dec 17, 2024
3b2958d
Merge branch 'master' into xinyuan-if-statement
aglinxinyuan Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added core/gui/src/assets/operator_images/If.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import edu.uci.ics.amber.operator.huggingFace.{
HuggingFaceSpamSMSDetectionOpDesc,
HuggingFaceTextSummarizationOpDesc
}
import edu.uci.ics.amber.operator.ifStatement.IfOpDesc
import edu.uci.ics.amber.operator.intersect.IntersectOpDesc
import edu.uci.ics.amber.operator.intervalJoin.IntervalJoinOpDesc
import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc
@@ -116,6 +117,7 @@ trait StateTransferFunc
)
@JsonSubTypes(
Array(
new Type(value = classOf[IfOpDesc], name = "If"),
new Type(value = classOf[SankeyDiagramOpDesc], name = "SankeyDiagram"),
new Type(value = classOf[IcicleChartOpDesc], name = "IcicleChart"),
new Type(value = classOf[CSVScanSourceOpDesc], name = "CSVFileScan"),
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package edu.uci.ics.amber.operator.ifStatement

import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import edu.uci.ics.amber.core.executor.OpExecInitInfo
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc}
import edu.uci.ics.amber.operator.LogicalOp
import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}
import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity}

class IfOpDesc extends LogicalOp {
@JsonProperty(required = true)
@JsonSchemaTitle("Condition State")
@JsonPropertyDescription("name of the state variable to evaluate")
var conditionName: String = _

override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp = {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecInitInfo((_, _) => {
new IfOpExec(conditionName)
})
)
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
.withParallelizable(false)
.withPropagateSchema(
SchemaPropagationFunc(inputSchemas =>
operatorInfo.outputPorts
.map(_.id)
.map(id => id -> inputSchemas(operatorInfo.inputPorts.last.id))
.toMap
)
)
}

override def operatorInfo: OperatorInfo =
OperatorInfo(
"If",
"If",
OperatorGroupConstants.CONTROL_GROUP,
inputPorts = List(
InputPort(PortIdentity(), "Condition"),
InputPort(PortIdentity(1), dependencies = List(PortIdentity()))
),
outputPorts = List(OutputPort(PortIdentity(), "False"), OutputPort(PortIdentity(1), "True"))
)

override def getOutputSchema(schemas: Array[Schema]): Schema = throw new NotImplementedError()

override def getOutputSchemas(schemas: Array[Schema]): Array[Schema] =
Array(schemas(1), schemas(1))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package edu.uci.ics.amber.operator.ifStatement

import edu.uci.ics.amber.core.executor.OperatorExecutor
import edu.uci.ics.amber.core.marker.State
import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}
import edu.uci.ics.amber.workflow.PortIdentity

class IfOpExec(conditionName: String) extends OperatorExecutor {
private var outputPort: PortIdentity = PortIdentity(1) // by default, it should be the true port.

//This function can handle one or more states.
//The state can have mutiple key-value pairs. Keys are not identified by conditionName will be ignored.
//It can accept any value that can be converted to a boolean. For example, Int 1 will be converted to true.
override def processState(state: State, port: Int): Option[State] = {
outputPort =
if (state.get(conditionName).asInstanceOf[Boolean]) PortIdentity(1) else PortIdentity()
Some(state)
}

override def processTupleMultiPort(
tuple: Tuple,
port: Int
): Iterator[(TupleLike, Option[PortIdentity])] =
Iterator((tuple, Some(outputPort)))

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = ???
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ object OperatorGroupConstants {
final val JAVA_GROUP = "Java"
final val R_GROUP = "R"
final val MACHINE_LEARNING_GENERAL_GROUP = "Machine Learning General"
final val CONTROL_GROUP = "Control Block"

/**
* The order of the groups to show up in the frontend operator panel.
@@ -46,6 +47,7 @@ object OperatorGroupConstants {
GroupInfo(UTILITY_GROUP),
GroupInfo(API_GROUP),
GroupInfo(UDF_GROUP, List(GroupInfo(PYTHON_GROUP), GroupInfo(JAVA_GROUP), GroupInfo(R_GROUP))),
GroupInfo(VISUALIZATION_GROUP)
GroupInfo(VISUALIZATION_GROUP),
GroupInfo(CONTROL_GROUP)
)
}
Loading