Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SM dev - state machine #370

Merged
merged 71 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
414784e
State machine base setup
bowenlan-amzn May 3, 2022
61d64d1
Rename SM_ACTION to SM_POLICY_ACTION
bowenlan-amzn May 17, 2022
61aa482
Add locking mechanism in runner
bowenlan-amzn May 18, 2022
9f2bc83
Metadata builder helper
bowenlan-amzn May 18, 2022
b285af5
Revise exception handling
bowenlan-amzn May 18, 2022
291a3b6
Move metadata builder to SMMetadata
bowenlan-amzn May 19, 2022
d3c9da5
Failure handling, separate creation, deletion workflow
bowenlan-amzn May 19, 2022
668191d
Check already created snapshots in CREATING state
bowenlan-amzn May 20, 2022
6c1df05
Revise DELETING state
bowenlan-amzn May 20, 2022
5e7496e
Revise FINISHED state
bowenlan-amzn May 21, 2022
a51fcd5
Working on time limit, refine the exception handling
bowenlan-amzn May 21, 2022
c502e66
Time limit implementation
bowenlan-amzn May 21, 2022
54b2c9e
Refactor execute result
bowenlan-amzn May 22, 2022
011384c
Retry implementation
bowenlan-amzn May 22, 2022
cdfcf17
Refactor
bowenlan-amzn May 22, 2022
eb42dc8
UT Create condition met
bowenlan-amzn May 22, 2022
8805116
UT creating
bowenlan-amzn May 23, 2022
58dc8e2
UT deleting
bowenlan-amzn May 23, 2022
426ad46
UT finished state
bowenlan-amzn May 23, 2022
63e8fe4
SMPolicy shouldn't have NO_ID
bowenlan-amzn May 23, 2022
7cbb20b
UT finished state deleting
bowenlan-amzn May 24, 2022
9661cf5
UT finished time limit exceed
bowenlan-amzn May 24, 2022
14ae449
UT state machine
bowenlan-amzn May 24, 2022
9f0c971
testingConventions
bowenlan-amzn May 24, 2022
3097bc2
additional apis implementation
downsrob May 24, 2022
8c73008
UT state machine left Retry
bowenlan-amzn May 24, 2022
09e3842
UT deleting findSnapshotsToDelete
bowenlan-amzn May 25, 2022
6cd27ee
After merge
bowenlan-amzn May 25, 2022
0a8d0cd
Update system index mapping
bowenlan-amzn May 25, 2022
f1664db
Manual testing and fix issues
bowenlan-amzn May 26, 2022
940d621
Use 'policy' as the start object field for index and get sm policy re…
bowenlan-amzn May 26, 2022
686b0d4
Refactor the metadata update to be at place
bowenlan-amzn May 27, 2022
70e1169
Adds get all policies and explain policy
downsrob May 27, 2022
5cac9a2
snapshot name cannot contain upper case letter
bowenlan-amzn May 31, 2022
406da87
Adds explain and get all apis
downsrob Jun 1, 2022
e8d3c83
Fixes detekt issues, adds additional tests
downsrob Jun 1, 2022
bed1acf
deletion condition becomes optional to user with default max_count an…
bowenlan-amzn Jun 2, 2022
381f9e8
Use snapshot metadata to save sm policy as identifier
bowenlan-amzn Jun 2, 2022
94bd5df
Fix tests
bowenlan-amzn Jun 2, 2022
7f2f093
Refactor to latest execution metadata, remove info
bowenlan-amzn Jun 4, 2022
02dfc4e
use info in latest execution
bowenlan-amzn Jun 5, 2022
da5436f
Refactor state machine Failure path
bowenlan-amzn Jun 5, 2022
e4249a5
Refactor get snapshots section
bowenlan-amzn Jun 5, 2022
e186c5c
Refactor latest execution update
bowenlan-amzn Jun 5, 2022
d8d6cf5
Refactor out the TimeLimitExceed SMResult
bowenlan-amzn Jun 5, 2022
613d2c1
Reduce confliction
bowenlan-amzn Jun 5, 2022
94704ab
Refactor latest execution update
bowenlan-amzn Jun 6, 2022
dfb88ae
Refactor update next execution time
bowenlan-amzn Jun 7, 2022
c1db46a
Improve UT
bowenlan-amzn Jun 7, 2022
cac72cb
Refactor delete config in SM policy to be optional
bowenlan-amzn Jun 8, 2022
bd0abca
Manual testing and fixing
bowenlan-amzn Jun 9, 2022
1bc4c71
Revise how state handle null policy deletion config and add UT
bowenlan-amzn Jun 9, 2022
f23bc17
Revises based on PR comments
downsrob Jun 10, 2022
197d01f
Fixed detekt issues
downsrob Jun 10, 2022
59fcc79
Removes unused import
downsrob Jun 10, 2022
a5c8667
Changes stop and start to POST. Fixes metadata writing
downsrob Jun 11, 2022
c36a83e
Merge branch 'apis' into smapis
bowenlan-amzn Jun 11, 2022
194013c
Fix after merge
bowenlan-amzn Jun 11, 2022
f060b5c
Add policy in rest handler name
bowenlan-amzn Jun 11, 2022
5db5dc3
Add a IT
bowenlan-amzn Jun 13, 2022
85e6998
Merge branch 'sm-dev' into sm-statemachine
bowenlan-amzn Jun 13, 2022
baf4fab
Fixing explain IT
bowenlan-amzn Jun 13, 2022
2caa004
One IT for overall workflow
bowenlan-amzn Jun 13, 2022
fa23380
Handle remote transport exception and remove double exclamation
bowenlan-amzn Jun 14, 2022
6c9b7f2
Refactor separate creation and deletion
bowenlan-amzn Jun 15, 2022
0e0cf32
Remove assert, update warning message for edge cases.
bowenlan-amzn Jun 15, 2022
617e37a
Fix UTs
bowenlan-amzn Jun 15, 2022
c9710eb
Address comments
bowenlan-amzn Jun 16, 2022
32a4f25
Comment out delete action IT flaky part
bowenlan-amzn Jun 16, 2022
312c8e8
Fixed cron schedule expression for flaky explain IT
bowenlan-amzn Jun 16, 2022
5ea28cd
Address partial comments
bowenlan-amzn Jun 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ http
.project
.settings
src/test/resources/job-scheduler/
src/test/resources/bwc/
src/test/resources/bwc/
bin/
spi/bin/
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.Trans
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.start.TransportStartSMAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.stop.TransportStopSMAction
import org.opensearch.indexmanagement.snapshotmanagement.engine.SMRunner
import org.opensearch.indexmanagement.snapshotmanagement.SMRunner
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.spi.IndexManagementExtension
Expand Down Expand Up @@ -231,7 +231,6 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
@Suppress("ComplexMethod")
override fun getJobParser(): ScheduledJobParser {
return ScheduledJobParser { xcp, id, jobDocVersion ->
log.info("sm dev: plugin job parser")
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import org.opensearch.indexmanagement.rollup.RollupRunner
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.snapshotmanagement.engine.SMRunner
import org.opensearch.indexmanagement.snapshotmanagement.SMRunner
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.transform.TransformRunner
import org.opensearch.indexmanagement.transform.model.Transform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.snapshotmanagement.engine
package org.opensearch.indexmanagement.snapshotmanagement

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.client.Client
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.snapshotmanagement.engine.SMStateMachine
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.SMState
import org.opensearch.indexmanagement.snapshotmanagement.getSMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.indexMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.smDocIdToPolicyName
import org.opensearch.indexmanagement.snapshotmanagement.smPolicyNameToMetadataId
import org.opensearch.OpenSearchStatusException
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.creationTransitions
import org.opensearch.indexmanagement.snapshotmanagement.engine.states.deletionTransitions
import org.opensearch.indexmanagement.util.acquireLockForScheduledJob
import org.opensearch.indexmanagement.util.releaseLockForScheduledJob
import org.opensearch.jobscheduler.spi.JobExecutionContext
Expand Down Expand Up @@ -61,16 +61,28 @@ object SMRunner :
return@launch
}

bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
var metadata = try {
val metadata = try {
client.getSMMetadata(job.id)
} catch (e: OpenSearchStatusException) {
initMetadata(job) ?: return@launch
}
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved

// TODO SM state machine logic
// creation, deletion workflow have to be executed sequentially,
// because they are sharing the same metadata document.
SMStateMachine(client, job, metadata)
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
.handlePolicyChange()
.currentState(metadata.creation.currentState)
.next(creationTransitions)
.apply {
val deleteMetadata = metadata.deletion
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
if (deleteMetadata != null) {
this.currentState(deleteMetadata.currentState)
.next(deletionTransitions)
}
}

if (!releaseLockForScheduledJob(context, lock)) {
log.debug("Could not release lock [${lock.lockId}] for ${job.id}.")
log.error("Could not release lock [${lock.lockId}] for ${job.id}.")
}
}
}
Expand All @@ -85,7 +97,7 @@ object SMRunner :
log.info("Initializing metadata [$initMetadata] for job [${job.id}].")
try {
// TODO SM more granular error checking
val res = client.indexMetadata(initMetadata, job.id, create = true)
val res = client.indexMetadata(initMetadata, job.id, create = true, seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
if (res.status() != RestStatus.CREATED) {
log.error("Metadata initialization response status is ${res.status()}, expecting CREATED 201.")
return null
Expand All @@ -98,21 +110,25 @@ object SMRunner :
}

private fun getInitialMetadata(job: SMPolicy): SMMetadata {
val now = now()
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
return SMMetadata(
id = smPolicyNameToMetadataId(smDocIdToPolicyName(job.id)),
policySeqNo = job.seqNo,
policyPrimaryTerm = job.primaryTerm,
currentState = SMState.START,
creation = SMMetadata.Creation(
creation = SMMetadata.WorkflowMetadata(
SMState.CREATION_START,
SMMetadata.Trigger(
time = job.creation.schedule.getNextExecutionTime(now())
time = job.creation.schedule.getNextExecutionTime(now)
)
),
deletion = SMMetadata.Deletion(
SMMetadata.Trigger(
time = job.deletion.schedule.getNextExecutionTime(now())
deletion = job.deletion?.let {
SMMetadata.WorkflowMetadata(
SMState.DELETION_START,
SMMetadata.Trigger(
time = job.deletion.schedule.getNextExecutionTime(now)
)
)
),
},
)
}
}
Loading