Skip to content

Commit

Permalink
Move the remaining limits / thresholds which were held in companion o…
Browse files Browse the repository at this point in the history
…bject constants, into the execution profile.
  • Loading branch information
Andrew Hogg authored and mieslep committed Jun 11, 2024
1 parent ea1c41c commit 30d71ae
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 71 deletions.
44 changes: 43 additions & 1 deletion montecristo/executionProfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,46 @@ limits:
numberOfLogDays: 90

# Aggregation Warnings to check for
aggregationWarnings: 1000000
aggregationWarnings: 1000000

# Batch Size Warnings to check for
batchSizeWarnings: 1000000

# Dropped Hint messages to check for
droppedHints: 100000

# Dropped Messages to Look for
droppedMessages: 1000000

# Dropped Messages per Hour before warning
droppedMessagesPerHourThreshold: 25

# Gossip Pause Warning Message to check for
gossipPauseWarnings: 1000000

# Minimum amount of time paused to trigger recommendation
gossipPauseTimePercentageThreshold: 5.0

# Hinted Handoff messages to check for
hintedHandoffMessages: 1000000

# Hints per hours recommendation threshold
hintedHandoffPerHourThreshold: 25

# Prepared Statements discarded messages to check for
preparedStatementWarnings: 1000000

# Prepared Statements Discarded Messages Per Hour Threshold
preparedStatementMessagesPerHourThreshold: 1

# Repair Errors to search for
repairErrorMessages: 10000

# Number of days of repair errors to report on
repairErrorMessagesDisplayedInReport: 14

# Number of tombstone warnings per day permitted threshold
tombstoneWarningsPerDayThreshold: 100

# Token balance threshold - % difference allowed from largest to smallest owners
tokenOwnershipPercentageImbalanceThreshold: 0.2
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,21 @@ package com.datastax.montecristo.model.profiles

data class Limits(val numberOfLogDays : Long = 90,
val aggregationWarnings: Int = 1000000,
val preparedStatementWarnings: Int = 1000000)
val batchSizeWarnings: Int = 1000000,
val droppedHints: Int = 100000,
val droppedMessages: Int = 1000000,
val droppedMessagesPerHourThreshold: Int = 25,
val gossipPauseWarnings: Int = 1000000,
val gossipPauseTimePercentageThreshold: Double = 5.0,
val hintedHandoffMessages: Int = 1000000,
val hintedHandoffPerHourThreshold: Int = 25,
val preparedStatementWarnings: Int = 1000000,
val preparedStatementMessagesPerHourThreshold: Int = 1,
val repairErrorMessages: Int = 10000,
val repairErrorMessagesDisplayedInReport: Int = 14,
val tombstoneWarningsPerDayThreshold: Int = 100,
val tokenOwnershipPercentageImbalanceThreshold: Double = 0.2
)

