Skip to content

Commit

Permalink
Time limit implementation
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed May 21, 2022
1 parent a51fcd5 commit c502e66
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ class SMStateMachine(
// TODO error notification
break
}
is ExecutionResult.TimeLimitExceed -> {
log.warn("${result.workflowType} has exceeded the time limit.")
val metadataToSave = SMMetadata.Builder(metadata)
.reset(result.workflowType)
.build()
updateMetadata(metadataToSave)
}
}
}
if (result !is ExecutionResult.Next) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.Workfl
import org.opensearch.indexmanagement.snapshotmanagement.smJobIdToPolicyName
import org.opensearch.snapshots.SnapshotInfo
import java.time.Instant
import java.time.Instant.now

object CreatingState : State {

Expand Down Expand Up @@ -60,7 +61,12 @@ object CreatingState : State {

val metadataToSave = SMMetadata.Builder(metadata)
.currentState(SMState.CREATING)
.startedCreation(SMMetadata.SnapshotInfo(name = snapshotName))
.startedCreation(
SMMetadata.SnapshotInfo(
name = snapshotName,
startTime = now(),
)
)
.build()
return ExecutionResult.Next(metadataToSave)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ object DeletingState : State {
}

val metadataToSave = SMMetadata.Builder(metadata)
.deletionStartTime(now())
.startedDeletion(snapshotToDelete)
.currentState(SMState.DELETING)
.deletionStart(
startTime = now(),
snapshotInfo = snapshotToDelete
)
.build()
return ExecutionResult.Next(metadataToSave)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.indexmanagement.snapshotmanagement.engine.states

import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine.SMStateMachine
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.State.ExecutionResult
import org.opensearch.indexmanagement.snapshotmanagement.getSnapshots
Expand All @@ -14,6 +15,8 @@ import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata.Compan
import org.opensearch.indexmanagement.snapshotmanagement.smJobIdToPolicyName
import org.opensearch.snapshots.SnapshotMissingException
import org.opensearch.snapshots.SnapshotState
import java.time.Instant
import java.time.Instant.now

object FinishedState : State {

Expand All @@ -30,37 +33,50 @@ object FinishedState : State {
var info = metadata.info
when {
metadata.creation.started != null -> {
try {
val snapshots = client.getSnapshots(
val snapshots = try {
client.getSnapshots(
metadata.creation.started.name,
job.snapshotConfig["repository"] as String
)
when (snapshots.firstOrNull()?.state()) {
SnapshotState.SUCCESS -> {
creationStarted = null
info = info.upsert(
"last_success" to "${metadata.creation.started} has been created."
)
}
else -> {
// IN_PROGRESS, FAILED, PARTIAL, INCOMPATIBLE
log.info("Creating snapshot [${metadata.creation.started}] has not succeed")
// TODO SM record the snapshot in progress state in info
}
}
} catch (ex: SnapshotMissingException) {
// User may manually delete the creating snapshot
return ExecutionResult.Failure(ex, WorkflowType.CREATION, reset = true)
} catch (ex: Exception) {
// TODO SM need to implement retry mechanism so we don't stuck forever
return ExecutionResult.Failure(ex, WorkflowType.CREATION, reset = false)
}

when (snapshots.firstOrNull()?.state()) {
SnapshotState.SUCCESS -> {
creationStarted = null
info = info.upsert(
"last_success" to "${metadata.creation.started} has been created."
)
}
else -> {
// IN_PROGRESS, FAILED, PARTIAL, INCOMPATIBLE
log.info("Creating snapshot [${metadata.creation.started}] has not succeed")
// TODO SM record the snapshot in progress state in info
}
}

val timeLimit = job.creation.timeLimit
val startTime = metadata.creation.started.startTime
timeLimit?.let {
if (timeLimitExceed(startTime, timeLimit))
return ExecutionResult.TimeLimitExceed(WorkflowType.CREATION)
}
}
metadata.deletion.started != null -> {
val snapshots = client.getSnapshots(
"${smJobIdToPolicyName(job.id)}*",
job.snapshotConfig["repository"] as String
)
val snapshots = try {
client.getSnapshots(
"${smJobIdToPolicyName(job.id)}*",
job.snapshotConfig["repository"] as String
)
} catch (ex: Exception) {
// TODO SM need to implement retry mechanism so we don't stuck forever
return ExecutionResult.Failure(ex, WorkflowType.DELETION, reset = false)
}
val existingSnapshots = snapshots.map { it.snapshotId().name }
val startedDeleteSnapshots = metadata.deletion.started
val remainingSnapshotsName = startedDeleteSnapshots.map { it.name }.toSet() - existingSnapshots.toSet()
Expand All @@ -72,14 +88,21 @@ object FinishedState : State {
it.name in remainingSnapshotsName
}.toList()
}

val timeLimit = job.deletion.timeLimit
val startTime = metadata.deletion.startedTime
startTime?.let {
timeLimit?.let {
if (timeLimitExceed(startTime, timeLimit))
return ExecutionResult.TimeLimitExceed(WorkflowType.DELETION)
}
}
}
else -> {
log.info("No ongoing creating or deleting snapshots, will go to next execution schedule.")
}
}

// TODO SM deal with time limitation

val metadataToSave = SMMetadata.Builder(metadata)
.startedCreation(creationStarted)
.startedDeletion(deletionStarted)
Expand All @@ -91,4 +114,8 @@ object FinishedState : State {
}
return ExecutionResult.Next(metadataToSave)
}

private fun timeLimitExceed(startTime: Instant, timeLimit: TimeValue): Boolean {
return (now().toEpochMilli() - startTime.toEpochMilli()) > timeLimit.millis
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ interface State {
data class Next(val metadataToSave: SMMetadata) : ExecutionResult()
data class Stay(val metadataToSave: SMMetadata? = null) : ExecutionResult()
data class Failure(val ex: Exception, val workflowType: SMMetadata.WorkflowType, val reset: Boolean) : ExecutionResult()
data class TimeLimitExceed(val workflowType: SMMetadata.WorkflowType) : ExecutionResult()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ data class SMMetadata(

data class SnapshotInfo(
val name: String,
val startTime: Instant? = null,
val startTime: Instant,
val endTime: Instant? = null,
) : Writeable, ToXContent {

Expand Down Expand Up @@ -343,22 +343,22 @@ data class SMMetadata(
}

return SnapshotInfo(
name = requireNotNull(name) { "snapshot info name must not be null." },
startTime = startTime,
name = requireNotNull(name) { "name in snapshot info must not be null." },
startTime = requireNotNull(startTime) { "start time in snapshot info must not be null." },
endTime = endTime,
)
}
}

constructor(sin: StreamInput) : this(
name = sin.readString(),
startTime = sin.readOptionalInstant(),
startTime = sin.readInstant(),
endTime = sin.readOptionalInstant(),
)

override fun writeTo(out: StreamOutput) {
out.writeString(name)
out.writeOptionalInstant(startTime)
out.writeInstant(startTime)
out.writeOptionalInstant(endTime)
}
}
Expand Down Expand Up @@ -470,10 +470,11 @@ data class SMMetadata(
return this
}

fun deletionStartTime(time: Instant?): Builder {
fun deletionStart(startTime: Instant?, snapshotInfo: List<SnapshotInfo>?): Builder {
metadata = metadata.copy(
deletion = metadata.deletion.copy(
startedTime = time
started = snapshotInfo,
startedTime = startTime,
)
)
return this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ data class SMPolicy(

data class Creation(
val schedule: Schedule,
val timeout: TimeValue? = null,
val timeLimit: TimeValue? = null,
) : Writeable, ToXContent {

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(SCHEDULE_FIELD, schedule)
.optionalField(TIME_LIMIT_FIELD, timeout)
.optionalField(TIME_LIMIT_FIELD, timeLimit)
.endObject()
}

Expand All @@ -224,33 +224,33 @@ data class SMPolicy(

return Creation(
schedule = requireNotNull(schedule) { "schedule field must not be null" },
timeout = timeout
timeLimit = timeout
)
}
}

constructor(sin: StreamInput) : this(
schedule = CronSchedule(sin),
timeout = sin.readOptionalTimeValue(),
timeLimit = sin.readOptionalTimeValue(),
)

override fun writeTo(out: StreamOutput) {
schedule.writeTo(out)
out.writeOptionalTimeValue(timeout)
out.writeOptionalTimeValue(timeLimit)
}
}

data class Deletion(
val schedule: Schedule,
val timeout: TimeValue? = null,
val timeLimit: TimeValue? = null,
val condition: DeleteCondition,
) : Writeable, ToXContent {

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(SCHEDULE_FIELD, schedule)
.field(CONDITION_FIELD, condition)
.optionalField(TIME_LIMIT_FIELD, timeout)
.optionalField(TIME_LIMIT_FIELD, timeLimit)
.endObject()
}

Expand Down Expand Up @@ -282,21 +282,21 @@ data class SMPolicy(

return Deletion(
schedule = schedule,
timeout = timeout,
timeLimit = timeout,
condition = requireNotNull(condition) { "condition field must not be null" },
)
}
}

constructor(sin: StreamInput) : this(
schedule = CronSchedule(sin),
timeout = sin.readOptionalTimeValue(),
timeLimit = sin.readOptionalTimeValue(),
condition = DeleteCondition(sin),
)

override fun writeTo(out: StreamOutput) {
schedule.writeTo(out)
out.writeOptionalTimeValue(timeout)
out.writeOptionalTimeValue(timeLimit)
condition.writeTo(out)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.opensearch.indexmanagement.randomIntervalSchedule
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
import org.opensearch.jobscheduler.spi.schedule.CronSchedule
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.rest.RestStatus
Expand Down Expand Up @@ -75,11 +74,11 @@ fun randomSMPolicy(
jobLastUpdateTime = jobLastUpdateTime,
creation = SMPolicy.Creation(
schedule = creationSchedule,
timeout = creationTimeout,
timeLimit = creationTimeout,
),
deletion = SMPolicy.Deletion(
schedule = deletionSchedule,
timeout = deletionTimeout,
timeLimit = deletionTimeout,
condition = SMPolicy.DeleteCondition(
maxCount = deletionMaxCount,
maxAge = deletionMaxAge,
Expand Down

0 comments on commit c502e66

Please sign in to comment.