From 46cd2fc9e8ae2d004974779e098ea354d7369f5b Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Sat, 14 Dec 2024 16:02:59 -0800 Subject: [PATCH 1/9] Add cost estimator using past statistics. --- .../CostBasedScheduleGenerator.scala | 42 +++++++---- .../scheduling/CostEstimator.scala | 65 +++++++++++++++++ .../workflow/WorkflowExecutionsResource.scala | 72 +++++++++++++++++++ 3 files changed, 166 insertions(+), 13 deletions(-) create mode 100644 core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala index ca2c5553633..bf0ae0bd7e9 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala @@ -34,6 +34,9 @@ class CostBasedScheduleGenerator( numStatesExplored: Int = 0 ) + private val costEstimator = + new DefaultCostEstimator(workflowContext = workflowContext, actorId = actorId) + def generate(): (Schedule, PhysicalPlan) = { val startTime = System.nanoTime() val regionDAG = createRegionDAG() @@ -281,7 +284,9 @@ class CostBasedScheduleGenerator( if (oEarlyStop) schedulableStates.add(currentState) // Calculate the current state's cost and update the bestResult if it's lower val cost = - evaluate(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet) + evaluate( + RegionPlan(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet) + ) if (cost < bestResult.cost) { bestResult = SearchResult(currentState, regionDAG, cost) } @@ -334,7 +339,12 @@ class CostBasedScheduleGenerator( physicalPlan.getNonMaterializedBlockingAndDependeeLinks ++ neighborState ) match { case Left(regionDAG) => - evaluate(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet) + evaluate( + RegionPlan( + regionDAG.vertexSet().asScala.toSet, + regionDAG.edgeSet().asScala.toSet + ) + ) case Right(_) => Double.MaxValue } @@ -423,7 +433,9 @@ class CostBasedScheduleGenerator( def updateOptimumIfApplicable(regionDAG: DirectedAcyclicGraph[Region, RegionLink]): Unit = { // Calculate the current state's cost and update the bestResult if it's lower val cost = - evaluate(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet) + evaluate( + RegionPlan(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet) + ) if (cost < bestResult.cost) { bestResult = SearchResult(currentState, regionDAG, cost) } @@ -453,7 +465,12 @@ class CostBasedScheduleGenerator( physicalPlan.getNonMaterializedBlockingAndDependeeLinks ++ neighborState ) match { case Left(regionDAG) => - evaluate(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet) + evaluate( + RegionPlan( + regionDAG.vertexSet().asScala.toSet, + regionDAG.edgeSet().asScala.toSet + ) + ) case Right(_) => Double.MaxValue } @@ -472,17 +489,16 @@ class CostBasedScheduleGenerator( } /** - * The cost function used by the search. Takes in a region graph represented as set of regions and links. + * The cost function used by the search. Takes a region plan, generates one or more (to be done in the future) + * schedules based on the region plan, and calculates the cost of the schedule(s) using Cost Estimator. Uses the cost + * of the best schedule (currently only considers one schedule) as the cost of the region plan. * - * @param regions A set of regions created based on a search state. - * @param regionLinks A set of links to indicate dependencies between regions, based on the materialization edges. - * @return A cost determined by the resource allocator. + * @return A cost determined by the cost estimator. */ - private def evaluate(regions: Set[Region], regionLinks: Set[RegionLink]): Double = { - // Using number of materialized ports as the cost. - // This is independent of the schedule / resource allocator. - // In the future we may need to use the ResourceAllocator to get the cost. - regions.flatMap(_.materializedPortIds).size + private def evaluate(regionPlan: RegionPlan): Double = { + val schedule = generateScheduleFromRegionPlan(regionPlan) + // In the future we may allow multiple regions in a level and split the resources. + schedule.map(level => level.map(region => costEstimator.estimate(region, 1)).sum).sum } } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala new file mode 100644 index 00000000000..0dfaa7d3356 --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala @@ -0,0 +1,65 @@ +package edu.uci.ics.amber.engine.architecture.scheduling + +import edu.uci.ics.amber.core.storage.StorageConfig +import edu.uci.ics.amber.core.workflow.WorkflowContext +import edu.uci.ics.amber.engine.common.AmberLogging +import edu.uci.ics.amber.virtualidentity.ActorVirtualIdentity +import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource + +/** + * A cost estimator should estimate a cost of running a region under the given resource constraints as units. + */ +trait CostEstimator { + def estimate(region: Region, resourceUnits: Int): Double +} + +/** + * A default cost estimator using past statistics. If past statistics of a workflow are available, the cost of a region + * is the execution time of its longest-running operator. Otherwise the cost is the number of materialized ports in the + * region. + */ +class DefaultCostEstimator( + workflowContext: WorkflowContext, + val actorId: ActorVirtualIdentity +) extends CostEstimator + with AmberLogging { + + // Requires mysql database to retrieve execution statistics, otherwise use number of materialized ports as a default. + private val operatorEstimatedTimeOption = { + if (StorageConfig.jdbcUsername.isEmpty) { + None + } else { + WorkflowExecutionsResource.getOperatorExecutionTimeInSeconds( + this.workflowContext.workflowId.id + ) + } + } + + operatorEstimatedTimeOption match { + case None => + logger.info( + s"WID: ${workflowContext.workflowId.id}, EID: ${workflowContext.executionId.id}, " + + s"no past execution statistics available. Using number of materialized output ports as the cost. " + ) + case Some(_) => + } + + override def estimate(region: Region, resourceUnits: Int): Double = { + this.operatorEstimatedTimeOption match { + case Some(operatorEstimatedTime) => + // Use past statistics (wall-clock runtime). We use the execution time of the longest-running + // operator in each region to represent the region's execution time, and use the sum of all the regions' + // execution time as the wall-clock runtime of the workflow. + // This assumes a schedule is a total-order of the regions. + val opExecutionTimes = region.getOperators.map(op => { + operatorEstimatedTime.getOrElse(op.id.logicalOpId.id, 1.0) + }) + val longestRunningOpExecutionTime = opExecutionTimes.max + longestRunningOpExecutionTime + case None => + // Without past statistics (e.g., first execution), we use number of materialized ports as the cost. + // This is independent of the schedule / resource allocator. + region.materializedPortIds.size + } + } +} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 0f5d4e55e2b..731b87ef703 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -120,6 +120,78 @@ object WorkflowExecutionsResource { idleTime: ULong, numWorkers: UInteger ) + + /** + * Retrieve the latest successful execution to get statistics to calculate costs in DefaultCostEstimator. + * Using the total control processing time plus data processing time of an operator as its cost. + * If no past statistics are available (e.g., first execution), return None. + */ + def getOperatorExecutionTimeInSeconds( + wid: Long + ): Option[Map[String, Double]] = { + val widAsUInteger = UInteger.valueOf(wid) + val latestSuccessfulExecution = context + .select( + WORKFLOW_EXECUTIONS.EID, + WORKFLOW_EXECUTIONS.VID, + field( + context + .select(USER.NAME) + .from(USER) + .where(WORKFLOW_EXECUTIONS.UID.eq(USER.UID)) + ), + WORKFLOW_EXECUTIONS.STATUS, + WORKFLOW_EXECUTIONS.RESULT, + WORKFLOW_EXECUTIONS.STARTING_TIME, + WORKFLOW_EXECUTIONS.LAST_UPDATE_TIME, + WORKFLOW_EXECUTIONS.BOOKMARKED, + WORKFLOW_EXECUTIONS.NAME, + WORKFLOW_EXECUTIONS.LOG_LOCATION + ) + .from(WORKFLOW_EXECUTIONS) + .join(WORKFLOW_VERSION) + .on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID)) + .where(WORKFLOW_VERSION.WID.eq(widAsUInteger).and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte))) + .orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc()) + .limit(1) + .fetchInto(classOf[WorkflowExecutionEntry]) + .asScala + .toList + .headOption + + if (latestSuccessfulExecution.isDefined) { + val eid = latestSuccessfulExecution.get.eId + val rawStats = context + .select( + WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID, + WORKFLOW_RUNTIME_STATISTICS.TIME, + WORKFLOW_RUNTIME_STATISTICS.DATA_PROCESSING_TIME, + WORKFLOW_RUNTIME_STATISTICS.CONTROL_PROCESSING_TIME + ) + .from(WORKFLOW_RUNTIME_STATISTICS) + .where( + WORKFLOW_RUNTIME_STATISTICS.WORKFLOW_ID + .eq(widAsUInteger) + .and(WORKFLOW_RUNTIME_STATISTICS.EXECUTION_ID.eq(eid)) + ) + .orderBy(WORKFLOW_RUNTIME_STATISTICS.TIME, WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID) + .fetchInto(classOf[WorkflowRuntimeStatistics]) + .asScala + .toList + if (rawStats.isEmpty) { + None + } else { + val cumulatedStats = rawStats.foldLeft(Map.empty[String, Double]) { (acc, stat) => + val opTotalExecutionTime = acc.getOrElse(stat.getOperatorId, 0.0) + acc + (stat.getOperatorId -> (opTotalExecutionTime + (stat.getDataProcessingTime + .doubleValue() + stat.getControlProcessingTime.doubleValue()) / 1e9)) + } + Some(cumulatedStats) + } + } else { + None + } + } } case class ExecutionGroupBookmarkRequest( From 9132fd0e6523a5b1ba75eef4b7c1923cc6e7a0a9 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Sat, 14 Dec 2024 21:46:57 -0800 Subject: [PATCH 2/9] Add one test case. --- .../scheduling/DefaultCostEstimatorSpec.scala | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala new file mode 100644 index 00000000000..da92d2900b2 --- /dev/null +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala @@ -0,0 +1,67 @@ +package edu.uci.ics.amber.engine.architecture.scheduling + +import edu.uci.ics.amber.core.workflow.WorkflowContext +import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER +import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow +import edu.uci.ics.amber.operator.TestOperators +import edu.uci.ics.amber.workflow.PortIdentity +import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.workflow.LogicalLink +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.flatspec.AnyFlatSpec + +class DefaultCostEstimatorSpec + extends AnyFlatSpec + with BeforeAndAfterAll + with BeforeAndAfterEach + with MockTexeraDB { + + override protected def beforeAll(): Unit = { + initializeDBAndReplaceDSLContext() + } + + it should "use fallback method when no past statistics are available" in { + + val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() + val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") + val sink = TestOperators.sinkOpDesc() + val workflow = buildWorkflow( + List(headerlessCsvOpDesc, keywordOpDesc, sink), + List( + LogicalLink( + headerlessCsvOpDesc.operatorIdentifier, + PortIdentity(0), + keywordOpDesc.operatorIdentifier, + PortIdentity(0) + ), + LogicalLink( + keywordOpDesc.operatorIdentifier, + PortIdentity(0), + sink.operatorIdentifier, + PortIdentity(0) + ) + ), + new WorkflowContext() + ) + + val costEstimator = new DefaultCostEstimator( + workflow.context, + CONTROLLER + ) + + val region = Region( + id = RegionIdentity(0), + physicalOps = workflow.physicalPlan.operators, + physicalLinks = workflow.physicalPlan.links + ) + + val costOfRegion = costEstimator.estimate(region, 1) + + assert(costOfRegion == 0) + } + + override protected def afterAll(): Unit = { + shutdownDB() + } + +} From 9ec5fab3db817a1d1b32206926b00dae00ed9ef4 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Sat, 14 Dec 2024 23:35:41 -0800 Subject: [PATCH 3/9] Add 2nd test case. --- .../scheduling/DefaultCostEstimatorSpec.scala | 131 +++++++++++++++++- 1 file changed, 126 insertions(+), 5 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala index da92d2900b2..c6ff6a2ea3c 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala @@ -4,9 +4,25 @@ import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow import edu.uci.ics.amber.operator.TestOperators +import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc +import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc +import edu.uci.ics.amber.operator.source.scan.csv.CSVScanSourceOpDesc import edu.uci.ics.amber.workflow.PortIdentity import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.web.model.jooq.generated.tables.daos.{ + WorkflowDao, + WorkflowExecutionsDao, + WorkflowRuntimeStatisticsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.web.model.jooq.generated.tables.pojos.{ + Workflow, + WorkflowExecutions, + WorkflowRuntimeStatistics, + WorkflowVersion +} import edu.uci.ics.texera.workflow.LogicalLink +import org.jooq.types.{UInteger, ULong} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.flatspec.AnyFlatSpec @@ -16,15 +32,71 @@ class DefaultCostEstimatorSpec with BeforeAndAfterEach with MockTexeraDB { + private val headerlessCsvOpDesc: CSVScanSourceOpDesc = + TestOperators.headerlessSmallCsvScanOpDesc() + private val keywordOpDesc: KeywordSearchOpDesc = + TestOperators.keywordSearchOpDesc("column-1", "Asia") + private val sink: ProgressiveSinkOpDesc = TestOperators.sinkOpDesc() + + private val testWorkflowEntry: Workflow = { + val workflow = new Workflow + workflow.setName("test workflow") + workflow.setWid(UInteger.valueOf(1)) + workflow + } + + private val testWorkflowVersionEntry: WorkflowVersion = { + val workflowVersion = new WorkflowVersion + workflowVersion.setWid(UInteger.valueOf(1)) + workflowVersion.setVid(UInteger.valueOf(1)) + workflowVersion + } + + private val testWorkflowExecutionEntry: WorkflowExecutions = { + val workflowExecution = new WorkflowExecutions + workflowExecution.setEid(UInteger.valueOf(1)) + workflowExecution.setVid(UInteger.valueOf(1)) + workflowExecution.setUid(UInteger.valueOf(1)) + workflowExecution.setStatus(3.toByte) + workflowExecution.setEnvironmentVersion("test engine") + workflowExecution + } + + private val headerlessCsvOpStatisticsEntry: WorkflowRuntimeStatistics = { + val workflowRuntimeStatistics = new WorkflowRuntimeStatistics + workflowRuntimeStatistics.setOperatorId(headerlessCsvOpDesc.operatorIdentifier.id) + workflowRuntimeStatistics.setWorkflowId(UInteger.valueOf(1)) + workflowRuntimeStatistics.setExecutionId(UInteger.valueOf(1)) + workflowRuntimeStatistics.setDataProcessingTime(ULong.valueOf(100)) + workflowRuntimeStatistics.setControlProcessingTime(ULong.valueOf(100)) + workflowRuntimeStatistics + } + + private val keywordOpDescStatisticsEntry: WorkflowRuntimeStatistics = { + val workflowRuntimeStatistics = new WorkflowRuntimeStatistics + workflowRuntimeStatistics.setOperatorId(headerlessCsvOpDesc.operatorIdentifier.id) + workflowRuntimeStatistics.setWorkflowId(UInteger.valueOf(1)) + workflowRuntimeStatistics.setExecutionId(UInteger.valueOf(1)) + workflowRuntimeStatistics.setDataProcessingTime(ULong.valueOf(300)) + workflowRuntimeStatistics.setControlProcessingTime(ULong.valueOf(300)) + workflowRuntimeStatistics + } + + private val sinkOpDescStatisticsEntry: WorkflowRuntimeStatistics = { + val workflowRuntimeStatistics = new WorkflowRuntimeStatistics + workflowRuntimeStatistics.setOperatorId(headerlessCsvOpDesc.operatorIdentifier.id) + workflowRuntimeStatistics.setWorkflowId(UInteger.valueOf(1)) + workflowRuntimeStatistics.setExecutionId(UInteger.valueOf(1)) + workflowRuntimeStatistics.setDataProcessingTime(ULong.valueOf(200)) + workflowRuntimeStatistics.setControlProcessingTime(ULong.valueOf(200)) + workflowRuntimeStatistics + } + override protected def beforeAll(): Unit = { initializeDBAndReplaceDSLContext() } - it should "use fallback method when no past statistics are available" in { - - val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() - val keywordOpDesc = TestOperators.keywordSearchOpDesc("column-1", "Asia") - val sink = TestOperators.sinkOpDesc() + "DefaultCostEstimator" should "use fallback method when no past statistics are available" in { val workflow = buildWorkflow( List(headerlessCsvOpDesc, keywordOpDesc, sink), List( @@ -60,6 +132,55 @@ class DefaultCostEstimatorSpec assert(costOfRegion == 0) } + "DefaultCostEstimator" should "use the latest successful execution to estimate cost when available" in { + val workflow = buildWorkflow( + List(headerlessCsvOpDesc, keywordOpDesc, sink), + List( + LogicalLink( + headerlessCsvOpDesc.operatorIdentifier, + PortIdentity(0), + keywordOpDesc.operatorIdentifier, + PortIdentity(0) + ), + LogicalLink( + keywordOpDesc.operatorIdentifier, + PortIdentity(0), + sink.operatorIdentifier, + PortIdentity(0) + ) + ), + new WorkflowContext() + ) + + val workflowDao = new WorkflowDao(getDSLContext.configuration()) + val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) + val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val workflowRuntimeStatisticsDao = + new WorkflowRuntimeStatisticsDao(getDSLContext.configuration()) + + workflowDao.insert(testWorkflowEntry) + workflowVersionDao.insert(testWorkflowVersionEntry) + workflowExecutionsDao.insert(testWorkflowExecutionEntry) + workflowRuntimeStatisticsDao.insert(headerlessCsvOpStatisticsEntry) + workflowRuntimeStatisticsDao.insert(keywordOpDescStatisticsEntry) + workflowRuntimeStatisticsDao.insert(sinkOpDescStatisticsEntry) + + val costEstimator = new DefaultCostEstimator( + workflow.context, + CONTROLLER + ) + + val region = Region( + id = RegionIdentity(0), + physicalOps = workflow.physicalPlan.operators, + physicalLinks = workflow.physicalPlan.links + ) + + val costOfRegion = costEstimator.estimate(region, 1) + + assert(costOfRegion != 0) + } + override protected def afterAll(): Unit = { shutdownDB() } From 98a212346d8c862a275d604a39bea69699433b6b Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Sat, 14 Dec 2024 23:49:46 -0800 Subject: [PATCH 4/9] fix test. --- .../architecture/scheduling/DefaultCostEstimatorSpec.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala index c6ff6a2ea3c..5810e71e6e2 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala @@ -42,6 +42,8 @@ class DefaultCostEstimatorSpec val workflow = new Workflow workflow.setName("test workflow") workflow.setWid(UInteger.valueOf(1)) + workflow.setContent("test workflow content") + workflow.setDescription("test description") workflow } @@ -49,6 +51,7 @@ class DefaultCostEstimatorSpec val workflowVersion = new WorkflowVersion workflowVersion.setWid(UInteger.valueOf(1)) workflowVersion.setVid(UInteger.valueOf(1)) + workflowVersion.setContent("test version content") workflowVersion } @@ -74,7 +77,7 @@ class DefaultCostEstimatorSpec private val keywordOpDescStatisticsEntry: WorkflowRuntimeStatistics = { val workflowRuntimeStatistics = new WorkflowRuntimeStatistics - workflowRuntimeStatistics.setOperatorId(headerlessCsvOpDesc.operatorIdentifier.id) + workflowRuntimeStatistics.setOperatorId(keywordOpDesc.operatorIdentifier.id) workflowRuntimeStatistics.setWorkflowId(UInteger.valueOf(1)) workflowRuntimeStatistics.setExecutionId(UInteger.valueOf(1)) workflowRuntimeStatistics.setDataProcessingTime(ULong.valueOf(300)) @@ -84,7 +87,7 @@ class DefaultCostEstimatorSpec private val sinkOpDescStatisticsEntry: WorkflowRuntimeStatistics = { val workflowRuntimeStatistics = new WorkflowRuntimeStatistics - workflowRuntimeStatistics.setOperatorId(headerlessCsvOpDesc.operatorIdentifier.id) + workflowRuntimeStatistics.setOperatorId(sink.operatorIdentifier.id) workflowRuntimeStatistics.setWorkflowId(UInteger.valueOf(1)) workflowRuntimeStatistics.setExecutionId(UInteger.valueOf(1)) workflowRuntimeStatistics.setDataProcessingTime(ULong.valueOf(200)) From 43b790d47a536524a884efe2b7c3912f7937c498 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Sat, 14 Dec 2024 23:58:38 -0800 Subject: [PATCH 5/9] fix test. --- .../scheduling/DefaultCostEstimatorSpec.scala | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala index 5810e71e6e2..88fe7e8c827 100644 --- a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala @@ -9,13 +9,16 @@ import edu.uci.ics.amber.operator.sink.managed.ProgressiveSinkOpDesc import edu.uci.ics.amber.operator.source.scan.csv.CSVScanSourceOpDesc import edu.uci.ics.amber.workflow.PortIdentity import edu.uci.ics.texera.dao.MockTexeraDB -import edu.uci.ics.texera.web.model.jooq.generated.tables.daos.{ +import edu.uci.ics.texera.dao.jooq.generated.enums.UserRole +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, WorkflowDao, WorkflowExecutionsDao, WorkflowRuntimeStatisticsDao, WorkflowVersionDao } -import edu.uci.ics.texera.web.model.jooq.generated.tables.pojos.{ +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, Workflow, WorkflowExecutions, WorkflowRuntimeStatistics, @@ -38,6 +41,16 @@ class DefaultCostEstimatorSpec TestOperators.keywordSearchOpDesc("column-1", "Asia") private val sink: ProgressiveSinkOpDesc = TestOperators.sinkOpDesc() + private val testUser: User = { + val user = new User + user.setUid(UInteger.valueOf(1)) + user.setName("test_user") + user.setRole(UserRole.ADMIN) + user.setPassword("123") + user.setEmail("test_user@test.com") + user + } + private val testWorkflowEntry: Workflow = { val workflow = new Workflow workflow.setName("test workflow") @@ -155,12 +168,14 @@ class DefaultCostEstimatorSpec new WorkflowContext() ) + val userDao = new UserDao(getDSLContext.configuration()) val workflowDao = new WorkflowDao(getDSLContext.configuration()) val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) val workflowRuntimeStatisticsDao = new WorkflowRuntimeStatisticsDao(getDSLContext.configuration()) + userDao.insert(testUser) workflowDao.insert(testWorkflowEntry) workflowVersionDao.insert(testWorkflowVersionEntry) workflowExecutionsDao.insert(testWorkflowExecutionEntry) From 3e9acdc86eb29a423cc5630b6feeec78b4123ce1 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Sun, 15 Dec 2024 00:23:57 -0800 Subject: [PATCH 6/9] fix test. --- .../architecture/scheduling/CostEstimator.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala index 0dfaa7d3356..3b521686bb5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala @@ -25,15 +25,10 @@ class DefaultCostEstimator( with AmberLogging { // Requires mysql database to retrieve execution statistics, otherwise use number of materialized ports as a default. - private val operatorEstimatedTimeOption = { - if (StorageConfig.jdbcUsername.isEmpty) { - None - } else { - WorkflowExecutionsResource.getOperatorExecutionTimeInSeconds( - this.workflowContext.workflowId.id - ) - } - } + private val operatorEstimatedTimeOption = + WorkflowExecutionsResource.getOperatorExecutionTimeInSeconds( + this.workflowContext.workflowId.id + ) operatorEstimatedTimeOption match { case None => From 9acbde41415947b445f47ff1c0ba8a1f875a5a73 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Sun, 15 Dec 2024 00:47:21 -0800 Subject: [PATCH 7/9] Move stats inside CostEstimator --- .../scheduling/CostEstimator.scala | 106 +++++++++++++++++- 1 file changed, 101 insertions(+), 5 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala index 3b521686bb5..e22dd5d48a4 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala @@ -4,7 +4,20 @@ import edu.uci.ics.amber.core.storage.StorageConfig import edu.uci.ics.amber.core.workflow.WorkflowContext import edu.uci.ics.amber.engine.common.AmberLogging import edu.uci.ics.amber.virtualidentity.ActorVirtualIdentity -import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource +import edu.uci.ics.texera.dao.SqlServer +import edu.uci.ics.texera.dao.SqlServer.withTransaction +import edu.uci.ics.texera.web.model.jooq.generated.Tables.{ + USER, + WORKFLOW_EXECUTIONS, + WORKFLOW_RUNTIME_STATISTICS, + WORKFLOW_VERSION +} +import edu.uci.ics.texera.web.model.jooq.generated.tables.pojos.WorkflowRuntimeStatistics +import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.WorkflowExecutionEntry +import org.jooq.impl.DSL.field +import org.jooq.types.UInteger + +import scala.jdk.CollectionConverters.ListHasAsScala /** * A cost estimator should estimate a cost of running a region under the given resource constraints as units. @@ -25,10 +38,9 @@ class DefaultCostEstimator( with AmberLogging { // Requires mysql database to retrieve execution statistics, otherwise use number of materialized ports as a default. - private val operatorEstimatedTimeOption = - WorkflowExecutionsResource.getOperatorExecutionTimeInSeconds( - this.workflowContext.workflowId.id - ) + private val operatorEstimatedTimeOption = getOperatorExecutionTimeInSeconds( + this.workflowContext.workflowId.id + ) operatorEstimatedTimeOption match { case None => @@ -57,4 +69,88 @@ class DefaultCostEstimator( region.materializedPortIds.size } } + + /** + * Retrieve the latest successful execution to get statistics to calculate costs in DefaultCostEstimator. + * Using the total control processing time plus data processing time of an operator as its cost. + * If no past statistics are available (e.g., first execution), return None. + */ + private def getOperatorExecutionTimeInSeconds( + wid: Long + ): Option[Map[String, Double]] = { + + val operatorEstimatedTimeOption = withTransaction( + SqlServer + .getInstance( + StorageConfig.jdbcUrl, + StorageConfig.jdbcUsername, + StorageConfig.jdbcPassword + ) + .createDSLContext() + ) { context => + val widAsUInteger = UInteger.valueOf(wid) + val latestSuccessfulExecution = context + .select( + WORKFLOW_EXECUTIONS.EID, + WORKFLOW_EXECUTIONS.VID, + field( + context + .select(USER.NAME) + .from(USER) + .where(WORKFLOW_EXECUTIONS.UID.eq(USER.UID)) + ), + WORKFLOW_EXECUTIONS.STATUS, + WORKFLOW_EXECUTIONS.RESULT, + WORKFLOW_EXECUTIONS.STARTING_TIME, + WORKFLOW_EXECUTIONS.LAST_UPDATE_TIME, + WORKFLOW_EXECUTIONS.BOOKMARKED, + WORKFLOW_EXECUTIONS.NAME, + WORKFLOW_EXECUTIONS.LOG_LOCATION + ) + .from(WORKFLOW_EXECUTIONS) + .join(WORKFLOW_VERSION) + .on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID)) + .where(WORKFLOW_VERSION.WID.eq(widAsUInteger).and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte))) + .orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc()) + .limit(1) + .fetchInto(classOf[WorkflowExecutionEntry]) + .asScala + .toList + .headOption + + if (latestSuccessfulExecution.isDefined) { + val eid = latestSuccessfulExecution.get.eId + val rawStats = context + .select( + WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID, + WORKFLOW_RUNTIME_STATISTICS.TIME, + WORKFLOW_RUNTIME_STATISTICS.DATA_PROCESSING_TIME, + WORKFLOW_RUNTIME_STATISTICS.CONTROL_PROCESSING_TIME + ) + .from(WORKFLOW_RUNTIME_STATISTICS) + .where( + WORKFLOW_RUNTIME_STATISTICS.WORKFLOW_ID + .eq(widAsUInteger) + .and(WORKFLOW_RUNTIME_STATISTICS.EXECUTION_ID.eq(eid)) + ) + .orderBy(WORKFLOW_RUNTIME_STATISTICS.TIME, WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID) + .fetchInto(classOf[WorkflowRuntimeStatistics]) + .asScala + .toList + if (rawStats.isEmpty) { + None + } else { + val cumulatedStats = rawStats.foldLeft(Map.empty[String, Double]) { (acc, stat) => + val opTotalExecutionTime = acc.getOrElse(stat.getOperatorId, 0.0) + acc + (stat.getOperatorId -> (opTotalExecutionTime + (stat.getDataProcessingTime + .doubleValue() + stat.getControlProcessingTime.doubleValue()) / 1e9)) + } + Some(cumulatedStats) + } + } else { + None + } + } + operatorEstimatedTimeOption + } } From 577eedb6be8c14352a8c7e05d7eb69434190bbb6 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Sun, 15 Dec 2024 01:26:14 -0800 Subject: [PATCH 8/9] Fix; revert WorkflowExecutionsResource --- .../scheduling/CostEstimator.scala | 12 +++- .../workflow/WorkflowExecutionsResource.scala | 72 ------------------- 2 files changed, 9 insertions(+), 75 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala index e22dd5d48a4..4a63173e693 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala @@ -18,6 +18,7 @@ import org.jooq.impl.DSL.field import org.jooq.types.UInteger import scala.jdk.CollectionConverters.ListHasAsScala +import scala.util.{Failure, Success, Try} /** * A cost estimator should estimate a cost of running a region under the given resource constraints as units. @@ -38,9 +39,14 @@ class DefaultCostEstimator( with AmberLogging { // Requires mysql database to retrieve execution statistics, otherwise use number of materialized ports as a default. - private val operatorEstimatedTimeOption = getOperatorExecutionTimeInSeconds( - this.workflowContext.workflowId.id - ) + private val operatorEstimatedTimeOption = Try( + this.getOperatorExecutionTimeInSeconds( + this.workflowContext.workflowId.id + ) + ) match { + case Failure(_) => None + case Success(result) => result + } operatorEstimatedTimeOption match { case None => diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 731b87ef703..0f5d4e55e2b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -120,78 +120,6 @@ object WorkflowExecutionsResource { idleTime: ULong, numWorkers: UInteger ) - - /** - * Retrieve the latest successful execution to get statistics to calculate costs in DefaultCostEstimator. - * Using the total control processing time plus data processing time of an operator as its cost. - * If no past statistics are available (e.g., first execution), return None. - */ - def getOperatorExecutionTimeInSeconds( - wid: Long - ): Option[Map[String, Double]] = { - val widAsUInteger = UInteger.valueOf(wid) - val latestSuccessfulExecution = context - .select( - WORKFLOW_EXECUTIONS.EID, - WORKFLOW_EXECUTIONS.VID, - field( - context - .select(USER.NAME) - .from(USER) - .where(WORKFLOW_EXECUTIONS.UID.eq(USER.UID)) - ), - WORKFLOW_EXECUTIONS.STATUS, - WORKFLOW_EXECUTIONS.RESULT, - WORKFLOW_EXECUTIONS.STARTING_TIME, - WORKFLOW_EXECUTIONS.LAST_UPDATE_TIME, - WORKFLOW_EXECUTIONS.BOOKMARKED, - WORKFLOW_EXECUTIONS.NAME, - WORKFLOW_EXECUTIONS.LOG_LOCATION - ) - .from(WORKFLOW_EXECUTIONS) - .join(WORKFLOW_VERSION) - .on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID)) - .where(WORKFLOW_VERSION.WID.eq(widAsUInteger).and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte))) - .orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc()) - .limit(1) - .fetchInto(classOf[WorkflowExecutionEntry]) - .asScala - .toList - .headOption - - if (latestSuccessfulExecution.isDefined) { - val eid = latestSuccessfulExecution.get.eId - val rawStats = context - .select( - WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID, - WORKFLOW_RUNTIME_STATISTICS.TIME, - WORKFLOW_RUNTIME_STATISTICS.DATA_PROCESSING_TIME, - WORKFLOW_RUNTIME_STATISTICS.CONTROL_PROCESSING_TIME - ) - .from(WORKFLOW_RUNTIME_STATISTICS) - .where( - WORKFLOW_RUNTIME_STATISTICS.WORKFLOW_ID - .eq(widAsUInteger) - .and(WORKFLOW_RUNTIME_STATISTICS.EXECUTION_ID.eq(eid)) - ) - .orderBy(WORKFLOW_RUNTIME_STATISTICS.TIME, WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID) - .fetchInto(classOf[WorkflowRuntimeStatistics]) - .asScala - .toList - if (rawStats.isEmpty) { - None - } else { - val cumulatedStats = rawStats.foldLeft(Map.empty[String, Double]) { (acc, stat) => - val opTotalExecutionTime = acc.getOrElse(stat.getOperatorId, 0.0) - acc + (stat.getOperatorId -> (opTotalExecutionTime + (stat.getDataProcessingTime - .doubleValue() + stat.getControlProcessingTime.doubleValue()) / 1e9)) - } - Some(cumulatedStats) - } - } else { - None - } - } } case class ExecutionGroupBookmarkRequest( From 2114d5f45b7582f5239fa69d59311229f491e358 Mon Sep 17 00:00:00 2001 From: Xiao-zhen-Liu Date: Thu, 19 Dec 2024 07:15:10 +0800 Subject: [PATCH 9/9] fix import. --- .../amber/engine/architecture/scheduling/CostEstimator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala index 4a63173e693..233df4a8fa0 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala @@ -6,13 +6,13 @@ import edu.uci.ics.amber.engine.common.AmberLogging import edu.uci.ics.amber.virtualidentity.ActorVirtualIdentity import edu.uci.ics.texera.dao.SqlServer import edu.uci.ics.texera.dao.SqlServer.withTransaction -import edu.uci.ics.texera.web.model.jooq.generated.Tables.{ +import edu.uci.ics.texera.dao.jooq.generated.Tables.{ USER, WORKFLOW_EXECUTIONS, WORKFLOW_RUNTIME_STATISTICS, WORKFLOW_VERSION } -import edu.uci.ics.texera.web.model.jooq.generated.tables.pojos.WorkflowRuntimeStatistics +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowRuntimeStatistics import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.WorkflowExecutionEntry import org.jooq.impl.DSL.field import org.jooq.types.UInteger