diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala index ca2c555363..bf0ae0bd7e 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala @@ -34,6 +34,9 @@ class CostBasedScheduleGenerator( numStatesExplored: Int = 0 ) + private val costEstimator = + new DefaultCostEstimator(workflowContext = workflowContext, actorId = actorId) + def generate(): (Schedule, PhysicalPlan) = { val startTime = System.nanoTime() val regionDAG = createRegionDAG() @@ -281,7 +284,9 @@ class CostBasedScheduleGenerator( if (oEarlyStop) schedulableStates.add(currentState) // Calculate the current state's cost and update the bestResult if it's lower val cost = - evaluate(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet) + evaluate( + RegionPlan(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet) + ) if (cost < bestResult.cost) { bestResult = SearchResult(currentState, regionDAG, cost) } @@ -334,7 +339,12 @@ class CostBasedScheduleGenerator( physicalPlan.getNonMaterializedBlockingAndDependeeLinks ++ neighborState ) match { case Left(regionDAG) => - evaluate(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet) + evaluate( + RegionPlan( + regionDAG.vertexSet().asScala.toSet, + regionDAG.edgeSet().asScala.toSet + ) + ) case Right(_) => Double.MaxValue } @@ -423,7 +433,9 @@ class CostBasedScheduleGenerator( def updateOptimumIfApplicable(regionDAG: DirectedAcyclicGraph[Region, RegionLink]): Unit = { // Calculate the current state's cost and update the bestResult if it's lower val cost = - evaluate(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet) + evaluate( + RegionPlan(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet) + ) if (cost < bestResult.cost) { bestResult = SearchResult(currentState, regionDAG, cost) } @@ -453,7 +465,12 @@ class CostBasedScheduleGenerator( physicalPlan.getNonMaterializedBlockingAndDependeeLinks ++ neighborState ) match { case Left(regionDAG) => - evaluate(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet) + evaluate( + RegionPlan( + regionDAG.vertexSet().asScala.toSet, + regionDAG.edgeSet().asScala.toSet + ) + ) case Right(_) => Double.MaxValue } @@ -472,17 +489,16 @@ class CostBasedScheduleGenerator( } /** - * The cost function used by the search. Takes in a region graph represented as set of regions and links. + * The cost function used by the search. Takes a region plan, generates one or more (to be done in the future) + * schedules based on the region plan, and calculates the cost of the schedule(s) using Cost Estimator. Uses the cost + * of the best schedule (currently only considers one schedule) as the cost of the region plan. * - * @param regions A set of regions created based on a search state. - * @param regionLinks A set of links to indicate dependencies between regions, based on the materialization edges. - * @return A cost determined by the resource allocator. + * @return A cost determined by the cost estimator. */ - private def evaluate(regions: Set[Region], regionLinks: Set[RegionLink]): Double = { - // Using number of materialized ports as the cost. - // This is independent of the schedule / resource allocator. - // In the future we may need to use the ResourceAllocator to get the cost. - regions.flatMap(_.materializedPortIds).size + private def evaluate(regionPlan: RegionPlan): Double = { + val schedule = generateScheduleFromRegionPlan(regionPlan) + // In the future we may allow multiple regions in a level and split the resources. + schedule.map(level => level.map(region => costEstimator.estimate(region, 1)).sum).sum } } diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala new file mode 100644 index 0000000000..ac23284305 --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/CostEstimator.scala @@ -0,0 +1,148 @@ +package edu.uci.ics.amber.engine.architecture.scheduling + +import edu.uci.ics.amber.core.storage.StorageConfig +import edu.uci.ics.amber.core.workflow.WorkflowContext +import edu.uci.ics.amber.engine.architecture.scheduling.DefaultCostEstimator.DEFAULT_OPERATOR_COST +import edu.uci.ics.amber.engine.common.AmberLogging +import edu.uci.ics.amber.virtualidentity.ActorVirtualIdentity +import edu.uci.ics.texera.dao.SqlServer +import edu.uci.ics.texera.dao.SqlServer.withTransaction +import edu.uci.ics.texera.dao.jooq.generated.Tables.{ + WORKFLOW_EXECUTIONS, + WORKFLOW_RUNTIME_STATISTICS, + WORKFLOW_VERSION +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowRuntimeStatistics +import org.jooq.types.UInteger + +import scala.jdk.CollectionConverters.ListHasAsScala +import scala.util.{Failure, Success, Try} + +/** + * A cost estimator should estimate a cost of running a region under the given resource constraints as units. + */ +trait CostEstimator { + def estimate(region: Region, resourceUnits: Int): Double +} + +object DefaultCostEstimator { + val DEFAULT_OPERATOR_COST: Double = 1.0 +} + +/** + * A default cost estimator using past statistics. If past statistics of a workflow are available, the cost of a region + * is the execution time of its longest-running operator. Otherwise the cost is the number of materialized ports in the + * region. + */ +class DefaultCostEstimator( + workflowContext: WorkflowContext, + val actorId: ActorVirtualIdentity +) extends CostEstimator + with AmberLogging { + + // Requires mysql database to retrieve execution statistics, otherwise use number of materialized ports as a default. + private val operatorEstimatedTimeOption = Try( + this.getOperatorExecutionTimeInSeconds( + this.workflowContext.workflowId.id + ) + ) match { + case Failure(_) => None + case Success(result) => result + } + + operatorEstimatedTimeOption match { + case None => + logger.info( + s"WID: ${workflowContext.workflowId.id}, EID: ${workflowContext.executionId.id}, " + + s"no past execution statistics available. Using number of materialized output ports as the cost. " + ) + case Some(_) => + } + + override def estimate(region: Region, resourceUnits: Int): Double = { + this.operatorEstimatedTimeOption match { + case Some(operatorEstimatedTime) => + // Use past statistics (wall-clock runtime). We use the execution time of the longest-running + // operator in each region to represent the region's execution time, and use the sum of all the regions' + // execution time as the wall-clock runtime of the workflow. + // This assumes a schedule is a total-order of the regions. + val opExecutionTimes = region.getOperators.map(op => { + operatorEstimatedTime.getOrElse(op.id.logicalOpId.id, DEFAULT_OPERATOR_COST) + }) + val longestRunningOpExecutionTime = opExecutionTimes.max + longestRunningOpExecutionTime + case None => + // Without past statistics (e.g., first execution), we use number of materialized ports as the cost. + // This is independent of the schedule / resource allocator. + region.materializedPortIds.size + } + } + + /** + * Retrieve the latest successful execution to get statistics to calculate costs in DefaultCostEstimator. + * Using the total control processing time plus data processing time of an operator as its cost. + * If no past statistics are available (e.g., first execution), return None. + */ + private def getOperatorExecutionTimeInSeconds( + wid: Long + ): Option[Map[String, Double]] = { + + val operatorEstimatedTimeOption = withTransaction( + SqlServer + .getInstance( + StorageConfig.jdbcUrl, + StorageConfig.jdbcUsername, + StorageConfig.jdbcPassword + ) + .createDSLContext() + ) { context => + val widAsUInteger = UInteger.valueOf(wid) + val rawStats = context + .select( + WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID, + WORKFLOW_RUNTIME_STATISTICS.TIME, + WORKFLOW_RUNTIME_STATISTICS.DATA_PROCESSING_TIME, + WORKFLOW_RUNTIME_STATISTICS.CONTROL_PROCESSING_TIME, + WORKFLOW_RUNTIME_STATISTICS.EXECUTION_ID + ) + .from(WORKFLOW_RUNTIME_STATISTICS) + .where( + WORKFLOW_RUNTIME_STATISTICS.WORKFLOW_ID + .eq(widAsUInteger) + .and( + WORKFLOW_RUNTIME_STATISTICS.EXECUTION_ID.eq( + context + .select( + WORKFLOW_EXECUTIONS.EID + ) + .from(WORKFLOW_EXECUTIONS) + .join(WORKFLOW_VERSION) + .on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID)) + .where( + WORKFLOW_VERSION.WID + .eq(widAsUInteger) + .and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte)) + ) + .orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc()) + .limit(1) + ) + ) + ) + .orderBy(WORKFLOW_RUNTIME_STATISTICS.TIME, WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID) + .fetchInto(classOf[WorkflowRuntimeStatistics]) + .asScala + .toList + if (rawStats.isEmpty) { + None + } else { + val cumulatedStats = rawStats.foldLeft(Map.empty[String, Double]) { (acc, stat) => + val opTotalExecutionTime = acc.getOrElse(stat.getOperatorId, 0.0) + acc + (stat.getOperatorId -> (opTotalExecutionTime + (stat.getDataProcessingTime + .doubleValue() + stat.getControlProcessingTime.doubleValue()) / 1e9)) + } + Some(cumulatedStats) + } + } + operatorEstimatedTimeOption + } +} diff --git a/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala new file mode 100644 index 0000000000..da6bcfd1e9 --- /dev/null +++ b/core/amber/src/test/scala/edu/uci/ics/amber/engine/architecture/scheduling/DefaultCostEstimatorSpec.scala @@ -0,0 +1,272 @@ +package edu.uci.ics.amber.engine.architecture.scheduling + +import edu.uci.ics.amber.core.workflow.WorkflowContext +import edu.uci.ics.amber.engine.architecture.scheduling.DefaultCostEstimator.DEFAULT_OPERATOR_COST +import edu.uci.ics.amber.engine.common.virtualidentity.util.CONTROLLER +import edu.uci.ics.amber.engine.e2e.TestUtils.buildWorkflow +import edu.uci.ics.amber.operator.TestOperators +import edu.uci.ics.amber.operator.aggregate.{AggregateOpDesc, AggregationFunction} +import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc +import edu.uci.ics.amber.operator.source.scan.csv.CSVScanSourceOpDesc +import edu.uci.ics.amber.workflow.PortIdentity +import edu.uci.ics.texera.dao.MockTexeraDB +import edu.uci.ics.texera.dao.jooq.generated.enums.UserRole +import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{ + UserDao, + WorkflowDao, + WorkflowExecutionsDao, + WorkflowRuntimeStatisticsDao, + WorkflowVersionDao +} +import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{ + User, + Workflow, + WorkflowExecutions, + WorkflowRuntimeStatistics, + WorkflowVersion +} +import edu.uci.ics.texera.workflow.LogicalLink +import org.jooq.types.{UInteger, ULong} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.flatspec.AnyFlatSpec + +import scala.jdk.CollectionConverters.CollectionHasAsScala + +class DefaultCostEstimatorSpec + extends AnyFlatSpec + with BeforeAndAfterAll + with BeforeAndAfterEach + with MockTexeraDB { + + private val headerlessCsvOpDesc: CSVScanSourceOpDesc = + TestOperators.headerlessSmallCsvScanOpDesc() + private val keywordOpDesc: KeywordSearchOpDesc = + TestOperators.keywordSearchOpDesc("column-1", "Asia") + private val groupByOpDesc: AggregateOpDesc = + TestOperators.aggregateAndGroupByDesc("column-1", AggregationFunction.COUNT, List[String]()) + + private val testUser: User = { + val user = new User + user.setUid(UInteger.valueOf(1)) + user.setName("test_user") + user.setRole(UserRole.ADMIN) + user.setPassword("123") + user.setEmail("test_user@test.com") + user + } + + private val testWorkflowEntry: Workflow = { + val workflow = new Workflow + workflow.setName("test workflow") + workflow.setWid(UInteger.valueOf(1)) + workflow.setContent("test workflow content") + workflow.setDescription("test description") + workflow + } + + private val testWorkflowVersionEntry: WorkflowVersion = { + val workflowVersion = new WorkflowVersion + workflowVersion.setWid(UInteger.valueOf(1)) + workflowVersion.setVid(UInteger.valueOf(1)) + workflowVersion.setContent("test version content") + workflowVersion + } + + private val testWorkflowExecutionEntry: WorkflowExecutions = { + val workflowExecution = new WorkflowExecutions + workflowExecution.setEid(UInteger.valueOf(1)) + workflowExecution.setVid(UInteger.valueOf(1)) + workflowExecution.setUid(UInteger.valueOf(1)) + workflowExecution.setStatus(3.toByte) + workflowExecution.setEnvironmentVersion("test engine") + workflowExecution + } + + private val headerlessCsvOpStatisticsEntry: WorkflowRuntimeStatistics = { + val workflowRuntimeStatistics = new WorkflowRuntimeStatistics + workflowRuntimeStatistics.setOperatorId(headerlessCsvOpDesc.operatorIdentifier.id) + workflowRuntimeStatistics.setWorkflowId(UInteger.valueOf(1)) + workflowRuntimeStatistics.setExecutionId(UInteger.valueOf(1)) + workflowRuntimeStatistics.setDataProcessingTime(ULong.valueOf(100)) + workflowRuntimeStatistics.setControlProcessingTime(ULong.valueOf(100)) + workflowRuntimeStatistics + } + + private val keywordOpDescStatisticsEntry: WorkflowRuntimeStatistics = { + val workflowRuntimeStatistics = new WorkflowRuntimeStatistics + workflowRuntimeStatistics.setOperatorId(keywordOpDesc.operatorIdentifier.id) + workflowRuntimeStatistics.setWorkflowId(UInteger.valueOf(1)) + workflowRuntimeStatistics.setExecutionId(UInteger.valueOf(1)) + workflowRuntimeStatistics.setDataProcessingTime(ULong.valueOf(300)) + workflowRuntimeStatistics.setControlProcessingTime(ULong.valueOf(300)) + workflowRuntimeStatistics + } + + private val groupByOpDescStatisticsEntry: WorkflowRuntimeStatistics = { + val workflowRuntimeStatistics = new WorkflowRuntimeStatistics + workflowRuntimeStatistics.setOperatorId(groupByOpDesc.operatorIdentifier.id) + workflowRuntimeStatistics.setWorkflowId(UInteger.valueOf(1)) + workflowRuntimeStatistics.setExecutionId(UInteger.valueOf(1)) + workflowRuntimeStatistics.setDataProcessingTime(ULong.valueOf(1000)) + workflowRuntimeStatistics.setControlProcessingTime(ULong.valueOf(1000)) + workflowRuntimeStatistics + } + + override protected def beforeEach(): Unit = { + initializeDBAndReplaceDSLContext() + } + + "DefaultCostEstimator" should "use fallback method when no past statistics are available" in { + val workflow = buildWorkflow( + List(headerlessCsvOpDesc, keywordOpDesc), + List( + LogicalLink( + headerlessCsvOpDesc.operatorIdentifier, + PortIdentity(0), + keywordOpDesc.operatorIdentifier, + PortIdentity(0) + ) + ), + new WorkflowContext() + ) + + val costEstimator = new DefaultCostEstimator( + workflow.context, + CONTROLLER + ) + + val region = Region( + id = RegionIdentity(0), + physicalOps = workflow.physicalPlan.operators, + physicalLinks = workflow.physicalPlan.links + ) + + val costOfRegion = costEstimator.estimate(region, 1) + + assert(costOfRegion == 0) + } + + "DefaultCostEstimator" should "use the latest successful execution to estimate cost when available" in { + val workflow = buildWorkflow( + List(headerlessCsvOpDesc, keywordOpDesc), + List( + LogicalLink( + headerlessCsvOpDesc.operatorIdentifier, + PortIdentity(0), + keywordOpDesc.operatorIdentifier, + PortIdentity(0) + ) + ), + new WorkflowContext() + ) + + val userDao = new UserDao(getDSLContext.configuration()) + val workflowDao = new WorkflowDao(getDSLContext.configuration()) + val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) + val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val workflowRuntimeStatisticsDao = + new WorkflowRuntimeStatisticsDao(getDSLContext.configuration()) + + userDao.insert(testUser) + workflowDao.insert(testWorkflowEntry) + workflowVersionDao.insert(testWorkflowVersionEntry) + workflowExecutionsDao.insert(testWorkflowExecutionEntry) + workflowRuntimeStatisticsDao.insert(headerlessCsvOpStatisticsEntry) + workflowRuntimeStatisticsDao.insert(keywordOpDescStatisticsEntry) + + val costEstimator = new DefaultCostEstimator( + workflow.context, + CONTROLLER + ) + + val region = Region( + id = RegionIdentity(0), + physicalOps = workflow.physicalPlan.operators, + physicalLinks = workflow.physicalPlan.links + ) + + val costOfRegion = costEstimator.estimate(region, 1) + + assert(costOfRegion != 0) + } + + "DefaultCostEstimator" should "use correctly estimate costs in a search" in { + val workflow = buildWorkflow( + List(headerlessCsvOpDesc, groupByOpDesc, keywordOpDesc), + List( + LogicalLink( + headerlessCsvOpDesc.operatorIdentifier, + PortIdentity(0), + groupByOpDesc.operatorIdentifier, + PortIdentity(0) + ), + LogicalLink( + groupByOpDesc.operatorIdentifier, + PortIdentity(0), + keywordOpDesc.operatorIdentifier, + PortIdentity(0) + ) + ), + new WorkflowContext() + ) + + val userDao = new UserDao(getDSLContext.configuration()) + val workflowDao = new WorkflowDao(getDSLContext.configuration()) + val workflowExecutionsDao = new WorkflowExecutionsDao(getDSLContext.configuration()) + val workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration()) + val workflowRuntimeStatisticsDao = + new WorkflowRuntimeStatisticsDao(getDSLContext.configuration()) + + userDao.insert(testUser) + workflowDao.insert(testWorkflowEntry) + workflowVersionDao.insert(testWorkflowVersionEntry) + workflowExecutionsDao.insert(testWorkflowExecutionEntry) + workflowRuntimeStatisticsDao.insert(headerlessCsvOpStatisticsEntry) + workflowRuntimeStatisticsDao.insert(groupByOpDescStatisticsEntry) + workflowRuntimeStatisticsDao.insert(keywordOpDescStatisticsEntry) + + // Should contain two regions, one with CSV->localAgg->globalAgg, another with keyword->sink + val searchResult = new CostBasedScheduleGenerator( + workflow.context, + workflow.physicalPlan, + CONTROLLER + ).bottomUpSearch() + + val groupByRegion = + searchResult.regionDAG.vertexSet().asScala.filter(region => region.physicalOps.size == 3).head + val keywordRegion = + searchResult.regionDAG.vertexSet().asScala.filter(region => region.physicalOps.size == 2).head + + val costEstimator = new DefaultCostEstimator( + workflow.context, + CONTROLLER + ) + + val groupByRegionCost = costEstimator.estimate(groupByRegion, 1) + + val groupByOperatorCost = (groupByOpDescStatisticsEntry.getControlProcessingTime + .doubleValue() + groupByOpDescStatisticsEntry.getControlProcessingTime.doubleValue()) / 1e9 + + // The cost of the first region should be the cost of the GroupBy operator (note the two physical operators for + // the GroupBy logical operator have the same cost because we use logical operator in the statistics. + // The GroupBy operator has a longer running time. + assert(groupByRegionCost == groupByOperatorCost) + + val keywordRegionCost = costEstimator.estimate(keywordRegion, 1) + + val keywordOperatorCost = (keywordOpDescStatisticsEntry.getControlProcessingTime + .doubleValue() + keywordOpDescStatisticsEntry.getControlProcessingTime.doubleValue()) / 1e9 + + // The cost of the second region should be the cost of the keyword operator, since the sink operator has the same + // logical operator as the keyword operator. + assert(keywordRegionCost == keywordOperatorCost) + + // The cost of the region plan should be the sum of region costs + assert(searchResult.cost == groupByRegionCost + keywordRegionCost) + } + + override protected def afterEach(): Unit = { + shutdownDB() + } + +}