diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMStateMachine.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMStateMachine.kt index bd6bdedda..66ca3162f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMStateMachine.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMStateMachine.kt @@ -77,6 +77,13 @@ class SMStateMachine( // TODO error notification break } + is ExecutionResult.TimeLimitExceed -> { + log.warn("${result.workflowType} has exceeded the time limit.") + val metadataToSave = SMMetadata.Builder(metadata) + .reset(result.workflowType) + .build() + updateMetadata(metadataToSave) + } } } if (result !is ExecutionResult.Next) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/CreatingState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/CreatingState.kt index 956fddf27..bea3fe7d0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/CreatingState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/CreatingState.kt @@ -17,6 +17,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.Workfl import org.opensearch.indexmanagement.snapshotmanagement.smJobIdToPolicyName import org.opensearch.snapshots.SnapshotInfo import java.time.Instant +import java.time.Instant.now object CreatingState : State { @@ -60,7 +61,12 @@ object CreatingState : State { val metadataToSave = SMMetadata.Builder(metadata) .currentState(SMState.CREATING) - .startedCreation(SMMetadata.SnapshotInfo(name = snapshotName)) + .startedCreation( + SMMetadata.SnapshotInfo( + name = snapshotName, + startTime = now(), + ) + ) .build() return ExecutionResult.Next(metadataToSave) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/DeletingState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/DeletingState.kt index 0f6ae1c9b..64dc42a06 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/DeletingState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/DeletingState.kt @@ -53,8 +53,11 @@ object DeletingState : State { } val metadataToSave = SMMetadata.Builder(metadata) - .deletionStartTime(now()) - .startedDeletion(snapshotToDelete) + .currentState(SMState.DELETING) + .deletionStart( + startTime = now(), + snapshotInfo = snapshotToDelete + ) .build() return ExecutionResult.Next(metadataToSave) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/FinishedState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/FinishedState.kt index 406af213b..3076914e1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/FinishedState.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/FinishedState.kt @@ -5,6 +5,7 @@ package org.opensearch.indexmanagement.snapshotmanagement.engine.states +import org.opensearch.common.unit.TimeValue import org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine.SMStateMachine import org.opensearch.indexmanagement.snapshotmanagement.engine.states.State.ExecutionResult import org.opensearch.indexmanagement.snapshotmanagement.getSnapshots @@ -14,6 +15,8 @@ import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.Compan import org.opensearch.indexmanagement.snapshotmanagement.smJobIdToPolicyName import org.opensearch.snapshots.SnapshotMissingException import org.opensearch.snapshots.SnapshotState +import java.time.Instant +import java.time.Instant.now object FinishedState : State { @@ -30,24 +33,11 @@ object FinishedState : State { var info = metadata.info when { metadata.creation.started != null -> { - try { - val snapshots = client.getSnapshots( + val snapshots = try { + client.getSnapshots( metadata.creation.started.name, job.snapshotConfig["repository"] as String ) - when (snapshots.firstOrNull()?.state()) { - SnapshotState.SUCCESS -> { - creationStarted = null - info = info.upsert( - "last_success" to "${metadata.creation.started} has been created." - ) - } - else -> { - // IN_PROGRESS, FAILED, PARTIAL, INCOMPATIBLE - log.info("Creating snapshot [${metadata.creation.started}] has not succeed") - // TODO SM record the snapshot in progress state in info - } - } } catch (ex: SnapshotMissingException) { // User may manually delete the creating snapshot return ExecutionResult.Failure(ex, WorkflowType.CREATION, reset = true) @@ -55,12 +45,38 @@ object FinishedState : State { // TODO SM need to implement retry mechanism so we don't stuck forever return ExecutionResult.Failure(ex, WorkflowType.CREATION, reset = false) } + + when (snapshots.firstOrNull()?.state()) { + SnapshotState.SUCCESS -> { + creationStarted = null + info = info.upsert( + "last_success" to "${metadata.creation.started} has been created." + ) + } + else -> { + // IN_PROGRESS, FAILED, PARTIAL, INCOMPATIBLE + log.info("Creating snapshot [${metadata.creation.started}] has not succeed") + // TODO SM record the snapshot in progress state in info + } + } + + val timeLimit = job.creation.timeLimit + val startTime = metadata.creation.started.startTime + timeLimit?.let { + if (timeLimitExceed(startTime, timeLimit)) + return ExecutionResult.TimeLimitExceed(WorkflowType.CREATION) + } } metadata.deletion.started != null -> { - val snapshots = client.getSnapshots( - "${smJobIdToPolicyName(job.id)}*", - job.snapshotConfig["repository"] as String - ) + val snapshots = try { + client.getSnapshots( + "${smJobIdToPolicyName(job.id)}*", + job.snapshotConfig["repository"] as String + ) + } catch (ex: Exception) { + // TODO SM need to implement retry mechanism so we don't stuck forever + return ExecutionResult.Failure(ex, WorkflowType.DELETION, reset = false) + } val existingSnapshots = snapshots.map { it.snapshotId().name } val startedDeleteSnapshots = metadata.deletion.started val remainingSnapshotsName = startedDeleteSnapshots.map { it.name }.toSet() - existingSnapshots.toSet() @@ -72,14 +88,21 @@ object FinishedState : State { it.name in remainingSnapshotsName }.toList() } + + val timeLimit = job.deletion.timeLimit + val startTime = metadata.deletion.startedTime + startTime?.let { + timeLimit?.let { + if (timeLimitExceed(startTime, timeLimit)) + return ExecutionResult.TimeLimitExceed(WorkflowType.DELETION) + } + } } else -> { log.info("No ongoing creating or deleting snapshots, will go to next execution schedule.") } } - // TODO SM deal with time limitation - val metadataToSave = SMMetadata.Builder(metadata) .startedCreation(creationStarted) .startedDeletion(deletionStarted) @@ -91,4 +114,8 @@ object FinishedState : State { } return ExecutionResult.Next(metadataToSave) } + + private fun timeLimitExceed(startTime: Instant, timeLimit: TimeValue): Boolean { + return (now().toEpochMilli() - startTime.toEpochMilli()) > timeLimit.millis + } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/State.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/State.kt index 515fd104d..1df8b808b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/State.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/states/State.kt @@ -34,5 +34,6 @@ interface State { data class Next(val metadataToSave: SMMetadata) : ExecutionResult() data class Stay(val metadataToSave: SMMetadata? = null) : ExecutionResult() data class Failure(val ex: Exception, val workflowType: SMMetadata.WorkflowType, val reset: Boolean) : ExecutionResult() + data class TimeLimitExceed(val workflowType: SMMetadata.WorkflowType) : ExecutionResult() } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt index 127e138d7..d70c19642 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt @@ -308,7 +308,7 @@ data class SMMetadata( data class SnapshotInfo( val name: String, - val startTime: Instant? = null, + val startTime: Instant, val endTime: Instant? = null, ) : Writeable, ToXContent { @@ -343,8 +343,8 @@ data class SMMetadata( } return SnapshotInfo( - name = requireNotNull(name) { "snapshot info name must not be null." }, - startTime = startTime, + name = requireNotNull(name) { "name in snapshot info must not be null." }, + startTime = requireNotNull(startTime) { "start time in snapshot info must not be null." }, endTime = endTime, ) } @@ -352,13 +352,13 @@ data class SMMetadata( constructor(sin: StreamInput) : this( name = sin.readString(), - startTime = sin.readOptionalInstant(), + startTime = sin.readInstant(), endTime = sin.readOptionalInstant(), ) override fun writeTo(out: StreamOutput) { out.writeString(name) - out.writeOptionalInstant(startTime) + out.writeInstant(startTime) out.writeOptionalInstant(endTime) } } @@ -470,10 +470,11 @@ data class SMMetadata( return this } - fun deletionStartTime(time: Instant?): Builder { + fun deletionStart(startTime: Instant?, snapshotInfo: List?): Builder { metadata = metadata.copy( deletion = metadata.deletion.copy( - startedTime = time + started = snapshotInfo, + startedTime = startTime, ) ) return this diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt index bdf76cc9f..899786754 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt @@ -194,13 +194,13 @@ data class SMPolicy( data class Creation( val schedule: Schedule, - val timeout: TimeValue? = null, + val timeLimit: TimeValue? = null, ) : Writeable, ToXContent { override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { return builder.startObject() .field(SCHEDULE_FIELD, schedule) - .optionalField(TIME_LIMIT_FIELD, timeout) + .optionalField(TIME_LIMIT_FIELD, timeLimit) .endObject() } @@ -224,25 +224,25 @@ data class SMPolicy( return Creation( schedule = requireNotNull(schedule) { "schedule field must not be null" }, - timeout = timeout + timeLimit = timeout ) } } constructor(sin: StreamInput) : this( schedule = CronSchedule(sin), - timeout = sin.readOptionalTimeValue(), + timeLimit = sin.readOptionalTimeValue(), ) override fun writeTo(out: StreamOutput) { schedule.writeTo(out) - out.writeOptionalTimeValue(timeout) + out.writeOptionalTimeValue(timeLimit) } } data class Deletion( val schedule: Schedule, - val timeout: TimeValue? = null, + val timeLimit: TimeValue? = null, val condition: DeleteCondition, ) : Writeable, ToXContent { @@ -250,7 +250,7 @@ data class SMPolicy( return builder.startObject() .field(SCHEDULE_FIELD, schedule) .field(CONDITION_FIELD, condition) - .optionalField(TIME_LIMIT_FIELD, timeout) + .optionalField(TIME_LIMIT_FIELD, timeLimit) .endObject() } @@ -282,7 +282,7 @@ data class SMPolicy( return Deletion( schedule = schedule, - timeout = timeout, + timeLimit = timeout, condition = requireNotNull(condition) { "condition field must not be null" }, ) } @@ -290,13 +290,13 @@ data class SMPolicy( constructor(sin: StreamInput) : this( schedule = CronSchedule(sin), - timeout = sin.readOptionalTimeValue(), + timeLimit = sin.readOptionalTimeValue(), condition = DeleteCondition(sin), ) override fun writeTo(out: StreamOutput) { schedule.writeTo(out) - out.writeOptionalTimeValue(timeout) + out.writeOptionalTimeValue(timeLimit) condition.writeTo(out) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt index 48dd3a99f..39d91db86 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt @@ -19,7 +19,6 @@ import org.opensearch.indexmanagement.randomIntervalSchedule import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout import org.opensearch.jobscheduler.spi.schedule.CronSchedule import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.rest.RestStatus @@ -75,11 +74,11 @@ fun randomSMPolicy( jobLastUpdateTime = jobLastUpdateTime, creation = SMPolicy.Creation( schedule = creationSchedule, - timeout = creationTimeout, + timeLimit = creationTimeout, ), deletion = SMPolicy.Deletion( schedule = deletionSchedule, - timeout = deletionTimeout, + timeLimit = deletionTimeout, condition = SMPolicy.DeleteCondition( maxCount = deletionMaxCount, maxAge = deletionMaxAge,