Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cost Estimator Using Past Statistics for Schedule Generator #3156

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class CostBasedScheduleGenerator(
numStatesExplored: Int = 0
)

private val costEstimator =
new DefaultCostEstimator(workflowContext = workflowContext, actorId = actorId)

def generate(): (Schedule, PhysicalPlan) = {
val startTime = System.nanoTime()
val regionDAG = createRegionDAG()
Expand Down Expand Up @@ -281,7 +284,9 @@ class CostBasedScheduleGenerator(
if (oEarlyStop) schedulableStates.add(currentState)
// Calculate the current state's cost and update the bestResult if it's lower
val cost =
evaluate(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet)
evaluate(
RegionPlan(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet)
)
if (cost < bestResult.cost) {
bestResult = SearchResult(currentState, regionDAG, cost)
}
Expand Down Expand Up @@ -334,7 +339,12 @@ class CostBasedScheduleGenerator(
physicalPlan.getNonMaterializedBlockingAndDependeeLinks ++ neighborState
) match {
case Left(regionDAG) =>
evaluate(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet)
evaluate(
RegionPlan(
regionDAG.vertexSet().asScala.toSet,
regionDAG.edgeSet().asScala.toSet
)
)
case Right(_) =>
Double.MaxValue
}
Expand Down Expand Up @@ -423,7 +433,9 @@ class CostBasedScheduleGenerator(
def updateOptimumIfApplicable(regionDAG: DirectedAcyclicGraph[Region, RegionLink]): Unit = {
// Calculate the current state's cost and update the bestResult if it's lower
val cost =
evaluate(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet)
evaluate(
RegionPlan(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet)
)
if (cost < bestResult.cost) {
bestResult = SearchResult(currentState, regionDAG, cost)
}
Expand Down Expand Up @@ -453,7 +465,12 @@ class CostBasedScheduleGenerator(
physicalPlan.getNonMaterializedBlockingAndDependeeLinks ++ neighborState
) match {
case Left(regionDAG) =>
evaluate(regionDAG.vertexSet().asScala.toSet, regionDAG.edgeSet().asScala.toSet)
evaluate(
RegionPlan(
regionDAG.vertexSet().asScala.toSet,
regionDAG.edgeSet().asScala.toSet
)
)
case Right(_) =>
Double.MaxValue
}
Expand All @@ -472,17 +489,16 @@ class CostBasedScheduleGenerator(
}

/**
* The cost function used by the search. Takes in a region graph represented as set of regions and links.
* The cost function used by the search. Takes a region plan, generates one or more (to be done in the future)
* schedules based on the region plan, and calculates the cost of the schedule(s) using Cost Estimator. Uses the cost
* of the best schedule (currently only considers one schedule) as the cost of the region plan.
*
* @param regions A set of regions created based on a search state.
* @param regionLinks A set of links to indicate dependencies between regions, based on the materialization edges.
* @return A cost determined by the resource allocator.
* @return A cost determined by the cost estimator.
*/
private def evaluate(regions: Set[Region], regionLinks: Set[RegionLink]): Double = {
// Using number of materialized ports as the cost.
// This is independent of the schedule / resource allocator.
// In the future we may need to use the ResourceAllocator to get the cost.
regions.flatMap(_.materializedPortIds).size
private def evaluate(regionPlan: RegionPlan): Double = {
val schedule = generateScheduleFromRegionPlan(regionPlan)
// In the future we may allow multiple regions in a level and split the resources.
schedule.map(level => level.map(region => costEstimator.estimate(region, 1)).sum).sum
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.storage.StorageConfig
import edu.uci.ics.amber.core.workflow.WorkflowContext
import edu.uci.ics.amber.engine.common.AmberLogging
import edu.uci.ics.amber.virtualidentity.ActorVirtualIdentity
import edu.uci.ics.texera.dao.SqlServer
import edu.uci.ics.texera.dao.SqlServer.withTransaction
import edu.uci.ics.texera.dao.jooq.generated.Tables.{
USER,
WORKFLOW_EXECUTIONS,
WORKFLOW_RUNTIME_STATISTICS,
WORKFLOW_VERSION
}
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.WorkflowRuntimeStatistics
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.WorkflowExecutionEntry
import org.jooq.impl.DSL.field
import org.jooq.types.UInteger

import scala.jdk.CollectionConverters.ListHasAsScala
import scala.util.{Failure, Success, Try}

/**
* A cost estimator should estimate a cost of running a region under the given resource constraints as units.
*/
trait CostEstimator {
def estimate(region: Region, resourceUnits: Int): Double
}

/**
* A default cost estimator using past statistics. If past statistics of a workflow are available, the cost of a region
* is the execution time of its longest-running operator. Otherwise the cost is the number of materialized ports in the
* region.
*/
class DefaultCostEstimator(
workflowContext: WorkflowContext,
val actorId: ActorVirtualIdentity
) extends CostEstimator
with AmberLogging {

// Requires mysql database to retrieve execution statistics, otherwise use number of materialized ports as a default.
private val operatorEstimatedTimeOption = Try(
this.getOperatorExecutionTimeInSeconds(
this.workflowContext.workflowId.id
)
) match {
case Failure(_) => None
case Success(result) => result
}

operatorEstimatedTimeOption match {
case None =>
logger.info(
s"WID: ${workflowContext.workflowId.id}, EID: ${workflowContext.executionId.id}, " +
s"no past execution statistics available. Using number of materialized output ports as the cost. "
)
case Some(_) =>
}

override def estimate(region: Region, resourceUnits: Int): Double = {
this.operatorEstimatedTimeOption match {
case Some(operatorEstimatedTime) =>
// Use past statistics (wall-clock runtime). We use the execution time of the longest-running
// operator in each region to represent the region's execution time, and use the sum of all the regions'
// execution time as the wall-clock runtime of the workflow.
// This assumes a schedule is a total-order of the regions.
val opExecutionTimes = region.getOperators.map(op => {
operatorEstimatedTime.getOrElse(op.id.logicalOpId.id, 1.0)
})
val longestRunningOpExecutionTime = opExecutionTimes.max
longestRunningOpExecutionTime
case None =>
// Without past statistics (e.g., first execution), we use number of materialized ports as the cost.
// This is independent of the schedule / resource allocator.
region.materializedPortIds.size
}
}

/**
* Retrieve the latest successful execution to get statistics to calculate costs in DefaultCostEstimator.
* Using the total control processing time plus data processing time of an operator as its cost.
* If no past statistics are available (e.g., first execution), return None.
*/
private def getOperatorExecutionTimeInSeconds(
wid: Long
): Option[Map[String, Double]] = {

val operatorEstimatedTimeOption = withTransaction(
SqlServer
.getInstance(
StorageConfig.jdbcUrl,
StorageConfig.jdbcUsername,
StorageConfig.jdbcPassword
)
.createDSLContext()
Comment on lines +89 to +95
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bobbai00 is this the correct way to use connection pool?

) { context =>
val widAsUInteger = UInteger.valueOf(wid)
val latestSuccessfulExecution = context
.select(
WORKFLOW_EXECUTIONS.EID,
WORKFLOW_EXECUTIONS.VID,
field(
context
.select(USER.NAME)
.from(USER)
.where(WORKFLOW_EXECUTIONS.UID.eq(USER.UID))
),
WORKFLOW_EXECUTIONS.STATUS,
WORKFLOW_EXECUTIONS.RESULT,
WORKFLOW_EXECUTIONS.STARTING_TIME,
WORKFLOW_EXECUTIONS.LAST_UPDATE_TIME,
WORKFLOW_EXECUTIONS.BOOKMARKED,
WORKFLOW_EXECUTIONS.NAME,
WORKFLOW_EXECUTIONS.LOG_LOCATION
)
.from(WORKFLOW_EXECUTIONS)
.join(WORKFLOW_VERSION)
.on(WORKFLOW_VERSION.VID.eq(WORKFLOW_EXECUTIONS.VID))
.where(WORKFLOW_VERSION.WID.eq(widAsUInteger).and(WORKFLOW_EXECUTIONS.STATUS.eq(3.toByte)))
.orderBy(WORKFLOW_EXECUTIONS.STARTING_TIME.desc())
.limit(1)
.fetchInto(classOf[WorkflowExecutionEntry])
.asScala
.toList
.headOption

if (latestSuccessfulExecution.isDefined) {
val eid = latestSuccessfulExecution.get.eId
val rawStats = context
.select(
WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID,
WORKFLOW_RUNTIME_STATISTICS.TIME,
WORKFLOW_RUNTIME_STATISTICS.DATA_PROCESSING_TIME,
WORKFLOW_RUNTIME_STATISTICS.CONTROL_PROCESSING_TIME
)
.from(WORKFLOW_RUNTIME_STATISTICS)
.where(
WORKFLOW_RUNTIME_STATISTICS.WORKFLOW_ID
.eq(widAsUInteger)
.and(WORKFLOW_RUNTIME_STATISTICS.EXECUTION_ID.eq(eid))
)
.orderBy(WORKFLOW_RUNTIME_STATISTICS.TIME, WORKFLOW_RUNTIME_STATISTICS.OPERATOR_ID)
.fetchInto(classOf[WorkflowRuntimeStatistics])
.asScala
.toList
Comment on lines +128 to +145
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if possible, can we use one query to get the latest successful execution's stats? we can avoid the transaction.

if (rawStats.isEmpty) {
None
} else {
val cumulatedStats = rawStats.foldLeft(Map.empty[String, Double]) { (acc, stat) =>
val opTotalExecutionTime = acc.getOrElse(stat.getOperatorId, 0.0)
acc + (stat.getOperatorId -> (opTotalExecutionTime + (stat.getDataProcessingTime
.doubleValue() + stat.getControlProcessingTime.doubleValue()) / 1e9))
}
Some(cumulatedStats)
}
} else {
None
}
Comment on lines +146 to +158
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to use early return or other techniques (e.g., pattern matching) to reduce branches.

}
operatorEstimatedTimeOption
}
}
Loading
Loading