Skip to content

Commit

Permalink
change Thread.sleep to waitUntil function under test files (#1242)
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Choi <[email protected]>
  • Loading branch information
JacobCho-i authored Nov 10, 2023
1 parent 2146199 commit a26f4c0
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder
import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import java.net.URLEncoder
import java.time.Instant
import java.time.ZonedDateTime
Expand All @@ -55,6 +56,7 @@ import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit.DAYS
import java.time.temporal.ChronoUnit.MILLIS
import java.time.temporal.ChronoUnit.MINUTES
import java.util.concurrent.TimeUnit

class MonitorRunnerServiceIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -138,7 +140,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
verifyAlert(firstRunAlert, monitor)
// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// see lastNotificationTime change.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
executeMonitor(monitor.id)
val secondRunAlert = searchAlerts(monitor).single()
verifyAlert(secondRunAlert, monitor)
Expand Down Expand Up @@ -265,7 +269,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
val response = executeMonitor(monitor.id)

val output = entityAsMap(response)
Expand Down Expand Up @@ -765,7 +771,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
verifyAlert(activeAlert1.single(), monitor, ACTIVE)
val actionResults1 = verifyActionExecutionResultInAlert(activeAlert1[0], mutableMapOf(Pair(actionThrottleEnabled.id, 0)))

Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
updateMonitor(monitor.copy(triggers = listOf(trigger.copy(condition = NEVER_RUN)), id = monitor.id))
executeMonitor(monitor.id)
val completedAlert = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).single()
Expand Down Expand Up @@ -1398,7 +1406,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let lastNotificationTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
executeMonitor(monitor.id)

// Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was
Expand All @@ -1418,7 +1428,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {
)

// Execute Monitor and check that both Alerts were updated
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
executeMonitor(monitor.id)
currentAlerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN)
val completedAlerts = currentAlerts.filter { it.state == COMPLETED }
Expand Down Expand Up @@ -1940,7 +1952,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() {

// Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to
// let Action executionTime change. W/o this sleep the test can result in a false negative.
Thread.sleep(200)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 200, TimeUnit.MILLISECONDS)
val monitorRunResultThrottled = entityAsMap(executeMonitor(monitor.id))
verifyActionThrottleResultsForBucketLevelMonitor(
monitorRunResult = monitorRunResultThrottled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ class AlertIndicesIT : AlertingRestTestCase() {
executeMonitor(trueMonitor)

// Allow for a rollover index.
Thread.sleep(2000)
OpenSearchTestCase.waitUntil({
return@waitUntil (getAlertIndices().size >= 3)
}, 2, TimeUnit.SECONDS)
assertTrue("Did not find 3 alert indices", getAlertIndices().size >= 3)
}

Expand All @@ -157,7 +159,9 @@ class AlertIndicesIT : AlertingRestTestCase() {
executeMonitor(trueMonitor.id)

// Allow for a rollover index.
Thread.sleep(2000)
OpenSearchTestCase.waitUntil({
return@waitUntil (getFindingIndices().size >= 2)
}, 2, TimeUnit.SECONDS)
assertTrue("Did not find 2 alert indices", getFindingIndices().size >= 2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.query.QueryBuilders
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import java.util.concurrent.TimeUnit

class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -69,8 +71,21 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {
// the test execution by a lot (might have to wait for Job Scheduler plugin integration first)
// Waiting a minute to ensure the Monitor ran again at least once before checking if the job is running
// on time
Thread.sleep(60000)
verifyMonitorStats("/_plugins/_alerting")
var passed = false
OpenSearchTestCase.waitUntil({
try {
// Run verifyMonitorStats until all assertion test passes
verifyMonitorStats("/_plugins/_alerting")
passed = true
return@waitUntil true
} catch (e: AssertionError) {
return@waitUntil false
}
}, 1, TimeUnit.MINUTES)
if (!passed) {
// if it hit the max time (1 minute), run verifyMonitorStats again to make sure all the tests pass
verifyMonitorStats("/_plugins/_alerting")
}
}
}
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,11 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Delete request not successful", RestStatus.OK, deleteResponse.restStatus())

// Wait 5 seconds for event to be processed and alerts moved
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val alerts = searchAlerts(monitor)
val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX)
return@waitUntil (alerts.isEmpty() && historyAlerts.size == 1)
}, 5, TimeUnit.SECONDS)

val alerts = searchAlerts(monitor)
assertEquals("Active alert was not deleted", 0, alerts.size)
Expand Down Expand Up @@ -842,7 +846,9 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus())

