Skip to content

Commit

Permalink
eval: add explicit data volume limits (#1397)
Browse files Browse the repository at this point in the history
Adds an `atlas.eval.limits` config block that can
be used to configure the max number of input or
intermediate datapoints for a given interval.

Fixes #1355

Co-authored-by: lavanya chennupati <[email protected]>
  • Loading branch information
lavanyachennupati and lavanya chennupati authored Jan 24, 2022
1 parent 1516f98 commit fdfa1dd
Show file tree
Hide file tree
Showing 11 changed files with 342 additions and 51 deletions.
7 changes: 7 additions & 0 deletions atlas-eval/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ atlas.eval {
// is too old
num-buffers = 1

limits {
// Maximum number of raw input datapoints for a data expr. Defaults to Integer.MaxValue
max-input-datapoints = 2147483647
// Maximum number of datapoints resulting from a group by. Defaults to Integer.MaxValue
max-intermediate-datapoints = 2147483647
}

// Broad tag keys that should be ignored for the purposes of dropping expensive queries
// that will be prohibitive to match. These are typically tags that will be applied to
// everything within a given deployment scope.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import com.netflix.atlas.core.model.Datapoint
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.TimeSeries
import com.netflix.atlas.core.util.Math
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.api.Registry

/**
* Datapoint for an aggregate data expression. This type is used for the intermediate
Expand Down Expand Up @@ -91,7 +93,26 @@ object AggrDatapoint {
*/
trait Aggregator {
protected[this] var rawDatapointCounter = 0
protected[this] var exceedsMaxInputOrIntermediateDatapoints = false
protected[this] var inputDatapointsLimit = Integer.MAX_VALUE
protected[this] var intermediateDatapointsLimit = Integer.MAX_VALUE

protected[this] var counter =
new NoopRegistry().counter("atlas.eval.datapoints", "id", "dropped-datapoints-limit-exceeded")
def maxInputOrIntermediateDatapointsExceeded: Boolean = exceedsMaxInputOrIntermediateDatapoints

protected[this] def inputOrIntermediateDatapointsAtLimitOrExceeded: Boolean = {
val datapointsLimitExceeded =
numRawDatapoints >= inputDatapointsLimit || datapoints.size >= intermediateDatapointsLimit
if (datapointsLimitExceeded) {
counter.increment()
exceedsMaxInputOrIntermediateDatapoints = true
}
datapointsLimitExceeded
}
def numRawDatapoints: Int = rawDatapointCounter
// drop the data points if the number of input/intermediate datapoints exceed the configured
// limit for an aggregator
def aggregate(datapoint: AggrDatapoint): Aggregator
def datapoints: List[AggrDatapoint]
}
Expand All @@ -101,15 +122,24 @@ object AggrDatapoint {
* the values need to be transformed to NaN or 1 prior to using the default operation
* on DataExpr.Count of sum.
*/
private class SimpleAggregator(init: AggrDatapoint, op: (Double, Double) => Double)
extends Aggregator {

private class SimpleAggregator(
init: AggrDatapoint,
op: (Double, Double) => Double,
maxInputDatapoints: Int,
maxIntermediateDatapoints: Int,
registry: Registry
) extends Aggregator {
private var value = init.value
rawDatapointCounter += 1
inputDatapointsLimit = maxInputDatapoints
intermediateDatapointsLimit = maxIntermediateDatapoints
counter = registry.counter("atlas.eval.datapoints", "id", "dropped-datapoints-limit-exceeded")

override def aggregate(datapoint: AggrDatapoint): Aggregator = {
value = op(value, datapoint.value)
rawDatapointCounter += 1
if (!inputOrIntermediateDatapointsAtLimitOrExceeded) {
value = op(value, datapoint.value)
rawDatapointCounter += 1
}
this
}

Expand All @@ -122,27 +152,43 @@ object AggrDatapoint {
* Group the datapoints by the tags and maintain a simple aggregator per distinct tag
* set.
*/
private class GroupByAggregator extends Aggregator {
private class GroupByAggregator(
maxInputDatapoints: Int,
maxIntermediateDatapoints: Int,
registry: Registry
) extends Aggregator {

private val aggregators =
scala.collection.mutable.AnyRefMap.empty[Map[String, String], SimpleAggregator]
inputDatapointsLimit = maxInputDatapoints
intermediateDatapointsLimit = maxIntermediateDatapoints
counter = registry.counter("atlas.eval.datapoints", "id", "dropped-datapoints-limit-exceeded")

private def newAggregator(datapoint: AggrDatapoint): SimpleAggregator = {
datapoint.expr match {
case GroupBy(af: AggregateFunction, _) =>
val aggregator = new SimpleAggregator(
datapoint,
aggrOp(af),
maxInputDatapoints,
maxIntermediateDatapoints,
registry
)
rawDatapointCounter += 1
new SimpleAggregator(datapoint, aggrOp(af))
aggregator
case _ =>
throw new IllegalArgumentException("datapoint is not for a grouped expression")
}
}

override def aggregate(datapoint: AggrDatapoint): Aggregator = {
aggregators.get(datapoint.tags) match {
case Some(aggr) =>
rawDatapointCounter += 1
aggr.aggregate(datapoint)
case None => aggregators.put(datapoint.tags, newAggregator(datapoint))
if (!inputOrIntermediateDatapointsAtLimitOrExceeded) {
aggregators.get(datapoint.tags) match {
case Some(aggr) =>
rawDatapointCounter += 1
aggr.aggregate(datapoint)
case None => aggregators.put(datapoint.tags, newAggregator(datapoint))
}
}
this
}
Expand All @@ -155,12 +201,21 @@ object AggrDatapoint {
/**
* Do not perform aggregation. Keep track of all datapoints that have been received.
*/
private class AllAggregator extends Aggregator {
private class AllAggregator(
maxInputDatapoints: Int,
maxIntermediateDatapoints: Int,
registry: Registry
) extends Aggregator {
private var values = List.empty[AggrDatapoint]
inputDatapointsLimit = maxInputDatapoints
intermediateDatapointsLimit = maxIntermediateDatapoints
counter = registry.counter("atlas.eval.datapoints", "id", "dropped-datapoints-limit-exceeded")

override def aggregate(datapoint: AggrDatapoint): Aggregator = {
values = datapoint :: values
rawDatapointCounter += 1
if (!inputOrIntermediateDatapointsAtLimitOrExceeded) {
values = datapoint :: values
rawDatapointCounter += 1
}
this
}

Expand All @@ -171,11 +226,27 @@ object AggrDatapoint {
* Create a new aggregator instance initialized with the specified datapoint. The
* datapoint will already be applied and should not get re-added to the aggregation.
*/
def newAggregator(datapoint: AggrDatapoint): Aggregator = {
def newAggregator(
datapoint: AggrDatapoint,
maxInputDatapoints: Int,
maxIntermediateDatapoints: Int,
registry: Registry
): Aggregator = {
datapoint.expr match {
case af: AggregateFunction => new SimpleAggregator(datapoint, aggrOp(af))
case _: GroupBy => (new GroupByAggregator).aggregate(datapoint)
case _: All => (new AllAggregator).aggregate(datapoint)
case af: AggregateFunction =>
new SimpleAggregator(
datapoint,
aggrOp(af),
maxInputDatapoints,
maxIntermediateDatapoints,
registry
)
case _: GroupBy =>
new GroupByAggregator(maxInputDatapoints, maxIntermediateDatapoints, registry)
.aggregate(datapoint)
case _: All =>
new AllAggregator(maxInputDatapoints, maxIntermediateDatapoints, registry)
.aggregate(datapoint)
}
}

Expand All @@ -193,16 +264,21 @@ object AggrDatapoint {
* Aggregate intermediate aggregates from each source to get the final aggregate for
* a given expression. All values are expected to be for the same data expression.
*/
def aggregate(values: List[AggrDatapoint]): List[AggrDatapoint] = {
if (values.isEmpty) Nil
def aggregate(
values: List[AggrDatapoint],
maxInputDatapoints: Int,
maxIntermediateDatapoints: Int,
registry: Registry
): Option[Aggregator] = {
if (values.isEmpty) Option.empty
else {
val vs = dedup(values)
val aggr = newAggregator(vs.head)
vs.tail
val aggr = newAggregator(vs.head, maxInputDatapoints, maxIntermediateDatapoints, registry)
val aggregator = vs.tail
.foldLeft(aggr) { (acc, d) =>
acc.aggregate(d)
}
.datapoints
Some(aggregator)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import com.netflix.atlas.akka.StreamOps
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.eval.model.AggrDatapoint
import com.netflix.atlas.eval.model.AggrDatapoint.Aggregator
import com.netflix.atlas.eval.model.AggrValuesInfo
import com.netflix.atlas.eval.model.LwcExpression
import com.netflix.atlas.eval.model.LwcMessages
Expand Down Expand Up @@ -294,26 +295,32 @@ private[stream] abstract class EvaluatorImpl(
val context = newStreamContext(dsLogger)
context.validate(sources)
context.setDataSources(sources)
val interpreter = context.interpreter

// Extract data expressions to reuse for creating time groups
val exprs = sources.getSources.asScala
.flatMap(ds => context.interpreter.eval(Uri(ds.getUri)))
.flatMap(ds => interpreter.eval(Uri(ds.getUri)))
.flatMap(_.expr.dataExprs)
.toList
.distinct

Flow[DatapointGroup]
.map(g => toTimeGroup(stepSize, exprs, g))
.map(g => toTimeGroup(stepSize, exprs, g, context))
.merge(Source.single(sources), eagerComplete = false)
.via(new FinalExprEval(context.interpreter))
.via(new FinalExprEval(interpreter))
.flatMapConcat(s => s)
.via(new OnUpstreamFinish[MessageEnvelope](queue.complete()))
.merge(logSrc, eagerComplete = false)
.toProcessor
.run()
}

private def toTimeGroup(step: Long, exprs: List[DataExpr], group: DatapointGroup): TimeGroup = {
private def toTimeGroup(
step: Long,
exprs: List[DataExpr],
group: DatapointGroup,
context: StreamContext
): TimeGroup = {
val valuesInfo = group.getDatapoints.asScala.zipWithIndex
.flatMap {
case (d, i) =>
Expand All @@ -332,7 +339,28 @@ private[stream] abstract class EvaluatorImpl(
}
}
.groupBy(_.expr)
.map(t => t._1 -> AggrValuesInfo(AggrDatapoint.aggregate(t._2.toList), t._2.size))
.map(t =>
t._1 -> {
val aggregator = AggrDatapoint.aggregate(
t._2.toList,
context.maxInputDatapointsPerExpression,
context.maxIntermediateDatapointsPerExpression,
context.registry
)

aggregator match {
case aggr: Some[Aggregator] =>
val maxInputOrIntermediateDatapointsExceeded =
aggr.get.maxInputOrIntermediateDatapointsExceeded

if (maxInputOrIntermediateDatapointsExceeded) {
context.logDatapointsExceeded(group.getTimestamp, t._1)
AggrValuesInfo(List(), t._2.size)
} else AggrValuesInfo(aggr.get.datapoints, t._2.size)
case _ => AggrValuesInfo(List(), t._2.size)
}
}
)
TimeGroup(group.getTimestamp, step, valuesInfo)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import com.netflix.atlas.core.model.StatefulExpr
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.core.model.TimeSeries
import com.netflix.atlas.core.util.IdentityMap
import com.netflix.atlas.eval.model.AggrDatapoint
import com.netflix.atlas.eval.model.TimeGroup
import com.netflix.atlas.eval.model.TimeSeriesMessage
import com.netflix.atlas.eval.stream.Evaluator.DataSources
Expand Down Expand Up @@ -170,7 +169,7 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)

val dataExprToDatapoints = noData ++ groupedDatapoints.map {
case (k, vs) =>
k -> AggrDatapoint.aggregate(vs.values).map(_.toTimeSeries)
k -> vs.values.map(_.toTimeSeries)
}

// Collect input and intermediate data size per DataSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ private[stream] class StreamContext(

def numBuffers: Int = config.getInt("num-buffers")

// Maximum number of raw input data points for a data expr.
def maxInputDatapointsPerExpression: Int = config.getInt("limits.max-input-datapoints")

// Maximum number of datapoints resulting from a group by for a data expr.
def maxIntermediateDatapointsPerExpression: Int =
config.getInt("limits.max-intermediate-datapoints")

val interpreter = new ExprInterpreter(rootConfig)

def findBackendForUri(uri: Uri): Backend = {
Expand Down Expand Up @@ -172,6 +179,20 @@ private[stream] class StreamContext(
}
}

/**
* Emit an error to the sources where the number of input
* or intermediate datapoints exceed for an expression.
*/
def logDatapointsExceeded(timestamp: Long, dataExpr: DataExpr) = {
val diagnosticMessage = DiagnosticMessage.error(
s"expression: $dataExpr exceeded the configured max input datapoints limit" +
s" '$maxInputDatapointsPerExpression' or max intermediate" +
s" datapoints limit '$maxIntermediateDatapointsPerExpression'" +
s" for timestamp '$timestamp}"
)
log(dataExpr, diagnosticMessage)
}

/**
* Send a diagnostic message to all data sources that use a particular data expression.
*/
Expand Down
Loading

0 comments on commit fdfa1dd

Please sign in to comment.