From afbb767c8496f3bcabb8b4c0a3dec6f0552e6c5c Mon Sep 17 00:00:00 2001 From: Drew Baugher <46505179+dbbaughe@users.noreply.github.com> Date: Mon, 30 Mar 2020 10:29:50 -0700 Subject: [PATCH] Adds isIdempotent method to each step and updates ManagedIndexRunner to use it --- .../indexstatemanagement/ManagedIndexRunner.kt | 12 ++++++++---- .../indexstatemanagement/step/Step.kt | 15 +++++++++++++++ .../step/close/AttemptCloseStep.kt | 2 ++ .../step/delete/AttemptDeleteStep.kt | 2 ++ .../step/forcemerge/AttemptCallForceMergeStep.kt | 2 ++ .../step/forcemerge/AttemptSetReadOnlyStep.kt | 2 ++ .../step/forcemerge/WaitForForceMergeStep.kt | 2 ++ .../step/notification/AttemptNotificationStep.kt | 2 ++ .../step/open/AttemptOpenStep.kt | 2 ++ .../step/readonly/SetReadOnlyStep.kt | 2 ++ .../step/readwrite/SetReadWriteStep.kt | 2 ++ .../replicacount/AttemptSetReplicaCountStep.kt | 2 ++ .../step/rollover/AttemptRolloverStep.kt | 2 ++ .../step/transition/AttemptTransitionStep.kt | 2 ++ 14 files changed, 47 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt index bb3f8a297..d6532ae71 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt @@ -265,10 +265,14 @@ object ManagedIndexRunner : ScheduledJobRunner, } if (managedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.STARTING) { - val info = mapOf("message" to "Previous action was not able to update IndexMetaData.") - val updated = updateManagedIndexMetaData(managedIndexMetaData.copy(policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info)) - if (updated) disableManagedIndexConfig(managedIndexConfig) - return + val isIdempotent = step?.isIdempotent() + logger.info("Previous execution failed to update step status, isIdempotent=$isIdempotent") + if (isIdempotent != true) { + val info = mapOf("message" to "Previous action was not able to update IndexMetaData.") + val updated = updateManagedIndexMetaData(managedIndexMetaData.copy(policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info)) + if (updated) disableManagedIndexConfig(managedIndexConfig) + return + } } // If this action is not allowed and the step to be executed is the first step in the action then we will fail diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt index 82d81697e..35cd30051 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt @@ -29,6 +29,21 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta abstract fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData + /** + * Before every execution of a step, we first update the step_status in cluster state to [StepStatus.STARTING] + * to signal that work is about to be done for the managed index. The step then attempts to do work by + * calling execute, and finally updates the step_status with the results of that work ([StepStatus]). + * + * If we ever start an execution with a step_status of [StepStatus.STARTING] it means we failed to update the step_status + * after calling the execute function. Since we do not know if the execution was a noop, failed, or completed then + * we can't always assume it's safe to just retry it (e.g. calling force merge multiple times in a row). This means + * that final update is a failure point that can't be retried and when multiplied by # of executions it leads to a lot of + * chances over time for random network failures, timeouts, etc. + * + * To get around this every step should have an [isIdempotent] method to signal if it's safe to retry this step for such failures. + */ + abstract fun isIdempotent(): Boolean + fun getStartingStepMetaData(): StepMetaData { return StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING) } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt index 407ae38f4..830e14694 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt @@ -37,6 +37,8 @@ class AttemptCloseStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt index 3f99a425d..2eea5d201 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt @@ -38,6 +38,8 @@ class AttemptDeleteStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt index d10920679..66e4baae1 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt @@ -39,6 +39,8 @@ class AttemptCallForceMergeStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = false + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt index e9576103e..5894d2fca 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/AttemptSetReadOnlyStep.kt @@ -39,6 +39,8 @@ class AttemptSetReadOnlyStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + override suspend fun execute() { val indexName = managedIndexMetaData.index diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt index 0a55983c0..1656a3046 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/forcemerge/WaitForForceMergeStep.kt @@ -41,6 +41,8 @@ class WaitForForceMergeStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { val indexName = managedIndexMetaData.index diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt index 2bdeb4bd3..75b8c7bd7 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/notification/AttemptNotificationStep.kt @@ -41,6 +41,8 @@ class AttemptNotificationStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = false + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt index 545918b9c..950357ad1 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/open/AttemptOpenStep.kt @@ -37,6 +37,8 @@ class AttemptOpenStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt index 70276e88f..9d217515c 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readonly/SetReadOnlyStep.kt @@ -38,6 +38,8 @@ class SetReadOnlyStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt index 90dd465ba..f38d6ce3f 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/readwrite/SetReadWriteStep.kt @@ -38,6 +38,8 @@ class SetReadWriteStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt index 9bed79c38..e452736b0 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/replicacount/AttemptSetReplicaCountStep.kt @@ -38,6 +38,8 @@ class AttemptSetReplicaCountStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { val numOfReplicas = config.numOfReplicas diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt index b995312b0..f22b20145 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt @@ -45,6 +45,8 @@ class AttemptRolloverStep( private var stepStatus = StepStatus.STARTING private var info: Map? = null + override fun isIdempotent() = false + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { // If we have already rolled over this index then fail as we only allow an index to be rolled over once diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt index 70d00cf35..4f7bd34fa 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/transition/AttemptTransitionStep.kt @@ -51,6 +51,8 @@ class AttemptTransitionStep( private var policyCompleted: Boolean = false private var info: Map? = null + override fun isIdempotent() = true + @Suppress("TooGenericExceptionCaught") override suspend fun execute() { try {