Skip to content

Commit

Permalink
Working on time limit, refine the exception handling
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed May 21, 2022
1 parent 5e7496e commit a51fcd5
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,4 @@ suspend fun Client.getSnapshots(name: String, repo: String): List<SnapshotInfo>
val res: GetSnapshotsResponse = admin().cluster().suspendUntil { getSnapshots(req, it) }
log.info("Get snapshot response: ${res.snapshots}")
return res.snapshots
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import kotlin.reflect.KClass
* Each exception key should have a corresponding user facing message
*/
class SnapshotManagementException(
private val exKey: ExceptionKey? = null,
val exKey: ExceptionKey? = null,
cause: Throwable? = null,
message: String? = null,
) : OpenSearchException(message, cause) {

enum class ExceptionKey {
GENERAL,
ATOMIC,
METADATA_INDEXING_FAILURE,
REPO_MISSING,
}

Expand All @@ -45,8 +45,8 @@ class SnapshotManagementException(

// User facing exception messages
private val exceptionMsgMap: Map<ExceptionKey, String> = mapOf(
ExceptionKey.GENERAL to "Caught exception while snapshot management is running. Please check the error log.",
ExceptionKey.ATOMIC to "Undetermined about whether the last snapshot operation has fully finished.",
ExceptionKey.GENERAL to "Caught exception while snapshot management runs. Please check the error log.",
ExceptionKey.METADATA_INDEXING_FAILURE to "Failed to update metadata while snapshot management runs.",
ExceptionKey.REPO_MISSING to "The repository provided is missing."
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.opensearch.client.Client
import org.opensearch.indexmanagement.snapshotmanagement.SnapshotManagementException
import org.opensearch.indexmanagement.snapshotmanagement.SnapshotManagementException.ExceptionKey
import org.opensearch.indexmanagement.snapshotmanagement.preFixTimeStamp
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.State.ExecutionResult
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState
Expand Down Expand Up @@ -52,29 +53,26 @@ class SMStateMachine(
when (result) {
is ExecutionResult.Next -> {
log.info("State [$currentState]'s execution finished, will execute its next state.")
updateMetadata(result.metadataToSave.copy(currentState=currentState))
updateMetadata(result.metadataToSave.copy(currentState = currentState))
// break the nextStates loop so to avoid execute other lateral states
break
}
is ExecutionResult.Stay -> {
log.info("State [$currentState]'s execution not finished, will stay.")
val metadataToSave = result.metadataToSave
metadataToSave?.let { updateMetadata(metadataToSave.copy(currentState=prevState)) }
metadataToSave?.let { updateMetadata(metadataToSave.copy(currentState = prevState)) }
// can still execute other lateral states if exists
}
is ExecutionResult.Failure -> {
val ex = result.ex
log.error("Caught exception while executing state [$currentState], will skip to the START state.", ex)

val userMessage = preFixTimeStamp(SnapshotManagementException(ex).message)
val info = metadata.info.upsert(
"exception" to userMessage
)
val metadataToSave = SMMetadata.Builder(metadata)
.reset(result.resetType)
.info(info)
.build()
updateMetadata(metadataToSave)
val info = metadata.info.upsert("exception" to userMessage)
val metadataToSave = SMMetadata.Builder(metadata).info(info)
if (result.reset)
metadataToSave.reset(result.workflowType)
updateMetadata(metadataToSave.build())

// TODO error notification
break
Expand All @@ -87,8 +85,14 @@ class SMStateMachine(
}
} while (currentState.instance.continuous)
} catch (ex: Exception) {
// For update metadata exception, we won't try to update metadata again
if (ex is SnapshotManagementException &&
ex.exKey == ExceptionKey.METADATA_INDEXING_FAILURE
) {
// For update metadata exception, we cannot update metadata again
return
}
log.error("Snapshot management uncaught runtime exception.", ex)
// TODO SM retry mechanism
}
}

Expand All @@ -107,12 +111,17 @@ class SMStateMachine(
// TODO SM before update metadata, check if next execution time is earlier than now()
// if so we should update it next execution time to keep it up to date

// TODO SM retry policy for update metadata
val res = client.indexMetadata(md, job.id, metadataSeqNo, metadataPrimaryTerm)

metadataSeqNo = res.seqNo
metadataPrimaryTerm = res.primaryTerm
metadata = md
try {
// TODO SM retry policy for update metadata
val res = client.indexMetadata(md, job.id, metadataSeqNo, metadataPrimaryTerm)
metadataSeqNo = res.seqNo
metadataPrimaryTerm = res.primaryTerm
metadata = md
} catch (ex: Exception) {
val smEx = SnapshotManagementException(ExceptionKey.METADATA_INDEXING_FAILURE, ex)
log.error(smEx.message, ex)
throw smEx
}

// TODO SM save a copy to history
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.engine.states.State.Exe
import org.opensearch.indexmanagement.snapshotmanagement.generateSnapshotName
import org.opensearch.indexmanagement.snapshotmanagement.getSnapshots
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.ResetType
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.WorkflowType
import org.opensearch.indexmanagement.snapshotmanagement.smJobIdToPolicyName
import org.opensearch.snapshots.SnapshotInfo
import java.time.Instant
Expand Down Expand Up @@ -52,7 +52,7 @@ object CreatingState : State {
// return ExecutionResult.Failure(SnapshotManagementException(ex), ActionType.CREATION)
// }
catch (ex: Exception) {
return ExecutionResult.Failure(ex, ResetType.CREATION)
return ExecutionResult.Failure(ex, WorkflowType.CREATION, reset = true)
}

log.info("Create snapshot response: $res.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine.SMS
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.State.ExecutionResult
import org.opensearch.indexmanagement.snapshotmanagement.getSnapshots
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.ResetType
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.WorkflowType
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.snapshotmanagement.smJobIdToPolicyName
import org.opensearch.snapshots.SnapshotInfo
Expand Down Expand Up @@ -49,16 +49,13 @@ object DeletingState : State {
log.info("sm dev: Delete snapshot acknowledged: ${res.isAcknowledged}.")
}
} catch (ex: Exception) {
return ExecutionResult.Failure(ex, ResetType.DELETION)
return ExecutionResult.Failure(ex, WorkflowType.DELETION, reset = true)
}

val metadataToSave = metadata.copy(
currentState = SMState.DELETING,
deletion = metadata.deletion.copy(
started = snapshotToDelete,
startedTime = now(),
),
)
val metadataToSave = SMMetadata.Builder(metadata)
.deletionStartTime(now())
.startedDeletion(snapshotToDelete)
.build()
return ExecutionResult.Next(metadataToSave)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@

package org.opensearch.indexmanagement.snapshotmanagement.engine.states

import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine.SMStateMachine
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.State.ExecutionResult
import org.opensearch.indexmanagement.snapshotmanagement.getSnapshots
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.ResetType
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.WorkflowType
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.Companion.upsert
import org.opensearch.indexmanagement.snapshotmanagement.smJobIdToPolicyName
import org.opensearch.snapshots.SnapshotMissingException
import org.opensearch.snapshots.SnapshotState

object FinishedState : State {
Expand Down Expand Up @@ -50,9 +48,12 @@ object FinishedState : State {
// TODO SM record the snapshot in progress state in info
}
}
} catch (ex: SnapshotMissingException) {
// User may manually delete the creating snapshot
return ExecutionResult.Failure(ex, WorkflowType.CREATION, reset = true)
} catch (ex: Exception) {
// If someone deletes the creating snapshot, we could face the SnapshotMissingException
return ExecutionResult.Failure(ex, ResetType.CREATION)
// TODO SM need to implement retry mechanism so we don't stuck forever
return ExecutionResult.Failure(ex, WorkflowType.CREATION, reset = false)
}
}
metadata.deletion.started != null -> {
Expand All @@ -77,7 +78,7 @@ object FinishedState : State {
}
}

// TODO SM deal with time limitation,
// TODO SM deal with time limitation

val metadataToSave = SMMetadata.Builder(metadata)
.startedCreation(creationStarted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ interface State {
sealed class ExecutionResult {
data class Next(val metadataToSave: SMMetadata) : ExecutionResult()
data class Stay(val metadataToSave: SMMetadata? = null) : ExecutionResult()
data class Failure(val ex: Exception, val resetType: SMMetadata.ResetType) : ExecutionResult()
data class Failure(val ex: Exception, val workflowType: SMMetadata.WorkflowType, val reset: Boolean) : ExecutionResult()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ data class SMMetadata(
}
}

enum class ResetType {
enum class WorkflowType {
CREATION,
DELETION,
}
Expand All @@ -376,22 +376,23 @@ data class SMMetadata(

fun build() = metadata

fun reset(resetType: ResetType): Builder {
fun reset(workflowType: WorkflowType): Builder {
var currentState = metadata.currentState
var startedCreation = metadata.creation.started
var startedDeletion = metadata.deletion.started
var deletionStartedTime = metadata.deletion.startedTime
when(resetType) {
ResetType.CREATION -> {
when (workflowType) {
WorkflowType.CREATION -> {
currentState = SMState.CREATING
startedCreation = null
}
ResetType.DELETION -> {
WorkflowType.DELETION -> {
currentState = SMState.DELETING
startedDeletion = null
deletionStartedTime = null
}
}

metadata = metadata.copy(
currentState = currentState,
creation = metadata.creation.copy(
Expand Down
Loading

0 comments on commit a51fcd5

Please sign in to comment.