data class ExecutionProfile(val limits : Limits) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Ring : DocumentSection {
val partitioner = if (cluster.nodes.first().cassandraYaml.partitioner == Partitioner.RANDOM.yamlSetting) { Partitioner.RANDOM } else { Partitioner.MURMUR }
val tokenResults = calculateTokenPercentage(partitioner, nodeRingTokens)

val balancedResults = isTokenRingBalanced(tokenResults, MAX_PERCENTAGE_DIFFERENCE)
val balancedResults = isTokenRingBalanced(tokenResults, executionProfile.limits.tokenOwnershipPercentageImbalanceThreshold)
val unbalancedDcCount = balancedResults.count { d -> !d.value }
if (unbalancedDcCount > 0) {
val dcNameList = balancedResults.filter { d -> !d.value }.toList().joinToString { it.first }
Expand Down Expand Up @@ -125,8 +125,4 @@ class Ring : DocumentSection {
}
return results
}

companion object {
private const val MAX_PERCENTAGE_DIFFERENCE = 0.20
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ class Batches : DocumentSection {

val batchWarnings = cluster.databaseVersion.searchLogForBatches(logSearcher, QUERY_LIMIT)

val numWarnings = if (batchWarnings.size > 1000000) {
"over a million"
val numWarnings = if (batchWarnings.size > executionProfile.limits.batchSizeWarnings) {
"over ${executionProfile.limits.batchSizeWarnings}"
} else {
batchWarnings.size.toString()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DroppedHints : DocumentSection {

val args = super.createDocArgs(cluster)

val droppedHintsResult = logSearcher.search("dropped AND hints", LogLevel.WARN, limit = 100000)
val droppedHintsResult = logSearcher.search("dropped AND hints", LogLevel.WARN, limit = executionProfile.limits.droppedHints)

val droppedMessagedRegex = """([^\s]*) has ([\d]*) dropped""".toRegex()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.datastax.montecristo.sections.structure.Recommendation
import com.datastax.montecristo.sections.structure.RecommendationType
import com.datastax.montecristo.sections.structure.immediate
import com.datastax.montecristo.utils.MarkdownTable
import com.github.andrewoma.kommon.script.exec

/**
* Reports on the various dropped messages. Currently we only have mutations and reads but it's easy to add more.
Expand Down Expand Up @@ -55,9 +56,9 @@ class DroppedMessages : DocumentSection {
// The log files can have far more warnings potentially
val logDroppedMessageWarnings = logSearcher.search(
"+MUTATION +messages +were +dropped +in +the +last",
limit = LIMIT
limit = executionProfile.limits.droppedMessages
).mapNotNull { DroppedOperationMessage.fromLogEntry(it) }
val hitMessageLimit = logDroppedMessageWarnings.size == LIMIT // did we hit the search limit?
val hitMessageLimit = logDroppedMessageWarnings.size == executionProfile.limits.droppedMessages // did we hit the search limit?
// map of node to mutation values, (count of messages, sum of internal drops and sum of cross node drops)
val sortedResults = processDroppedMutationLogMessages(logDroppedMessageWarnings)
// map of node to log duration
Expand All @@ -67,7 +68,7 @@ class DroppedMessages : DocumentSection {
val logDropMd = MarkdownTable("Node", "Log Duration (Days)", "Total Internal Mutation Drops", "Total Cross-Node Mutations Dropped", "Number of Distinct Hours with Mutations Dropped", "Drops per Hour").orMessage("There were no dropped mutations within the logs.")
for (node in sortedResults) {
val droppedMutationsPerHour = (node.value.sumOfInternalDrops + node.value.sumOfCrossNodeDrops) / (mapOfDurations.getValue(node.key))
if (droppedMutationsPerHour > DROP_PER_HOUR_LIMIT) {
if (droppedMutationsPerHour > executionProfile.limits.droppedMessagesPerHourThreshold) {
dropRecommendationExceeded = true
}
logDropMd.addRow()
Expand All @@ -88,10 +89,10 @@ class DroppedMessages : DocumentSection {
}
if (hitMessageLimit) {
// this scenario is incredibly unlikely, but included just in case.
recs.immediate(RecommendationType.OPERATIONS,"There are over $hitMessageLimit separate dropped mutation warnings within the logs. We recommend further investigation into the extremely high level of dropped mutations.")
recs.immediate(RecommendationType.OPERATIONS,"There are over ${executionProfile.limits.droppedMessages} separate dropped mutation warnings within the logs. We recommend further investigation into the extremely high level of dropped mutations.")
} else if ( dropRecommendationExceeded) {
// trigger a recommendation if the number of dropped messages per hour exceeds the trigger limit
recs.immediate(RecommendationType.OPERATIONS,"There are over $DROP_PER_HOUR_LIMIT dropped mutations occurring on average every hour. We recommend further investigation into this elevated level of dropped mutations.")
recs.immediate(RecommendationType.OPERATIONS,"There are over ${executionProfile.limits.droppedMessagesPerHourThreshold} dropped mutations occurring on average every hour. We recommend further investigation into this elevated level of dropped mutations.")
}
return compileAndExecute("operations/operations_dropped_messages.md", args)
}
Expand All @@ -116,8 +117,4 @@ class DroppedMessages : DocumentSection {

// data class just to hold the results of the maps together
private data class DroppedMutationMessageCounts( val countOfMessages : Int, val sumOfInternalDrops : Long, val sumOfCrossNodeDrops : Long, val numberOfHoursWithDrops : Int)
companion object {
private const val LIMIT = 1000000
private const val DROP_PER_HOUR_LIMIT = 25
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ class GossipLogPausesWarnings : DocumentSection {
// search and parse the gossip failure detector messages
val pauseWarnings = logSearcher.search(
"FailureDetector.java",
limit = LIMIT
limit = executionProfile.limits.gossipPauseWarnings
).mapNotNull { GossipLogPauseMessage.fromLogEntry(it) }

// first - did we hit the limit (and thus need to caveat any figures)
val hitMessageLimit = pauseWarnings.size == LIMIT
val hitMessageLimit = pauseWarnings.size == executionProfile.limits.gossipPauseWarnings

// Generate some statistical information, count, total, average duration, average duration, average per hour and %
val countOfMessagesPerNode = pauseWarnings.groupingBy { it.host }.eachCount()
Expand Down Expand Up @@ -77,14 +77,14 @@ class GossipLogPausesWarnings : DocumentSection {
}

val countOfWarningsMessage = if (hitMessageLimit) {
"More than $LIMIT local pause warnings were discovered within the logs"
"More than ${executionProfile.limits.gossipPauseWarnings} local pause warnings were discovered within the logs"
} else {
"A total of ${pauseWarnings.size} local pause warnings were discovered within the logs."
}

// TODO - what triggers the recommendation, % of time? max of a duration? avg duraation?
if ((percentTimePause.maxByOrNull
{ it.value }?.value ?: 0.0) > MIN_PERCENTAGE_PAUSED_TO_RECOMMEND
{ it.value }?.value ?: 0.0) > executionProfile.limits.gossipPauseTimePercentageThreshold
) {
recs.near(RecommendationType.OPERATIONS,"We recommend further investigation into the elevated level of local pause warnings.")
}
Expand All @@ -93,9 +93,4 @@ class GossipLogPausesWarnings : DocumentSection {
args["localPauseMessagesTable"] = countPerNodeTable.toString()
return compileAndExecute("operations/operations_gossip_pause.md", args)
}

companion object {
private const val LIMIT = 1000000
private const val MIN_PERCENTAGE_PAUSED_TO_RECOMMEND = 5.0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class HintedHandoffTasks : DocumentSection {

// Hinted Handoffs from the log files can have far more / less than JMX potentially
val logHintedHandoffMessagesPerNode = logSearcher.search("+Finished +hinted +handoff +to +endpoint",
limit = MESSAGE_SEARCH_LIMIT
limit = executionProfile.limits.hintedHandoffMessages
).groupingBy { it.host }.eachCount()
val hitLogMessageLimit = logHintedHandoffMessagesPerNode.size == MESSAGE_SEARCH_LIMIT // did we hit the search limit?
val hitLogMessageLimit = logHintedHandoffMessagesPerNode.size == executionProfile.limits.hintedHandoffMessages // did we hit the search limit?

val mapOfDurations: Map<String, Double> = cluster.getLogDurationsInHours(executionProfile.limits.numberOfLogDays)

Expand All @@ -83,15 +83,10 @@ class HintedHandoffTasks : DocumentSection {
recs.immediate(RecommendationType.OPERATIONS,"There are over $hitLogMessageLimit separate hint dispatch messages within the logs. We recommend further investigation into the high level of hint activity.")
}
val maxHintsPerHour = hintsPerHourSet.maxOf { it }
if (maxHintsPerHour > HINTS_PER_HOUR_LIMIT) {
if (maxHintsPerHour > executionProfile.limits.hintedHandoffPerHourThreshold) {
recs.immediate(RecommendationType.OPERATIONS,"The highest average hints per hours for a node is ${Utils.round(maxHintsPerHour)}. A value this high indicates that the cluster is facing stability issues and should be investigated further.")
}

return compileAndExecute("operations/operations_hinted_handoff_tasks.md", args)
}

companion object {
private const val MESSAGE_SEARCH_LIMIT = 1000000
private const val HINTS_PER_HOUR_LIMIT = 25
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class PreparedStatements : DocumentSection {
(countOfMessagesPerNode.getOrDefault(
node.key,
0
) / mapOfDurations.getValue(node.key) > PREPARED_STATEMENT_DISCARDED_PER_HOUR_LIMIT)
) / mapOfDurations.getValue(node.key) > executionProfile.limits.preparedStatementMessagesPerHourThreshold)
}

// Warnings aggregated by Date - for the last 7 days
Expand Down Expand Up @@ -94,14 +94,10 @@ class PreparedStatements : DocumentSection {
// trigger a recommendation if the number of warnings exceeds the trigger limit
recs.immediate(
RecommendationType.OPERATIONS,
"There are over $PREPARED_STATEMENT_DISCARDED_PER_HOUR_LIMIT prepared statement discard warnings on average every hour. $remediationMessage"
"There are over ${executionProfile.limits.preparedStatementMessagesPerHourThreshold} prepared statement discard warnings on average every hour. $remediationMessage"
)
}
return compileAndExecute("operations/operations_prepared_statements_discarded.md", args)
}

companion object {
private const val PREPARED_STATEMENT_DISCARDED_PER_HOUR_LIMIT = 1
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ class RepairSessions : DocumentSection {
val args = super.createDocArgs(cluster)

// replace with search
val searchErrors = logSearcher.search("repair", LogLevel.ERROR, LIMIT)
if (searchErrors.count() > 0) {
val searchErrors = logSearcher.search("repair", LogLevel.ERROR, executionProfile.limits.repairErrorMessages)
if (searchErrors.isNotEmpty()) {
args["numWarnings"] = searchErrors.count()
}

if (searchErrors.size >= LIMIT) {
if (searchErrors.size >= executionProfile.limits.repairErrorMessages) {
// hit the search limit
args["hitLimit"] = LIMIT
args["hitLimit"] = executionProfile.limits.repairErrorMessages
}

val repairErrorCountByNode = searchErrors.groupingBy { it.host }.eachCount()
Expand All @@ -64,7 +64,7 @@ class RepairSessions : DocumentSection {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val repairErrorCountByDay = searchErrors.groupingBy { it.getDate().truncatedTo(ChronoUnit.DAYS) }.eachCount()
val repairFailuresPerDateTable = MarkdownTable("Date", "Number of Repair Failures")
repairErrorCountByDay.toList().sortedByDescending { (value, count) -> value }.take(NUMBER_OF_DATES_TO_REPORT).toMap().forEach {
repairErrorCountByDay.toList().sortedByDescending { (value, count) -> value }.take(executionProfile.limits.repairErrorMessagesDisplayedInReport).toMap().forEach {
repairFailuresPerDateTable.addRow()
.addField(it.key.format(formatter) ?: "Unknown Date" )
.addField(it.value)
Expand All @@ -73,15 +73,15 @@ class RepairSessions : DocumentSection {
if (searchErrors.size > 250) {
recs.immediate(RecommendationType.OPERATIONS,"Repairs on the cluster are failing in sufficient numbers to be a cause of concern. We recommend investigating the root cause of the failures and impact to data consistency at rest and when served for reads.")
}
args["numberOfDatesToReport"] = NUMBER_OF_DATES_TO_REPORT
args["numberOfDatesToReport"] = executionProfile.limits.repairErrorMessagesDisplayedInReport
args["warningsByDate"] = repairFailuresPerDateTable.toString()

// are repairs running?
val logDurations = cluster.metricServer.getLogDurations()
// calculate the latest log date, minus 10 days
val maxLogDate = logDurations.map { Utils.tryParseDate (it.value.second ) }.maxByOrNull { it } ?: LocalDateTime.now()

val repairEntries = logSearcher.search("RepairSession.java", LogLevel.INFO, LIMIT)
val repairEntries = logSearcher.search("RepairSession.java", LogLevel.INFO, executionProfile.limits.repairErrorMessages)
val isRepairRunning = repairEntries.any { it.getDate().isAfter(maxLogDate.minusDays(10)) }
if (isRepairRunning) {
args["repairsFound"] = "Repairs are being run on the cluster."
Expand Down Expand Up @@ -115,10 +115,4 @@ class RepairSessions : DocumentSection {
}
return compileAndExecute("operations/operations_logs_failed_repair.md", args)
}

companion object {
private const val LIMIT = 100000
private const val NUMBER_OF_DATES_TO_REPORT = 14
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class TombstoneWarnings : DocumentSection {
}
// Recommendation - trigger if more than 100 warnings in a single day.
// This value is not normalized out based on the uptime per node, the complexity involved is too high vs the benefit to the trigger.
if (byDay.filter { it.value >= NUMBER_OF_WARNINGS_PER_DAY_TO_RECOMMEND }.size > 1) {
if (byDay.filter { it.value >= executionProfile.limits.tombstoneWarningsPerDayThreshold }.size > 1) {
recs.immediate(RecommendationType.DATAMODEL ,"We recommend reviewing the data model of ${byTableCounts.size} table(s), to remediate the number of tombstone warnings. DataStax Services can provide assistance in this activity.")
}

Expand All @@ -86,9 +86,4 @@ class TombstoneWarnings : DocumentSection {
}
return compileAndExecute("operations/operations_tombstone_warnings.md", args)
}

companion object {
const val NUMBER_OF_WARNINGS_PER_DAY_TO_RECOMMEND = 100
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ internal class BatchesTest {
"\n" +
"Single partition batches do not have the overhead of multi-partition batches. They do not require the batch log and are executed atomically and in isolation as a single mutation.\n" +
"\n" +
"There are over a million warnings in the logs regarding large batches.\n" +
"There are over 1000000 warnings in the logs regarding large batches.\n" +
"The largest batch we have seen was 100 Kb with an average size of 100.0 Kb.\n"

@Test
Expand Down
Loading

0 comments on commit 30d71ae

Please sign in to comment.