Skip to content

Commit

Permalink
Convert Java operator descriptors to Scala (#3179)
Browse files Browse the repository at this point in the history
There are a few operator descriptors written in Java, which makes them
difficult to use and maintain. This PR converts all such descriptors to
Scala to streamline the migration process to new APIs and facilitate
future work, such as operator offloading.

Changed Operators:
- PythonUDFSourceOpDescV2
- RUDFSourceOpDesc
- SentimentAnalysisOpDesc
- SpecializedFilterOpDesc
- TypeCastingOpDesc
  • Loading branch information
Yicong-Huang authored Dec 30, 2024
1 parent 5b622e3 commit 0414544
Show file tree
Hide file tree
Showing 17 changed files with 312 additions and 450 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class WorkflowCompilationResourceSpec extends AnyFlatSpec with BeforeAndAfterAll

// utility function to create a filter op
private def getFilterOpDesc(
filterPredicates: java.util.List[FilterPredicate]
filterPredicates: List[FilterPredicate]
): FilterOpDesc = {
val filterOpDesc = new SpecializedFilterOpDesc
filterOpDesc.predicates = filterPredicates
Expand Down Expand Up @@ -116,11 +116,11 @@ class WorkflowCompilationResourceSpec extends AnyFlatSpec with BeforeAndAfterAll

// Create the filter predicate for TotalProfit > 10000
val filterPredicate1 = new FilterPredicate("Total Profit", ComparisonType.GREATER_THAN, "10000")
val filterOpDesc1 = getFilterOpDesc(java.util.Arrays.asList(filterPredicate1))
val filterOpDesc1 = getFilterOpDesc(List(filterPredicate1))

// Create the filter predicate for Region != "JPN"
val filterPredicate2 = new FilterPredicate("Region", ComparisonType.NOT_EQUAL_TO, "JPN")
val filterOpDesc2 = getFilterOpDesc(java.util.Arrays.asList(filterPredicate2))
val filterOpDesc2 = getFilterOpDesc(List(filterPredicate2))

// Add a second limit operation
val limitOpDesc2 = getLimitOpDesc(5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ abstract class FilterOpExec extends OperatorExecutor with Serializable {

var filterFunc: Tuple => Boolean = _

def setFilterFunc(func: Tuple => java.lang.Boolean): Unit =
filterFunc = (tuple: Tuple) => func.apply(tuple).booleanValue()
def setFilterFunc(func: Tuple => Boolean): Unit =
filterFunc = func

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] =
if (filterFunc(tuple)) Iterator.single(tuple) else Iterator.empty
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package edu.uci.ics.amber.operator.filter

import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
import edu.uci.ics.amber.core.executor.OpExecInitInfo
import edu.uci.ics.amber.core.workflow.PhysicalOp
import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}
import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.workflow.{InputPort, OutputPort}

class SpecializedFilterOpDesc extends FilterOpDesc {

@JsonProperty(value = "predicates", required = true)
@JsonPropertyDescription("multiple predicates in OR")
var predicates: List[FilterPredicate] = List.empty

override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp = {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecInitInfo((_, _) => new SpecializedFilterOpExec(predicates))
)
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
}

override def operatorInfo: OperatorInfo = {
OperatorInfo(
"Filter",
"Performs a filter operation",
OperatorGroupConstants.CLEANING_GROUP,
List(InputPort()),
List(OutputPort()),
supportReconfiguration = true
)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package edu.uci.ics.amber.operator.filter

import edu.uci.ics.amber.core.tuple.Tuple

class SpecializedFilterOpExec(predicates: List[FilterPredicate]) extends FilterOpExec {

setFilterFunc((tuple: Tuple) => predicates.exists(_.evaluate(tuple)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

public class SentimentAnalysisOpExec extends MapOpExec {
private final String attributeName;
private final edu.uci.ics.amber.operator.sentiment.StanfordCoreNLPWrapper coreNlp;
private final StanfordCoreNLPWrapper coreNlp;

public SentimentAnalysisOpExec(String attributeName) {
this.attributeName = attributeName;
Properties props = new Properties();
props.setProperty("annotators", "tokenize, ssplit, parse, sentiment");
coreNlp = new edu.uci.ics.amber.operator.sentiment.StanfordCoreNLPWrapper(props);
coreNlp = new StanfordCoreNLPWrapper(props);
this.setMapFunc((Function1<Tuple, TupleLike> & Serializable) this::sentimentAnalysis);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import edu.uci.ics.amber.core.storage.model.BufferedItemWriter
import edu.uci.ics.amber.core.storage.result.ResultStorage
import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}
import edu.uci.ics.amber.operator.sink.ProgressiveUtils
import edu.uci.ics.amber.virtualidentity.{OperatorIdentity, WorkflowIdentity}
import edu.uci.ics.amber.virtualidentity.WorkflowIdentity
import edu.uci.ics.amber.workflow.OutputPort.OutputMode
import edu.uci.ics.amber.workflow.PortIdentity

Expand Down Expand Up @@ -43,9 +43,12 @@ class ProgressiveSinkOpExec(
}

override def onFinishMultiPort(port: Int): Iterator[(TupleLike, Option[PortIdentity])] = {
writer.close()
Iterator.empty
}

override def close(): Unit = {
writer.close()
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator.empty
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package edu.uci.ics.amber.operator.typecasting

import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import edu.uci.ics.amber.core.executor.OpExecInitInfo
import edu.uci.ics.amber.core.tuple.{AttributeTypeUtils, Schema}
import edu.uci.ics.amber.core.workflow.{PhysicalOp, SchemaPropagationFunc}
import edu.uci.ics.amber.operator.map.MapOpDesc
import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}
import edu.uci.ics.amber.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import edu.uci.ics.amber.workflow.{InputPort, OutputPort, PortIdentity}

class TypeCastingOpDesc extends MapOpDesc {

@JsonProperty(required = true)
@JsonSchemaTitle("TypeCasting Units")
@JsonPropertyDescription("Multiple type castings")
var typeCastingUnits: List[TypeCastingUnit] = List.empty

override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp = {
if (typeCastingUnits == null) typeCastingUnits = List.empty
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecInitInfo((_, _) => new TypeCastingOpExec(typeCastingUnits))
)
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
.withPropagateSchema(
SchemaPropagationFunc { inputSchemas: Map[PortIdentity, Schema] =>
val outputSchema = typeCastingUnits.foldLeft(inputSchemas.values.head) { (schema, unit) =>
AttributeTypeUtils.SchemaCasting(schema, unit.attribute, unit.resultType)
}
Map(operatorInfo.outputPorts.head.id -> outputSchema)
}
)
}

override def operatorInfo: OperatorInfo = {
OperatorInfo(
"Type Casting",
"Cast between types",
OperatorGroupConstants.CLEANING_GROUP,
List(InputPort()),
List(OutputPort())
)
}

override def getOutputSchema(schemas: Array[Schema]): Schema = {
typeCastingUnits.foldLeft(schemas.head) { (schema, unit) =>
AttributeTypeUtils.SchemaCasting(schema, unit.attribute, unit.resultType)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package edu.uci.ics.amber.operator.typecasting
import edu.uci.ics.amber.core.tuple.{AttributeTypeUtils, Tuple, TupleLike}
import edu.uci.ics.amber.operator.map.MapOpExec

import scala.jdk.CollectionConverters.CollectionHasAsScala

class TypeCastingOpExec(typeCastingUnits: java.util.List[TypeCastingUnit]) extends MapOpExec {
class TypeCastingOpExec(typeCastingUnits: List[TypeCastingUnit]) extends MapOpExec {
this.setMapFunc(castTuple)

private def castTuple(tuple: Tuple): TupleLike =
AttributeTypeUtils.tupleCasting(
tuple,
typeCastingUnits.asScala
typeCastingUnits
.map(typeCastingUnit => typeCastingUnit.attribute -> typeCastingUnit.resultType)
.toMap
)
Expand Down
Loading

0 comments on commit 0414544

Please sign in to comment.