// Wait 5 seconds for event to be processed and alerts moved
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 5, TimeUnit.SECONDS)

val alerts = searchAlerts(monitor)
assertEquals("Active alert was not deleted", 0, alerts.size)
Expand Down Expand Up @@ -870,7 +876,11 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus())

// Wait 5 seconds for event to be processed and alerts moved
Thread.sleep(5000)
OpenSearchTestCase.waitUntil({
val alerts = searchAlerts(monitor)
val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX)
return@waitUntil (alerts.isEmpty() && historyAlerts.size == 1)
}, 5, TimeUnit.SECONDS)

val alerts = searchAlerts(monitor)
assertEquals("Active alert was not deleted", 0, alerts.size)
Expand Down Expand Up @@ -956,10 +966,13 @@ class MonitorRestApiIT : AlertingRestTestCase() {

fun `test monitor stats when disabling and re-enabling scheduled jobs with existing monitor`() {
// Enable Monitor jobs

enableScheduledJob()
val monitorId = createMonitor(randomQueryLevelMonitor(enabled = true), refresh = true).id

if (isMultiNode) Thread.sleep(2000)
if (isMultiNode) OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 2, TimeUnit.SECONDS)
var alertingStats = getAlertingStats()
assertAlertingStatsSweeperEnabled(alertingStats, true)
assertEquals("Scheduled job index does not exist", true, alertingStats["scheduled_job_index_exists"])
Expand Down Expand Up @@ -992,7 +1005,9 @@ class MonitorRestApiIT : AlertingRestTestCase() {
enableScheduledJob()

// Sleep briefly so sweep can reschedule the Monitor
Thread.sleep(2000)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 2, TimeUnit.SECONDS)

alertingStats = getAlertingStats()
assertAlertingStatsSweeperEnabled(alertingStats, true)
Expand All @@ -1015,10 +1030,13 @@ class MonitorRestApiIT : AlertingRestTestCase() {

fun `test monitor stats jobs`() {
// Enable the Monitor plugin.

enableScheduledJob()
createRandomMonitor(refresh = true)

if (isMultiNode) Thread.sleep(2000)
if (isMultiNode) OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 2, TimeUnit.SECONDS)
val responseMap = getAlertingStats()
assertAlertingStatsSweeperEnabled(responseMap, true)
assertEquals("Scheduled job index does not exist", true, responseMap["scheduled_job_index_exists"])
Expand Down Expand Up @@ -1051,7 +1069,9 @@ class MonitorRestApiIT : AlertingRestTestCase() {
enableScheduledJob()
createRandomMonitor(refresh = true)

if (isMultiNode) Thread.sleep(2000)
if (isMultiNode) OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 2, TimeUnit.SECONDS)
val responseMap = getAlertingStats("/jobs_info")
assertAlertingStatsSweeperEnabled(responseMap, true)
assertEquals("Scheduled job index does not exist", true, responseMap["scheduled_job_index_exists"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ import org.opensearch.index.query.QueryBuilders
import org.opensearch.script.Script
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.junit.annotations.TestLogging
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Collections
import java.util.Locale
import java.util.UUID
import java.util.concurrent.TimeUnit

@TestLogging("level:DEBUG", reason = "Debug for tests.")
@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -1180,7 +1182,10 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
}"""

indexDoc(index, "1", testDoc)
Thread.sleep(80000)
OpenSearchTestCase.waitUntil({
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
return@waitUntil (findings.size == 1)
}, 80, TimeUnit.SECONDS)

val findings = searchFindings(monitor.copy(id = monitorResponse.id))
assertEquals("Findings saved for test monitor", 1, findings.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import org.opensearch.alerting.util.DestinationType
import org.opensearch.client.ResponseException
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.core.rest.RestStatus
import org.opensearch.test.OpenSearchTestCase
import java.time.Instant
import java.util.UUID
import java.util.concurrent.TimeUnit

class DestinationMigrationUtilServiceIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -80,7 +82,9 @@ class DestinationMigrationUtilServiceIT : AlertingRestTestCase() {

// Create cluster change event and wait for migration service to complete migrating data over
client().updateSettings("indices.recovery.max_bytes_per_sec", "40mb")
Thread.sleep(120000)
OpenSearchTestCase.waitUntil({
return@waitUntil false
}, 2, TimeUnit.MINUTES)

for (id in ids) {
val response = client().makeRequest(
Expand Down

0 comments on commit a26f4c0

Please sign in to comment.