diff --git a/.github/ISSUE_TEMPLATE/documentation.md b/.github/ISSUE_TEMPLATE/documentation.md
new file mode 100644
index 000000000..9c4ca695c
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/documentation.md
@@ -0,0 +1,11 @@
+**Is your feature request related to a problem?**
+A new feature has been added.
+
+**What solution would you like?**
+Document the usage of the new feature.
+
+**What alternatives have you considered?**
+N/A
+
+**Do you have any additional context?**
+_Add any other context or screenshots about the feature request here._
diff --git a/.github/workflows/create-documentation-issue.yml b/.github/workflows/create-documentation-issue.yml
new file mode 100644
index 000000000..23bc47e6f
--- /dev/null
+++ b/.github/workflows/create-documentation-issue.yml
@@ -0,0 +1,41 @@
+name: Create Documentation Issue
+on:
+ pull_request:
+ types:
+ - labeled
+env:
+ PR_NUMBER: ${{ github.event.number }}
+
+jobs:
+ create-issue:
+ if: ${{ github.event.label.name == 'needs-documentation' }}
+ runs-on: ubuntu-latest
+ name: Create Documentation Issue
+ steps:
+ - name: GitHub App token
+ id: github_app_token
+ uses: tibdex/github-app-token@v1.5.0
+ with:
+ app_id: ${{ secrets.APP_ID }}
+ private_key: ${{ secrets.APP_PRIVATE_KEY }}
+ installation_id: 22958780
+
+ - name: Checkout code
+ uses: actions/checkout@v2
+
+ - name: Edit the issue template
+ run: |
+ echo "https://github.com/opensearch-project/index-management/pull/${{ env.PR_NUMBER }}." >> ./.github/ISSUE_TEMPLATE/documentation.md
+
+ - name: Create Issue From File
+ id: create-issue
+ uses: peter-evans/create-issue-from-file@v4
+ with:
+ title: Add documentation related to new feature
+ content-filepath: ./.github/ISSUE_TEMPLATE/documentation.md
+ labels: documentation
+ repository: opensearch-project/documentation-website
+ token: ${{ steps.github_app_token.outputs.token }}
+
+ - name: Print Issue
+ run: echo Created related documentation issue ${{ steps.create-issue.outputs.issue-number }}
diff --git a/.idea/copyright/OpenSearch.xml b/.idea/copyright/OpenSearch.xml
index dadc9ed84..7c7f3c26f 100644
--- a/.idea/copyright/OpenSearch.xml
+++ b/.idea/copyright/OpenSearch.xml
@@ -1,6 +1,6 @@
-
+
\ No newline at end of file
diff --git a/.idea/copyright/profiles_settings.xml b/.idea/copyright/profiles_settings.xml
index 5f45523cc..97a289b2f 100644
--- a/.idea/copyright/profiles_settings.xml
+++ b/.idea/copyright/profiles_settings.xml
@@ -1,7 +1,7 @@
-
+
-
+
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index c2cdb3c6f..499f13e1c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -14,9 +14,9 @@ import java.util.function.Predicate
buildscript {
ext {
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
- opensearch_version = System.getProperty("opensearch.version", "2.0.0-rc1-SNAPSHOT")
- buildVersionQualifier = System.getProperty("build.version_qualifier", "rc1")
- // 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT
+ opensearch_version = System.getProperty("opensearch.version", "2.0.0-SNAPSHOT")
+ buildVersionQualifier = System.getProperty("build.version_qualifier", "")
+ // 2.0.0-SNAPSHOT -> 2.0.0.0-SNAPSHOT
version_tokens = opensearch_version.tokenize('-')
opensearch_build = version_tokens[0] + '.0'
job_scheduler_no_snapshot = opensearch_build
@@ -99,6 +99,10 @@ configurations.all {
}
}
+idea.module {
+ excludeDirs -= file("$buildDir")
+}
+
def usingRemoteCluster = System.properties.containsKey('tests.rest.cluster') || System.properties.containsKey('tests.cluster')
def usingMultiNode = project.properties.containsKey('numNodes')
// Only apply jacoco test coverage if we are running a local single node cluster
diff --git a/detekt.yml b/detekt.yml
index 47b9d163c..78ff93acb 100644
--- a/detekt.yml
+++ b/detekt.yml
@@ -1,6 +1,6 @@
# TODO: Remove this before initial release, only for developmental purposes
build:
- maxIssues: 20
+ maxIssues: 100
exceptions:
TooGenericExceptionCaught:
diff --git a/spi/build.gradle b/spi/build.gradle
index f8ba3f69b..4b398668a 100644
--- a/spi/build.gradle
+++ b/spi/build.gradle
@@ -15,6 +15,7 @@ apply plugin: 'opensearch.java-rest-test'
apply plugin: 'kotlin'
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'
+apply plugin: 'idea'
ext {
projectSubstitutions = [:]
@@ -64,6 +65,15 @@ dependencies {
testImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
}
+idea.module {
+ excludeDirs -= file("$buildDir")
+}
+
+task sourcesJar(type: Jar, dependsOn: classes) {
+ classifier = 'sources'
+ from sourceSets.main.allSource
+}
+
test {
doFirst {
test.classpath -= project.files(project.tasks.named('shadowJar'))
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt
index 609f87330..8463b60df 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt
@@ -111,6 +111,13 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.settings.IndexManagementSettings
+import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestSMPolicyHandler
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMPolicyAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.TransportGetSMPolicyAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMPolicyAction
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.spi.IndexManagementExtension
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker
@@ -164,6 +171,8 @@ import java.util.function.Supplier
@Suppress("TooManyFunctions")
class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, Plugin() {
+ private val log = LogManager.getLogger(javaClass)
+
private val logger = LogManager.getLogger(javaClass)
lateinit var indexManagementIndices: IndexManagementIndices
lateinit var clusterService: ClusterService
@@ -187,6 +196,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management"
const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data"
+ const val SM_BASE_URI = "$PLUGINS_BASE_URI/_sm"
+
const val OLD_PLUGIN_NAME = "opendistro-im"
const val OPEN_DISTRO_BASE_URI = "/_opendistro"
const val LEGACY_ISM_BASE_URI = "$OPEN_DISTRO_BASE_URI/_ism"
@@ -208,6 +219,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
@Suppress("ComplexMethod")
override fun getJobParser(): ScheduledJobParser {
return ScheduledJobParser { xcp, id, jobDocVersion ->
+ log.info("sm dev: plugin job parser")
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
@@ -235,6 +247,12 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE -> {
return@ScheduledJobParser null
}
+ SMPolicy.SM_TYPE -> {
+ return@ScheduledJobParser SMPolicy.parse(xcp, id, jobDocVersion.seqNo, jobDocVersion.primaryTerm)
+ }
+ SMMetadata.SM_METADATA_TYPE -> {
+ return@ScheduledJobParser null
+ }
else -> {
logger.warn("Unsupported document was indexed in $INDEX_MANAGEMENT_INDEX with type: $fieldName")
xcp.skipChildren()
@@ -300,7 +318,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RestDeleteTransformAction(),
RestExplainTransformAction(),
RestStartTransformAction(),
- RestStopTransformAction()
+ RestStopTransformAction(),
+ RestSMPolicyHandler(),
)
}
@@ -502,7 +521,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ActionPlugin.ActionHandler(ExplainTransformAction.INSTANCE, TransportExplainTransformAction::class.java),
ActionPlugin.ActionHandler(StartTransformAction.INSTANCE, TransportStartTransformAction::class.java),
ActionPlugin.ActionHandler(StopTransformAction.INSTANCE, TransportStopTransformAction::class.java),
- ActionPlugin.ActionHandler(ManagedIndexAction.INSTANCE, TransportManagedIndexAction::class.java)
+ ActionPlugin.ActionHandler(ManagedIndexAction.INSTANCE, TransportManagedIndexAction::class.java),
+ ActionPlugin.ActionHandler(SMActions.INDEX_SM_ACTION_TYPE, TransportIndexSMPolicyAction::class.java),
+ ActionPlugin.ActionHandler(SMActions.GET_SM_ACTION_TYPE, TransportGetSMPolicyAction::class.java),
+ ActionPlugin.ActionHandler(SMActions.DELETE_SM_ACTION_TYPE, TransportDeleteSMPolicyAction::class.java),
)
}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt
index ae43a4829..23da3b8a4 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt
@@ -633,28 +633,49 @@ class ManagedIndexCoordinator(
suspend fun sweepManagedIndexJobs(client: Client): List {
val managedIndexUuids = mutableListOf()
- val managedIndexSearchRequest = getSweptManagedIndexSearchRequest()
- var response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) }
- var uuids = response.hits.map { it.id }
- val scrollIDsToClear = mutableSetOf()
-
- while (uuids.isNotEmpty()) {
- managedIndexUuids.addAll(uuids)
- val scrollID = response.scrollId
- scrollIDsToClear.add(scrollID)
- val scrollRequest = SearchScrollRequest().scrollId(scrollID).scroll(TimeValue.timeValueMinutes(1))
- response = client.suspendUntil { searchScroll(scrollRequest, it) }
- uuids = response.hits.map { it.id }
- }
+ // if # of documents below 10k, don't use scroll search
+ val countReq = getSweptManagedIndexSearchRequest(size = 0)
+ val countRes: SearchResponse = client.suspendUntil { search(countReq, it) }
+ val totalHits = countRes.hits.totalHits ?: return managedIndexUuids
- if (scrollIDsToClear.isNotEmpty()) {
- val clearScrollRequest = ClearScrollRequest()
- clearScrollRequest.scrollIds(scrollIDsToClear.toList())
- val clearScrollResponse: ClearScrollResponse =
- client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) }
+ if (totalHits.value >= MAX_HITS) {
+ val scrollIDsToClear = mutableSetOf()
+ try {
+ val managedIndexSearchRequest = getSweptManagedIndexSearchRequest(scroll = true)
+ var response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) }
+ var uuids = transformManagedIndexSearchRes(response)
+
+ while (uuids.isNotEmpty()) {
+ managedIndexUuids.addAll(uuids)
+ val scrollID = response.scrollId
+ scrollIDsToClear.add(scrollID)
+ val scrollRequest = SearchScrollRequest().scrollId(scrollID).scroll(TimeValue.timeValueMinutes(1))
+ response = client.suspendUntil { searchScroll(scrollRequest, it) }
+ uuids = transformManagedIndexSearchRes(response)
+ }
+ } finally {
+ if (scrollIDsToClear.isNotEmpty()) {
+ val clearScrollRequest = ClearScrollRequest()
+ clearScrollRequest.scrollIds(scrollIDsToClear.toList())
+ val clearScrollResponse: ClearScrollResponse =
+ client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) }
+ }
+ }
+ return managedIndexUuids
}
- return managedIndexUuids
+ val response: SearchResponse = client.suspendUntil { search(getSweptManagedIndexSearchRequest(), it) }
+ return transformManagedIndexSearchRes(response)
+ }
+
+ fun transformManagedIndexSearchRes(response: SearchResponse): List {
+ if (response.isTimedOut || response.failedShards > 0 || response.skippedShards > 0) {
+ val errorMsg = "Sweep managed indices failed. Timed out: ${response.isTimedOut} | " +
+ "Failed shards: ${response.failedShards} | Skipped shards: ${response.skippedShards}."
+ logger.error(errorMsg)
+ throw ISMCoordinatorSearchException(message = errorMsg)
+ }
+ return response.hits.map { it.id }
}
/**
@@ -736,3 +757,5 @@ class ManagedIndexCoordinator(
const val BUFFER = 20L
}
}
+
+class ISMCoordinatorSearchException(message: String, cause: Throwable? = null) : Exception(message, cause)
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt
index ee32f50ea..acaabc864 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt
@@ -14,6 +14,7 @@ import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.opensearchapi.instant
import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.schedule.Schedule
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser
@@ -86,7 +87,6 @@ data class ManagedIndexConfig(
companion object {
const val MANAGED_INDEX_TYPE = "managed_index"
- const val NO_ID = ""
const val NAME_FIELD = "name"
const val ENABLED_FIELD = "enabled"
const val SCHEDULE_FIELD = "schedule"
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt
index 432fa8e0d..6f5dd0377 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt
@@ -24,6 +24,7 @@ import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
import org.opensearch.indexmanagement.opensearchapi.optionalUserField
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.util.IndexUtils
+import org.opensearch.indexmanagement.util.NO_ID
import java.io.IOException
import java.time.Instant
@@ -147,7 +148,6 @@ data class Policy(
const val POLICY_TYPE = "policy"
const val POLICY_ID_FIELD = "policy_id"
const val DESCRIPTION_FIELD = "description"
- const val NO_ID = ""
const val LAST_UPDATED_TIME_FIELD = "last_updated_time"
const val SCHEMA_VERSION_FIELD = "schema_version"
const val ERROR_NOTIFICATION_FIELD = "error_notification"
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestIndexPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestIndexPolicyAction.kt
index dd98093a0..77f57689c 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestIndexPolicyAction.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestIndexPolicyAction.kt
@@ -21,6 +21,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.inde
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.indexmanagement.util.IF_PRIMARY_TERM
import org.opensearch.indexmanagement.util.IF_SEQ_NO
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.indexmanagement.util.REFRESH
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
@@ -69,8 +70,8 @@ class RestIndexPolicyAction(
@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
- val id = request.param("policyID", Policy.NO_ID)
- if (Policy.NO_ID == id) {
+ val id = request.param("policyID", NO_ID)
+ if (NO_ID == id) {
throw IllegalArgumentException("Missing policy ID")
}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/IndexPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/IndexPolicyRequest.kt
index fe1f60c74..4cc515cd6 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/IndexPolicyRequest.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/IndexPolicyRequest.kt
@@ -12,6 +12,7 @@ import org.opensearch.action.support.WriteRequest
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
+import org.opensearch.indexmanagement.util.NO_ID
import java.io.IOException
class IndexPolicyRequest : ActionRequest {
@@ -47,7 +48,7 @@ class IndexPolicyRequest : ActionRequest {
override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
- if (Policy.NO_ID == policyID) {
+ if (NO_ID == policyID) {
validationException = ValidateActions.addValidationError("Missing policyID", validationException)
}
return validationException
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt
index 769b2d22a..17c6fdeba 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt
@@ -188,18 +188,19 @@ fun getManagedIndicesToDelete(
}
}
-fun getSweptManagedIndexSearchRequest(): SearchRequest {
+fun getSweptManagedIndexSearchRequest(scroll: Boolean = false, size: Int = ManagedIndexCoordinator.MAX_HITS): SearchRequest {
val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.existsQuery(ManagedIndexConfig.MANAGED_INDEX_TYPE))
- return SearchRequest()
- .indices(INDEX_MANAGEMENT_INDEX)
- .scroll(TimeValue.timeValueMinutes(1))
+ val req = SearchRequest().indices(INDEX_MANAGEMENT_INDEX)
+ .allowPartialSearchResults(false)
.source(
SearchSourceBuilder.searchSource()
- .size(ManagedIndexCoordinator.MAX_HITS)
+ .size(size)
.seqNoAndPrimaryTerm(true)
.fetchSource(emptyArray(), emptyArray())
.query(boolQueryBuilder)
)
+ if (scroll) req.scroll(TimeValue.timeValueMinutes(1))
+ return req
}
@Suppress("ReturnCount", "ComplexCondition")
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt
index 726df1adf..834284334 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt
@@ -23,6 +23,9 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.DefaultShardOperationFailedException
import org.opensearch.client.OpenSearchClient
import org.opensearch.common.bytes.BytesReference
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.util.concurrent.ThreadContext
@@ -306,3 +309,34 @@ suspend fun withClosableContext(
context.injector.close()
}
}
+
+fun XContentBuilder.optionalField(name: String, value: Any?): XContentBuilder {
+ return if (value != null) { this.field(name, value) } else this
+}
+
+inline fun XContentParser.nullValueHandler(block: XContentParser.() -> T): T? {
+ return if (currentToken() == Token.VALUE_NULL) null else block()
+}
+
+inline fun XContentParser.parseArray(block: XContentParser.() -> T): List {
+ val resArr = mutableListOf()
+ ensureExpectedToken(Token.START_ARRAY, currentToken(), this)
+ while (nextToken() != Token.END_ARRAY) {
+ resArr.add(block())
+ }
+ return resArr
+}
+
+// similar to readOptionalWriteable
+fun StreamInput.readOptionalValue(value: T): T? {
+ return if (readBoolean()) { value } else null
+}
+
+fun StreamOutput.writeOptionalValue(value: T, writer: Writeable.Writer) {
+ if (value == null) {
+ writeBoolean(false)
+ } else {
+ writeBoolean(true)
+ writer.write(this, value)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt
index 823dee303..bfa939c55 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt
@@ -33,9 +33,9 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.rollup.model.ContinuousMetadata
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
-import org.opensearch.indexmanagement.rollup.model.RollupMetadata.Companion.NO_ID
import org.opensearch.indexmanagement.rollup.model.RollupStats
import org.opensearch.indexmanagement.rollup.util.DATE_FIELD_EPOCH_MILLIS_FORMAT
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.search.aggregations.bucket.composite.InternalComposite
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt
index dea0cfe0d..efe5c52a7 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt
@@ -25,6 +25,7 @@ import org.opensearch.indexmanagement.opensearchapi.instant
import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
import org.opensearch.indexmanagement.opensearchapi.optionalUserField
import org.opensearch.indexmanagement.util.IndexUtils
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.indexmanagement.util._ID
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.schedule.CronSchedule
@@ -220,7 +221,6 @@ data class Rollup(
const val ROLLUP_LOCK_DURATION_SECONDS = 1800L // 30 minutes
const val ROLLUP_TYPE = "rollup"
const val ROLLUP_ID_FIELD = "rollup_id"
- const val NO_ID = ""
const val ENABLED_FIELD = "enabled"
const val SCHEMA_VERSION_FIELD = "schema_version"
const val SCHEDULE_FIELD = "schedule"
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/RollupMetadata.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/RollupMetadata.kt
index c56a1648c..5fb708719 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/RollupMetadata.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/RollupMetadata.kt
@@ -18,6 +18,7 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_TYPE
import org.opensearch.indexmanagement.opensearchapi.instant
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.search.aggregations.bucket.composite.InternalComposite
import java.io.IOException
import java.time.Instant
@@ -228,7 +229,6 @@ data class RollupMetadata(
companion object {
const val ROLLUP_METADATA_TYPE = "rollup_metadata"
- const val NO_ID = ""
const val ROLLUP_ID_FIELD = "rollup_id"
const val AFTER_KEY_FIELD = "after_key"
const val LAST_UPDATED_FIELD = "last_updated_time"
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupAction.kt
index 3c4ae5aa4..4f8a8f112 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupAction.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupAction.kt
@@ -18,6 +18,7 @@ import org.opensearch.indexmanagement.rollup.action.index.IndexRollupResponse
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.util.IF_PRIMARY_TERM
import org.opensearch.indexmanagement.util.IF_SEQ_NO
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.indexmanagement.util.REFRESH
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
@@ -58,8 +59,8 @@ class RestIndexRollupAction : BaseRestHandler() {
@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
- val id = request.param("rollupID", Rollup.NO_ID)
- if (Rollup.NO_ID == id) {
+ val id = request.param("rollupID", NO_ID)
+ if (NO_ID == id) {
throw IllegalArgumentException("Missing rollup ID")
}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupAction.kt
index 6e1289f39..ece151c24 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupAction.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupAction.kt
@@ -10,7 +10,7 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_ROL
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI
import org.opensearch.indexmanagement.rollup.action.start.StartRollupAction
import org.opensearch.indexmanagement.rollup.action.start.StartRollupRequest
-import org.opensearch.indexmanagement.rollup.model.Rollup
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.RestHandler.ReplacedRoute
@@ -41,7 +41,7 @@ class RestStartRollupAction : BaseRestHandler() {
@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
- val id = request.param("rollupID", Rollup.NO_ID)
+ val id = request.param("rollupID", NO_ID)
val startRequest = StartRollupRequest(id)
return RestChannelConsumer { channel ->
client.execute(StartRollupAction.INSTANCE, startRequest, RestToXContentListener(channel))
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupAction.kt
index 799e0d7a2..f4de4d63b 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupAction.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupAction.kt
@@ -10,7 +10,7 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_ROL
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI
import org.opensearch.indexmanagement.rollup.action.stop.StopRollupAction
import org.opensearch.indexmanagement.rollup.action.stop.StopRollupRequest
-import org.opensearch.indexmanagement.rollup.model.Rollup
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.RestHandler.ReplacedRoute
@@ -41,8 +41,8 @@ class RestStopRollupAction : BaseRestHandler() {
@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
- val id = request.param("rollupID", Rollup.NO_ID)
- if (Rollup.NO_ID == id) {
+ val id = request.param("rollupID", NO_ID)
+ if (NO_ID == id) {
throw IllegalArgumentException("Missing rollup ID")
}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt
new file mode 100644
index 000000000..51f3bcf7d
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt
@@ -0,0 +1,15 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement
+
+import org.apache.logging.log4j.LogManager
+
+private val log = LogManager.getLogger("Snapshot Management Helper")
+
+const val smSuffix = "-sm"
+fun smPolicyNameToDocId(policyName: String) = "$policyName$smSuffix"
+fun smDocIdToPolicyName(id: String) = id.substringBeforeLast(smSuffix)
+fun getSMMetadataDocId(policyName: String) = "$policyName-sm-metadata"
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt
new file mode 100644
index 000000000..7a3237b08
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt
@@ -0,0 +1,115 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.resthandler
+
+import org.apache.logging.log4j.LogManager
+import org.opensearch.client.node.NodeClient
+import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.SM_BASE_URI
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_ACTION_TYPE
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_ACTION_TYPE
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMPolicyRequest
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMPolicyRequest
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyRequest
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
+import org.opensearch.indexmanagement.snapshotmanagement.smPolicyNameToDocId
+import org.opensearch.rest.BaseRestHandler
+import org.opensearch.rest.BytesRestResponse
+import org.opensearch.rest.RestHandler.Route
+import org.opensearch.rest.RestRequest
+import org.opensearch.rest.RestRequest.Method.POST
+import org.opensearch.rest.RestRequest.Method.PUT
+import org.opensearch.rest.RestRequest.Method.GET
+import org.opensearch.rest.RestRequest.Method.DELETE
+import org.opensearch.rest.RestStatus
+import org.opensearch.rest.action.RestToXContentListener
+
+class RestSMPolicyHandler : BaseRestHandler() {
+
+ private val log = LogManager.getLogger(javaClass)
+
+ override fun getName(): String {
+ return "snapshot_management_policy_rest_handler"
+ }
+
+ override fun routes(): List {
+ return listOf(
+ Route(POST, "$SM_BASE_URI/{policyName}"),
+ Route(PUT, "$SM_BASE_URI/{policyName}"),
+ Route(GET, "$SM_BASE_URI/{policyName}"),
+ Route(DELETE, "$SM_BASE_URI/{policyName}"),
+ )
+ }
+
+ override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
+ return when (request.method()) {
+ POST -> indexRequest(request, client, true)
+ PUT -> indexRequest(request, client, false)
+ GET -> getRequest(request, client)
+ DELETE -> deleteRequest(request, client)
+ else -> RestChannelConsumer {
+ it.sendResponse(
+ BytesRestResponse(
+ RestStatus.METHOD_NOT_ALLOWED,
+ "${request.method()} is not allowed"
+ )
+ )
+ }
+ }
+ }
+
+ private fun getRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
+ val policyName = request.param("policyName", "")
+ if (policyName == "") {
+ throw IllegalArgumentException("Missing policy name")
+ }
+
+ return RestChannelConsumer {
+ client.execute(
+ GET_SM_ACTION_TYPE,
+ GetSMPolicyRequest(policyName),
+ RestToXContentListener(it)
+ )
+ }
+ }
+
+ private fun indexRequest(request: RestRequest, client: NodeClient, create: Boolean): RestChannelConsumer {
+ val policyName = request.param("policyName", "")
+ if (policyName == "") {
+ throw IllegalArgumentException("Missing policy name")
+ }
+ // TODO validate policy name validateGeneratedSnapshotName
+
+ log.info("sm dev: receive request ${request.requiredContent().utf8ToString()}")
+
+ val xcp = request.contentParser()
+ val policy = SMPolicy.parse(xcp, id = smPolicyNameToDocId(policyName))
+ log.info("sm dev: policy parsed $policy")
+
+ return RestChannelConsumer {
+ client.execute(
+ SMActions.INDEX_SM_ACTION_TYPE,
+ IndexSMPolicyRequest(policy, create),
+ RestToXContentListener(it)
+ )
+ }
+ }
+
+ private fun deleteRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
+ val policyName = request.param("policyName", "")
+ if (policyName == "") {
+ throw IllegalArgumentException("Missing policy name")
+ }
+
+ return RestChannelConsumer {
+ client.execute(
+ DELETE_SM_ACTION_TYPE,
+ DeleteSMPolicyRequest(policyName),
+ RestToXContentListener(it)
+ )
+ }
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt
new file mode 100644
index 000000000..f0d0dcc9c
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.launch
+import org.apache.logging.log4j.LogManager
+import org.opensearch.OpenSearchStatusException
+import org.opensearch.action.ActionListener
+import org.opensearch.action.ActionRequest
+import org.opensearch.action.ActionResponse
+import org.opensearch.action.support.ActionFilters
+import org.opensearch.action.support.HandledTransportAction
+import org.opensearch.client.Client
+import org.opensearch.common.io.stream.Writeable
+import org.opensearch.common.util.concurrent.ThreadContext.StoredContext
+import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
+import org.opensearch.commons.authuser.User
+import org.opensearch.rest.RestStatus
+import org.opensearch.tasks.Task
+import org.opensearch.transport.TransportService
+
+abstract class BaseTransportAction(
+ name: String,
+ transportService: TransportService,
+ val client: Client,
+ actionFilters: ActionFilters,
+ requestReader: Writeable.Reader,
+) : HandledTransportAction(
+ name, transportService, actionFilters, requestReader
+) {
+
+ private val log = LogManager.getLogger(javaClass)
+ private val coroutineScope: CoroutineScope = CoroutineScope(Dispatchers.IO)
+
+ override fun doExecute(
+ task: Task,
+ request: Request,
+ listener: ActionListener
+ ) {
+ val userStr: String? =
+ client.threadPool().threadContext.getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
+ log.info("sm dev: user and roles string from thread context: $userStr")
+ val user: User? = User.parse(userStr)
+ coroutineScope.launch {
+ try {
+ client.threadPool().threadContext.stashContext().use { threadContext ->
+ listener.onResponse(executeRequest(request, user, threadContext))
+ }
+ } catch (ex: Exception) {
+ log.error("Uncaught exception:", ex)
+ listener.onFailure(
+ OpenSearchStatusException(
+ ex.message, RestStatus.INTERNAL_SERVER_ERROR
+ )
+ )
+ }
+ }
+ }
+
+ // TODO SM test could we get userStr from threadContext? suppose no
+ abstract suspend fun executeRequest(request: Request, user: User?, threadContext: StoredContext): Response
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt
new file mode 100644
index 000000000..753acbf57
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport
+
+import org.opensearch.action.ActionType
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMPolicyResponse
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.TransportGetSMPolicyAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyResponse
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMPolicyAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMPolicyResponse
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMPolicyAction
+
+object SMActions {
+ /**
+ * [TransportIndexSMPolicyAction]
+ */
+ const val INDEX_SM_ACTION_NAME = "cluster:admin/opensearch/snapshot_management/policy/write"
+ val INDEX_SM_ACTION_TYPE = ActionType(INDEX_SM_ACTION_NAME, ::IndexSMPolicyResponse)
+
+ /**
+ * [TransportGetSMPolicyAction]
+ */
+ const val GET_SM_ACTION_NAME = "cluster:admin/opensearch/snapshot_management/policy/get"
+ val GET_SM_ACTION_TYPE = ActionType(GET_SM_ACTION_NAME, ::GetSMPolicyResponse)
+
+ /**
+ * [TransportDeleteSMPolicyAction]
+ */
+ const val DELETE_SM_ACTION_NAME = "cluster:admin/opensearch/snapshot_management/policy/delete"
+ val DELETE_SM_ACTION_TYPE = ActionType(DELETE_SM_ACTION_NAME, ::DeleteSMPolicyResponse)
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyRequest.kt
new file mode 100644
index 000000000..418eefb09
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyRequest.kt
@@ -0,0 +1,27 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete
+
+import org.opensearch.action.ActionRequest
+import org.opensearch.action.ActionRequestValidationException
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+
+class DeleteSMPolicyRequest(
+ val policyName: String
+) : ActionRequest() {
+ override fun validate(): ActionRequestValidationException? {
+ return null
+ }
+
+ constructor(sin: StreamInput) : this(
+ policyName = sin.readString()
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeString(policyName)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyResponse.kt
new file mode 100644
index 000000000..6a0ecd524
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyResponse.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete
+
+import org.opensearch.action.ActionResponse
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.common.xcontent.ToXContent
+import org.opensearch.common.xcontent.ToXContentObject
+import org.opensearch.common.xcontent.XContentBuilder
+
+class DeleteSMPolicyResponse(
+ val status: String
+) : ActionResponse(), ToXContentObject {
+
+ constructor(sin: StreamInput) : this(
+ status = sin.readString()
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeString(status)
+ }
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field("delete", status)
+ .endObject()
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt
new file mode 100644
index 000000000..5091f8d9d
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt
@@ -0,0 +1,42 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete
+
+import org.apache.logging.log4j.LogManager
+import org.opensearch.action.delete.DeleteRequest
+import org.opensearch.action.delete.DeleteResponse
+import org.opensearch.action.support.ActionFilters
+import org.opensearch.client.Client
+import org.opensearch.common.inject.Inject
+import org.opensearch.common.util.concurrent.ThreadContext
+import org.opensearch.commons.authuser.User
+import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
+import org.opensearch.indexmanagement.opensearchapi.suspendUntil
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTransportAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_ACTION_NAME
+import org.opensearch.indexmanagement.snapshotmanagement.smPolicyNameToDocId
+import org.opensearch.transport.TransportService
+
+class TransportDeleteSMPolicyAction @Inject constructor(
+ client: Client,
+ transportService: TransportService,
+ actionFilters: ActionFilters,
+) : BaseTransportAction(
+ DELETE_SM_ACTION_NAME, transportService, client, actionFilters, ::DeleteSMPolicyRequest
+) {
+
+ private val log = LogManager.getLogger(javaClass)
+
+ override suspend fun executeRequest(
+ request: DeleteSMPolicyRequest,
+ user: User?,
+ threadContext: ThreadContext.StoredContext
+ ): DeleteSMPolicyResponse {
+ val deleteReq = DeleteRequest(INDEX_MANAGEMENT_INDEX, smPolicyNameToDocId(request.policyName))
+ val deleteRes: DeleteResponse = client.suspendUntil { delete(deleteReq, it) }
+ return DeleteSMPolicyResponse(deleteRes.status().toString())
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyRequest.kt
new file mode 100644
index 000000000..cd70585c9
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyRequest.kt
@@ -0,0 +1,27 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.get
+
+import org.opensearch.action.ActionRequest
+import org.opensearch.action.ActionRequestValidationException
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+
+class GetSMPolicyRequest(
+ val policyName: String
+) : ActionRequest() {
+ override fun validate(): ActionRequestValidationException? {
+ return null
+ }
+
+ constructor(sin: StreamInput) : this(
+ policyName = sin.readString(),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeString(policyName)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyResponse.kt
new file mode 100644
index 000000000..02c80bcca
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyResponse.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.get
+
+import org.opensearch.action.ActionResponse
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.common.xcontent.ToXContent
+import org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS
+import org.opensearch.common.xcontent.ToXContentObject
+import org.opensearch.common.xcontent.XContentBuilder
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
+
+class GetSMPolicyResponse(
+ val policy: SMPolicy
+) : ActionResponse(), ToXContentObject {
+
+ constructor(sin: StreamInput) : this(
+ policy = SMPolicy(sin)
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ policy.writeTo(out)
+ }
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return policy.toXContent(builder, EMPTY_PARAMS)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt
new file mode 100644
index 000000000..730f727f7
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.get
+
+import org.apache.logging.log4j.LogManager
+import org.opensearch.action.get.GetRequest
+import org.opensearch.action.get.GetResponse
+import org.opensearch.action.support.ActionFilters
+import org.opensearch.client.Client
+import org.opensearch.common.inject.Inject
+import org.opensearch.common.util.concurrent.ThreadContext
+import org.opensearch.commons.authuser.User
+import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
+import org.opensearch.indexmanagement.opensearchapi.contentParser
+import org.opensearch.indexmanagement.opensearchapi.parseWithType
+import org.opensearch.indexmanagement.opensearchapi.suspendUntil
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTransportAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_ACTION_NAME
+import org.opensearch.indexmanagement.snapshotmanagement.smPolicyNameToDocId
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
+import org.opensearch.transport.TransportService
+
+class TransportGetSMPolicyAction @Inject constructor(
+ client: Client,
+ transportService: TransportService,
+ actionFilters: ActionFilters,
+) : BaseTransportAction(
+ GET_SM_ACTION_NAME, transportService, client, actionFilters, ::GetSMPolicyRequest
+) {
+
+ private val log = LogManager.getLogger(javaClass)
+
+ override suspend fun executeRequest(
+ request: GetSMPolicyRequest,
+ user: User?,
+ threadContext: ThreadContext.StoredContext
+ ): GetSMPolicyResponse {
+ val getReq = GetRequest(INDEX_MANAGEMENT_INDEX, smPolicyNameToDocId(request.policyName))
+ val getRes: GetResponse = client.suspendUntil { get(getReq, it) }
+ val xcp = contentParser(getRes.sourceAsBytesRef)
+ val policy = xcp.parseWithType(getRes.id, getRes.seqNo, getRes.primaryTerm, SMPolicy.Companion::parse)
+ return GetSMPolicyResponse(policy)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyRequest.kt
new file mode 100644
index 000000000..ad5b23341
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyRequest.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.index
+
+import org.opensearch.action.ActionRequest
+import org.opensearch.action.ActionRequestValidationException
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
+
+class IndexSMPolicyRequest(
+ val policy: SMPolicy,
+ val create: Boolean,
+) : ActionRequest() {
+ override fun validate(): ActionRequestValidationException? {
+ return null
+ }
+
+ constructor(sin: StreamInput) : this(
+ policy = SMPolicy(sin),
+ create = sin.readBoolean(),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ policy.writeTo(out)
+ out.writeBoolean(create)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyResponse.kt
new file mode 100644
index 000000000..c3c45cdd3
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyResponse.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.index
+
+import org.opensearch.action.ActionResponse
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.common.xcontent.ToXContent
+import org.opensearch.common.xcontent.ToXContentObject
+import org.opensearch.common.xcontent.XContentBuilder
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
+
+class IndexSMPolicyResponse(val policy: SMPolicy) : ActionResponse(), ToXContentObject {
+
+ constructor(sin: StreamInput) : this(
+ policy = SMPolicy(sin)
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ policy.writeTo(out)
+ }
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(SMPolicy.SM_TYPE, policy)
+ .endObject()
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt
new file mode 100644
index 000000000..ab042379d
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt
@@ -0,0 +1,47 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.index
+
+import org.apache.logging.log4j.LogManager
+import org.opensearch.action.index.IndexRequest
+import org.opensearch.action.index.IndexResponse
+import org.opensearch.action.support.ActionFilters
+import org.opensearch.client.Client
+import org.opensearch.common.inject.Inject
+import org.opensearch.common.util.concurrent.ThreadContext
+import org.opensearch.common.xcontent.ToXContent
+import org.opensearch.common.xcontent.XContentFactory
+import org.opensearch.commons.authuser.User
+import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
+import org.opensearch.indexmanagement.opensearchapi.suspendUntil
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTransportAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.INDEX_SM_ACTION_NAME
+import org.opensearch.transport.TransportService
+
+class TransportIndexSMPolicyAction @Inject constructor(
+ client: Client,
+ transportService: TransportService,
+ actionFilters: ActionFilters,
+) : BaseTransportAction(
+ INDEX_SM_ACTION_NAME, transportService, client, actionFilters, ::IndexSMPolicyRequest
+) {
+
+ private val log = LogManager.getLogger(javaClass)
+
+ override suspend fun executeRequest(
+ request: IndexSMPolicyRequest,
+ user: User?,
+ threadContext: ThreadContext.StoredContext
+ ): IndexSMPolicyResponse {
+ val policy = request.policy
+ val indexReq = IndexRequest(INDEX_MANAGEMENT_INDEX)
+ .source(policy.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
+ .id(policy.id)
+ .create(request.create)
+ val indexRes: IndexResponse = client.suspendUntil { index(indexReq, it) }
+ return IndexSMPolicyResponse(policy)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMState.kt
new file mode 100644
index 000000000..99ca43705
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMState.kt
@@ -0,0 +1,8 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine
+
+enum class SMState
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt
new file mode 100644
index 000000000..59a551e8f
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt
@@ -0,0 +1,361 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.model
+
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.common.io.stream.Writeable
+import org.opensearch.common.xcontent.ToXContent
+import org.opensearch.common.xcontent.ToXContentFragment
+import org.opensearch.common.xcontent.XContentBuilder
+import org.opensearch.common.xcontent.XContentParser
+import org.opensearch.common.xcontent.XContentParser.Token
+import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
+import org.opensearch.index.seqno.SequenceNumbers
+import org.opensearch.indexmanagement.opensearchapi.instant
+import org.opensearch.indexmanagement.opensearchapi.nullValueHandler
+import org.opensearch.indexmanagement.opensearchapi.optionalField
+import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
+import org.opensearch.indexmanagement.opensearchapi.parseArray
+import org.opensearch.indexmanagement.opensearchapi.readOptionalValue
+import org.opensearch.indexmanagement.opensearchapi.writeOptionalValue
+import org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine.SMState
+import org.opensearch.indexmanagement.util.NO_ID
+import java.time.Instant
+
+typealias InfoType = Map?
+
+data class SMMetadata(
+ val policySeqNo: Long,
+ val policyPrimaryTerm: Long,
+ val currentState: SMState,
+ val creation: Creation,
+ val deletion: Deletion,
+ val info: InfoType = null,
+ val id: String = NO_ID,
+ val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
+ val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
+) : Writeable, ToXContent {
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .startObject(SM_METADATA_TYPE)
+ .field(POLICY_SEQ_NO_FIELD, policySeqNo)
+ .field(POLICY_PRIMARY_TERM_FIELD, policyPrimaryTerm)
+ .field(CURRENT_STATE_FIELD, currentState.toString())
+ .field(CREATION_FIELD, creation)
+ .field(DELETION_FIELD, deletion)
+ .optionalField(INFO_FIELD, info)
+ .endObject()
+ .endObject()
+ }
+
+ companion object {
+ const val SM_METADATA_TYPE = "sm_metadata"
+ const val POLICY_SEQ_NO_FIELD = "policy_seq_no"
+ const val POLICY_PRIMARY_TERM_FIELD = "policy_primary_term"
+ const val CURRENT_STATE_FIELD = "current_state"
+ const val CREATION_FIELD = "creation"
+ const val DELETION_FIELD = "deletion"
+ const val INFO_FIELD = "info"
+
+ fun parse(
+ xcp: XContentParser,
+ id: String = NO_ID,
+ seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
+ primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
+ ): SMMetadata {
+ var policySeqNo: Long? = null
+ var policyPrimaryTerm: Long? = null
+ var currentState: SMState? = null
+ var atomic = false
+ var creation: Creation? = null
+ var deletion: Deletion? = null
+ var info: InfoType = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ POLICY_SEQ_NO_FIELD -> policySeqNo = xcp.longValue()
+ POLICY_PRIMARY_TERM_FIELD -> policyPrimaryTerm = xcp.longValue()
+ CURRENT_STATE_FIELD -> currentState = SMState.valueOf(xcp.text())
+ CREATION_FIELD -> creation = Creation.parse(xcp)
+ DELETION_FIELD -> deletion = Deletion.parse(xcp)
+ INFO_FIELD -> info = xcp.nullValueHandler { xcp.map() }
+ }
+ }
+
+ return SMMetadata(
+ policySeqNo = requireNotNull(policySeqNo) {},
+ policyPrimaryTerm = requireNotNull(policyPrimaryTerm) {},
+ currentState = requireNotNull(currentState) {},
+ creation = requireNotNull(creation) {},
+ deletion = requireNotNull(deletion) {},
+ info = info,
+ id = id,
+ seqNo = seqNo,
+ primaryTerm = primaryTerm
+ )
+ }
+
+ fun InfoType.upsert(keyValuePair: Pair): InfoType {
+ val info: MutableMap = this?.toMutableMap() ?: mutableMapOf()
+ info[keyValuePair.first] = keyValuePair.second
+ return info
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ policySeqNo = sin.readLong(),
+ policyPrimaryTerm = sin.readLong(),
+ currentState = sin.readEnum(SMState::class.java),
+ creation = Creation(sin),
+ deletion = Deletion(sin),
+ info = sin.readMap(),
+ id = sin.readString(),
+ seqNo = sin.readLong(),
+ primaryTerm = sin.readLong(),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeLong(policySeqNo)
+ out.writeLong(policyPrimaryTerm)
+ out.writeEnum(currentState)
+ creation.writeTo(out)
+ deletion.writeTo(out)
+ out.writeMap(info)
+ out.writeString(id)
+ out.writeLong(seqNo)
+ out.writeLong(primaryTerm)
+ }
+
+ data class Creation(
+ val trigger: Trigger,
+ val started: SnapshotInfo? = null,
+ val finished: SnapshotInfo? = null,
+ ) : Writeable, ToXContentFragment {
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(TRIGGER_FIELD, trigger)
+ .optionalField(STARTED_FIELD, started)
+ .optionalField(FINISHED_FIELD, finished)
+ .endObject()
+ }
+
+ companion object {
+ const val TRIGGER_FIELD = "trigger"
+ const val STARTED_FIELD = "started"
+ const val FINISHED_FIELD = "finished"
+
+ fun parse(xcp: XContentParser): Creation {
+ var trigger: Trigger? = null
+ var started: SnapshotInfo? = null
+ var finished: SnapshotInfo? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ TRIGGER_FIELD -> trigger = Trigger.parse(xcp)
+ STARTED_FIELD -> started = xcp.nullValueHandler { SnapshotInfo.parse(xcp) }
+ FINISHED_FIELD -> finished = xcp.nullValueHandler { SnapshotInfo.parse(xcp) }
+ }
+ }
+
+ return Creation(
+ trigger = requireNotNull(trigger) { "trigger field must not be null" },
+ started = started,
+ finished = finished,
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ trigger = Trigger(sin),
+ started = sin.readOptionalWriteable { SnapshotInfo(it) },
+ finished = sin.readOptionalWriteable { SnapshotInfo(it) },
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ trigger.writeTo(out)
+ out.writeOptionalWriteable(started)
+ out.writeOptionalWriteable(finished)
+ }
+ }
+
+ data class Deletion(
+ val trigger: Trigger,
+ val started: List? = null,
+ val startedTime: Instant? = null
+ ) : Writeable, ToXContentFragment {
+
+ init {
+ require(!(started != null).xor(startedTime != null)) {
+ "deletion started and startedTime must exist at the same time."
+ }
+ }
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(TRIGGER_FIELD, trigger)
+ .optionalField(STARTED_FIELD, started)
+ .optionalField(STARTED_TIME_FIELD, startedTime)
+ .endObject()
+ }
+
+ companion object {
+ const val TRIGGER_FIELD = "trigger"
+ const val STARTED_FIELD = "started"
+ const val STARTED_TIME_FIELD = "started_time"
+
+ fun parse(xcp: XContentParser): Deletion {
+ var trigger: Trigger? = null
+ var started: List? = null
+ var startedTime: Instant? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ TRIGGER_FIELD -> trigger = Trigger.parse(xcp)
+ STARTED_FIELD -> started = xcp.nullValueHandler { parseArray { SnapshotInfo.parse(xcp) } }
+ STARTED_TIME_FIELD -> startedTime = xcp.instant()
+ }
+ }
+
+ return Deletion(
+ trigger = requireNotNull(trigger) { "trigger field must not be null" },
+ started = started,
+ startedTime = startedTime,
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ trigger = Trigger(sin),
+ started = sin.readOptionalValue(sin.readList { SnapshotInfo(it) }),
+ startedTime = sin.readOptionalInstant(),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ trigger.writeTo(out)
+ out.writeOptionalValue(started, StreamOutput::writeList)
+ out.writeOptionalInstant(startedTime)
+ }
+ }
+
+ /**
+ * Trigger for recurring condition check
+ *
+ * index_size can be another possible trigger, e.g.: snapshot will be created
+ * every time index size increases 50gb
+ */
+ data class Trigger(
+ val time: Instant,
+ ) : Writeable, ToXContentFragment {
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .optionalTimeField(TIME_FIELD, time)
+ .endObject()
+ }
+
+ companion object {
+ const val TIME_FIELD = "time"
+
+ fun parse(xcp: XContentParser): Trigger {
+ var nextExecutionTime: Instant? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ TIME_FIELD -> nextExecutionTime = xcp.instant()
+ }
+ }
+
+ return Trigger(
+ time = requireNotNull(nextExecutionTime) { "trigger time field must not be null." },
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ time = sin.readInstant()
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeInstant(time)
+ }
+ }
+
+ data class SnapshotInfo(
+ val name: String,
+ val startTime: Instant? = null,
+ val endTime: Instant? = null,
+ ) : Writeable, ToXContent {
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(NAME_FIELD, name)
+ .optionalTimeField(START_TIME_FIELD, startTime)
+ .optionalField(END_TIME_FIELD, endTime)
+ .endObject()
+ }
+
+ companion object {
+ const val NAME_FIELD = "name"
+ const val START_TIME_FIELD = "start_time"
+ const val END_TIME_FIELD = "end_time"
+
+ fun parse(xcp: XContentParser): SnapshotInfo {
+ var name: String? = null
+ var startTime: Instant? = null
+ var endTime: Instant? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ NAME_FIELD -> name = xcp.text()
+ START_TIME_FIELD -> startTime = xcp.instant()
+ END_TIME_FIELD -> endTime = xcp.instant()
+ }
+ }
+
+ return SnapshotInfo(
+ name = requireNotNull(name) { "snapshot info name must not be null." },
+ startTime = startTime,
+ endTime = endTime,
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ name = sin.readString(),
+ startTime = sin.readOptionalInstant(),
+ endTime = sin.readOptionalInstant(),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeString(name)
+ out.writeOptionalInstant(startTime)
+ out.writeOptionalInstant(endTime)
+ }
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt
new file mode 100644
index 000000000..45ebdb415
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt
@@ -0,0 +1,369 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.model
+
+import org.apache.logging.log4j.LogManager
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.common.io.stream.Writeable
+import org.opensearch.common.unit.TimeValue
+import org.opensearch.common.xcontent.ToXContent
+import org.opensearch.common.xcontent.XContentBuilder
+import org.opensearch.common.xcontent.XContentParser
+import org.opensearch.common.xcontent.XContentParser.Token
+import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
+import org.opensearch.index.seqno.SequenceNumbers
+import org.opensearch.indexmanagement.opensearchapi.instant
+import org.opensearch.indexmanagement.opensearchapi.nullValueHandler
+import org.opensearch.indexmanagement.opensearchapi.optionalField
+import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
+import org.opensearch.indexmanagement.snapshotmanagement.smDocIdToPolicyName
+import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
+import org.opensearch.indexmanagement.util.NO_ID
+import org.opensearch.jobscheduler.spi.ScheduledJobParameter
+import org.opensearch.jobscheduler.spi.schedule.CronSchedule
+import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
+import org.opensearch.jobscheduler.spi.schedule.Schedule
+import org.opensearch.jobscheduler.spi.schedule.ScheduleParser
+import java.time.Instant
+import java.time.ZoneId
+import java.time.temporal.ChronoUnit
+
+private val log = LogManager.getLogger(SMPolicy::class.java)
+
+data class SMPolicy(
+ val id: String = NO_ID,
+ val description: String? = null,
+ val creation: Creation,
+ val deletion: Deletion,
+ val snapshotConfig: Map,
+ val jobEnabled: Boolean,
+ val jobLastUpdateTime: Instant,
+ val jobEnabledTime: Instant,
+ val jobSchedule: Schedule,
+ val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
+ val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
+) : ScheduledJobParameter, Writeable {
+
+ init {
+ require(snapshotConfig["repository"] != null) { "Must provide the repository in snapshot config." }
+ // Other fields in snapshotConfig: date_expression, indices, partial, include_global_state, ignore_unavailable, metadata
+ // TODO SM validate date_format is of right format
+ }
+
+ override fun getName() = id
+
+ override fun getLastUpdateTime() = jobLastUpdateTime
+
+ override fun getEnabledTime() = jobEnabledTime
+
+ override fun getSchedule() = jobSchedule
+
+ override fun isEnabled() = jobEnabled
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder
+ .startObject()
+ .startObject(SM_TYPE)
+ .field(NAME_FIELD, smDocIdToPolicyName(id)) // for searching policy by name
+ .optionalField(DESCRIPTION_FIELD, description)
+ .field(CREATION_FIELD, creation)
+ .field(DELETION_FIELD, deletion)
+ .field(SNAPSHOT_CONFIG_FIELD, snapshotConfig)
+ .field(SCHEDULE_FIELD, jobSchedule)
+ .field(ENABLED_FIELD, jobEnabled)
+ .optionalTimeField(LAST_UPDATED_TIME_FIELD, jobLastUpdateTime)
+ .optionalTimeField(ENABLED_TIME_FIELD, jobEnabledTime)
+ .endObject()
+ .endObject()
+ }
+
+ companion object {
+ const val SM_TYPE = "sm"
+ const val NAME_FIELD = "name"
+ const val DESCRIPTION_FIELD = "description"
+ const val CREATION_FIELD = "creation"
+ const val DELETION_FIELD = "deletion"
+ const val SNAPSHOT_CONFIG_FIELD = "snapshot_config"
+ const val ENABLED_FIELD = "enabled"
+ const val LAST_UPDATED_TIME_FIELD = "last_updated_time"
+ const val ENABLED_TIME_FIELD = "enabled_time"
+ const val SCHEDULE_FIELD = "schedule"
+
+ fun parse(
+ xcp: XContentParser,
+ id: String = NO_ID,
+ seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
+ primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
+ ): SMPolicy {
+ var description: String? = null
+ var creation: Creation? = null
+ var deletion: Deletion? = null
+ var snapshotConfig: Map? = null
+ var lastUpdatedTime: Instant? = null
+ var enabledTime: Instant? = null
+ var schedule: Schedule? = null
+ var enabled = true
+
+ log.info("sm dev: first token: ${xcp.currentToken()}, ${xcp.currentName()}")
+ if (xcp.currentToken() == null) xcp.nextToken()
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ log.info("sm dev: current token loop: ${xcp.currentToken()}, ${xcp.currentName()}")
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ NAME_FIELD -> requireNotNull(xcp.text()) { "The name field of SMPolicy must not be null." }
+ DESCRIPTION_FIELD -> description = xcp.nullValueHandler { text() }
+ CREATION_FIELD -> creation = Creation.parse(xcp)
+ DELETION_FIELD -> deletion = Deletion.parse(xcp)
+ SNAPSHOT_CONFIG_FIELD -> snapshotConfig = xcp.map()
+ LAST_UPDATED_TIME_FIELD -> lastUpdatedTime = xcp.instant()
+ ENABLED_TIME_FIELD -> enabledTime = xcp.instant()
+ SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp)
+ ENABLED_FIELD -> enabled = xcp.booleanValue()
+ }
+ }
+
+ if (enabled && enabledTime == null) {
+ enabledTime = Instant.now()
+ } else if (!enabled) {
+ enabledTime = null
+ }
+
+ // TODO SM update policy API can update this value
+ if (lastUpdatedTime == null) {
+ lastUpdatedTime = Instant.now()
+ }
+
+ if (schedule == null) {
+ schedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES)
+ }
+
+ return SMPolicy(
+ description = description,
+ creation = requireNotNull(creation) { "creation field must not be null" },
+ deletion = requireNotNull(deletion) { "deletion field must not be null" },
+ snapshotConfig = requireNotNull(snapshotConfig) { "snapshot_config field must not be null" },
+ jobLastUpdateTime = requireNotNull(lastUpdatedTime) { "last_updated_at field must not be null" },
+ jobEnabledTime = requireNotNull(enabledTime) { "job_enabled_time field must not be null" },
+ jobSchedule = schedule,
+ jobEnabled = enabled,
+ id = id,
+ seqNo = seqNo,
+ primaryTerm = primaryTerm,
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ description = sin.readOptionalString(),
+ creation = Creation(sin),
+ deletion = Deletion(sin),
+ snapshotConfig = sin.readMap() as Map,
+ jobLastUpdateTime = sin.readInstant(),
+ jobEnabledTime = sin.readInstant(),
+ jobSchedule = IntervalSchedule(sin),
+ jobEnabled = sin.readBoolean(),
+ id = sin.readString(),
+ seqNo = sin.readLong(),
+ primaryTerm = sin.readLong(),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeOptionalString(description)
+ creation.writeTo(out)
+ deletion.writeTo(out)
+ out.writeMap(snapshotConfig)
+ out.writeInstant(jobLastUpdateTime)
+ out.writeInstant(jobEnabledTime)
+ out.writeOptionalWriteable(jobSchedule)
+ out.writeBoolean(jobEnabled)
+ out.writeString(id)
+ out.writeLong(seqNo)
+ out.writeLong(primaryTerm)
+ }
+
+ data class Creation(
+ val schedule: Schedule,
+ val timeout: ActionTimeout? = null,
+ ) : Writeable, ToXContent {
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(SCHEDULE_FIELD, schedule)
+ .optionalField(TIMEOUT_FIELD, timeout)
+ .endObject()
+ }
+
+ companion object {
+ const val SCHEDULE_FIELD = "schedule"
+ const val TIMEOUT_FIELD = "timeout"
+
+ fun parse(xcp: XContentParser): Creation {
+ var schedule: Schedule? = null
+ var timeout: ActionTimeout? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp)
+ TIMEOUT_FIELD -> timeout = ActionTimeout.parse(xcp)
+ }
+ }
+
+ return Creation(
+ schedule = requireNotNull(schedule) { "schedule field must not be null" },
+ timeout = timeout
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ schedule = CronSchedule(sin),
+ timeout = sin.readOptionalWriteable(::ActionTimeout),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ schedule.writeTo(out)
+ out.writeOptionalWriteable(timeout)
+ }
+ }
+
+ data class Deletion(
+ val schedule: Schedule,
+ val timeout: ActionTimeout? = null,
+ val condition: DeleteCondition,
+ ) : Writeable, ToXContent {
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(SCHEDULE_FIELD, schedule)
+ .optionalField(TIMEOUT_FIELD, timeout)
+ .field(CONDITION_FIELD, condition)
+ .endObject()
+ }
+
+ companion object {
+ const val SCHEDULE_FIELD = "schedule"
+ const val TIMEOUT_FIELD = "timeout"
+ const val CONDITION_FIELD = "condition"
+
+ fun parse(xcp: XContentParser): Deletion {
+ var schedule: Schedule? = null
+ var timeout: ActionTimeout? = null
+ var condition: DeleteCondition? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp)
+ TIMEOUT_FIELD -> timeout = ActionTimeout.parse(xcp)
+ CONDITION_FIELD -> condition = DeleteCondition.parse(xcp)
+ }
+ }
+
+ // If user doesn't provide delete schedule, defaults to every day 1AM
+ if (schedule == null) {
+ schedule = CronSchedule("0 1 * * *", ZoneId.systemDefault())
+ }
+
+ return Deletion(
+ schedule = schedule,
+ timeout = timeout,
+ condition = requireNotNull(condition) { "condition field must not be null" },
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ schedule = CronSchedule(sin),
+ timeout = sin.readOptionalWriteable(::ActionTimeout),
+ condition = DeleteCondition(sin),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ schedule.writeTo(out)
+ out.writeOptionalWriteable(timeout)
+ condition.writeTo(out)
+ }
+ }
+
+ data class DeleteCondition(
+ val maxCount: Int,
+ val maxAge: TimeValue? = null,
+ val minCount: Int? = null,
+ ) : Writeable, ToXContent {
+
+ init {
+ require(maxCount > 0) { "$MAX_COUNT_FIELD should be bigger than 0." }
+ require(minCount == null || maxCount >= minCount && minCount > 0) {
+ "$MIN_COUNT_FIELD should be bigger than 0 and smaller than $MAX_COUNT_FIELD."
+ }
+ }
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(MAX_COUNT_FIELD, maxCount)
+ .optionalField(MAX_AGE_FIELD, maxAge)
+ .optionalField(MIN_COUNT_FIELD, minCount)
+ .endObject()
+ }
+
+ companion object {
+ const val MAX_COUNT_FIELD = "max_count"
+ const val MAX_AGE_FIELD = "max_age"
+ const val MIN_COUNT_FIELD = "min_count"
+
+ fun parse(xcp: XContentParser): DeleteCondition {
+ var maxCount = 50
+ var maxAge: TimeValue? = null
+ var minCount: Int? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ MAX_COUNT_FIELD -> maxCount = xcp.intValue()
+ MAX_AGE_FIELD -> maxAge = TimeValue.parseTimeValue(xcp.text(), MAX_AGE_FIELD)
+ MIN_COUNT_FIELD -> minCount = xcp.intValue()
+ }
+ }
+
+ if (maxAge != null && minCount == null) {
+ minCount = minOf(5, maxCount)
+ }
+
+ return DeleteCondition(
+ maxCount = maxCount,
+ maxAge = maxAge,
+ minCount = minCount,
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ maxCount = sin.readInt(),
+ maxAge = sin.readOptionalTimeValue(),
+ minCount = sin.readOptionalInt()
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeInt(maxCount)
+ out.writeOptionalTimeValue(maxAge)
+ out.writeOptionalInt(minCount)
+ }
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt
index 070cf4f5a..dd9acfc2d 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt
@@ -39,6 +39,7 @@ import org.opensearch.indexmanagement.opensearchapi.optionalUserField
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.transform.TransformSearchService
import org.opensearch.indexmanagement.util.IndexUtils
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.schedule.CronSchedule
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
@@ -264,7 +265,6 @@ data class Transform(
val supportedAggregations = listOf("sum", "max", "min", "value_count", "avg", "scripted_metric", "percentiles")
const val LOCK_DURATION_SECONDS = 1800L
- const val NO_ID = ""
const val TRANSFORM_TYPE = "transform"
const val TRANSFORM_ID_FIELD = "transform_id"
const val ENABLED_FIELD = "enabled"
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformAction.kt
index 8609c3672..c75b50df7 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformAction.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformAction.kt
@@ -17,6 +17,7 @@ import org.opensearch.indexmanagement.transform.action.index.IndexTransformRespo
import org.opensearch.indexmanagement.transform.model.Transform
import org.opensearch.indexmanagement.util.IF_PRIMARY_TERM
import org.opensearch.indexmanagement.util.IF_SEQ_NO
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.indexmanagement.util.REFRESH
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
@@ -46,8 +47,8 @@ class RestIndexTransformAction : BaseRestHandler() {
@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
- val id = request.param("transformID", Transform.NO_ID)
- if (Transform.NO_ID == id) {
+ val id = request.param("transformID", NO_ID)
+ if (NO_ID == id) {
throw IllegalArgumentException("Missing transform ID")
}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStartTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStartTransformAction.kt
index 9518c1f75..9cfc1d97c 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStartTransformAction.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStartTransformAction.kt
@@ -9,7 +9,7 @@ import org.opensearch.client.node.NodeClient
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.TRANSFORM_BASE_URI
import org.opensearch.indexmanagement.transform.action.start.StartTransformAction
import org.opensearch.indexmanagement.transform.action.start.StartTransformRequest
-import org.opensearch.indexmanagement.transform.model.Transform
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.RestHandler.Route
@@ -32,7 +32,7 @@ class RestStartTransformAction : BaseRestHandler() {
@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
- val id = request.param("transformID", Transform.NO_ID)
+ val id = request.param("transformID", NO_ID)
val startRequest = StartTransformRequest(id)
return RestChannelConsumer { channel ->
client.execute(StartTransformAction.INSTANCE, startRequest, RestToXContentListener(channel))
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStopTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStopTransformAction.kt
index 515a346d1..a48034e5c 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStopTransformAction.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStopTransformAction.kt
@@ -9,7 +9,7 @@ import org.opensearch.client.node.NodeClient
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.TRANSFORM_BASE_URI
import org.opensearch.indexmanagement.transform.action.stop.StopTransformAction
import org.opensearch.indexmanagement.transform.action.stop.StopTransformRequest
-import org.opensearch.indexmanagement.transform.model.Transform
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.RestHandler.Route
@@ -32,8 +32,8 @@ class RestStopTransformAction : BaseRestHandler() {
@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
- val id = request.param("transformID", Transform.NO_ID)
- if (Transform.NO_ID == id) {
+ val id = request.param("transformID", NO_ID)
+ if (NO_ID == id) {
throw IllegalArgumentException("Missing transform ID")
}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt
index c075bf2b1..db1fe9e73 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt
@@ -35,6 +35,7 @@ class IndexUtils {
const val ODFE_MAGIC_NULL = "#ODFE-MAGIC-NULL-MAGIC-ODFE#"
private const val BYTE_ARRAY_SIZE = 16
private const val DOCUMENT_ID_SEED = 72390L
+
val logger = LogManager.getLogger(IndexUtils::class.java)
var indexManagementConfigSchemaVersion: Long
diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json
index bf2b61e1c..1fe996ecd 100644
--- a/src/main/resources/mappings/opendistro-ism-config.json
+++ b/src/main/resources/mappings/opendistro-ism-config.json
@@ -1,6 +1,6 @@
{
"_meta" : {
- "schema_version": 14
+ "schema_version": 15
},
"dynamic": "strict",
"properties": {
@@ -1305,6 +1305,250 @@
"enabled": false
}
}
+ },
+ "sm": {
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "description": {
+ "type": "text"
+ },
+ "creation": {
+ "properties": {
+ "schedule": {
+ "properties": {
+ "cron": {
+ "properties": {
+ "expression": {
+ "type": "keyword"
+ },
+ "timezone": {
+ "type": "keyword"
+ },
+ "schedule_delay": {
+ "type": "long"
+ }
+ }
+ }
+ }
+ },
+ "timeout": {
+ "type": "keyword"
+ }
+ }
+ },
+ "deletion": {
+ "properties": {
+ "schedule": {
+ "properties": {
+ "cron": {
+ "properties": {
+ "expression": {
+ "type": "keyword"
+ },
+ "timezone": {
+ "type": "keyword"
+ },
+ "schedule_delay": {
+ "type": "long"
+ }
+ }
+ }
+ }
+ },
+ "timeout": {
+ "type": "keyword"
+ },
+ "condition": {
+ "properties": {
+ "max_count": {
+ "type": "integer"
+ },
+ "max_age": {
+ "type": "keyword"
+ },
+ "min_count": {
+ "type": "integer"
+ }
+ }
+ }
+ }
+ },
+ "snapshot_config": {
+ "type": "object",
+ "enabled": false
+ },
+ "enabled": {
+ "type": "boolean"
+ },
+ "enabled_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "last_updated_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "schedule": {
+ "properties": {
+ "interval": {
+ "properties": {
+ "period": {
+ "type": "integer"
+ },
+ "unit": {
+ "type": "keyword"
+ },
+ "start_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "schedule_delay": {
+ "type": "long"
+ }
+ }
+ },
+ "cron": {
+ "properties": {
+ "expression": {
+ "type": "keyword"
+ },
+ "timezone": {
+ "type": "keyword"
+ },
+ "schedule_delay": {
+ "type": "long"
+ }
+ }
+ }
+ }
+ },
+ "user": {
+ "properties": {
+ "name": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "backend_roles": {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "type" : "keyword"
+ }
+ }
+ },
+ "roles": {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "type" : "keyword"
+ }
+ }
+ },
+ "custom_attribute_names": {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "type" : "keyword"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "sm_metadata": {
+ "properties": {
+ "policy_seq_no": {
+ "type": "long"
+ },
+ "policy_primary_term": {
+ "type": "long"
+ },
+ "current_state": {
+ "type": "keyword"
+ },
+ "info": {
+ "type": "object",
+ "enabled": false
+ },
+ "creation": {
+ "properties": {
+ "trigger": {
+ "properties": {
+ "time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ },
+ "started": {
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "started_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "end_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ },
+ "finished": {
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "started_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "end_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ }
+ }
+ },
+ "deletion": {
+ "properties": {
+ "trigger": {
+ "properties": {
+ "time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ },
+ "started": {
+ "type": "nested",
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "started_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "end_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ }
+ }
+ }
+ }
}
}
}
diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt
index fbe64cf8a..63ee4181c 100644
--- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt
+++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt
@@ -27,7 +27,7 @@ import javax.management.remote.JMXServiceURL
abstract class IndexManagementRestTestCase : ODFERestTestCase() {
- val configSchemaVersion = 14
+ val configSchemaVersion = 15
val historySchemaVersion = 5
// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
diff --git a/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt
index 0b793be17..630fcca42 100644
--- a/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt
+++ b/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt
@@ -13,11 +13,11 @@ import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_ISM_BASE_URI
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_POLICY_BASE_URI
import org.opensearch.indexmanagement.IndexManagementRestTestCase
-import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_USER
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.opensearchapi.string
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.rest.RestStatus
import org.opensearch.search.builder.SearchSourceBuilder
@@ -137,7 +137,7 @@ class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() {
val responseBody = createResponse.asMap()
val createdId = responseBody["_id"] as String
val createdVersion = responseBody["_version"] as Int
- assertNotEquals("Create policy response is missing id", Policy.NO_ID, createdId)
+ assertNotEquals("Create policy response is missing id", NO_ID, createdId)
assertTrue("Create policy response has incorrect version", createdVersion > 0)
Thread.sleep(10000)
}
diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt
index 1dd3ca7c9..a4c719537 100644
--- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt
+++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt
@@ -46,6 +46,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeo
import org.opensearch.test.OpenSearchTestCase
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
+import java.lang.Math.abs
import kotlin.test.assertFailsWith
class ActionTests : OpenSearchTestCase() {
@@ -191,7 +192,9 @@ class ActionTests : OpenSearchTestCase() {
val totalNodeBytes = randomByteSizeValue().bytes
val thresholdBytes = getFreeBytesThresholdHigh(clusterSettings, totalNodeBytes)
val expectedThreshold: Long = ((1 - (rawPercentage.toDouble() / 100.0)) * totalNodeBytes).toLong()
- assertEquals("Free bytes threshold not being calculated correctly for percentage setting.", thresholdBytes, expectedThreshold)
+ // To account for some rounding issues, allow an error of 1
+ val approximatelyEqual = kotlin.math.abs(thresholdBytes - expectedThreshold) <= 1
+ assertTrue("Free bytes threshold not being calculated correctly for percentage setting.", approximatelyEqual)
}
fun `test shrink disk threshold byte settings`() {
diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt
index cb50d18ed..898e198bb 100644
--- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt
+++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt
@@ -22,6 +22,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.randomReadOnlyActionC
import org.opensearch.indexmanagement.indexstatemanagement.randomState
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.makeRequest
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.indexmanagement.util._ID
import org.opensearch.indexmanagement.util._PRIMARY_TERM
import org.opensearch.indexmanagement.util._SEQ_NO
@@ -71,7 +72,7 @@ class IndexStateManagementRestApiIT : IndexStateManagementRestTestCase() {
val createdId = responseBody["_id"] as String
val createdSeqNo = responseBody[_SEQ_NO] as Int
val createdPrimaryTerm = responseBody[_PRIMARY_TERM] as Int
- assertNotEquals("response is missing Id", Policy.NO_ID, createdId)
+ assertNotEquals("response is missing Id", NO_ID, createdId)
assertEquals("not same id", policyId, createdId)
assertEquals("incorrect seqNo", 0, createdSeqNo)
assertEquals("incorrect primaryTerm", 1, createdPrimaryTerm)
@@ -189,7 +190,7 @@ class IndexStateManagementRestApiIT : IndexStateManagementRestTestCase() {
val responseBody = updateResponse.asMap()
val updatedId = responseBody[_ID] as String
val updatedSeqNo = (responseBody[_SEQ_NO] as Int).toLong()
- assertNotEquals("response is missing Id", Policy.NO_ID, updatedId)
+ assertNotEquals("response is missing Id", NO_ID, updatedId)
assertEquals("not same id", policy.id, updatedId)
assertTrue("incorrect seqNo", policy.seqNo < updatedSeqNo)
}
diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt
index 066debaa6..9ac1b8af6 100644
--- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt
+++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt
@@ -15,7 +15,6 @@ import org.opensearch.indexmanagement.common.model.dimension.Histogram
import org.opensearch.indexmanagement.common.model.dimension.Terms
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.rollup.RollupRestTestCase
-import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupMetrics
import org.opensearch.indexmanagement.rollup.model.metric.Average
import org.opensearch.indexmanagement.rollup.model.metric.Max
@@ -26,6 +25,7 @@ import org.opensearch.indexmanagement.rollup.model.metric.ValueCount
import org.opensearch.indexmanagement.rollup.randomRollup
import org.opensearch.indexmanagement.rollup.randomRollupDimensions
import org.opensearch.indexmanagement.rollup.randomRollupMetrics
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.indexmanagement.util._ID
import org.opensearch.indexmanagement.util._SEQ_NO
import org.opensearch.rest.RestStatus
@@ -43,7 +43,7 @@ class RestIndexRollupActionIT : RollupRestTestCase() {
assertEquals("Create rollup failed", RestStatus.CREATED, response.restStatus())
val responseBody = response.asMap()
val createdId = responseBody["_id"] as String
- assertNotEquals("Response is missing Id", Rollup.NO_ID, createdId)
+ assertNotEquals("Response is missing Id", NO_ID, createdId)
assertEquals("Not same id", rollup.id, createdId)
assertEquals("Incorrect Location header", "$ROLLUP_JOBS_BASE_URI/$createdId", response.getHeader("Location"))
}
@@ -116,7 +116,7 @@ class RestIndexRollupActionIT : RollupRestTestCase() {
val responseBody = updateResponse.asMap()
val updatedId = responseBody[_ID] as String
val updatedSeqNo = (responseBody[_SEQ_NO] as Int).toLong()
- assertNotEquals("response is missing Id", Rollup.NO_ID, updatedId)
+ assertNotEquals("response is missing Id", NO_ID, updatedId)
assertEquals("not same id", rollup.id, updatedId)
assertTrue("incorrect seqNo", rollup.seqNo < updatedSeqNo)
}
diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtilsTests.kt
new file mode 100644
index 000000000..f89742d02
--- /dev/null
+++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtilsTests.kt
@@ -0,0 +1,15 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement
+
+import org.opensearch.test.OpenSearchTestCase
+
+class SMUtilsTests : OpenSearchTestCase() {
+ fun `test sm policy name and id conversion`() {
+ val policyName = "daily-snapshot-sm-sm"
+ assertEquals(policyName, smDocIdToPolicyName(smPolicyNameToDocId(policyName)))
+ }
+}
diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt
new file mode 100644
index 000000000..635038f71
--- /dev/null
+++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt
@@ -0,0 +1,142 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement
+
+import com.nhaarman.mockitokotlin2.doReturn
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse
+import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
+import org.opensearch.action.index.IndexResponse
+import org.opensearch.common.UUIDs
+import org.opensearch.common.unit.TimeValue
+import org.opensearch.indexmanagement.randomCronSchedule
+import org.opensearch.indexmanagement.randomInstant
+import org.opensearch.indexmanagement.randomIntervalSchedule
+import org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine.SMState
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
+import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
+import org.opensearch.jobscheduler.spi.schedule.CronSchedule
+import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
+import org.opensearch.rest.RestStatus
+import org.opensearch.snapshots.SnapshotId
+import org.opensearch.snapshots.SnapshotInfo
+import org.opensearch.test.OpenSearchTestCase.randomNonNegativeLong
+import java.time.Instant
+import java.time.Instant.now
+
+fun randomSMMetadata(
+ currentState: SMState,
+ nextCreationTime: Instant = now(),
+ nextDeletionTime: Instant = now(),
+ policySeqNo: Long = randomNonNegativeLong(),
+ policyPrimaryTerm: Long = randomNonNegativeLong(),
+ atomic: Boolean = false,
+): SMMetadata {
+ return SMMetadata(
+ policySeqNo = policySeqNo,
+ policyPrimaryTerm = policyPrimaryTerm,
+ currentState = currentState,
+ creation = SMMetadata.Creation(
+ trigger = SMMetadata.Trigger(
+ time = nextCreationTime
+ )
+ ),
+ deletion = SMMetadata.Deletion(
+ trigger = SMMetadata.Trigger(
+ time = nextDeletionTime
+ )
+ ),
+ )
+}
+
+fun randomSMPolicy(
+ jobEnabled: Boolean = false,
+ jobLastUpdateTime: Instant = randomInstant(),
+ creationSchedule: CronSchedule = randomCronSchedule(),
+ creationTimeout: ActionTimeout? = null,
+ deletionSchedule: CronSchedule = randomCronSchedule(),
+ deletionTimeout: ActionTimeout? = null,
+ deletionMaxCount: Int = 5,
+ deletionMaxAge: TimeValue? = null,
+ deletionMinCount: Int? = null,
+ snapshotConfig: Map = mapOf(
+ "repository" to "repo",
+ "date_format" to "yyyy-MM-dd-HH:mm"
+ ),
+ jobEnabledTime: Instant = randomInstant(),
+ jobSchedule: IntervalSchedule = randomIntervalSchedule()
+): SMPolicy {
+ return SMPolicy(
+ jobEnabled = jobEnabled,
+ jobLastUpdateTime = jobLastUpdateTime,
+ creation = SMPolicy.Creation(
+ schedule = creationSchedule,
+ timeout = creationTimeout,
+ ),
+ deletion = SMPolicy.Deletion(
+ schedule = deletionSchedule,
+ timeout = deletionTimeout,
+ condition = SMPolicy.DeleteCondition(
+ maxCount = deletionMaxCount,
+ maxAge = deletionMaxAge,
+ minCount = deletionMinCount
+ )
+ ),
+ snapshotConfig = snapshotConfig,
+ jobEnabledTime = jobEnabledTime,
+ jobSchedule = jobSchedule
+ )
+}
+
+fun mockIndexResponse(status: RestStatus = RestStatus.OK): IndexResponse {
+ val indexResponse: IndexResponse = mock()
+ whenever(indexResponse.status()).doReturn(status)
+ whenever(indexResponse.seqNo).doReturn(0L)
+ whenever(indexResponse.primaryTerm).doReturn(1L)
+
+ return indexResponse
+}
+
+fun mockCreateSnapshotResponse(status: RestStatus = RestStatus.ACCEPTED): CreateSnapshotResponse {
+ val createSnapshotRes: CreateSnapshotResponse = mock()
+ whenever(createSnapshotRes.status()).doReturn(status)
+ return createSnapshotRes
+}
+
+fun mockGetSnapshotResponse(num: Int): GetSnapshotsResponse {
+ val getSnapshotsRes: GetSnapshotsResponse = mock()
+ whenever(getSnapshotsRes.snapshots).doReturn(mockSnapshots(num))
+ return getSnapshotsRes
+}
+
+fun mockSnapshots(num: Int): List {
+ val result = mutableListOf()
+ for (i in 1..num) {
+ result.add(mockSnapshotInfo(idNum = i))
+ }
+ return result.toList()
+}
+
+/**
+ * For our use case, only mock snapshotId, startTime and endTime
+ */
+fun mockSnapshotInfo(idNum: Int, startTime: Long = randomNonNegativeLong(), endTime: Long = randomNonNegativeLong()): SnapshotInfo {
+ val snapshotId = SnapshotId("mock_snapshot-$idNum}", UUIDs.randomBase64UUID())
+ return SnapshotInfo(
+ snapshotId,
+ listOf("index1"),
+ listOf("ds-1"),
+ startTime,
+ "",
+ endTime,
+ 5,
+ emptyList(),
+ false,
+ emptyMap(),
+ )
+}
diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformActionIT.kt
index 0857c1900..d184f1c2c 100644
--- a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformActionIT.kt
+++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformActionIT.kt
@@ -11,8 +11,8 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANA
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.TRANSFORM_BASE_URI
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.transform.TransformRestTestCase
-import org.opensearch.indexmanagement.transform.model.Transform
import org.opensearch.indexmanagement.transform.randomTransform
+import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.rest.RestStatus
import org.opensearch.test.junit.annotations.TestLogging
@@ -34,7 +34,7 @@ class RestIndexTransformActionIT : TransformRestTestCase() {
assertEquals("Create transform failed", RestStatus.CREATED, response.restStatus())
val responseBody = response.asMap()
val createdId = responseBody["_id"] as String
- assertNotEquals("Response is missing Id", Transform.NO_ID, createdId)
+ assertNotEquals("Response is missing Id", NO_ID, createdId)
assertEquals("Not same id", transform.id, createdId)
assertEquals("Incorrect Location header", "$TRANSFORM_BASE_URI/$createdId", response.getHeader("Location"))
}
diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json
index bf2b61e1c..1fe996ecd 100644
--- a/src/test/resources/mappings/cached-opendistro-ism-config.json
+++ b/src/test/resources/mappings/cached-opendistro-ism-config.json
@@ -1,6 +1,6 @@
{
"_meta" : {
- "schema_version": 14
+ "schema_version": 15
},
"dynamic": "strict",
"properties": {
@@ -1305,6 +1305,250 @@
"enabled": false
}
}
+ },
+ "sm": {
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "description": {
+ "type": "text"
+ },
+ "creation": {
+ "properties": {
+ "schedule": {
+ "properties": {
+ "cron": {
+ "properties": {
+ "expression": {
+ "type": "keyword"
+ },
+ "timezone": {
+ "type": "keyword"
+ },
+ "schedule_delay": {
+ "type": "long"
+ }
+ }
+ }
+ }
+ },
+ "timeout": {
+ "type": "keyword"
+ }
+ }
+ },
+ "deletion": {
+ "properties": {
+ "schedule": {
+ "properties": {
+ "cron": {
+ "properties": {
+ "expression": {
+ "type": "keyword"
+ },
+ "timezone": {
+ "type": "keyword"
+ },
+ "schedule_delay": {
+ "type": "long"
+ }
+ }
+ }
+ }
+ },
+ "timeout": {
+ "type": "keyword"
+ },
+ "condition": {
+ "properties": {
+ "max_count": {
+ "type": "integer"
+ },
+ "max_age": {
+ "type": "keyword"
+ },
+ "min_count": {
+ "type": "integer"
+ }
+ }
+ }
+ }
+ },
+ "snapshot_config": {
+ "type": "object",
+ "enabled": false
+ },
+ "enabled": {
+ "type": "boolean"
+ },
+ "enabled_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "last_updated_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "schedule": {
+ "properties": {
+ "interval": {
+ "properties": {
+ "period": {
+ "type": "integer"
+ },
+ "unit": {
+ "type": "keyword"
+ },
+ "start_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "schedule_delay": {
+ "type": "long"
+ }
+ }
+ },
+ "cron": {
+ "properties": {
+ "expression": {
+ "type": "keyword"
+ },
+ "timezone": {
+ "type": "keyword"
+ },
+ "schedule_delay": {
+ "type": "long"
+ }
+ }
+ }
+ }
+ },
+ "user": {
+ "properties": {
+ "name": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "backend_roles": {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "type" : "keyword"
+ }
+ }
+ },
+ "roles": {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "type" : "keyword"
+ }
+ }
+ },
+ "custom_attribute_names": {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "type" : "keyword"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "sm_metadata": {
+ "properties": {
+ "policy_seq_no": {
+ "type": "long"
+ },
+ "policy_primary_term": {
+ "type": "long"
+ },
+ "current_state": {
+ "type": "keyword"
+ },
+ "info": {
+ "type": "object",
+ "enabled": false
+ },
+ "creation": {
+ "properties": {
+ "trigger": {
+ "properties": {
+ "time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ },
+ "started": {
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "started_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "end_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ },
+ "finished": {
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "started_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "end_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ }
+ }
+ },
+ "deletion": {
+ "properties": {
+ "trigger": {
+ "properties": {
+ "time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ },
+ "started": {
+ "type": "nested",
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "started_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "end_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ }
+ }
+ }
+ }
}
}
}
diff --git a/worksheets/ism/all_actions.http b/worksheets/ism/all_actions.http
new file mode 100644
index 000000000..f9c625f11
--- /dev/null
+++ b/worksheets/ism/all_actions.http
@@ -0,0 +1,487 @@
+###
+PUT localhost:9200/_opendistro/_ism/policies/rollover
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "rollover": {
+ "min_doc_count": 0
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["rollover*"],
+ "priority": 100
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/rollover_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/force_merge
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "force_merge": {
+ "max_num_segments": 1
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["force_merge*"],
+ "priority": 100
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/force_merge_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/read_only
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "read_only": {}
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["read_only*"],
+ "priority": 100
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/read_only_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/read_write
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "read_write": {}
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["read_write*"],
+ "priority": 100
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/read_write_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/replica_count
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "replica_count": {
+ "number_of_replicas": 2
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["replica_count*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/replica_count_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/close
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "close": {}
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["close*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/close_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/open
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "open": {}
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["open*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/open_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/notification
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "notification": {
+ "destination": {
+ "chime": {
+ "url": ""
+ }
+ },
+ "message_template": {
+ "source": "the index is {{ctx.index}}"
+ }
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["notification*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/notification_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/snapshot
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "snapshot": {
+ "repository": "my_backup",
+ "snapshot": "snapshot1"
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["snapshot*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/snapshot_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/index_priority
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "index_priority": {
+ "priority": 50
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["index_priority*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/index_priority_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/allocation
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "allocation": {
+ "require": { "temp": "warm" }
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["allocation*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/allocation_index
+Content-Type: application/json
diff --git a/worksheets/sm/create.http b/worksheets/sm/create.http
new file mode 100644
index 000000000..9b0d4b648
--- /dev/null
+++ b/worksheets/sm/create.http
@@ -0,0 +1,84 @@
+###
+GET localhost:9200/
+Accept: application/json
+
+###
+POST localhost:9200/.opendistro-ism-config/_search
+Content-Type: application/json
+
+{
+ "seq_no_primary_term": true
+}
+
+###
+DELETE localhost:9200/.opendistro-ism-config
+
+### index sm
+PUT localhost:9200/_plugins/_sm/daily_snapshot
+Content-Type: application/json
+
+{
+ "description": "A daily snapshot policy",
+ "creation": {
+ "schedule": {
+ "cron": {
+ "expression": "* * * * *",
+ "timezone": "America/Los_Angeles"
+ }
+ }
+ },
+ "deletion": {
+ "schedule": {
+ "cron": {
+ "expression": "*/2 * * * *",
+ "timezone": "America/Los_Angeles"
+ }
+ },
+ "condition": {
+ "max_count": 5
+ }
+ },
+ "snapshot_config": {
+ "repository": "repo",
+ "date_format": "yyyy-MM-dd-HH:mm",
+ "indices": "*",
+ "partial": "false",
+ "include_global_state": "false",
+ "ignore_unavailable": "true",
+ "metadata": {
+ "taken_by": "snapshot management"
+ }
+ }
+}
+
+###
+# "schedule": {
+# "interval": {
+# "start_time": 1645596970,
+# "period": 1,
+# "unit": "MINUTES"
+# }
+# },
+
+### register repo
+PUT localhost:9200/_snapshot/repo
+Content-Type: application/json
+
+{
+ "type": "fs",
+ "settings": {
+ "location": "my_backup_location"
+ }
+}
+
+### delete repo
+DELETE localhost:9200/_snapshot/repo
+
+### create snapshot
+PUT localhost:9200/_snapshot/repo/my_snapshot
+
+### delete snapshot
+DELETE localhost:9200/_snapshot/repo/test
+
+### get snapshot status
+GET localhost:9200/_snapshot/repo/daily_snapshot*