Skip to content

Commit

Permalink
initial commit for remote monitor support (#1547)
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored May 31, 2024
1 parent 0b2dc76 commit 5ed4db9
Show file tree
Hide file tree
Showing 98 changed files with 2,222 additions and 1,947 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ jobs:
java-version: ${{ matrix.java }}
- name: Build and run with Gradle
working-directory: ${{ env.WORKING_DIR }}
run: ./gradlew assemble integTest ${{ env.BUILD_ARGS }}
run: ./gradlew assemble ${{ env.BUILD_ARGS }}
env:
_JAVA_OPTIONS: ${{ matrix.os_java_options }}
- name: Create Artifact Path
Expand Down
2 changes: 2 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import com.github.jengelman.gradle.plugins.shadow.ShadowPlugin
import java.util.concurrent.Callable
import org.opensearch.gradle.test.RestIntegTestTask
import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask
Expand Down Expand Up @@ -114,6 +115,7 @@ dependencies {

api project(":alerting-core")
implementation "com.github.seancfoley:ipaddress:5.4.1"
implementation project(path: ":alerting-spi", configuration: 'shadow')

testImplementation "org.antlr:antlr4-runtime:${versions.antlr4}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down
10 changes: 5 additions & 5 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
Expand All @@ -31,7 +27,6 @@ import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.MAX_SEARCH_SIZE
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
Expand All @@ -40,14 +35,19 @@ import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.alerts.AlertError
import org.opensearch.commons.alerting.model.ActionExecutionResult
import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.AggregationResultBucket
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.commons.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.NoOpTrigger
import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.bytes.BytesReference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.alerting

import org.opensearch.action.ActionRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.GetDestinationsAction
Expand All @@ -25,6 +24,7 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
Expand Down Expand Up @@ -52,6 +52,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MON
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.spi.RemoteMonitorRunnerExtension
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
Expand Down Expand Up @@ -85,6 +86,7 @@ import org.opensearch.common.settings.Setting
import org.opensearch.common.settings.Settings
import org.opensearch.common.settings.SettingsFilter
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.ChainedAlertTrigger
Expand All @@ -96,6 +98,7 @@ import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.common.io.stream.StreamInput
Expand All @@ -110,6 +113,7 @@ import org.opensearch.painless.spi.AllowlistLoader
import org.opensearch.painless.spi.PainlessExtension
import org.opensearch.percolator.PercolatorPluginExt
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.ExtensiblePlugin
import org.opensearch.plugins.ReloadablePlugin
import org.opensearch.plugins.ScriptPlugin
import org.opensearch.plugins.SearchPlugin
Expand Down Expand Up @@ -162,6 +166,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var alertIndices: AlertIndices
lateinit var clusterService: ClusterService
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()

override fun getRestHandlers(
settings: Settings,
Expand Down Expand Up @@ -236,6 +241,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY,
ChainedAlertTrigger.XCONTENT_REGISTRY,
RemoteMonitorTrigger.XCONTENT_REGISTRY,
Workflow.XCONTENT_REGISTRY
)
}
Expand Down Expand Up @@ -277,6 +283,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerLockService(lockService)
.registerConsumers()
.registerDestinationSettings()
.registerRemoteMonitors(monitorTypeToMonitorRunners)
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
scheduler = JobScheduler(threadPool, runner)
Expand Down Expand Up @@ -409,4 +416,20 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
)
)
}

override fun loadExtensions(loader: ExtensiblePlugin.ExtensionLoader) {
for (monitorExtension in loader.loadExtensions(RemoteMonitorRunnerExtension::class.java)) {
val monitorTypesToMonitorRunners = monitorExtension.getMonitorTypesToMonitorRunners()

for (monitorTypeToMonitorRunner in monitorTypesToMonitorRunners) {
val monitorType = monitorTypeToMonitorRunner.key
val monitorRunner = monitorTypeToMonitorRunner.value

if (!this.monitorTypeToMonitorRunners.containsKey(monitorType)) {
val monitorRegistry = RemoteMonitorRegistry(monitorType, monitorRunner)
this.monitorTypeToMonitorRunners[monitorType] = monitorRegistry
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.AlertContext
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.retry
Expand All @@ -29,16 +25,20 @@ import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorRunResult
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@ import org.opensearch.ExceptionsHelper
import org.opensearch.Version
import org.opensearch.action.ActionListenerResponseHandler
import org.opensearch.action.support.GroupedActionListener
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutRequest
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse
import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.commons.alerting.model.IndexExecutionContext
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorRunResult
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.breaker.CircuitBreakingException
import org.opensearch.core.common.io.stream.Writeable
Expand Down Expand Up @@ -436,7 +436,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
if (res.exception == null) {
return null
} else {
exceptions.add(res.exception)
exceptions.add(res.exception!!)
}
}
return AlertingException.merge(*exceptions.toTypedArray())
Expand Down Expand Up @@ -501,9 +501,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
if (response.exception == null) {
if (response.inputResults.error != null) {
if (response.inputResults.error is AlertingException) {
errors.add(response.inputResults.error)
errors.add(response.inputResults.error as AlertingException)
} else {
errors.add(AlertingException.wrap(response.inputResults.error) as AlertingException)
errors.add(AlertingException.wrap(response.inputResults.error as Exception) as AlertingException)
}
}
val partialResult = response.inputResults.results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.TriggerAfterKey
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
Expand All @@ -22,7 +20,6 @@ import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTranspor
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.util.use
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
Expand All @@ -31,8 +28,11 @@ import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.TriggerAfterKey
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.xcontent.NamedXContentRegistry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import org.opensearch.action.get.GetResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
Expand All @@ -40,17 +38,16 @@ import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.Monitor.MonitorType
import org.opensearch.commons.alerting.model.MonitorMetadata
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.transport.RemoteTransportException
import java.util.Locale
import kotlin.collections.HashMap

private val log = LogManager.getLogger(MonitorMetadataService::class.java)

Expand Down Expand Up @@ -188,10 +185,10 @@ object MonitorMetadataService :

suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
try {
val monitorIndex = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR)
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
else null
return if (runContext != null) {
Expand All @@ -211,13 +208,12 @@ object MonitorMetadataService :
createWithRunContext: Boolean,
workflowMetadataId: String? = null,
): MonitorMetadata {
val monitorIndex = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext =
if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
createFullRunContext(monitorIndex)
else emptyMap()
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
createFullRunContext(monitorIndex)
else emptyMap()
return MonitorMetadata(
id = MonitorMetadata.getId(monitor, workflowMetadataId),
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO,
Expand Down
Loading

0 comments on commit 5ed4db9

Please sign in to comment.