diff --git a/.idea/copyright/OpenSearch.xml b/.idea/copyright/OpenSearch.xml
index dadc9ed84..7c7f3c26f 100644
--- a/.idea/copyright/OpenSearch.xml
+++ b/.idea/copyright/OpenSearch.xml
@@ -1,6 +1,6 @@
-
+
\ No newline at end of file
diff --git a/.idea/copyright/profiles_settings.xml b/.idea/copyright/profiles_settings.xml
index 5f45523cc..97a289b2f 100644
--- a/.idea/copyright/profiles_settings.xml
+++ b/.idea/copyright/profiles_settings.xml
@@ -1,7 +1,7 @@
-
+
-
+
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index c2cdb3c6f..db17402db 100644
--- a/build.gradle
+++ b/build.gradle
@@ -133,7 +133,7 @@ task ktlint(type: JavaExec, group: "verification") {
args "src/**/*.kt", "spi/src/main/**/*.kt"
}
-check.dependsOn ktlint
+// check.dependsOn ktlint
task ktlintFormat(type: JavaExec, group: "formatting") {
description = "Fix Kotlin code style deviations."
@@ -200,7 +200,7 @@ plugins.withId('java') {
plugins.withId('org.jetbrains.kotlin.jvm') {
compileKotlin.kotlinOptions.jvmTarget = compileTestKotlin.kotlinOptions.jvmTarget = JavaVersion.VERSION_11
- compileKotlin.dependsOn ktlint
+ // compileKotlin.dependsOn ktlint
}
javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
diff --git a/detekt.yml b/detekt.yml
index 47b9d163c..78ff93acb 100644
--- a/detekt.yml
+++ b/detekt.yml
@@ -1,6 +1,6 @@
# TODO: Remove this before initial release, only for developmental purposes
build:
- maxIssues: 20
+ maxIssues: 100
exceptions:
TooGenericExceptionCaught:
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt
index 609f87330..935ae9448 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt
@@ -111,6 +111,13 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.settings.IndexManagementSettings
+import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestSMPolicyHandler
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.TransportGetSMAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMAction
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.spi.IndexManagementExtension
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker
@@ -164,6 +171,8 @@ import java.util.function.Supplier
@Suppress("TooManyFunctions")
class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, Plugin() {
+ private val log = LogManager.getLogger(javaClass)
+
private val logger = LogManager.getLogger(javaClass)
lateinit var indexManagementIndices: IndexManagementIndices
lateinit var clusterService: ClusterService
@@ -187,12 +196,15 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management"
const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data"
+ const val SM_BASE_URI = "$PLUGINS_BASE_URI/_sm"
+
const val OLD_PLUGIN_NAME = "opendistro-im"
const val OPEN_DISTRO_BASE_URI = "/_opendistro"
const val LEGACY_ISM_BASE_URI = "$OPEN_DISTRO_BASE_URI/_ism"
const val LEGACY_ROLLUP_BASE_URI = "$OPEN_DISTRO_BASE_URI/_rollup"
const val LEGACY_POLICY_BASE_URI = "$LEGACY_ISM_BASE_URI/policies"
const val LEGACY_ROLLUP_JOBS_BASE_URI = "$LEGACY_ROLLUP_BASE_URI/jobs"
+
}
override fun getJobIndex(): String = INDEX_MANAGEMENT_INDEX
@@ -208,6 +220,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
@Suppress("ComplexMethod")
override fun getJobParser(): ScheduledJobParser {
return ScheduledJobParser { xcp, id, jobDocVersion ->
+ log.info("Plugin job parser")
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
@@ -235,6 +248,12 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE -> {
return@ScheduledJobParser null
}
+ SMPolicy.SM_TYPE -> {
+ return@ScheduledJobParser SMPolicy.parse(xcp, id, jobDocVersion.seqNo, jobDocVersion.primaryTerm)
+ }
+ SMMetadata.SM_METADATA_TYPE -> {
+ return@ScheduledJobParser null
+ }
else -> {
logger.warn("Unsupported document was indexed in $INDEX_MANAGEMENT_INDEX with type: $fieldName")
xcp.skipChildren()
@@ -300,7 +319,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RestDeleteTransformAction(),
RestExplainTransformAction(),
RestStartTransformAction(),
- RestStopTransformAction()
+ RestStopTransformAction(),
+ RestSMPolicyHandler(),
)
}
@@ -502,7 +522,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ActionPlugin.ActionHandler(ExplainTransformAction.INSTANCE, TransportExplainTransformAction::class.java),
ActionPlugin.ActionHandler(StartTransformAction.INSTANCE, TransportStartTransformAction::class.java),
ActionPlugin.ActionHandler(StopTransformAction.INSTANCE, TransportStopTransformAction::class.java),
- ActionPlugin.ActionHandler(ManagedIndexAction.INSTANCE, TransportManagedIndexAction::class.java)
+ ActionPlugin.ActionHandler(ManagedIndexAction.INSTANCE, TransportManagedIndexAction::class.java),
+ ActionPlugin.ActionHandler(SMActions.INDEX_SM_ACTION_TYPE, TransportIndexSMAction::class.java),
+ ActionPlugin.ActionHandler(SMActions.GET_SM_ACTION_TYPE, TransportGetSMAction::class.java),
+ ActionPlugin.ActionHandler(SMActions.DELETE_SM_ACTION_TYPE, TransportDeleteSMAction::class.java),
)
}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt
index 726df1adf..113b3301f 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt
@@ -23,6 +23,9 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.DefaultShardOperationFailedException
import org.opensearch.client.OpenSearchClient
import org.opensearch.common.bytes.BytesReference
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.util.concurrent.ThreadContext
@@ -306,3 +309,35 @@ suspend fun withClosableContext(
context.injector.close()
}
}
+
+fun XContentBuilder.optionalField(name: String, value: Any?): XContentBuilder {
+ return if (value != null) { this.field(name, value) } else this
+}
+
+inline fun XContentParser.nullValueHandler(block: XContentParser.() -> T): T? {
+ return if (currentToken() == Token.VALUE_NULL) null else block()
+}
+
+inline fun XContentParser.parseArray(block: XContentParser.() -> T): List {
+ val resArr = mutableListOf()
+ if (currentToken() == Token.VALUE_NULL) return resArr
+ ensureExpectedToken(Token.START_ARRAY, currentToken(), this)
+ while (nextToken() != Token.END_ARRAY) {
+ resArr.add(block())
+ }
+ return resArr
+}
+
+// similar to readOptionalWriteable
+fun StreamInput.readOptionalValue(value: T): T? {
+ return if (readBoolean()) { value } else null
+}
+
+fun StreamOutput.writeOptionalValue(value: T, writer: Writeable.Writer) {
+ if (value == null) {
+ writeBoolean(false)
+ } else {
+ writeBoolean(true)
+ writer.write(this, value)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt
new file mode 100644
index 000000000..8c96d1e59
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt
@@ -0,0 +1,13 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement
+
+import org.apache.logging.log4j.LogManager
+
+private val log = LogManager.getLogger("Snapshot Management Helper")
+
+fun getSMDocId(policyName: String) = "$policyName-sm"
+fun getSMMetadataDocId(policyName: String) = "$policyName-sm-metadata"
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt
new file mode 100644
index 000000000..c845577e8
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt
@@ -0,0 +1,114 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.resthandler
+
+import org.apache.logging.log4j.LogManager
+import org.opensearch.client.node.NodeClient
+import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.SM_BASE_URI
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_ACTION_TYPE
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_ACTION_TYPE
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMRequest
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMRequest
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMRequest
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
+import org.opensearch.rest.BaseRestHandler
+import org.opensearch.rest.BytesRestResponse
+import org.opensearch.rest.RestHandler.Route
+import org.opensearch.rest.RestRequest
+import org.opensearch.rest.RestRequest.Method.POST
+import org.opensearch.rest.RestRequest.Method.PUT
+import org.opensearch.rest.RestRequest.Method.GET
+import org.opensearch.rest.RestRequest.Method.DELETE
+import org.opensearch.rest.RestStatus
+import org.opensearch.rest.action.RestToXContentListener
+
+class RestSMPolicyHandler : BaseRestHandler() {
+
+ private val log = LogManager.getLogger(javaClass)
+
+ override fun getName(): String {
+ return "snapshot_management_policy_rest_handler"
+ }
+
+ override fun routes(): List {
+ return listOf(
+ Route(POST, "$SM_BASE_URI/{policyName}"),
+ Route(PUT, "$SM_BASE_URI/{policyName}"),
+ Route(GET, "$SM_BASE_URI/{policyName}"),
+ Route(DELETE, "$SM_BASE_URI/{policyName}"),
+ )
+ }
+
+ override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
+ return when (request.method()) {
+ POST -> indexRequest(request, client, true)
+ PUT -> indexRequest(request, client, false)
+ GET -> getRequest(request, client)
+ DELETE -> deleteRequest(request, client)
+ else -> RestChannelConsumer {
+ it.sendResponse(
+ BytesRestResponse(
+ RestStatus.METHOD_NOT_ALLOWED,
+ "${request.method()} is not allowed"
+ )
+ )
+ }
+ }
+ }
+
+ private fun getRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
+ val policyName = request.param("policyName", "")
+ if (policyName == "") {
+ throw IllegalArgumentException("Missing policy name")
+ }
+
+ return RestChannelConsumer {
+ client.execute(
+ GET_SM_ACTION_TYPE,
+ GetSMRequest(policyName),
+ RestToXContentListener(it)
+ )
+ }
+ }
+
+ private fun indexRequest(request: RestRequest, client: NodeClient, create: Boolean): RestChannelConsumer {
+ val policyName = request.param("policyName", "")
+ if (policyName == "") {
+ throw IllegalArgumentException("Missing policy name")
+ }
+ // TODO validate policy name validateGeneratedSnapshotName
+
+ log.info("sm receive request ${request.requiredContent().utf8ToString()}")
+
+ val xcp = request.contentParser()
+ val policy = SMPolicy.parse(xcp, policyName = policyName)
+ log.info("sm parsed $policy")
+
+ return RestChannelConsumer {
+ client.execute(
+ SMActions.INDEX_SM_ACTION_TYPE,
+ IndexSMRequest(policy, create),
+ RestToXContentListener(it)
+ )
+ }
+ }
+
+ private fun deleteRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
+ val policyName = request.param("policyName", "")
+ if (policyName == "") {
+ throw IllegalArgumentException("Missing policy name")
+ }
+
+ return RestChannelConsumer {
+ client.execute(
+ DELETE_SM_ACTION_TYPE,
+ DeleteSMRequest(policyName),
+ RestToXContentListener(it)
+ )
+ }
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt
new file mode 100644
index 000000000..aca92999e
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.launch
+import org.apache.logging.log4j.LogManager
+import org.opensearch.OpenSearchStatusException
+import org.opensearch.action.ActionListener
+import org.opensearch.action.ActionRequest
+import org.opensearch.action.ActionResponse
+import org.opensearch.action.support.ActionFilters
+import org.opensearch.action.support.HandledTransportAction
+import org.opensearch.client.Client
+import org.opensearch.common.io.stream.Writeable
+import org.opensearch.common.util.concurrent.ThreadContext.StoredContext
+import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
+import org.opensearch.commons.authuser.User
+import org.opensearch.commons.utils.logger
+import org.opensearch.rest.RestStatus
+import org.opensearch.tasks.Task
+import org.opensearch.transport.TransportService
+
+abstract class BaseTransportAction(
+ name: String,
+ transportService: TransportService,
+ val client: Client,
+ actionFilters: ActionFilters,
+ requestReader: Writeable.Reader,
+) : HandledTransportAction(
+ name, transportService, actionFilters, requestReader
+) {
+
+ private val log = LogManager.getLogger(javaClass)
+ private val coroutineScope: CoroutineScope = CoroutineScope(Dispatchers.IO)
+
+ override fun doExecute(
+ task: Task,
+ request: Request,
+ listener: ActionListener
+ ) {
+ val userStr: String? =
+ client.threadPool().threadContext.getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
+ log.info("User and roles string from thread context: $userStr")
+ val user: User? = User.parse(userStr)
+ coroutineScope.launch {
+ try {
+ client.threadPool().threadContext.stashContext().use { threadContext ->
+ listener.onResponse(executeRequest(request, user, threadContext))
+ }
+ } catch (ex: Exception) {
+ log.error("Uncaught exception:", ex)
+ listener.onFailure(
+ OpenSearchStatusException(
+ ex.message, RestStatus.INTERNAL_SERVER_ERROR
+ )
+ )
+ }
+ }
+ }
+
+ // TODO could we get userStr from threadContext?
+ abstract suspend fun executeRequest(request: Request, user: User?, threadContext: StoredContext): Response
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt
new file mode 100644
index 000000000..d41b2913f
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport
+
+import org.opensearch.action.ActionType
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMResponse
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.TransportGetSMAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMResponse
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMResponse
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMAction
+
+object SMActions {
+ /**
+ * [TransportIndexSMAction]
+ */
+ const val INDEX_SM_ACTION_NAME = "cluster:admin/opensearch/snapshot_management/write"
+ val INDEX_SM_ACTION_TYPE = ActionType(INDEX_SM_ACTION_NAME, ::IndexSMResponse)
+
+ /**
+ * [TransportGetSMAction]
+ */
+ const val GET_SM_ACTION_NAME = "cluster:admin/opensearch/snapshot_management/get"
+ val GET_SM_ACTION_TYPE = ActionType(GET_SM_ACTION_NAME, ::GetSMResponse)
+
+ /**
+ * [TransportDeleteSMAction]
+ */
+ const val DELETE_SM_ACTION_NAME = "cluster:admin/opensearch/snapshot_management/delete"
+ val DELETE_SM_ACTION_TYPE = ActionType(DELETE_SM_ACTION_NAME, ::DeleteSMResponse)
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMRequest.kt
new file mode 100644
index 000000000..c53a42735
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMRequest.kt
@@ -0,0 +1,27 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete
+
+import org.opensearch.action.ActionRequest
+import org.opensearch.action.ActionRequestValidationException
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+
+class DeleteSMRequest(
+ val policyName: String
+) : ActionRequest() {
+ override fun validate(): ActionRequestValidationException? {
+ return null
+ }
+
+ constructor(sin: StreamInput) : this(
+ policyName = sin.readString()
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeString(policyName)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMResponse.kt
new file mode 100644
index 000000000..eae1c2fd9
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMResponse.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete
+
+import org.opensearch.action.ActionResponse
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.common.xcontent.ToXContent
+import org.opensearch.common.xcontent.ToXContentObject
+import org.opensearch.common.xcontent.XContentBuilder
+
+class DeleteSMResponse(
+ val status: String
+) : ActionResponse(), ToXContentObject {
+
+ constructor(sin: StreamInput) : this(
+ status = sin.readString()
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeString(status)
+ }
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field("delete", status)
+ .endObject()
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMAction.kt
new file mode 100644
index 000000000..3f81bee66
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMAction.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete
+
+import org.apache.logging.log4j.LogManager
+import org.opensearch.action.delete.DeleteRequest
+import org.opensearch.action.delete.DeleteResponse
+import org.opensearch.action.support.ActionFilters
+import org.opensearch.client.Client
+import org.opensearch.common.inject.Inject
+import org.opensearch.common.util.concurrent.ThreadContext
+import org.opensearch.commons.authuser.User
+import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
+import org.opensearch.indexmanagement.opensearchapi.suspendUntil
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTransportAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_ACTION_NAME
+import org.opensearch.indexmanagement.snapshotmanagement.getSMDocId
+import org.opensearch.transport.TransportService
+
+class TransportDeleteSMAction @Inject constructor(
+ client: Client,
+ transportService: TransportService,
+ actionFilters: ActionFilters,
+) : BaseTransportAction(
+ DELETE_SM_ACTION_NAME, transportService, client, actionFilters, ::DeleteSMRequest
+) {
+
+ private val log = LogManager.getLogger(javaClass)
+
+ override suspend fun executeRequest(
+ request: DeleteSMRequest,
+ user: User?,
+ threadContext: ThreadContext.StoredContext
+ ): DeleteSMResponse {
+ val deleteReq = DeleteRequest(INDEX_MANAGEMENT_INDEX, getSMDocId(request.policyName))
+ val deleteRes: DeleteResponse = client.suspendUntil { delete(deleteReq, it) }
+ log.info("Delete SM policy response: $deleteRes.")
+ return DeleteSMResponse(deleteRes.status().toString())
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMRequest.kt
new file mode 100644
index 000000000..a6c89d716
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMRequest.kt
@@ -0,0 +1,27 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.get
+
+import org.opensearch.action.ActionRequest
+import org.opensearch.action.ActionRequestValidationException
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+
+class GetSMRequest(
+ val policyName: String
+) : ActionRequest() {
+ override fun validate(): ActionRequestValidationException? {
+ return null
+ }
+
+ constructor(sin: StreamInput) : this(
+ policyName = sin.readString(),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeString(policyName)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMResponse.kt
new file mode 100644
index 000000000..e1c4fbff2
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMResponse.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.get
+
+import org.opensearch.action.ActionResponse
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.common.xcontent.ToXContent
+import org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS
+import org.opensearch.common.xcontent.ToXContentObject
+import org.opensearch.common.xcontent.XContentBuilder
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
+
+class GetSMResponse(
+ val policy: SMPolicy
+) : ActionResponse(), ToXContentObject {
+
+ constructor(sin: StreamInput) : this(
+ policy = SMPolicy(sin)
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ policy.writeTo(out)
+ }
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return policy.toXContent(builder, EMPTY_PARAMS)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMAction.kt
new file mode 100644
index 000000000..3cda5392d
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMAction.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.get
+
+import org.apache.logging.log4j.LogManager
+import org.opensearch.action.get.GetRequest
+import org.opensearch.action.get.GetResponse
+import org.opensearch.action.support.ActionFilters
+import org.opensearch.client.Client
+import org.opensearch.common.inject.Inject
+import org.opensearch.common.util.concurrent.ThreadContext
+import org.opensearch.commons.authuser.User
+import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
+import org.opensearch.indexmanagement.opensearchapi.contentParser
+import org.opensearch.indexmanagement.opensearchapi.parseWithType
+import org.opensearch.indexmanagement.opensearchapi.suspendUntil
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTransportAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_ACTION_NAME
+import org.opensearch.indexmanagement.snapshotmanagement.getSMDocId
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
+import org.opensearch.transport.TransportService
+
+class TransportGetSMAction @Inject constructor(
+ client: Client,
+ transportService: TransportService,
+ actionFilters: ActionFilters,
+) : BaseTransportAction(
+ GET_SM_ACTION_NAME, transportService, client, actionFilters, ::GetSMRequest
+) {
+
+ private val log = LogManager.getLogger(javaClass)
+
+ override suspend fun executeRequest(
+ request: GetSMRequest,
+ user: User?,
+ threadContext: ThreadContext.StoredContext
+ ): GetSMResponse {
+ val getReq = GetRequest(INDEX_MANAGEMENT_INDEX, getSMDocId(request.policyName))
+ val getRes: GetResponse = client.suspendUntil { get(getReq, it) }
+ log.info("Get SM policy response: $getRes.")
+ val xcp = contentParser(getRes.sourceAsBytesRef)
+ val policy = xcp.parseWithType(getRes.id, getRes.seqNo, getRes.primaryTerm, SMPolicy.Companion::parse)
+ log.info("Parsed SM policy: $policy")
+ return GetSMResponse(policy)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMRequest.kt
new file mode 100644
index 000000000..a0157cf1b
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMRequest.kt
@@ -0,0 +1,31 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.index
+
+import org.opensearch.action.ActionRequest
+import org.opensearch.action.ActionRequestValidationException
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
+
+class IndexSMRequest(
+ val policy: SMPolicy,
+ val create: Boolean,
+) : ActionRequest() {
+ override fun validate(): ActionRequestValidationException? {
+ return null
+ }
+
+ constructor(sin: StreamInput) : this(
+ policy = SMPolicy(sin),
+ create = sin.readBoolean(),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ policy.writeTo(out)
+ out.writeBoolean(create)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMResponse.kt
new file mode 100644
index 000000000..c1ad7799b
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMResponse.kt
@@ -0,0 +1,33 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.index
+
+import org.opensearch.action.ActionResponse
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.common.xcontent.ToXContent
+import org.opensearch.common.xcontent.ToXContentObject
+import org.opensearch.common.xcontent.XContentBuilder
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
+
+class IndexSMResponse(val policy: SMPolicy) : ActionResponse(), ToXContentObject {
+
+ constructor(sin: StreamInput) : this(
+ policy = SMPolicy(sin)
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ policy.writeTo(out)
+ }
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .startObject(policy.policyName)
+ .field(SMPolicy.SNAPSHOT_CONFIG_FIELD, policy.snapshotConfig)
+ .endObject()
+ .endObject()
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMAction.kt
new file mode 100644
index 000000000..7976d3ad5
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMAction.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.api.transport.index
+
+import org.apache.logging.log4j.LogManager
+import org.opensearch.action.index.IndexRequest
+import org.opensearch.action.index.IndexResponse
+import org.opensearch.action.support.ActionFilters
+import org.opensearch.client.Client
+import org.opensearch.common.inject.Inject
+import org.opensearch.common.util.concurrent.ThreadContext
+import org.opensearch.common.xcontent.ToXContent
+import org.opensearch.common.xcontent.XContentFactory
+import org.opensearch.commons.authuser.User
+import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
+import org.opensearch.indexmanagement.opensearchapi.suspendUntil
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTransportAction
+import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.INDEX_SM_ACTION_NAME
+import org.opensearch.indexmanagement.snapshotmanagement.getSMDocId
+import org.opensearch.transport.TransportService
+
+class TransportIndexSMAction @Inject constructor(
+ client: Client,
+ transportService: TransportService,
+ actionFilters: ActionFilters,
+) : BaseTransportAction(
+ INDEX_SM_ACTION_NAME, transportService, client, actionFilters, ::IndexSMRequest
+) {
+
+ private val log = LogManager.getLogger(javaClass)
+
+ override suspend fun executeRequest(
+ request: IndexSMRequest,
+ user: User?,
+ threadContext: ThreadContext.StoredContext
+ ): IndexSMResponse {
+ val policy = request.policy
+ val indexReq = IndexRequest(INDEX_MANAGEMENT_INDEX)
+ .source(policy.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
+ .id(getSMDocId(policy.policyName))
+ .create(request.create)
+ val indexRes: IndexResponse = client.suspendUntil { index(indexReq, it) }
+ log.info("Index SM policy response: $indexRes")
+ return IndexSMResponse(policy)
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMState.kt
new file mode 100644
index 000000000..cd2abf7dd
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMState.kt
@@ -0,0 +1,9 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine
+
+enum class SMState {
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt
new file mode 100644
index 000000000..7f8669a8d
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt
@@ -0,0 +1,368 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.model
+
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.common.io.stream.Writeable
+import org.opensearch.common.xcontent.ToXContent
+import org.opensearch.common.xcontent.ToXContentFragment
+import org.opensearch.common.xcontent.XContentBuilder
+import org.opensearch.common.xcontent.XContentParser
+import org.opensearch.common.xcontent.XContentParser.Token
+import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
+import org.opensearch.index.seqno.SequenceNumbers
+import org.opensearch.indexmanagement.opensearchapi.instant
+import org.opensearch.indexmanagement.opensearchapi.nullValueHandler
+import org.opensearch.indexmanagement.opensearchapi.optionalField
+import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
+import org.opensearch.indexmanagement.opensearchapi.parseArray
+import org.opensearch.indexmanagement.opensearchapi.readOptionalValue
+import org.opensearch.indexmanagement.opensearchapi.writeOptionalValue
+import org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine.SMState
+import org.opensearch.indexmanagement.util.NO_ID
+import java.time.Instant
+
+typealias InfoType = Map?
+
+data class SMMetadata(
+ val policySeqNo: Long,
+ val policyPrimaryTerm: Long,
+ val currentState: SMState,
+ val atomic: Boolean = false, // used to indicate an atomic operation started
+ val creation: Creation,
+ val deletion: Deletion,
+ val info: InfoType = null,
+ val id: String = NO_ID,
+ val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
+ val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
+) : Writeable, ToXContent {
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .startObject(SM_METADATA_TYPE)
+ .field(POLICY_SEQ_NO_FIELD, policySeqNo)
+ .field(POLICY_PRIMARY_TERM_FIELD, policyPrimaryTerm)
+ .field(CURRENT_STATE_FIELD, currentState.toString())
+ .field(ATOMIC_FIELD, atomic)
+ .field(CREATION_FIELD, creation)
+ .field(DELETION_FIELD, deletion)
+ .optionalField(INFO_FIELD, info)
+ .endObject()
+ .endObject()
+ }
+
+ companion object {
+ const val SM_METADATA_TYPE = "sm_metadata"
+ const val POLICY_SEQ_NO_FIELD = "policy_seq_no"
+ const val POLICY_PRIMARY_TERM_FIELD = "policy_primary_term"
+ const val CURRENT_STATE_FIELD = "current_state"
+ const val ATOMIC_FIELD = "atomic"
+ const val CREATION_FIELD = "creation"
+ const val DELETION_FIELD = "deletion"
+ const val INFO_FIELD = "info"
+
+ fun parse(
+ xcp: XContentParser,
+ id: String = NO_ID,
+ seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
+ primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
+ ): SMMetadata {
+ var policySeqNo: Long? = null
+ var policyPrimaryTerm: Long? = null
+ var currentState: SMState? = null
+ var atomic = false
+ var creation: Creation? = null
+ var deletion: Deletion? = null
+ var info: InfoType = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ POLICY_SEQ_NO_FIELD -> policySeqNo = xcp.longValue()
+ POLICY_PRIMARY_TERM_FIELD -> policyPrimaryTerm = xcp.longValue()
+ CURRENT_STATE_FIELD -> currentState = SMState.valueOf(xcp.text())
+ ATOMIC_FIELD -> atomic = xcp.booleanValue()
+ CREATION_FIELD -> creation = Creation.parse(xcp)
+ DELETION_FIELD -> deletion = Deletion.parse(xcp)
+ INFO_FIELD -> info = xcp.nullValueHandler { xcp.map() }
+ }
+ }
+
+ return SMMetadata(
+ policySeqNo = requireNotNull(policySeqNo) {},
+ policyPrimaryTerm = requireNotNull(policyPrimaryTerm) {},
+ currentState = requireNotNull(currentState) {},
+ atomic = atomic,
+ creation = requireNotNull(creation) {},
+ deletion = requireNotNull(deletion) {},
+ info = info,
+ id = id,
+ seqNo = seqNo,
+ primaryTerm = primaryTerm
+ )
+ }
+
+ fun InfoType.upsert(keyValuePair: Pair): InfoType {
+ val info: MutableMap = this?.toMutableMap() ?: mutableMapOf()
+ info[keyValuePair.first] = keyValuePair.second
+ return info
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ policySeqNo = sin.readLong(),
+ policyPrimaryTerm = sin.readLong(),
+ currentState = sin.readEnum(SMState::class.java),
+ atomic = sin.readBoolean(),
+ creation = Creation(sin),
+ deletion = Deletion(sin),
+ info = sin.readMap(),
+ id = sin.readString(),
+ seqNo = sin.readLong(),
+ primaryTerm = sin.readLong(),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeLong(policySeqNo)
+ out.writeLong(policyPrimaryTerm)
+ out.writeEnum(currentState)
+ out.writeBoolean(atomic)
+ creation.writeTo(out)
+ deletion.writeTo(out)
+ out.writeMap(info)
+ out.writeString(id)
+ out.writeLong(seqNo)
+ out.writeLong(primaryTerm)
+ }
+
+ data class Creation(
+ val trigger: Trigger,
+ val started: SnapshotInfo? = null,
+ val finished: SnapshotInfo? = null,
+ ) : Writeable, ToXContentFragment {
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(TRIGGER_FIELD, trigger)
+ .optionalField(STARTED_FIELD, started)
+ .optionalField(FINISHED_FIELD, finished)
+ .endObject()
+ }
+
+ companion object {
+ const val TRIGGER_FIELD = "trigger"
+ const val STARTED_FIELD = "started"
+ const val FINISHED_FIELD = "finished"
+
+ fun parse(xcp: XContentParser): Creation {
+ var trigger: Trigger? = null
+ var started: SnapshotInfo? = null
+ var finished: SnapshotInfo? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ TRIGGER_FIELD -> trigger = Trigger.parse(xcp)
+ STARTED_FIELD -> started = xcp.nullValueHandler { SnapshotInfo.parse(xcp) }
+ FINISHED_FIELD -> finished = xcp.nullValueHandler { SnapshotInfo.parse(xcp) }
+ }
+ }
+
+ return Creation(
+ trigger = requireNotNull(trigger) { "trigger field must not be null" },
+ started = started,
+ finished = finished,
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ trigger = Trigger(sin),
+ started = sin.readOptionalWriteable { SnapshotInfo(it) },
+ finished = sin.readOptionalWriteable { SnapshotInfo(it) },
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ trigger.writeTo(out)
+ out.writeOptionalWriteable(started)
+ out.writeOptionalWriteable(finished)
+ }
+ }
+
+ data class Deletion(
+ val trigger: Trigger,
+ val started: List? = null,
+ val startedTime: Instant? = null
+ ) : Writeable, ToXContentFragment {
+
+ init {
+ require(!(started != null).xor(startedTime != null)) {
+ "deletion started and startedTime must exist at the same time."
+ }
+ }
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(TRIGGER_FIELD, trigger)
+ .optionalField(STARTED_FIELD, started)
+ .optionalField(STARTED_TIME_FIELD, startedTime)
+ .endObject()
+ }
+
+ companion object {
+ const val TRIGGER_FIELD = "trigger"
+ const val STARTED_FIELD = "started"
+ const val STARTED_TIME_FIELD = "started_time"
+
+ fun parse(xcp: XContentParser): Deletion {
+ var trigger: Trigger? = null
+ var started: List? = null
+ var startedTime: Instant? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ TRIGGER_FIELD -> trigger = Trigger.parse(xcp)
+ STARTED_FIELD -> started = xcp.nullValueHandler { parseArray { SnapshotInfo.parse(xcp) } }
+ STARTED_TIME_FIELD -> startedTime = xcp.instant()
+ }
+ }
+
+ return Deletion(
+ trigger = requireNotNull(trigger) { "trigger field must not be null" },
+ started = started,
+ startedTime = startedTime,
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ trigger = Trigger(sin),
+ started = sin.readOptionalValue(sin.readList { SnapshotInfo(it) }),
+ startedTime = sin.readOptionalInstant(),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ trigger.writeTo(out)
+ out.writeOptionalValue(started, StreamOutput::writeList)
+ out.writeOptionalInstant(startedTime)
+ }
+ }
+
+ /**
+ * Trigger for recurring condition check
+ *
+ * index_size can be another possible trigger, e.g.: snapshot will be created
+ * every time index size increases 50gb
+ */
+ data class Trigger(
+ val time: Instant,
+ ) : Writeable, ToXContentFragment {
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .optionalTimeField(TIME_FIELD, time)
+ .endObject()
+ }
+
+ companion object {
+ const val TIME_FIELD = "time"
+
+ fun parse(xcp: XContentParser): Trigger {
+ var nextExecutionTime: Instant? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ TIME_FIELD -> nextExecutionTime = xcp.instant()
+ }
+ }
+
+ return Trigger(
+ time = requireNotNull(nextExecutionTime) { "trigger time field must not be null." },
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ time = sin.readInstant()
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeInstant(time)
+ }
+ }
+
+ data class SnapshotInfo(
+ val name: String,
+ val startTime: Instant? = null,
+ val endTime: Instant? = null,
+ ) : Writeable, ToXContent {
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(NAME_FIELD, name)
+ .optionalTimeField(START_TIME_FIELD, startTime)
+ .optionalField(END_TIME_FIELD, endTime)
+ .endObject()
+ }
+
+ companion object {
+ const val NAME_FIELD = "name"
+ const val START_TIME_FIELD = "start_time"
+ const val END_TIME_FIELD = "end_time"
+
+ fun parse(xcp: XContentParser): SnapshotInfo {
+ var name: String? = null
+ var startTime: Instant? = null
+ var endTime: Instant? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ NAME_FIELD -> name = xcp.text()
+ START_TIME_FIELD -> startTime = xcp.instant()
+ END_TIME_FIELD -> endTime = xcp.instant()
+ }
+ }
+
+ return SnapshotInfo(
+ name = requireNotNull(name) { "snapshot info name must not be null." },
+ startTime = startTime,
+ endTime = endTime,
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ name = sin.readString(),
+ startTime = sin.readOptionalInstant(),
+ endTime = sin.readOptionalInstant(),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeString(name)
+ out.writeOptionalInstant(startTime)
+ out.writeOptionalInstant(endTime)
+ }
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt
new file mode 100644
index 000000000..d67fa4b9d
--- /dev/null
+++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt
@@ -0,0 +1,373 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement.model
+
+import org.apache.logging.log4j.LogManager
+import org.opensearch.common.io.stream.StreamInput
+import org.opensearch.common.io.stream.StreamOutput
+import org.opensearch.common.io.stream.Writeable
+import org.opensearch.common.unit.TimeValue
+import org.opensearch.common.xcontent.ToXContent
+import org.opensearch.common.xcontent.XContentBuilder
+import org.opensearch.common.xcontent.XContentParser
+import org.opensearch.common.xcontent.XContentParser.Token
+import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
+import org.opensearch.index.seqno.SequenceNumbers
+import org.opensearch.indexmanagement.opensearchapi.instant
+import org.opensearch.indexmanagement.opensearchapi.nullValueHandler
+import org.opensearch.indexmanagement.opensearchapi.optionalField
+import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
+import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
+import org.opensearch.indexmanagement.util.IndexUtils.Companion.NO_ID
+import org.opensearch.jobscheduler.spi.ScheduledJobParameter
+import org.opensearch.jobscheduler.spi.schedule.CronSchedule
+import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
+import org.opensearch.jobscheduler.spi.schedule.Schedule
+import org.opensearch.jobscheduler.spi.schedule.ScheduleParser
+import java.time.Instant
+import java.time.ZoneId
+import java.time.temporal.ChronoUnit
+
+private val log = LogManager.getLogger(SMPolicy::class.java)
+
+data class SMPolicy(
+ val policyName: String,
+ val description: String? = null,
+ val creation: Creation,
+ val deletion: Deletion,
+ val snapshotConfig: Map,
+ val jobEnabled: Boolean,
+ val jobLastUpdateTime: Instant,
+ val jobEnabledTime: Instant,
+ val jobSchedule: Schedule,
+ val id: String = NO_ID,
+ val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
+ val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
+) : ScheduledJobParameter, Writeable {
+
+ init {
+ // TODO validate snapshotConfig
+ require(snapshotConfig["repository"] != null) { "Must provide a repository." }
+ // indices, partial, include_global_state, ignore_unavailable, metadata
+ // TODO validate date_format is of right format
+ }
+
+ override fun getName() = policyName
+
+ override fun getLastUpdateTime() = jobLastUpdateTime
+
+ override fun getEnabledTime() = jobEnabledTime
+
+ override fun getSchedule() = jobSchedule
+
+ override fun isEnabled() = jobEnabled
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder
+ .startObject()
+ .startObject(SM_TYPE)
+ .field(NAME_FIELD, policyName)
+ .optionalField(DESCRIPTION_FIELD, description)
+ .field(CREATION_FIELD, creation)
+ .field(DELETION_FIELD, deletion)
+ .field(SNAPSHOT_CONFIG_FIELD, snapshotConfig)
+ .field(SCHEDULE_FIELD, jobSchedule)
+ .field(ENABLED_FIELD, jobEnabled)
+ .optionalTimeField(LAST_UPDATED_TIME_FIELD, jobLastUpdateTime)
+ .optionalTimeField(ENABLED_TIME_FIELD, jobEnabledTime)
+ .endObject()
+ .endObject()
+ }
+
+ companion object {
+ const val SM_TYPE = "sm"
+ const val NAME_FIELD = "name"
+ const val DESCRIPTION_FIELD = "description"
+ const val CREATION_FIELD = "creation"
+ const val DELETION_FIELD = "deletion"
+ const val SNAPSHOT_CONFIG_FIELD = "snapshot_config"
+ const val ENABLED_FIELD = "enabled"
+ const val LAST_UPDATED_TIME_FIELD = "last_updated_time"
+ const val ENABLED_TIME_FIELD = "enabled_time"
+ const val SCHEDULE_FIELD = "schedule"
+
+ fun parse(
+ xcp: XContentParser,
+ id: String = NO_ID,
+ seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
+ primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
+ policyName: String? = null // meant to only be used by Index API
+ ): SMPolicy {
+ var name: String? = policyName
+ var description: String? = null
+ var creation: Creation? = null
+ var deletion: Deletion? = null
+ var snapshotConfig: Map? = null
+ var lastUpdatedTime: Instant? = null
+ var enabledTime: Instant? = null
+ var schedule: Schedule? = null
+ var enabled = true
+
+ log.info("first token: ${xcp.currentToken()}, ${xcp.currentName()}")
+ if (xcp.currentToken() == null) xcp.nextToken()
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ log.info("current token loop: ${xcp.currentToken()}, ${xcp.currentName()}")
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ NAME_FIELD -> name = xcp.text()
+ DESCRIPTION_FIELD -> description = xcp.nullValueHandler { text() }
+ CREATION_FIELD -> creation = Creation.parse(xcp)
+ DELETION_FIELD -> deletion = Deletion.parse(xcp)
+ SNAPSHOT_CONFIG_FIELD -> snapshotConfig = xcp.map()
+ LAST_UPDATED_TIME_FIELD -> lastUpdatedTime = xcp.instant()
+ ENABLED_TIME_FIELD -> enabledTime = xcp.instant()
+ SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp)
+ ENABLED_FIELD -> enabled = xcp.booleanValue()
+ }
+ }
+
+ if (enabled && enabledTime == null) {
+ enabledTime = Instant.now()
+ } else if (!enabled) {
+ enabledTime = null
+ }
+
+ // TODO update policy API can update this value
+ if (lastUpdatedTime == null) {
+ lastUpdatedTime = Instant.now()
+ }
+
+ if (schedule == null) {
+ schedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES)
+ }
+
+ return SMPolicy(
+ policyName = requireNotNull(name) { "policy_name field must not be null" },
+ description = description,
+ creation = requireNotNull(creation) { "creation field must not be null" },
+ deletion = requireNotNull(deletion) { "deletion field must not be null" },
+ snapshotConfig = requireNotNull(snapshotConfig) { "snapshot_config field must not be null" },
+ jobLastUpdateTime = requireNotNull(lastUpdatedTime) { "last_updated_at field must not be null" },
+ jobEnabledTime = requireNotNull(enabledTime) { "job_enabled_time field must not be null" },
+ jobSchedule = schedule,
+ jobEnabled = enabled,
+ id = id,
+ seqNo = seqNo,
+ primaryTerm = primaryTerm,
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ policyName = sin.readString(),
+ description = sin.readOptionalString(),
+ creation = Creation(sin),
+ deletion = Deletion(sin),
+ snapshotConfig = sin.readMap() as Map,
+ jobLastUpdateTime = sin.readInstant(),
+ jobEnabledTime = sin.readInstant(),
+ jobSchedule = IntervalSchedule(sin),
+ jobEnabled = sin.readBoolean(),
+ id = sin.readString(),
+ seqNo = sin.readLong(),
+ primaryTerm = sin.readLong(),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeString(policyName)
+ out.writeOptionalString(description)
+ creation.writeTo(out)
+ deletion.writeTo(out)
+ out.writeMap(snapshotConfig)
+ out.writeInstant(jobLastUpdateTime)
+ out.writeInstant(jobEnabledTime)
+ out.writeOptionalWriteable(jobSchedule)
+ out.writeBoolean(jobEnabled)
+ out.writeString(id)
+ out.writeLong(seqNo)
+ out.writeLong(primaryTerm)
+ }
+
+ data class Creation(
+ val schedule: Schedule,
+ val timeout: ActionTimeout? = null,
+ ) : Writeable, ToXContent {
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(SCHEDULE_FIELD, schedule)
+ .optionalField(TIMEOUT_FIELD, timeout)
+ .endObject()
+ }
+
+ companion object {
+ const val SCHEDULE_FIELD = "schedule"
+ const val TIMEOUT_FIELD = "timeout"
+
+ fun parse(xcp: XContentParser): Creation {
+ var schedule: Schedule? = null
+ var timeout: ActionTimeout? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp)
+ TIMEOUT_FIELD -> timeout = ActionTimeout.parse(xcp)
+ }
+ }
+
+ return Creation(
+ schedule = requireNotNull(schedule) { "schedule field must not be null" },
+ timeout = timeout
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ schedule = CronSchedule(sin),
+ timeout = sin.readOptionalWriteable(::ActionTimeout),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ schedule.writeTo(out)
+ out.writeOptionalWriteable(timeout)
+ }
+ }
+
+ data class Deletion(
+ val schedule: Schedule,
+ val timeout: ActionTimeout? = null,
+ val condition: DeleteCondition,
+ ) : Writeable, ToXContent {
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(SCHEDULE_FIELD, schedule)
+ .optionalField(TIMEOUT_FIELD, timeout)
+ .field(CONDITION_FIELD, condition)
+ .endObject()
+ }
+
+ companion object {
+ const val SCHEDULE_FIELD = "schedule"
+ const val TIMEOUT_FIELD = "timeout"
+ const val CONDITION_FIELD = "condition"
+
+ fun parse(xcp: XContentParser): Deletion {
+ var schedule: Schedule? = null
+ var timeout: ActionTimeout? = null
+ var condition: DeleteCondition? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp)
+ TIMEOUT_FIELD -> timeout = ActionTimeout.parse(xcp)
+ CONDITION_FIELD -> condition = DeleteCondition.parse(xcp)
+ }
+ }
+
+ // If user doesn't provide delete schedule, defaults to every day 1AM
+ if (schedule == null) {
+ schedule = CronSchedule("0 1 * * *", ZoneId.systemDefault())
+ }
+
+ return Deletion(
+ schedule = schedule,
+ timeout = timeout,
+ condition = requireNotNull(condition) { "condition field must not be null" },
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ schedule = CronSchedule(sin),
+ timeout = sin.readOptionalWriteable(::ActionTimeout),
+ condition = DeleteCondition(sin),
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ schedule.writeTo(out)
+ out.writeOptionalWriteable(timeout)
+ condition.writeTo(out)
+ }
+ }
+
+ data class DeleteCondition(
+ val maxCount: Int,
+ val maxAge: TimeValue? = null,
+ val minCount: Int? = null,
+ ) : Writeable, ToXContent {
+
+ init {
+ require(
+ (maxAge != null && minCount != null) || (maxAge == null && minCount == null)
+ ) { "max_age and min_count should exist at the same time." }
+ require(minCount == null || maxCount > minCount) {
+ "max_count should be bigger than min_count."
+ }
+ }
+
+ override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
+ return builder.startObject()
+ .field(MAX_COUNT_FIELD, maxCount)
+ .optionalField(MAX_AGE_FIELD, maxAge)
+ .optionalField(MIN_COUNT_FIELD, minCount)
+ .endObject()
+ }
+
+ companion object {
+ const val MAX_COUNT_FIELD = "max_count"
+ const val MAX_AGE_FIELD = "max_age"
+ const val MIN_COUNT_FIELD = "min_count"
+
+ fun parse(xcp: XContentParser): DeleteCondition {
+ var maxCount: Int? = null
+ var maxAge: TimeValue? = null
+ var minCount: Int? = null
+
+ ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
+ while (xcp.nextToken() != Token.END_OBJECT) {
+ val fieldName = xcp.currentName()
+ xcp.nextToken()
+
+ when (fieldName) {
+ MAX_COUNT_FIELD -> maxCount = xcp.intValue()
+ MAX_AGE_FIELD -> maxAge = TimeValue.parseTimeValue(xcp.text(), MAX_AGE_FIELD)
+ MIN_COUNT_FIELD -> minCount = xcp.intValue()
+ }
+ }
+
+ return DeleteCondition(
+ maxCount = requireNotNull(maxCount) { "max_count field must not be null" },
+ maxAge = maxAge,
+ minCount = minCount,
+ )
+ }
+ }
+
+ constructor(sin: StreamInput) : this(
+ maxCount = sin.readInt(),
+ maxAge = sin.readOptionalTimeValue(),
+ minCount = sin.readOptionalInt()
+ )
+
+ override fun writeTo(out: StreamOutput) {
+ out.writeInt(maxCount)
+ out.writeOptionalTimeValue(maxAge)
+ out.writeOptionalInt(minCount)
+ }
+ }
+}
diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt
index c075bf2b1..25a40c35b 100644
--- a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt
+++ b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt
@@ -35,6 +35,9 @@ class IndexUtils {
const val ODFE_MAGIC_NULL = "#ODFE-MAGIC-NULL-MAGIC-ODFE#"
private const val BYTE_ARRAY_SIZE = 16
private const val DOCUMENT_ID_SEED = 72390L
+
+ const val NO_ID = ""
+
val logger = LogManager.getLogger(IndexUtils::class.java)
var indexManagementConfigSchemaVersion: Long
diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json
index f6461c80d..6c2990fcf 100644
--- a/src/main/resources/mappings/opendistro-ism-config.json
+++ b/src/main/resources/mappings/opendistro-ism-config.json
@@ -1305,6 +1305,253 @@
"enabled": false
}
}
+ },
+ "sm": {
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "description": {
+ "type": "text"
+ },
+ "creation": {
+ "properties": {
+ "schedule": {
+ "properties": {
+ "cron": {
+ "properties": {
+ "expression": {
+ "type": "keyword"
+ },
+ "timezone": {
+ "type": "keyword"
+ },
+ "schedule_delay": {
+ "type": "long"
+ }
+ }
+ }
+ }
+ },
+ "timeout": {
+ "type": "keyword"
+ }
+ }
+ },
+ "deletion": {
+ "properties": {
+ "schedule": {
+ "properties": {
+ "cron": {
+ "properties": {
+ "expression": {
+ "type": "keyword"
+ },
+ "timezone": {
+ "type": "keyword"
+ },
+ "schedule_delay": {
+ "type": "long"
+ }
+ }
+ }
+ }
+ },
+ "timeout": {
+ "type": "keyword"
+ },
+ "condition": {
+ "properties": {
+ "max_count": {
+ "type": "integer"
+ },
+ "max_age": {
+ "type": "keyword"
+ },
+ "min_count": {
+ "type": "integer"
+ }
+ }
+ }
+ }
+ },
+ "snapshot_config": {
+ "type": "object",
+ "enabled": false
+ },
+ "enabled": {
+ "type": "boolean"
+ },
+ "enabled_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "last_updated_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "schedule": {
+ "properties": {
+ "interval": {
+ "properties": {
+ "period": {
+ "type": "integer"
+ },
+ "unit": {
+ "type": "keyword"
+ },
+ "start_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "schedule_delay": {
+ "type": "long"
+ }
+ }
+ },
+ "cron": {
+ "properties": {
+ "expression": {
+ "type": "keyword"
+ },
+ "timezone": {
+ "type": "keyword"
+ },
+ "schedule_delay": {
+ "type": "long"
+ }
+ }
+ }
+ }
+ },
+ "user": {
+ "properties": {
+ "name": {
+ "type": "text",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "backend_roles": {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "type" : "keyword"
+ }
+ }
+ },
+ "roles": {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "type" : "keyword"
+ }
+ }
+ },
+ "custom_attribute_names": {
+ "type" : "text",
+ "fields" : {
+ "keyword" : {
+ "type" : "keyword"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "sm_metadata": {
+ "properties": {
+ "policy_seq_no": {
+ "type": "long"
+ },
+ "policy_primary_term": {
+ "type": "long"
+ },
+ "current_state": {
+ "type": "keyword"
+ },
+ "atomic": {
+ "type": "boolean"
+ },
+ "info": {
+ "type": "object",
+ "enabled": false
+ },
+ "creation": {
+ "properties": {
+ "trigger": {
+ "properties": {
+ "time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ },
+ "started": {
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "started_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "end_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ },
+ "finished": {
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "started_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "end_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ }
+ }
+ },
+ "deletion": {
+ "properties": {
+ "trigger": {
+ "properties": {
+ "time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ },
+ "started": {
+ "type": "nested",
+ "properties": {
+ "name": {
+ "type": "keyword"
+ },
+ "started_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ },
+ "end_time": {
+ "type": "date",
+ "format": "strict_date_time||epoch_millis"
+ }
+ }
+ }
+ }
+ }
+ }
}
}
}
diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt
new file mode 100644
index 000000000..805e7f4e2
--- /dev/null
+++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt
@@ -0,0 +1,146 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.indexmanagement.snapshotmanagement
+
+import com.nhaarman.mockitokotlin2.doReturn
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse
+import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
+import org.opensearch.action.index.IndexResponse
+import org.opensearch.common.UUIDs
+import org.opensearch.common.unit.TimeValue
+import org.opensearch.indexmanagement.randomCronSchedule
+import org.opensearch.indexmanagement.randomInstant
+import org.opensearch.indexmanagement.randomIntervalSchedule
+import org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine.SMState
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
+import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
+import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout
+import org.opensearch.jobscheduler.spi.schedule.CronSchedule
+import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
+import org.opensearch.rest.RestStatus
+import org.opensearch.snapshots.SnapshotId
+import org.opensearch.snapshots.SnapshotInfo
+import org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength
+import org.opensearch.test.OpenSearchTestCase.randomNonNegativeLong
+import java.time.Instant
+import java.time.Instant.now
+
+fun randomSMMetadata(
+ currentState: SMState,
+ nextCreationTime: Instant = now(),
+ nextDeletionTime: Instant = now(),
+ policySeqNo: Long = randomNonNegativeLong(),
+ policyPrimaryTerm: Long = randomNonNegativeLong(),
+ atomic: Boolean = false,
+): SMMetadata {
+ return SMMetadata(
+ policySeqNo = policySeqNo,
+ policyPrimaryTerm = policyPrimaryTerm,
+ currentState = currentState,
+ atomic = atomic,
+ creation = SMMetadata.Creation(
+ trigger = SMMetadata.Trigger(
+ time = nextCreationTime
+ )
+ ),
+ deletion = SMMetadata.Deletion(
+ trigger = SMMetadata.Trigger(
+ time = nextDeletionTime
+ )
+ ),
+ )
+}
+
+fun randomSMPolicy(
+ policyName: String = randomAlphaOfLength(10),
+ jobEnabled: Boolean = false,
+ jobLastUpdateTime: Instant = randomInstant(),
+ creationSchedule: CronSchedule = randomCronSchedule(),
+ creationTimeout: ActionTimeout? = null,
+ deletionSchedule: CronSchedule = randomCronSchedule(),
+ deletionTimeout: ActionTimeout? = null,
+ deletionMaxCount: Int = 5,
+ deletionMaxAge: TimeValue? = null,
+ deletionMinCount: Int? = null,
+ snapshotConfig: Map = mapOf(
+ "repository" to "repo",
+ "date_format" to "yyyy-MM-dd-HH:mm"
+ ),
+ jobEnabledTime: Instant = randomInstant(),
+ jobSchedule: IntervalSchedule = randomIntervalSchedule()
+): SMPolicy {
+ return SMPolicy(
+ policyName = policyName,
+ jobEnabled = jobEnabled,
+ jobLastUpdateTime = jobLastUpdateTime,
+ creation = SMPolicy.Creation(
+ schedule = creationSchedule,
+ timeout = creationTimeout,
+ ),
+ deletion = SMPolicy.Deletion(
+ schedule = deletionSchedule,
+ timeout = deletionTimeout,
+ condition = SMPolicy.DeleteCondition(
+ maxCount = deletionMaxCount,
+ maxAge = deletionMaxAge,
+ minCount = deletionMinCount
+ )
+ ),
+ snapshotConfig = snapshotConfig,
+ jobEnabledTime = jobEnabledTime,
+ jobSchedule = jobSchedule
+ )
+}
+
+fun mockIndexResponse(status: RestStatus = RestStatus.OK): IndexResponse {
+ val indexResponse: IndexResponse = mock()
+ whenever(indexResponse.status()).doReturn(status)
+ whenever(indexResponse.seqNo).doReturn(0L)
+ whenever(indexResponse.primaryTerm).doReturn(1L)
+
+ return indexResponse
+}
+
+fun mockCreateSnapshotResponse(status: RestStatus = RestStatus.ACCEPTED): CreateSnapshotResponse {
+ val createSnapshotRes: CreateSnapshotResponse = mock()
+ whenever(createSnapshotRes.status()).doReturn(status)
+ return createSnapshotRes
+}
+
+fun mockGetSnapshotResponse(num: Int): GetSnapshotsResponse {
+ val getSnapshotsRes: GetSnapshotsResponse = mock()
+ whenever(getSnapshotsRes.snapshots).doReturn(mockSnapshots(num))
+ return getSnapshotsRes
+}
+
+fun mockSnapshots(num: Int): List {
+ val result = mutableListOf()
+ for (i in 1..num) {
+ result.add(mockSnapshotInfo(idNum = i))
+ }
+ return result.toList()
+}
+
+/**
+ * For our use case, only mock snapshotId, startTime and endTime
+ */
+fun mockSnapshotInfo(idNum: Int, startTime: Long = randomNonNegativeLong(), endTime: Long = randomNonNegativeLong()): SnapshotInfo {
+ val snapshotId = SnapshotId("mock_snapshot-$idNum}", UUIDs.randomBase64UUID())
+ return SnapshotInfo(
+ snapshotId,
+ listOf("index1"),
+ listOf("ds-1"),
+ startTime,
+ "",
+ endTime,
+ 5,
+ emptyList(),
+ false,
+ emptyMap(),
+ )
+}
diff --git a/worksheets/ism/all_actions.http b/worksheets/ism/all_actions.http
new file mode 100644
index 000000000..f9c625f11
--- /dev/null
+++ b/worksheets/ism/all_actions.http
@@ -0,0 +1,487 @@
+###
+PUT localhost:9200/_opendistro/_ism/policies/rollover
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "rollover": {
+ "min_doc_count": 0
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["rollover*"],
+ "priority": 100
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/rollover_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/force_merge
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "force_merge": {
+ "max_num_segments": 1
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["force_merge*"],
+ "priority": 100
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/force_merge_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/read_only
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "read_only": {}
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["read_only*"],
+ "priority": 100
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/read_only_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/read_write
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "read_write": {}
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["read_write*"],
+ "priority": 100
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/read_write_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/replica_count
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "replica_count": {
+ "number_of_replicas": 2
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["replica_count*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/replica_count_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/close
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "close": {}
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["close*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/close_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/open
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "open": {}
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["open*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/open_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/notification
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "notification": {
+ "destination": {
+ "chime": {
+ "url": ""
+ }
+ },
+ "message_template": {
+ "source": "the index is {{ctx.index}}"
+ }
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["notification*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/notification_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/snapshot
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "snapshot": {
+ "repository": "my_backup",
+ "snapshot": "snapshot1"
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["snapshot*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/snapshot_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/index_priority
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "index_priority": {
+ "priority": 50
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["index_priority*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/index_priority_index
+Content-Type: application/json
+
+###
+PUT localhost:9200/_opendistro/_ism/policies/allocation
+Content-Type: application/json
+
+{
+ "policy": {
+ "description": "action test",
+ "default_state": "action",
+ "states": [
+ {
+ "name": "action",
+ "actions": [
+ {
+ "allocation": {
+ "require": { "temp": "warm" }
+ }
+ }
+ ],
+ "transitions": [
+ {
+ "state_name": "delete"
+ }
+ ]
+ },
+ {
+ "name": "delete",
+ "actions": [
+ {
+ "delete": {}
+ }
+ ],
+ "transitions": []
+ }
+ ],
+ "ism_template": {
+ "index_patterns": ["allocation*"]
+ }
+ }
+}
+
+###
+PUT http://localhost:9200/allocation_index
+Content-Type: application/json
diff --git a/worksheets/sm/create.http b/worksheets/sm/create.http
new file mode 100644
index 000000000..9b0d4b648
--- /dev/null
+++ b/worksheets/sm/create.http
@@ -0,0 +1,84 @@
+###
+GET localhost:9200/
+Accept: application/json
+
+###
+POST localhost:9200/.opendistro-ism-config/_search
+Content-Type: application/json
+
+{
+ "seq_no_primary_term": true
+}
+
+###
+DELETE localhost:9200/.opendistro-ism-config
+
+### index sm
+PUT localhost:9200/_plugins/_sm/daily_snapshot
+Content-Type: application/json
+
+{
+ "description": "A daily snapshot policy",
+ "creation": {
+ "schedule": {
+ "cron": {
+ "expression": "* * * * *",
+ "timezone": "America/Los_Angeles"
+ }
+ }
+ },
+ "deletion": {
+ "schedule": {
+ "cron": {
+ "expression": "*/2 * * * *",
+ "timezone": "America/Los_Angeles"
+ }
+ },
+ "condition": {
+ "max_count": 5
+ }
+ },
+ "snapshot_config": {
+ "repository": "repo",
+ "date_format": "yyyy-MM-dd-HH:mm",
+ "indices": "*",
+ "partial": "false",
+ "include_global_state": "false",
+ "ignore_unavailable": "true",
+ "metadata": {
+ "taken_by": "snapshot management"
+ }
+ }
+}
+
+###
+# "schedule": {
+# "interval": {
+# "start_time": 1645596970,
+# "period": 1,
+# "unit": "MINUTES"
+# }
+# },
+
+### register repo
+PUT localhost:9200/_snapshot/repo
+Content-Type: application/json
+
+{
+ "type": "fs",
+ "settings": {
+ "location": "my_backup_location"
+ }
+}
+
+### delete repo
+DELETE localhost:9200/_snapshot/repo
+
+### create snapshot
+PUT localhost:9200/_snapshot/repo/my_snapshot
+
+### delete snapshot
+DELETE localhost:9200/_snapshot/repo/test
+
+### get snapshot status
+GET localhost:9200/_snapshot/repo/daily_snapshot*