From 7d6fd166ab4653a5387a97766e93c5b205e4a9d3 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Tue, 17 May 2022 13:24:00 -0700 Subject: [PATCH] SM dev - SM datamodel and CRUD skeleton (#355) * Strengthen scroll search in Coordinator (#356) * Removes rc1 qualifier (#353) * Allows error in shrink test * Adds workflow to create documentation issues (#358) * Adds documentation issue workflow * Makes the issue template more relevant * Data model and Basic CRUD APIs Signed-off-by: bowenlan-amzn --- .github/ISSUE_TEMPLATE/documentation.md | 11 + .../workflows/create-documentation-issue.yml | 41 ++ .idea/copyright/OpenSearch.xml | 2 +- .idea/copyright/profiles_settings.xml | 4 +- build.gradle | 10 +- detekt.yml | 2 +- spi/build.gradle | 10 + .../indexmanagement/IndexManagementPlugin.kt | 26 +- .../ManagedIndexCoordinator.kt | 61 ++- .../model/ManagedIndexConfig.kt | 2 +- .../indexstatemanagement/model/Policy.kt | 2 +- .../resthandler/RestIndexPolicyAction.kt | 5 +- .../action/indexpolicy/IndexPolicyRequest.kt | 3 +- .../util/ManagedIndexUtils.kt | 11 +- .../opensearchapi/OpenSearchExtensions.kt | 34 ++ .../rollup/RollupMetadataService.kt | 2 +- .../indexmanagement/rollup/model/Rollup.kt | 2 +- .../rollup/model/RollupMetadata.kt | 2 +- .../resthandler/RestIndexRollupAction.kt | 5 +- .../resthandler/RestStartRollupAction.kt | 4 +- .../resthandler/RestStopRollupAction.kt | 6 +- .../snapshotmanagement/SMUtils.kt | 15 + .../api/resthandler/RestSMPolicyHandler.kt | 115 +++++ .../api/transport/BaseTransportAction.kt | 67 +++ .../api/transport/SMActions.kt | 34 ++ .../transport/delete/DeleteSMPolicyRequest.kt | 27 + .../delete/DeleteSMPolicyResponse.kt | 32 ++ .../delete/TransportDeleteSMPolicyAction.kt | 42 ++ .../api/transport/get/GetSMPolicyRequest.kt | 27 + .../api/transport/get/GetSMPolicyResponse.kt | 32 ++ .../get/TransportGetSMPolicyAction.kt | 47 ++ .../transport/index/IndexSMPolicyRequest.kt | 31 ++ .../transport/index/IndexSMPolicyResponse.kt | 31 ++ .../index/TransportIndexSMPolicyAction.kt | 47 ++ .../engine/statemachine/SMState.kt | 8 + .../snapshotmanagement/model/SMMetadata.kt | 361 +++++++++++++ .../snapshotmanagement/model/SMPolicy.kt | 369 +++++++++++++ .../transform/model/Transform.kt | 2 +- .../resthandler/RestIndexTransformAction.kt | 5 +- .../resthandler/RestStartTransformAction.kt | 4 +- .../resthandler/RestStopTransformAction.kt | 6 +- .../indexmanagement/util/IndexUtils.kt | 1 + .../mappings/opendistro-ism-config.json | 246 ++++++++- .../IndexManagementRestTestCase.kt | 2 +- ...IndexManagementBackwardsCompatibilityIT.kt | 4 +- .../indexstatemanagement/model/ActionTests.kt | 5 +- .../IndexStateManagementRestApiIT.kt | 5 +- .../resthandler/RestIndexRollupActionIT.kt | 6 +- .../snapshotmanagement/SMUtilsTests.kt | 15 + .../snapshotmanagement/utils.kt | 142 +++++ .../resthandler/RestIndexTransformActionIT.kt | 4 +- .../cached-opendistro-ism-config.json | 246 ++++++++- worksheets/ism/all_actions.http | 487 ++++++++++++++++++ worksheets/sm/create.http | 84 +++ 54 files changed, 2725 insertions(+), 69 deletions(-) create mode 100644 .github/ISSUE_TEMPLATE/documentation.md create mode 100644 .github/workflows/create-documentation-issue.yml create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyRequest.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyResponse.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyRequest.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyResponse.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyRequest.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyResponse.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMState.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtilsTests.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt create mode 100644 worksheets/ism/all_actions.http create mode 100644 worksheets/sm/create.http diff --git a/.github/ISSUE_TEMPLATE/documentation.md b/.github/ISSUE_TEMPLATE/documentation.md new file mode 100644 index 000000000..9c4ca695c --- /dev/null +++ b/.github/ISSUE_TEMPLATE/documentation.md @@ -0,0 +1,11 @@ +**Is your feature request related to a problem?** +A new feature has been added. + +**What solution would you like?** +Document the usage of the new feature. + +**What alternatives have you considered?** +N/A + +**Do you have any additional context?** +_Add any other context or screenshots about the feature request here._ diff --git a/.github/workflows/create-documentation-issue.yml b/.github/workflows/create-documentation-issue.yml new file mode 100644 index 000000000..23bc47e6f --- /dev/null +++ b/.github/workflows/create-documentation-issue.yml @@ -0,0 +1,41 @@ +name: Create Documentation Issue +on: + pull_request: + types: + - labeled +env: + PR_NUMBER: ${{ github.event.number }} + +jobs: + create-issue: + if: ${{ github.event.label.name == 'needs-documentation' }} + runs-on: ubuntu-latest + name: Create Documentation Issue + steps: + - name: GitHub App token + id: github_app_token + uses: tibdex/github-app-token@v1.5.0 + with: + app_id: ${{ secrets.APP_ID }} + private_key: ${{ secrets.APP_PRIVATE_KEY }} + installation_id: 22958780 + + - name: Checkout code + uses: actions/checkout@v2 + + - name: Edit the issue template + run: | + echo "https://github.com/opensearch-project/index-management/pull/${{ env.PR_NUMBER }}." >> ./.github/ISSUE_TEMPLATE/documentation.md + + - name: Create Issue From File + id: create-issue + uses: peter-evans/create-issue-from-file@v4 + with: + title: Add documentation related to new feature + content-filepath: ./.github/ISSUE_TEMPLATE/documentation.md + labels: documentation + repository: opensearch-project/documentation-website + token: ${{ steps.github_app_token.outputs.token }} + + - name: Print Issue + run: echo Created related documentation issue ${{ steps.create-issue.outputs.issue-number }} diff --git a/.idea/copyright/OpenSearch.xml b/.idea/copyright/OpenSearch.xml index dadc9ed84..7c7f3c26f 100644 --- a/.idea/copyright/OpenSearch.xml +++ b/.idea/copyright/OpenSearch.xml @@ -1,6 +1,6 @@ - \ No newline at end of file diff --git a/.idea/copyright/profiles_settings.xml b/.idea/copyright/profiles_settings.xml index 5f45523cc..97a289b2f 100644 --- a/.idea/copyright/profiles_settings.xml +++ b/.idea/copyright/profiles_settings.xml @@ -1,7 +1,7 @@ - + - + \ No newline at end of file diff --git a/build.gradle b/build.gradle index c2cdb3c6f..499f13e1c 100644 --- a/build.gradle +++ b/build.gradle @@ -14,9 +14,9 @@ import java.util.function.Predicate buildscript { ext { isSnapshot = "true" == System.getProperty("build.snapshot", "true") - opensearch_version = System.getProperty("opensearch.version", "2.0.0-rc1-SNAPSHOT") - buildVersionQualifier = System.getProperty("build.version_qualifier", "rc1") - // 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT + opensearch_version = System.getProperty("opensearch.version", "2.0.0-SNAPSHOT") + buildVersionQualifier = System.getProperty("build.version_qualifier", "") + // 2.0.0-SNAPSHOT -> 2.0.0.0-SNAPSHOT version_tokens = opensearch_version.tokenize('-') opensearch_build = version_tokens[0] + '.0' job_scheduler_no_snapshot = opensearch_build @@ -99,6 +99,10 @@ configurations.all { } } +idea.module { + excludeDirs -= file("$buildDir") +} + def usingRemoteCluster = System.properties.containsKey('tests.rest.cluster') || System.properties.containsKey('tests.cluster') def usingMultiNode = project.properties.containsKey('numNodes') // Only apply jacoco test coverage if we are running a local single node cluster diff --git a/detekt.yml b/detekt.yml index 47b9d163c..78ff93acb 100644 --- a/detekt.yml +++ b/detekt.yml @@ -1,6 +1,6 @@ # TODO: Remove this before initial release, only for developmental purposes build: - maxIssues: 20 + maxIssues: 100 exceptions: TooGenericExceptionCaught: diff --git a/spi/build.gradle b/spi/build.gradle index f8ba3f69b..4b398668a 100644 --- a/spi/build.gradle +++ b/spi/build.gradle @@ -15,6 +15,7 @@ apply plugin: 'opensearch.java-rest-test' apply plugin: 'kotlin' apply plugin: 'org.jetbrains.kotlin.jvm' apply plugin: 'org.jetbrains.kotlin.plugin.allopen' +apply plugin: 'idea' ext { projectSubstitutions = [:] @@ -64,6 +65,15 @@ dependencies { testImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}" } +idea.module { + excludeDirs -= file("$buildDir") +} + +task sourcesJar(type: Jar, dependsOn: classes) { + classifier = 'sources' + from sourceSets.main.allSource +} + test { doFirst { test.classpath -= project.files(project.tasks.named('shadowJar')) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 609f87330..8463b60df 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -111,6 +111,13 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings import org.opensearch.indexmanagement.rollup.settings.RollupSettings import org.opensearch.indexmanagement.settings.IndexManagementSettings +import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestSMPolicyHandler +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMPolicyAction +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.TransportGetSMPolicyAction +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMPolicyAction +import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy import org.opensearch.indexmanagement.spi.IndexManagementExtension import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker @@ -164,6 +171,8 @@ import java.util.function.Supplier @Suppress("TooManyFunctions") class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, Plugin() { + private val log = LogManager.getLogger(javaClass) + private val logger = LogManager.getLogger(javaClass) lateinit var indexManagementIndices: IndexManagementIndices lateinit var clusterService: ClusterService @@ -187,6 +196,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management" const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data" + const val SM_BASE_URI = "$PLUGINS_BASE_URI/_sm" + const val OLD_PLUGIN_NAME = "opendistro-im" const val OPEN_DISTRO_BASE_URI = "/_opendistro" const val LEGACY_ISM_BASE_URI = "$OPEN_DISTRO_BASE_URI/_ism" @@ -208,6 +219,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin @Suppress("ComplexMethod") override fun getJobParser(): ScheduledJobParser { return ScheduledJobParser { xcp, id, jobDocVersion -> + log.info("sm dev: plugin job parser") ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) while (xcp.nextToken() != Token.END_OBJECT) { val fieldName = xcp.currentName() @@ -235,6 +247,12 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE -> { return@ScheduledJobParser null } + SMPolicy.SM_TYPE -> { + return@ScheduledJobParser SMPolicy.parse(xcp, id, jobDocVersion.seqNo, jobDocVersion.primaryTerm) + } + SMMetadata.SM_METADATA_TYPE -> { + return@ScheduledJobParser null + } else -> { logger.warn("Unsupported document was indexed in $INDEX_MANAGEMENT_INDEX with type: $fieldName") xcp.skipChildren() @@ -300,7 +318,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin RestDeleteTransformAction(), RestExplainTransformAction(), RestStartTransformAction(), - RestStopTransformAction() + RestStopTransformAction(), + RestSMPolicyHandler(), ) } @@ -502,7 +521,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin ActionPlugin.ActionHandler(ExplainTransformAction.INSTANCE, TransportExplainTransformAction::class.java), ActionPlugin.ActionHandler(StartTransformAction.INSTANCE, TransportStartTransformAction::class.java), ActionPlugin.ActionHandler(StopTransformAction.INSTANCE, TransportStopTransformAction::class.java), - ActionPlugin.ActionHandler(ManagedIndexAction.INSTANCE, TransportManagedIndexAction::class.java) + ActionPlugin.ActionHandler(ManagedIndexAction.INSTANCE, TransportManagedIndexAction::class.java), + ActionPlugin.ActionHandler(SMActions.INDEX_SM_ACTION_TYPE, TransportIndexSMPolicyAction::class.java), + ActionPlugin.ActionHandler(SMActions.GET_SM_ACTION_TYPE, TransportGetSMPolicyAction::class.java), + ActionPlugin.ActionHandler(SMActions.DELETE_SM_ACTION_TYPE, TransportDeleteSMPolicyAction::class.java), ) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index ae43a4829..23da3b8a4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -633,28 +633,49 @@ class ManagedIndexCoordinator( suspend fun sweepManagedIndexJobs(client: Client): List { val managedIndexUuids = mutableListOf() - val managedIndexSearchRequest = getSweptManagedIndexSearchRequest() - var response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) } - var uuids = response.hits.map { it.id } - val scrollIDsToClear = mutableSetOf() - - while (uuids.isNotEmpty()) { - managedIndexUuids.addAll(uuids) - val scrollID = response.scrollId - scrollIDsToClear.add(scrollID) - val scrollRequest = SearchScrollRequest().scrollId(scrollID).scroll(TimeValue.timeValueMinutes(1)) - response = client.suspendUntil { searchScroll(scrollRequest, it) } - uuids = response.hits.map { it.id } - } + // if # of documents below 10k, don't use scroll search + val countReq = getSweptManagedIndexSearchRequest(size = 0) + val countRes: SearchResponse = client.suspendUntil { search(countReq, it) } + val totalHits = countRes.hits.totalHits ?: return managedIndexUuids - if (scrollIDsToClear.isNotEmpty()) { - val clearScrollRequest = ClearScrollRequest() - clearScrollRequest.scrollIds(scrollIDsToClear.toList()) - val clearScrollResponse: ClearScrollResponse = - client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) } + if (totalHits.value >= MAX_HITS) { + val scrollIDsToClear = mutableSetOf() + try { + val managedIndexSearchRequest = getSweptManagedIndexSearchRequest(scroll = true) + var response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) } + var uuids = transformManagedIndexSearchRes(response) + + while (uuids.isNotEmpty()) { + managedIndexUuids.addAll(uuids) + val scrollID = response.scrollId + scrollIDsToClear.add(scrollID) + val scrollRequest = SearchScrollRequest().scrollId(scrollID).scroll(TimeValue.timeValueMinutes(1)) + response = client.suspendUntil { searchScroll(scrollRequest, it) } + uuids = transformManagedIndexSearchRes(response) + } + } finally { + if (scrollIDsToClear.isNotEmpty()) { + val clearScrollRequest = ClearScrollRequest() + clearScrollRequest.scrollIds(scrollIDsToClear.toList()) + val clearScrollResponse: ClearScrollResponse = + client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) } + } + } + return managedIndexUuids } - return managedIndexUuids + val response: SearchResponse = client.suspendUntil { search(getSweptManagedIndexSearchRequest(), it) } + return transformManagedIndexSearchRes(response) + } + + fun transformManagedIndexSearchRes(response: SearchResponse): List { + if (response.isTimedOut || response.failedShards > 0 || response.skippedShards > 0) { + val errorMsg = "Sweep managed indices failed. Timed out: ${response.isTimedOut} | " + + "Failed shards: ${response.failedShards} | Skipped shards: ${response.skippedShards}." + logger.error(errorMsg) + throw ISMCoordinatorSearchException(message = errorMsg) + } + return response.hits.map { it.id } } /** @@ -736,3 +757,5 @@ class ManagedIndexCoordinator( const val BUFFER = 20L } } + +class ISMCoordinatorSearchException(message: String, cause: Throwable? = null) : Exception(message, cause) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt index ee32f50ea..acaabc864 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt @@ -14,6 +14,7 @@ import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE import org.opensearch.indexmanagement.opensearchapi.instant import org.opensearch.indexmanagement.opensearchapi.optionalTimeField +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.schedule.Schedule import org.opensearch.jobscheduler.spi.schedule.ScheduleParser @@ -86,7 +87,6 @@ data class ManagedIndexConfig( companion object { const val MANAGED_INDEX_TYPE = "managed_index" - const val NO_ID = "" const val NAME_FIELD = "name" const val ENABLED_FIELD = "enabled" const val SCHEDULE_FIELD = "schedule" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt index 432fa8e0d..6f5dd0377 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/Policy.kt @@ -24,6 +24,7 @@ import org.opensearch.indexmanagement.opensearchapi.optionalTimeField import org.opensearch.indexmanagement.opensearchapi.optionalUserField import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.indexmanagement.util.NO_ID import java.io.IOException import java.time.Instant @@ -147,7 +148,6 @@ data class Policy( const val POLICY_TYPE = "policy" const val POLICY_ID_FIELD = "policy_id" const val DESCRIPTION_FIELD = "description" - const val NO_ID = "" const val LAST_UPDATED_TIME_FIELD = "last_updated_time" const val SCHEMA_VERSION_FIELD = "schema_version" const val ERROR_NOTIFICATION_FIELD = "error_notification" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestIndexPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestIndexPolicyAction.kt index dd98093a0..77f57689c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestIndexPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestIndexPolicyAction.kt @@ -21,6 +21,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.inde import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.indexmanagement.util.IF_PRIMARY_TERM import org.opensearch.indexmanagement.util.IF_SEQ_NO +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.indexmanagement.util.REFRESH import org.opensearch.rest.BaseRestHandler import org.opensearch.rest.BaseRestHandler.RestChannelConsumer @@ -69,8 +70,8 @@ class RestIndexPolicyAction( @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val id = request.param("policyID", Policy.NO_ID) - if (Policy.NO_ID == id) { + val id = request.param("policyID", NO_ID) + if (NO_ID == id) { throw IllegalArgumentException("Missing policy ID") } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/IndexPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/IndexPolicyRequest.kt index fe1f60c74..4cc515cd6 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/IndexPolicyRequest.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/IndexPolicyRequest.kt @@ -12,6 +12,7 @@ import org.opensearch.action.support.WriteRequest import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.indexmanagement.indexstatemanagement.model.Policy +import org.opensearch.indexmanagement.util.NO_ID import java.io.IOException class IndexPolicyRequest : ActionRequest { @@ -47,7 +48,7 @@ class IndexPolicyRequest : ActionRequest { override fun validate(): ActionRequestValidationException? { var validationException: ActionRequestValidationException? = null - if (Policy.NO_ID == policyID) { + if (NO_ID == policyID) { validationException = ValidateActions.addValidationError("Missing policyID", validationException) } return validationException diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index 769b2d22a..17c6fdeba 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -188,18 +188,19 @@ fun getManagedIndicesToDelete( } } -fun getSweptManagedIndexSearchRequest(): SearchRequest { +fun getSweptManagedIndexSearchRequest(scroll: Boolean = false, size: Int = ManagedIndexCoordinator.MAX_HITS): SearchRequest { val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.existsQuery(ManagedIndexConfig.MANAGED_INDEX_TYPE)) - return SearchRequest() - .indices(INDEX_MANAGEMENT_INDEX) - .scroll(TimeValue.timeValueMinutes(1)) + val req = SearchRequest().indices(INDEX_MANAGEMENT_INDEX) + .allowPartialSearchResults(false) .source( SearchSourceBuilder.searchSource() - .size(ManagedIndexCoordinator.MAX_HITS) + .size(size) .seqNoAndPrimaryTerm(true) .fetchSource(emptyArray(), emptyArray()) .query(boolQueryBuilder) ) + if (scroll) req.scroll(TimeValue.timeValueMinutes(1)) + return req } @Suppress("ReturnCount", "ComplexCondition") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt index 726df1adf..834284334 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt @@ -23,6 +23,9 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.DefaultShardOperationFailedException import org.opensearch.client.OpenSearchClient import org.opensearch.common.bytes.BytesReference +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.common.util.concurrent.ThreadContext @@ -306,3 +309,34 @@ suspend fun withClosableContext( context.injector.close() } } + +fun XContentBuilder.optionalField(name: String, value: Any?): XContentBuilder { + return if (value != null) { this.field(name, value) } else this +} + +inline fun XContentParser.nullValueHandler(block: XContentParser.() -> T): T? { + return if (currentToken() == Token.VALUE_NULL) null else block() +} + +inline fun XContentParser.parseArray(block: XContentParser.() -> T): List { + val resArr = mutableListOf() + ensureExpectedToken(Token.START_ARRAY, currentToken(), this) + while (nextToken() != Token.END_ARRAY) { + resArr.add(block()) + } + return resArr +} + +// similar to readOptionalWriteable +fun StreamInput.readOptionalValue(value: T): T? { + return if (readBoolean()) { value } else null +} + +fun StreamOutput.writeOptionalValue(value: T, writer: Writeable.Writer) { + if (value == null) { + writeBoolean(false) + } else { + writeBoolean(true) + writer.write(this, value) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt index 823dee303..bfa939c55 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt @@ -33,9 +33,9 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.rollup.model.ContinuousMetadata import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata -import org.opensearch.indexmanagement.rollup.model.RollupMetadata.Companion.NO_ID import org.opensearch.indexmanagement.rollup.model.RollupStats import org.opensearch.indexmanagement.rollup.util.DATE_FIELD_EPOCH_MILLIS_FORMAT +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.search.aggregations.bucket.composite.InternalComposite import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt index dea0cfe0d..efe5c52a7 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt @@ -25,6 +25,7 @@ import org.opensearch.indexmanagement.opensearchapi.instant import org.opensearch.indexmanagement.opensearchapi.optionalTimeField import org.opensearch.indexmanagement.opensearchapi.optionalUserField import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.indexmanagement.util._ID import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.schedule.CronSchedule @@ -220,7 +221,6 @@ data class Rollup( const val ROLLUP_LOCK_DURATION_SECONDS = 1800L // 30 minutes const val ROLLUP_TYPE = "rollup" const val ROLLUP_ID_FIELD = "rollup_id" - const val NO_ID = "" const val ENABLED_FIELD = "enabled" const val SCHEMA_VERSION_FIELD = "schema_version" const val SCHEDULE_FIELD = "schedule" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/RollupMetadata.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/RollupMetadata.kt index c56a1648c..5fb708719 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/RollupMetadata.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/RollupMetadata.kt @@ -18,6 +18,7 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_TYPE import org.opensearch.indexmanagement.opensearchapi.instant +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.search.aggregations.bucket.composite.InternalComposite import java.io.IOException import java.time.Instant @@ -228,7 +229,6 @@ data class RollupMetadata( companion object { const val ROLLUP_METADATA_TYPE = "rollup_metadata" - const val NO_ID = "" const val ROLLUP_ID_FIELD = "rollup_id" const val AFTER_KEY_FIELD = "after_key" const val LAST_UPDATED_FIELD = "last_updated_time" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupAction.kt index 3c4ae5aa4..4f8a8f112 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupAction.kt @@ -18,6 +18,7 @@ import org.opensearch.indexmanagement.rollup.action.index.IndexRollupResponse import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.util.IF_PRIMARY_TERM import org.opensearch.indexmanagement.util.IF_SEQ_NO +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.indexmanagement.util.REFRESH import org.opensearch.rest.BaseRestHandler import org.opensearch.rest.BaseRestHandler.RestChannelConsumer @@ -58,8 +59,8 @@ class RestIndexRollupAction : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val id = request.param("rollupID", Rollup.NO_ID) - if (Rollup.NO_ID == id) { + val id = request.param("rollupID", NO_ID) + if (NO_ID == id) { throw IllegalArgumentException("Missing rollup ID") } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupAction.kt index 6e1289f39..ece151c24 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStartRollupAction.kt @@ -10,7 +10,7 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_ROL import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI import org.opensearch.indexmanagement.rollup.action.start.StartRollupAction import org.opensearch.indexmanagement.rollup.action.start.StartRollupRequest -import org.opensearch.indexmanagement.rollup.model.Rollup +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.rest.BaseRestHandler import org.opensearch.rest.BaseRestHandler.RestChannelConsumer import org.opensearch.rest.RestHandler.ReplacedRoute @@ -41,7 +41,7 @@ class RestStartRollupAction : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val id = request.param("rollupID", Rollup.NO_ID) + val id = request.param("rollupID", NO_ID) val startRequest = StartRollupRequest(id) return RestChannelConsumer { channel -> client.execute(StartRollupAction.INSTANCE, startRequest, RestToXContentListener(channel)) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupAction.kt index 799e0d7a2..f4de4d63b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestStopRollupAction.kt @@ -10,7 +10,7 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_ROL import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI import org.opensearch.indexmanagement.rollup.action.stop.StopRollupAction import org.opensearch.indexmanagement.rollup.action.stop.StopRollupRequest -import org.opensearch.indexmanagement.rollup.model.Rollup +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.rest.BaseRestHandler import org.opensearch.rest.BaseRestHandler.RestChannelConsumer import org.opensearch.rest.RestHandler.ReplacedRoute @@ -41,8 +41,8 @@ class RestStopRollupAction : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val id = request.param("rollupID", Rollup.NO_ID) - if (Rollup.NO_ID == id) { + val id = request.param("rollupID", NO_ID) + if (NO_ID == id) { throw IllegalArgumentException("Missing rollup ID") } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt new file mode 100644 index 000000000..51f3bcf7d --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement + +import org.apache.logging.log4j.LogManager + +private val log = LogManager.getLogger("Snapshot Management Helper") + +const val smSuffix = "-sm" +fun smPolicyNameToDocId(policyName: String) = "$policyName$smSuffix" +fun smDocIdToPolicyName(id: String) = id.substringBeforeLast(smSuffix) +fun getSMMetadataDocId(policyName: String) = "$policyName-sm-metadata" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt new file mode 100644 index 000000000..7a3237b08 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.api.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.client.node.NodeClient +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.SM_BASE_URI +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_ACTION_TYPE +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_ACTION_TYPE +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMPolicyRequest +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMPolicyRequest +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyRequest +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy +import org.opensearch.indexmanagement.snapshotmanagement.smPolicyNameToDocId +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BytesRestResponse +import org.opensearch.rest.RestHandler.Route +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestRequest.Method.POST +import org.opensearch.rest.RestRequest.Method.PUT +import org.opensearch.rest.RestRequest.Method.GET +import org.opensearch.rest.RestRequest.Method.DELETE +import org.opensearch.rest.RestStatus +import org.opensearch.rest.action.RestToXContentListener + +class RestSMPolicyHandler : BaseRestHandler() { + + private val log = LogManager.getLogger(javaClass) + + override fun getName(): String { + return "snapshot_management_policy_rest_handler" + } + + override fun routes(): List { + return listOf( + Route(POST, "$SM_BASE_URI/{policyName}"), + Route(PUT, "$SM_BASE_URI/{policyName}"), + Route(GET, "$SM_BASE_URI/{policyName}"), + Route(DELETE, "$SM_BASE_URI/{policyName}"), + ) + } + + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + return when (request.method()) { + POST -> indexRequest(request, client, true) + PUT -> indexRequest(request, client, false) + GET -> getRequest(request, client) + DELETE -> deleteRequest(request, client) + else -> RestChannelConsumer { + it.sendResponse( + BytesRestResponse( + RestStatus.METHOD_NOT_ALLOWED, + "${request.method()} is not allowed" + ) + ) + } + } + } + + private fun getRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val policyName = request.param("policyName", "") + if (policyName == "") { + throw IllegalArgumentException("Missing policy name") + } + + return RestChannelConsumer { + client.execute( + GET_SM_ACTION_TYPE, + GetSMPolicyRequest(policyName), + RestToXContentListener(it) + ) + } + } + + private fun indexRequest(request: RestRequest, client: NodeClient, create: Boolean): RestChannelConsumer { + val policyName = request.param("policyName", "") + if (policyName == "") { + throw IllegalArgumentException("Missing policy name") + } + // TODO validate policy name validateGeneratedSnapshotName + + log.info("sm dev: receive request ${request.requiredContent().utf8ToString()}") + + val xcp = request.contentParser() + val policy = SMPolicy.parse(xcp, id = smPolicyNameToDocId(policyName)) + log.info("sm dev: policy parsed $policy") + + return RestChannelConsumer { + client.execute( + SMActions.INDEX_SM_ACTION_TYPE, + IndexSMPolicyRequest(policy, create), + RestToXContentListener(it) + ) + } + } + + private fun deleteRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val policyName = request.param("policyName", "") + if (policyName == "") { + throw IllegalArgumentException("Missing policy name") + } + + return RestChannelConsumer { + client.execute( + DELETE_SM_ACTION_TYPE, + DeleteSMPolicyRequest(policyName), + RestToXContentListener(it) + ) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt new file mode 100644 index 000000000..f0d0dcc9c --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.api.transport + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionListener +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.client.Client +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.util.concurrent.ThreadContext.StoredContext +import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT +import org.opensearch.commons.authuser.User +import org.opensearch.rest.RestStatus +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService + +abstract class BaseTransportAction( + name: String, + transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + requestReader: Writeable.Reader, +) : HandledTransportAction( + name, transportService, actionFilters, requestReader +) { + + private val log = LogManager.getLogger(javaClass) + private val coroutineScope: CoroutineScope = CoroutineScope(Dispatchers.IO) + + override fun doExecute( + task: Task, + request: Request, + listener: ActionListener + ) { + val userStr: String? = + client.threadPool().threadContext.getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + log.info("sm dev: user and roles string from thread context: $userStr") + val user: User? = User.parse(userStr) + coroutineScope.launch { + try { + client.threadPool().threadContext.stashContext().use { threadContext -> + listener.onResponse(executeRequest(request, user, threadContext)) + } + } catch (ex: Exception) { + log.error("Uncaught exception:", ex) + listener.onFailure( + OpenSearchStatusException( + ex.message, RestStatus.INTERNAL_SERVER_ERROR + ) + ) + } + } + } + + // TODO SM test could we get userStr from threadContext? suppose no + abstract suspend fun executeRequest(request: Request, user: User?, threadContext: StoredContext): Response +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt new file mode 100644 index 000000000..753acbf57 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.api.transport + +import org.opensearch.action.ActionType +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMPolicyResponse +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.TransportGetSMPolicyAction +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyResponse +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMPolicyAction +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMPolicyResponse +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMPolicyAction + +object SMActions { + /** + * [TransportIndexSMPolicyAction] + */ + const val INDEX_SM_ACTION_NAME = "cluster:admin/opensearch/snapshot_management/policy/write" + val INDEX_SM_ACTION_TYPE = ActionType(INDEX_SM_ACTION_NAME, ::IndexSMPolicyResponse) + + /** + * [TransportGetSMPolicyAction] + */ + const val GET_SM_ACTION_NAME = "cluster:admin/opensearch/snapshot_management/policy/get" + val GET_SM_ACTION_TYPE = ActionType(GET_SM_ACTION_NAME, ::GetSMPolicyResponse) + + /** + * [TransportDeleteSMPolicyAction] + */ + const val DELETE_SM_ACTION_NAME = "cluster:admin/opensearch/snapshot_management/policy/delete" + val DELETE_SM_ACTION_TYPE = ActionType(DELETE_SM_ACTION_NAME, ::DeleteSMPolicyResponse) +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyRequest.kt new file mode 100644 index 000000000..418eefb09 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyRequest.kt @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput + +class DeleteSMPolicyRequest( + val policyName: String +) : ActionRequest() { + override fun validate(): ActionRequestValidationException? { + return null + } + + constructor(sin: StreamInput) : this( + policyName = sin.readString() + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(policyName) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyResponse.kt new file mode 100644 index 000000000..6a0ecd524 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyResponse.kt @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete + +import org.opensearch.action.ActionResponse +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.common.xcontent.XContentBuilder + +class DeleteSMPolicyResponse( + val status: String +) : ActionResponse(), ToXContentObject { + + constructor(sin: StreamInput) : this( + status = sin.readString() + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(status) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field("delete", status) + .endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt new file mode 100644 index 000000000..5091f8d9d --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.delete.DeleteResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.client.Client +import org.opensearch.common.inject.Inject +import org.opensearch.common.util.concurrent.ThreadContext +import org.opensearch.commons.authuser.User +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTransportAction +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_ACTION_NAME +import org.opensearch.indexmanagement.snapshotmanagement.smPolicyNameToDocId +import org.opensearch.transport.TransportService + +class TransportDeleteSMPolicyAction @Inject constructor( + client: Client, + transportService: TransportService, + actionFilters: ActionFilters, +) : BaseTransportAction( + DELETE_SM_ACTION_NAME, transportService, client, actionFilters, ::DeleteSMPolicyRequest +) { + + private val log = LogManager.getLogger(javaClass) + + override suspend fun executeRequest( + request: DeleteSMPolicyRequest, + user: User?, + threadContext: ThreadContext.StoredContext + ): DeleteSMPolicyResponse { + val deleteReq = DeleteRequest(INDEX_MANAGEMENT_INDEX, smPolicyNameToDocId(request.policyName)) + val deleteRes: DeleteResponse = client.suspendUntil { delete(deleteReq, it) } + return DeleteSMPolicyResponse(deleteRes.status().toString()) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyRequest.kt new file mode 100644 index 000000000..cd70585c9 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyRequest.kt @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.api.transport.get + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput + +class GetSMPolicyRequest( + val policyName: String +) : ActionRequest() { + override fun validate(): ActionRequestValidationException? { + return null + } + + constructor(sin: StreamInput) : this( + policyName = sin.readString(), + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(policyName) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyResponse.kt new file mode 100644 index 000000000..02c80bcca --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyResponse.kt @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.api.transport.get + +import org.opensearch.action.ActionResponse +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy + +class GetSMPolicyResponse( + val policy: SMPolicy +) : ActionResponse(), ToXContentObject { + + constructor(sin: StreamInput) : this( + policy = SMPolicy(sin) + ) + + override fun writeTo(out: StreamOutput) { + policy.writeTo(out) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return policy.toXContent(builder, EMPTY_PARAMS) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt new file mode 100644 index 000000000..730f727f7 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.api.transport.get + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.client.Client +import org.opensearch.common.inject.Inject +import org.opensearch.common.util.concurrent.ThreadContext +import org.opensearch.commons.authuser.User +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.opensearchapi.contentParser +import org.opensearch.indexmanagement.opensearchapi.parseWithType +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTransportAction +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_ACTION_NAME +import org.opensearch.indexmanagement.snapshotmanagement.smPolicyNameToDocId +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy +import org.opensearch.transport.TransportService + +class TransportGetSMPolicyAction @Inject constructor( + client: Client, + transportService: TransportService, + actionFilters: ActionFilters, +) : BaseTransportAction( + GET_SM_ACTION_NAME, transportService, client, actionFilters, ::GetSMPolicyRequest +) { + + private val log = LogManager.getLogger(javaClass) + + override suspend fun executeRequest( + request: GetSMPolicyRequest, + user: User?, + threadContext: ThreadContext.StoredContext + ): GetSMPolicyResponse { + val getReq = GetRequest(INDEX_MANAGEMENT_INDEX, smPolicyNameToDocId(request.policyName)) + val getRes: GetResponse = client.suspendUntil { get(getReq, it) } + val xcp = contentParser(getRes.sourceAsBytesRef) + val policy = xcp.parseWithType(getRes.id, getRes.seqNo, getRes.primaryTerm, SMPolicy.Companion::parse) + return GetSMPolicyResponse(policy) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyRequest.kt new file mode 100644 index 000000000..ad5b23341 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyRequest.kt @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.api.transport.index + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy + +class IndexSMPolicyRequest( + val policy: SMPolicy, + val create: Boolean, +) : ActionRequest() { + override fun validate(): ActionRequestValidationException? { + return null + } + + constructor(sin: StreamInput) : this( + policy = SMPolicy(sin), + create = sin.readBoolean(), + ) + + override fun writeTo(out: StreamOutput) { + policy.writeTo(out) + out.writeBoolean(create) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyResponse.kt new file mode 100644 index 000000000..c3c45cdd3 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyResponse.kt @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.api.transport.index + +import org.opensearch.action.ActionResponse +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy + +class IndexSMPolicyResponse(val policy: SMPolicy) : ActionResponse(), ToXContentObject { + + constructor(sin: StreamInput) : this( + policy = SMPolicy(sin) + ) + + override fun writeTo(out: StreamOutput) { + policy.writeTo(out) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(SMPolicy.SM_TYPE, policy) + .endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt new file mode 100644 index 000000000..ab042379d --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.api.transport.index + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.index.IndexResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.client.Client +import org.opensearch.common.inject.Inject +import org.opensearch.common.util.concurrent.ThreadContext +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.commons.authuser.User +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTransportAction +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.INDEX_SM_ACTION_NAME +import org.opensearch.transport.TransportService + +class TransportIndexSMPolicyAction @Inject constructor( + client: Client, + transportService: TransportService, + actionFilters: ActionFilters, +) : BaseTransportAction( + INDEX_SM_ACTION_NAME, transportService, client, actionFilters, ::IndexSMPolicyRequest +) { + + private val log = LogManager.getLogger(javaClass) + + override suspend fun executeRequest( + request: IndexSMPolicyRequest, + user: User?, + threadContext: ThreadContext.StoredContext + ): IndexSMPolicyResponse { + val policy = request.policy + val indexReq = IndexRequest(INDEX_MANAGEMENT_INDEX) + .source(policy.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .id(policy.id) + .create(request.create) + val indexRes: IndexResponse = client.suspendUntil { index(indexReq, it) } + return IndexSMPolicyResponse(policy) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMState.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMState.kt new file mode 100644 index 000000000..99ca43705 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/statemachine/SMState.kt @@ -0,0 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine + +enum class SMState diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt new file mode 100644 index 000000000..59a551e8f --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt @@ -0,0 +1,361 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentFragment +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParser.Token +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indexmanagement.opensearchapi.instant +import org.opensearch.indexmanagement.opensearchapi.nullValueHandler +import org.opensearch.indexmanagement.opensearchapi.optionalField +import org.opensearch.indexmanagement.opensearchapi.optionalTimeField +import org.opensearch.indexmanagement.opensearchapi.parseArray +import org.opensearch.indexmanagement.opensearchapi.readOptionalValue +import org.opensearch.indexmanagement.opensearchapi.writeOptionalValue +import org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine.SMState +import org.opensearch.indexmanagement.util.NO_ID +import java.time.Instant + +typealias InfoType = Map? + +data class SMMetadata( + val policySeqNo: Long, + val policyPrimaryTerm: Long, + val currentState: SMState, + val creation: Creation, + val deletion: Deletion, + val info: InfoType = null, + val id: String = NO_ID, + val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, +) : Writeable, ToXContent { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .startObject(SM_METADATA_TYPE) + .field(POLICY_SEQ_NO_FIELD, policySeqNo) + .field(POLICY_PRIMARY_TERM_FIELD, policyPrimaryTerm) + .field(CURRENT_STATE_FIELD, currentState.toString()) + .field(CREATION_FIELD, creation) + .field(DELETION_FIELD, deletion) + .optionalField(INFO_FIELD, info) + .endObject() + .endObject() + } + + companion object { + const val SM_METADATA_TYPE = "sm_metadata" + const val POLICY_SEQ_NO_FIELD = "policy_seq_no" + const val POLICY_PRIMARY_TERM_FIELD = "policy_primary_term" + const val CURRENT_STATE_FIELD = "current_state" + const val CREATION_FIELD = "creation" + const val DELETION_FIELD = "deletion" + const val INFO_FIELD = "info" + + fun parse( + xcp: XContentParser, + id: String = NO_ID, + seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ): SMMetadata { + var policySeqNo: Long? = null + var policyPrimaryTerm: Long? = null + var currentState: SMState? = null + var atomic = false + var creation: Creation? = null + var deletion: Deletion? = null + var info: InfoType = null + + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + POLICY_SEQ_NO_FIELD -> policySeqNo = xcp.longValue() + POLICY_PRIMARY_TERM_FIELD -> policyPrimaryTerm = xcp.longValue() + CURRENT_STATE_FIELD -> currentState = SMState.valueOf(xcp.text()) + CREATION_FIELD -> creation = Creation.parse(xcp) + DELETION_FIELD -> deletion = Deletion.parse(xcp) + INFO_FIELD -> info = xcp.nullValueHandler { xcp.map() } + } + } + + return SMMetadata( + policySeqNo = requireNotNull(policySeqNo) {}, + policyPrimaryTerm = requireNotNull(policyPrimaryTerm) {}, + currentState = requireNotNull(currentState) {}, + creation = requireNotNull(creation) {}, + deletion = requireNotNull(deletion) {}, + info = info, + id = id, + seqNo = seqNo, + primaryTerm = primaryTerm + ) + } + + fun InfoType.upsert(keyValuePair: Pair): InfoType { + val info: MutableMap = this?.toMutableMap() ?: mutableMapOf() + info[keyValuePair.first] = keyValuePair.second + return info + } + } + + constructor(sin: StreamInput) : this( + policySeqNo = sin.readLong(), + policyPrimaryTerm = sin.readLong(), + currentState = sin.readEnum(SMState::class.java), + creation = Creation(sin), + deletion = Deletion(sin), + info = sin.readMap(), + id = sin.readString(), + seqNo = sin.readLong(), + primaryTerm = sin.readLong(), + ) + + override fun writeTo(out: StreamOutput) { + out.writeLong(policySeqNo) + out.writeLong(policyPrimaryTerm) + out.writeEnum(currentState) + creation.writeTo(out) + deletion.writeTo(out) + out.writeMap(info) + out.writeString(id) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + } + + data class Creation( + val trigger: Trigger, + val started: SnapshotInfo? = null, + val finished: SnapshotInfo? = null, + ) : Writeable, ToXContentFragment { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(TRIGGER_FIELD, trigger) + .optionalField(STARTED_FIELD, started) + .optionalField(FINISHED_FIELD, finished) + .endObject() + } + + companion object { + const val TRIGGER_FIELD = "trigger" + const val STARTED_FIELD = "started" + const val FINISHED_FIELD = "finished" + + fun parse(xcp: XContentParser): Creation { + var trigger: Trigger? = null + var started: SnapshotInfo? = null + var finished: SnapshotInfo? = null + + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + TRIGGER_FIELD -> trigger = Trigger.parse(xcp) + STARTED_FIELD -> started = xcp.nullValueHandler { SnapshotInfo.parse(xcp) } + FINISHED_FIELD -> finished = xcp.nullValueHandler { SnapshotInfo.parse(xcp) } + } + } + + return Creation( + trigger = requireNotNull(trigger) { "trigger field must not be null" }, + started = started, + finished = finished, + ) + } + } + + constructor(sin: StreamInput) : this( + trigger = Trigger(sin), + started = sin.readOptionalWriteable { SnapshotInfo(it) }, + finished = sin.readOptionalWriteable { SnapshotInfo(it) }, + ) + + override fun writeTo(out: StreamOutput) { + trigger.writeTo(out) + out.writeOptionalWriteable(started) + out.writeOptionalWriteable(finished) + } + } + + data class Deletion( + val trigger: Trigger, + val started: List? = null, + val startedTime: Instant? = null + ) : Writeable, ToXContentFragment { + + init { + require(!(started != null).xor(startedTime != null)) { + "deletion started and startedTime must exist at the same time." + } + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(TRIGGER_FIELD, trigger) + .optionalField(STARTED_FIELD, started) + .optionalField(STARTED_TIME_FIELD, startedTime) + .endObject() + } + + companion object { + const val TRIGGER_FIELD = "trigger" + const val STARTED_FIELD = "started" + const val STARTED_TIME_FIELD = "started_time" + + fun parse(xcp: XContentParser): Deletion { + var trigger: Trigger? = null + var started: List? = null + var startedTime: Instant? = null + + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + TRIGGER_FIELD -> trigger = Trigger.parse(xcp) + STARTED_FIELD -> started = xcp.nullValueHandler { parseArray { SnapshotInfo.parse(xcp) } } + STARTED_TIME_FIELD -> startedTime = xcp.instant() + } + } + + return Deletion( + trigger = requireNotNull(trigger) { "trigger field must not be null" }, + started = started, + startedTime = startedTime, + ) + } + } + + constructor(sin: StreamInput) : this( + trigger = Trigger(sin), + started = sin.readOptionalValue(sin.readList { SnapshotInfo(it) }), + startedTime = sin.readOptionalInstant(), + ) + + override fun writeTo(out: StreamOutput) { + trigger.writeTo(out) + out.writeOptionalValue(started, StreamOutput::writeList) + out.writeOptionalInstant(startedTime) + } + } + + /** + * Trigger for recurring condition check + * + * index_size can be another possible trigger, e.g.: snapshot will be created + * every time index size increases 50gb + */ + data class Trigger( + val time: Instant, + ) : Writeable, ToXContentFragment { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .optionalTimeField(TIME_FIELD, time) + .endObject() + } + + companion object { + const val TIME_FIELD = "time" + + fun parse(xcp: XContentParser): Trigger { + var nextExecutionTime: Instant? = null + + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + TIME_FIELD -> nextExecutionTime = xcp.instant() + } + } + + return Trigger( + time = requireNotNull(nextExecutionTime) { "trigger time field must not be null." }, + ) + } + } + + constructor(sin: StreamInput) : this( + time = sin.readInstant() + ) + + override fun writeTo(out: StreamOutput) { + out.writeInstant(time) + } + } + + data class SnapshotInfo( + val name: String, + val startTime: Instant? = null, + val endTime: Instant? = null, + ) : Writeable, ToXContent { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(NAME_FIELD, name) + .optionalTimeField(START_TIME_FIELD, startTime) + .optionalField(END_TIME_FIELD, endTime) + .endObject() + } + + companion object { + const val NAME_FIELD = "name" + const val START_TIME_FIELD = "start_time" + const val END_TIME_FIELD = "end_time" + + fun parse(xcp: XContentParser): SnapshotInfo { + var name: String? = null + var startTime: Instant? = null + var endTime: Instant? = null + + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + NAME_FIELD -> name = xcp.text() + START_TIME_FIELD -> startTime = xcp.instant() + END_TIME_FIELD -> endTime = xcp.instant() + } + } + + return SnapshotInfo( + name = requireNotNull(name) { "snapshot info name must not be null." }, + startTime = startTime, + endTime = endTime, + ) + } + } + + constructor(sin: StreamInput) : this( + name = sin.readString(), + startTime = sin.readOptionalInstant(), + endTime = sin.readOptionalInstant(), + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(name) + out.writeOptionalInstant(startTime) + out.writeOptionalInstant(endTime) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt new file mode 100644 index 000000000..45ebdb415 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt @@ -0,0 +1,369 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.model + +import org.apache.logging.log4j.LogManager +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParser.Token +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indexmanagement.opensearchapi.instant +import org.opensearch.indexmanagement.opensearchapi.nullValueHandler +import org.opensearch.indexmanagement.opensearchapi.optionalField +import org.opensearch.indexmanagement.opensearchapi.optionalTimeField +import org.opensearch.indexmanagement.snapshotmanagement.smDocIdToPolicyName +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout +import org.opensearch.indexmanagement.util.NO_ID +import org.opensearch.jobscheduler.spi.ScheduledJobParameter +import org.opensearch.jobscheduler.spi.schedule.CronSchedule +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import org.opensearch.jobscheduler.spi.schedule.Schedule +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser +import java.time.Instant +import java.time.ZoneId +import java.time.temporal.ChronoUnit + +private val log = LogManager.getLogger(SMPolicy::class.java) + +data class SMPolicy( + val id: String = NO_ID, + val description: String? = null, + val creation: Creation, + val deletion: Deletion, + val snapshotConfig: Map, + val jobEnabled: Boolean, + val jobLastUpdateTime: Instant, + val jobEnabledTime: Instant, + val jobSchedule: Schedule, + val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, +) : ScheduledJobParameter, Writeable { + + init { + require(snapshotConfig["repository"] != null) { "Must provide the repository in snapshot config." } + // Other fields in snapshotConfig: date_expression, indices, partial, include_global_state, ignore_unavailable, metadata + // TODO SM validate date_format is of right format + } + + override fun getName() = id + + override fun getLastUpdateTime() = jobLastUpdateTime + + override fun getEnabledTime() = jobEnabledTime + + override fun getSchedule() = jobSchedule + + override fun isEnabled() = jobEnabled + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder + .startObject() + .startObject(SM_TYPE) + .field(NAME_FIELD, smDocIdToPolicyName(id)) // for searching policy by name + .optionalField(DESCRIPTION_FIELD, description) + .field(CREATION_FIELD, creation) + .field(DELETION_FIELD, deletion) + .field(SNAPSHOT_CONFIG_FIELD, snapshotConfig) + .field(SCHEDULE_FIELD, jobSchedule) + .field(ENABLED_FIELD, jobEnabled) + .optionalTimeField(LAST_UPDATED_TIME_FIELD, jobLastUpdateTime) + .optionalTimeField(ENABLED_TIME_FIELD, jobEnabledTime) + .endObject() + .endObject() + } + + companion object { + const val SM_TYPE = "sm" + const val NAME_FIELD = "name" + const val DESCRIPTION_FIELD = "description" + const val CREATION_FIELD = "creation" + const val DELETION_FIELD = "deletion" + const val SNAPSHOT_CONFIG_FIELD = "snapshot_config" + const val ENABLED_FIELD = "enabled" + const val LAST_UPDATED_TIME_FIELD = "last_updated_time" + const val ENABLED_TIME_FIELD = "enabled_time" + const val SCHEDULE_FIELD = "schedule" + + fun parse( + xcp: XContentParser, + id: String = NO_ID, + seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + ): SMPolicy { + var description: String? = null + var creation: Creation? = null + var deletion: Deletion? = null + var snapshotConfig: Map? = null + var lastUpdatedTime: Instant? = null + var enabledTime: Instant? = null + var schedule: Schedule? = null + var enabled = true + + log.info("sm dev: first token: ${xcp.currentToken()}, ${xcp.currentName()}") + if (xcp.currentToken() == null) xcp.nextToken() + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + log.info("sm dev: current token loop: ${xcp.currentToken()}, ${xcp.currentName()}") + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + NAME_FIELD -> requireNotNull(xcp.text()) { "The name field of SMPolicy must not be null." } + DESCRIPTION_FIELD -> description = xcp.nullValueHandler { text() } + CREATION_FIELD -> creation = Creation.parse(xcp) + DELETION_FIELD -> deletion = Deletion.parse(xcp) + SNAPSHOT_CONFIG_FIELD -> snapshotConfig = xcp.map() + LAST_UPDATED_TIME_FIELD -> lastUpdatedTime = xcp.instant() + ENABLED_TIME_FIELD -> enabledTime = xcp.instant() + SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp) + ENABLED_FIELD -> enabled = xcp.booleanValue() + } + } + + if (enabled && enabledTime == null) { + enabledTime = Instant.now() + } else if (!enabled) { + enabledTime = null + } + + // TODO SM update policy API can update this value + if (lastUpdatedTime == null) { + lastUpdatedTime = Instant.now() + } + + if (schedule == null) { + schedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES) + } + + return SMPolicy( + description = description, + creation = requireNotNull(creation) { "creation field must not be null" }, + deletion = requireNotNull(deletion) { "deletion field must not be null" }, + snapshotConfig = requireNotNull(snapshotConfig) { "snapshot_config field must not be null" }, + jobLastUpdateTime = requireNotNull(lastUpdatedTime) { "last_updated_at field must not be null" }, + jobEnabledTime = requireNotNull(enabledTime) { "job_enabled_time field must not be null" }, + jobSchedule = schedule, + jobEnabled = enabled, + id = id, + seqNo = seqNo, + primaryTerm = primaryTerm, + ) + } + } + + constructor(sin: StreamInput) : this( + description = sin.readOptionalString(), + creation = Creation(sin), + deletion = Deletion(sin), + snapshotConfig = sin.readMap() as Map, + jobLastUpdateTime = sin.readInstant(), + jobEnabledTime = sin.readInstant(), + jobSchedule = IntervalSchedule(sin), + jobEnabled = sin.readBoolean(), + id = sin.readString(), + seqNo = sin.readLong(), + primaryTerm = sin.readLong(), + ) + + override fun writeTo(out: StreamOutput) { + out.writeOptionalString(description) + creation.writeTo(out) + deletion.writeTo(out) + out.writeMap(snapshotConfig) + out.writeInstant(jobLastUpdateTime) + out.writeInstant(jobEnabledTime) + out.writeOptionalWriteable(jobSchedule) + out.writeBoolean(jobEnabled) + out.writeString(id) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + } + + data class Creation( + val schedule: Schedule, + val timeout: ActionTimeout? = null, + ) : Writeable, ToXContent { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(SCHEDULE_FIELD, schedule) + .optionalField(TIMEOUT_FIELD, timeout) + .endObject() + } + + companion object { + const val SCHEDULE_FIELD = "schedule" + const val TIMEOUT_FIELD = "timeout" + + fun parse(xcp: XContentParser): Creation { + var schedule: Schedule? = null + var timeout: ActionTimeout? = null + + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp) + TIMEOUT_FIELD -> timeout = ActionTimeout.parse(xcp) + } + } + + return Creation( + schedule = requireNotNull(schedule) { "schedule field must not be null" }, + timeout = timeout + ) + } + } + + constructor(sin: StreamInput) : this( + schedule = CronSchedule(sin), + timeout = sin.readOptionalWriteable(::ActionTimeout), + ) + + override fun writeTo(out: StreamOutput) { + schedule.writeTo(out) + out.writeOptionalWriteable(timeout) + } + } + + data class Deletion( + val schedule: Schedule, + val timeout: ActionTimeout? = null, + val condition: DeleteCondition, + ) : Writeable, ToXContent { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(SCHEDULE_FIELD, schedule) + .optionalField(TIMEOUT_FIELD, timeout) + .field(CONDITION_FIELD, condition) + .endObject() + } + + companion object { + const val SCHEDULE_FIELD = "schedule" + const val TIMEOUT_FIELD = "timeout" + const val CONDITION_FIELD = "condition" + + fun parse(xcp: XContentParser): Deletion { + var schedule: Schedule? = null + var timeout: ActionTimeout? = null + var condition: DeleteCondition? = null + + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp) + TIMEOUT_FIELD -> timeout = ActionTimeout.parse(xcp) + CONDITION_FIELD -> condition = DeleteCondition.parse(xcp) + } + } + + // If user doesn't provide delete schedule, defaults to every day 1AM + if (schedule == null) { + schedule = CronSchedule("0 1 * * *", ZoneId.systemDefault()) + } + + return Deletion( + schedule = schedule, + timeout = timeout, + condition = requireNotNull(condition) { "condition field must not be null" }, + ) + } + } + + constructor(sin: StreamInput) : this( + schedule = CronSchedule(sin), + timeout = sin.readOptionalWriteable(::ActionTimeout), + condition = DeleteCondition(sin), + ) + + override fun writeTo(out: StreamOutput) { + schedule.writeTo(out) + out.writeOptionalWriteable(timeout) + condition.writeTo(out) + } + } + + data class DeleteCondition( + val maxCount: Int, + val maxAge: TimeValue? = null, + val minCount: Int? = null, + ) : Writeable, ToXContent { + + init { + require(maxCount > 0) { "$MAX_COUNT_FIELD should be bigger than 0." } + require(minCount == null || maxCount >= minCount && minCount > 0) { + "$MIN_COUNT_FIELD should be bigger than 0 and smaller than $MAX_COUNT_FIELD." + } + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(MAX_COUNT_FIELD, maxCount) + .optionalField(MAX_AGE_FIELD, maxAge) + .optionalField(MIN_COUNT_FIELD, minCount) + .endObject() + } + + companion object { + const val MAX_COUNT_FIELD = "max_count" + const val MAX_AGE_FIELD = "max_age" + const val MIN_COUNT_FIELD = "min_count" + + fun parse(xcp: XContentParser): DeleteCondition { + var maxCount = 50 + var maxAge: TimeValue? = null + var minCount: Int? = null + + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + MAX_COUNT_FIELD -> maxCount = xcp.intValue() + MAX_AGE_FIELD -> maxAge = TimeValue.parseTimeValue(xcp.text(), MAX_AGE_FIELD) + MIN_COUNT_FIELD -> minCount = xcp.intValue() + } + } + + if (maxAge != null && minCount == null) { + minCount = minOf(5, maxCount) + } + + return DeleteCondition( + maxCount = maxCount, + maxAge = maxAge, + minCount = minCount, + ) + } + } + + constructor(sin: StreamInput) : this( + maxCount = sin.readInt(), + maxAge = sin.readOptionalTimeValue(), + minCount = sin.readOptionalInt() + ) + + override fun writeTo(out: StreamOutput) { + out.writeInt(maxCount) + out.writeOptionalTimeValue(maxAge) + out.writeOptionalInt(minCount) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt index 070cf4f5a..dd9acfc2d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt @@ -39,6 +39,7 @@ import org.opensearch.indexmanagement.opensearchapi.optionalUserField import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.transform.TransformSearchService import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.schedule.CronSchedule import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule @@ -264,7 +265,6 @@ data class Transform( val supportedAggregations = listOf("sum", "max", "min", "value_count", "avg", "scripted_metric", "percentiles") const val LOCK_DURATION_SECONDS = 1800L - const val NO_ID = "" const val TRANSFORM_TYPE = "transform" const val TRANSFORM_ID_FIELD = "transform_id" const val ENABLED_FIELD = "enabled" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformAction.kt index 8609c3672..c75b50df7 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformAction.kt @@ -17,6 +17,7 @@ import org.opensearch.indexmanagement.transform.action.index.IndexTransformRespo import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.util.IF_PRIMARY_TERM import org.opensearch.indexmanagement.util.IF_SEQ_NO +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.indexmanagement.util.REFRESH import org.opensearch.rest.BaseRestHandler import org.opensearch.rest.BaseRestHandler.RestChannelConsumer @@ -46,8 +47,8 @@ class RestIndexTransformAction : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val id = request.param("transformID", Transform.NO_ID) - if (Transform.NO_ID == id) { + val id = request.param("transformID", NO_ID) + if (NO_ID == id) { throw IllegalArgumentException("Missing transform ID") } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStartTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStartTransformAction.kt index 9518c1f75..9cfc1d97c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStartTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStartTransformAction.kt @@ -9,7 +9,7 @@ import org.opensearch.client.node.NodeClient import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.TRANSFORM_BASE_URI import org.opensearch.indexmanagement.transform.action.start.StartTransformAction import org.opensearch.indexmanagement.transform.action.start.StartTransformRequest -import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.rest.BaseRestHandler import org.opensearch.rest.BaseRestHandler.RestChannelConsumer import org.opensearch.rest.RestHandler.Route @@ -32,7 +32,7 @@ class RestStartTransformAction : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val id = request.param("transformID", Transform.NO_ID) + val id = request.param("transformID", NO_ID) val startRequest = StartTransformRequest(id) return RestChannelConsumer { channel -> client.execute(StartTransformAction.INSTANCE, startRequest, RestToXContentListener(channel)) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStopTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStopTransformAction.kt index 515a346d1..a48034e5c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStopTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestStopTransformAction.kt @@ -9,7 +9,7 @@ import org.opensearch.client.node.NodeClient import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.TRANSFORM_BASE_URI import org.opensearch.indexmanagement.transform.action.stop.StopTransformAction import org.opensearch.indexmanagement.transform.action.stop.StopTransformRequest -import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.rest.BaseRestHandler import org.opensearch.rest.BaseRestHandler.RestChannelConsumer import org.opensearch.rest.RestHandler.Route @@ -32,8 +32,8 @@ class RestStopTransformAction : BaseRestHandler() { @Throws(IOException::class) override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val id = request.param("transformID", Transform.NO_ID) - if (Transform.NO_ID == id) { + val id = request.param("transformID", NO_ID) + if (NO_ID == id) { throw IllegalArgumentException("Missing transform ID") } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt index c075bf2b1..db1fe9e73 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt @@ -35,6 +35,7 @@ class IndexUtils { const val ODFE_MAGIC_NULL = "#ODFE-MAGIC-NULL-MAGIC-ODFE#" private const val BYTE_ARRAY_SIZE = 16 private const val DOCUMENT_ID_SEED = 72390L + val logger = LogManager.getLogger(IndexUtils::class.java) var indexManagementConfigSchemaVersion: Long diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index bf2b61e1c..1fe996ecd 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 14 + "schema_version": 15 }, "dynamic": "strict", "properties": { @@ -1305,6 +1305,250 @@ "enabled": false } } + }, + "sm": { + "properties": { + "name": { + "type": "keyword" + }, + "description": { + "type": "text" + }, + "creation": { + "properties": { + "schedule": { + "properties": { + "cron": { + "properties": { + "expression": { + "type": "keyword" + }, + "timezone": { + "type": "keyword" + }, + "schedule_delay": { + "type": "long" + } + } + } + } + }, + "timeout": { + "type": "keyword" + } + } + }, + "deletion": { + "properties": { + "schedule": { + "properties": { + "cron": { + "properties": { + "expression": { + "type": "keyword" + }, + "timezone": { + "type": "keyword" + }, + "schedule_delay": { + "type": "long" + } + } + } + } + }, + "timeout": { + "type": "keyword" + }, + "condition": { + "properties": { + "max_count": { + "type": "integer" + }, + "max_age": { + "type": "keyword" + }, + "min_count": { + "type": "integer" + } + } + } + } + }, + "snapshot_config": { + "type": "object", + "enabled": false + }, + "enabled": { + "type": "boolean" + }, + "enabled_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "last_updated_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "schedule": { + "properties": { + "interval": { + "properties": { + "period": { + "type": "integer" + }, + "unit": { + "type": "keyword" + }, + "start_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "schedule_delay": { + "type": "long" + } + } + }, + "cron": { + "properties": { + "expression": { + "type": "keyword" + }, + "timezone": { + "type": "keyword" + }, + "schedule_delay": { + "type": "long" + } + } + } + } + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } + } + } + }, + "sm_metadata": { + "properties": { + "policy_seq_no": { + "type": "long" + }, + "policy_primary_term": { + "type": "long" + }, + "current_state": { + "type": "keyword" + }, + "info": { + "type": "object", + "enabled": false + }, + "creation": { + "properties": { + "trigger": { + "properties": { + "time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } + }, + "started": { + "properties": { + "name": { + "type": "keyword" + }, + "started_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "end_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } + }, + "finished": { + "properties": { + "name": { + "type": "keyword" + }, + "started_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "end_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } + } + } + }, + "deletion": { + "properties": { + "trigger": { + "properties": { + "time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } + }, + "started": { + "type": "nested", + "properties": { + "name": { + "type": "keyword" + }, + "started_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "end_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } + } + } + } + } } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index fbe64cf8a..63ee4181c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -27,7 +27,7 @@ import javax.management.remote.JMXServiceURL abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 14 + val configSchemaVersion = 15 val historySchemaVersion = 5 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt index 0b793be17..630fcca42 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt @@ -13,11 +13,11 @@ import org.opensearch.index.query.QueryBuilders import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_ISM_BASE_URI import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.LEGACY_POLICY_BASE_URI import org.opensearch.indexmanagement.IndexManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_USER import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.opensearchapi.string +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.rest.RestStatus import org.opensearch.search.builder.SearchSourceBuilder @@ -137,7 +137,7 @@ class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() { val responseBody = createResponse.asMap() val createdId = responseBody["_id"] as String val createdVersion = responseBody["_version"] as Int - assertNotEquals("Create policy response is missing id", Policy.NO_ID, createdId) + assertNotEquals("Create policy response is missing id", NO_ID, createdId) assertTrue("Create policy response has incorrect version", createdVersion > 0) Thread.sleep(10000) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt index 1dd3ca7c9..a4c719537 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt @@ -46,6 +46,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeo import org.opensearch.test.OpenSearchTestCase import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream +import java.lang.Math.abs import kotlin.test.assertFailsWith class ActionTests : OpenSearchTestCase() { @@ -191,7 +192,9 @@ class ActionTests : OpenSearchTestCase() { val totalNodeBytes = randomByteSizeValue().bytes val thresholdBytes = getFreeBytesThresholdHigh(clusterSettings, totalNodeBytes) val expectedThreshold: Long = ((1 - (rawPercentage.toDouble() / 100.0)) * totalNodeBytes).toLong() - assertEquals("Free bytes threshold not being calculated correctly for percentage setting.", thresholdBytes, expectedThreshold) + // To account for some rounding issues, allow an error of 1 + val approximatelyEqual = kotlin.math.abs(thresholdBytes - expectedThreshold) <= 1 + assertTrue("Free bytes threshold not being calculated correctly for percentage setting.", approximatelyEqual) } fun `test shrink disk threshold byte settings`() { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt index cb50d18ed..898e198bb 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt @@ -22,6 +22,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.randomReadOnlyActionC import org.opensearch.indexmanagement.indexstatemanagement.randomState import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings import org.opensearch.indexmanagement.makeRequest +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.indexmanagement.util._ID import org.opensearch.indexmanagement.util._PRIMARY_TERM import org.opensearch.indexmanagement.util._SEQ_NO @@ -71,7 +72,7 @@ class IndexStateManagementRestApiIT : IndexStateManagementRestTestCase() { val createdId = responseBody["_id"] as String val createdSeqNo = responseBody[_SEQ_NO] as Int val createdPrimaryTerm = responseBody[_PRIMARY_TERM] as Int - assertNotEquals("response is missing Id", Policy.NO_ID, createdId) + assertNotEquals("response is missing Id", NO_ID, createdId) assertEquals("not same id", policyId, createdId) assertEquals("incorrect seqNo", 0, createdSeqNo) assertEquals("incorrect primaryTerm", 1, createdPrimaryTerm) @@ -189,7 +190,7 @@ class IndexStateManagementRestApiIT : IndexStateManagementRestTestCase() { val responseBody = updateResponse.asMap() val updatedId = responseBody[_ID] as String val updatedSeqNo = (responseBody[_SEQ_NO] as Int).toLong() - assertNotEquals("response is missing Id", Policy.NO_ID, updatedId) + assertNotEquals("response is missing Id", NO_ID, updatedId) assertEquals("not same id", policy.id, updatedId) assertTrue("incorrect seqNo", policy.seqNo < updatedSeqNo) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt index 066debaa6..9ac1b8af6 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/resthandler/RestIndexRollupActionIT.kt @@ -15,7 +15,6 @@ import org.opensearch.indexmanagement.common.model.dimension.Histogram import org.opensearch.indexmanagement.common.model.dimension.Terms import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.rollup.RollupRestTestCase -import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetrics import org.opensearch.indexmanagement.rollup.model.metric.Average import org.opensearch.indexmanagement.rollup.model.metric.Max @@ -26,6 +25,7 @@ import org.opensearch.indexmanagement.rollup.model.metric.ValueCount import org.opensearch.indexmanagement.rollup.randomRollup import org.opensearch.indexmanagement.rollup.randomRollupDimensions import org.opensearch.indexmanagement.rollup.randomRollupMetrics +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.indexmanagement.util._ID import org.opensearch.indexmanagement.util._SEQ_NO import org.opensearch.rest.RestStatus @@ -43,7 +43,7 @@ class RestIndexRollupActionIT : RollupRestTestCase() { assertEquals("Create rollup failed", RestStatus.CREATED, response.restStatus()) val responseBody = response.asMap() val createdId = responseBody["_id"] as String - assertNotEquals("Response is missing Id", Rollup.NO_ID, createdId) + assertNotEquals("Response is missing Id", NO_ID, createdId) assertEquals("Not same id", rollup.id, createdId) assertEquals("Incorrect Location header", "$ROLLUP_JOBS_BASE_URI/$createdId", response.getHeader("Location")) } @@ -116,7 +116,7 @@ class RestIndexRollupActionIT : RollupRestTestCase() { val responseBody = updateResponse.asMap() val updatedId = responseBody[_ID] as String val updatedSeqNo = (responseBody[_SEQ_NO] as Int).toLong() - assertNotEquals("response is missing Id", Rollup.NO_ID, updatedId) + assertNotEquals("response is missing Id", NO_ID, updatedId) assertEquals("not same id", rollup.id, updatedId) assertTrue("incorrect seqNo", rollup.seqNo < updatedSeqNo) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtilsTests.kt new file mode 100644 index 000000000..f89742d02 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtilsTests.kt @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement + +import org.opensearch.test.OpenSearchTestCase + +class SMUtilsTests : OpenSearchTestCase() { + fun `test sm policy name and id conversion`() { + val policyName = "daily-snapshot-sm-sm" + assertEquals(policyName, smDocIdToPolicyName(smPolicyNameToDocId(policyName))) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt new file mode 100644 index 000000000..635038f71 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt @@ -0,0 +1,142 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement + +import com.nhaarman.mockitokotlin2.doReturn +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse +import org.opensearch.action.index.IndexResponse +import org.opensearch.common.UUIDs +import org.opensearch.common.unit.TimeValue +import org.opensearch.indexmanagement.randomCronSchedule +import org.opensearch.indexmanagement.randomInstant +import org.opensearch.indexmanagement.randomIntervalSchedule +import org.opensearch.indexmanagement.snapshotmanagement.engine.statemachine.SMState +import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout +import org.opensearch.jobscheduler.spi.schedule.CronSchedule +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import org.opensearch.rest.RestStatus +import org.opensearch.snapshots.SnapshotId +import org.opensearch.snapshots.SnapshotInfo +import org.opensearch.test.OpenSearchTestCase.randomNonNegativeLong +import java.time.Instant +import java.time.Instant.now + +fun randomSMMetadata( + currentState: SMState, + nextCreationTime: Instant = now(), + nextDeletionTime: Instant = now(), + policySeqNo: Long = randomNonNegativeLong(), + policyPrimaryTerm: Long = randomNonNegativeLong(), + atomic: Boolean = false, +): SMMetadata { + return SMMetadata( + policySeqNo = policySeqNo, + policyPrimaryTerm = policyPrimaryTerm, + currentState = currentState, + creation = SMMetadata.Creation( + trigger = SMMetadata.Trigger( + time = nextCreationTime + ) + ), + deletion = SMMetadata.Deletion( + trigger = SMMetadata.Trigger( + time = nextDeletionTime + ) + ), + ) +} + +fun randomSMPolicy( + jobEnabled: Boolean = false, + jobLastUpdateTime: Instant = randomInstant(), + creationSchedule: CronSchedule = randomCronSchedule(), + creationTimeout: ActionTimeout? = null, + deletionSchedule: CronSchedule = randomCronSchedule(), + deletionTimeout: ActionTimeout? = null, + deletionMaxCount: Int = 5, + deletionMaxAge: TimeValue? = null, + deletionMinCount: Int? = null, + snapshotConfig: Map = mapOf( + "repository" to "repo", + "date_format" to "yyyy-MM-dd-HH:mm" + ), + jobEnabledTime: Instant = randomInstant(), + jobSchedule: IntervalSchedule = randomIntervalSchedule() +): SMPolicy { + return SMPolicy( + jobEnabled = jobEnabled, + jobLastUpdateTime = jobLastUpdateTime, + creation = SMPolicy.Creation( + schedule = creationSchedule, + timeout = creationTimeout, + ), + deletion = SMPolicy.Deletion( + schedule = deletionSchedule, + timeout = deletionTimeout, + condition = SMPolicy.DeleteCondition( + maxCount = deletionMaxCount, + maxAge = deletionMaxAge, + minCount = deletionMinCount + ) + ), + snapshotConfig = snapshotConfig, + jobEnabledTime = jobEnabledTime, + jobSchedule = jobSchedule + ) +} + +fun mockIndexResponse(status: RestStatus = RestStatus.OK): IndexResponse { + val indexResponse: IndexResponse = mock() + whenever(indexResponse.status()).doReturn(status) + whenever(indexResponse.seqNo).doReturn(0L) + whenever(indexResponse.primaryTerm).doReturn(1L) + + return indexResponse +} + +fun mockCreateSnapshotResponse(status: RestStatus = RestStatus.ACCEPTED): CreateSnapshotResponse { + val createSnapshotRes: CreateSnapshotResponse = mock() + whenever(createSnapshotRes.status()).doReturn(status) + return createSnapshotRes +} + +fun mockGetSnapshotResponse(num: Int): GetSnapshotsResponse { + val getSnapshotsRes: GetSnapshotsResponse = mock() + whenever(getSnapshotsRes.snapshots).doReturn(mockSnapshots(num)) + return getSnapshotsRes +} + +fun mockSnapshots(num: Int): List { + val result = mutableListOf() + for (i in 1..num) { + result.add(mockSnapshotInfo(idNum = i)) + } + return result.toList() +} + +/** + * For our use case, only mock snapshotId, startTime and endTime + */ +fun mockSnapshotInfo(idNum: Int, startTime: Long = randomNonNegativeLong(), endTime: Long = randomNonNegativeLong()): SnapshotInfo { + val snapshotId = SnapshotId("mock_snapshot-$idNum}", UUIDs.randomBase64UUID()) + return SnapshotInfo( + snapshotId, + listOf("index1"), + listOf("ds-1"), + startTime, + "", + endTime, + 5, + emptyList(), + false, + emptyMap(), + ) +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformActionIT.kt index 0857c1900..d184f1c2c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestIndexTransformActionIT.kt @@ -11,8 +11,8 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANA import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.TRANSFORM_BASE_URI import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.transform.TransformRestTestCase -import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.transform.randomTransform +import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.rest.RestStatus import org.opensearch.test.junit.annotations.TestLogging @@ -34,7 +34,7 @@ class RestIndexTransformActionIT : TransformRestTestCase() { assertEquals("Create transform failed", RestStatus.CREATED, response.restStatus()) val responseBody = response.asMap() val createdId = responseBody["_id"] as String - assertNotEquals("Response is missing Id", Transform.NO_ID, createdId) + assertNotEquals("Response is missing Id", NO_ID, createdId) assertEquals("Not same id", transform.id, createdId) assertEquals("Incorrect Location header", "$TRANSFORM_BASE_URI/$createdId", response.getHeader("Location")) } diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index bf2b61e1c..1fe996ecd 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 14 + "schema_version": 15 }, "dynamic": "strict", "properties": { @@ -1305,6 +1305,250 @@ "enabled": false } } + }, + "sm": { + "properties": { + "name": { + "type": "keyword" + }, + "description": { + "type": "text" + }, + "creation": { + "properties": { + "schedule": { + "properties": { + "cron": { + "properties": { + "expression": { + "type": "keyword" + }, + "timezone": { + "type": "keyword" + }, + "schedule_delay": { + "type": "long" + } + } + } + } + }, + "timeout": { + "type": "keyword" + } + } + }, + "deletion": { + "properties": { + "schedule": { + "properties": { + "cron": { + "properties": { + "expression": { + "type": "keyword" + }, + "timezone": { + "type": "keyword" + }, + "schedule_delay": { + "type": "long" + } + } + } + } + }, + "timeout": { + "type": "keyword" + }, + "condition": { + "properties": { + "max_count": { + "type": "integer" + }, + "max_age": { + "type": "keyword" + }, + "min_count": { + "type": "integer" + } + } + } + } + }, + "snapshot_config": { + "type": "object", + "enabled": false + }, + "enabled": { + "type": "boolean" + }, + "enabled_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "last_updated_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "schedule": { + "properties": { + "interval": { + "properties": { + "period": { + "type": "integer" + }, + "unit": { + "type": "keyword" + }, + "start_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "schedule_delay": { + "type": "long" + } + } + }, + "cron": { + "properties": { + "expression": { + "type": "keyword" + }, + "timezone": { + "type": "keyword" + }, + "schedule_delay": { + "type": "long" + } + } + } + } + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } + } + } + }, + "sm_metadata": { + "properties": { + "policy_seq_no": { + "type": "long" + }, + "policy_primary_term": { + "type": "long" + }, + "current_state": { + "type": "keyword" + }, + "info": { + "type": "object", + "enabled": false + }, + "creation": { + "properties": { + "trigger": { + "properties": { + "time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } + }, + "started": { + "properties": { + "name": { + "type": "keyword" + }, + "started_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "end_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } + }, + "finished": { + "properties": { + "name": { + "type": "keyword" + }, + "started_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "end_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } + } + } + }, + "deletion": { + "properties": { + "trigger": { + "properties": { + "time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } + }, + "started": { + "type": "nested", + "properties": { + "name": { + "type": "keyword" + }, + "started_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "end_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } + } + } + } + } } } } diff --git a/worksheets/ism/all_actions.http b/worksheets/ism/all_actions.http new file mode 100644 index 000000000..f9c625f11 --- /dev/null +++ b/worksheets/ism/all_actions.http @@ -0,0 +1,487 @@ +### +PUT localhost:9200/_opendistro/_ism/policies/rollover +Content-Type: application/json + +{ + "policy": { + "description": "action test", + "default_state": "action", + "states": [ + { + "name": "action", + "actions": [ + { + "rollover": { + "min_doc_count": 0 + } + } + ], + "transitions": [ + { + "state_name": "delete" + } + ] + }, + { + "name": "delete", + "actions": [ + { + "delete": {} + } + ], + "transitions": [] + } + ], + "ism_template": { + "index_patterns": ["rollover*"], + "priority": 100 + } + } +} + +### +PUT http://localhost:9200/rollover_index +Content-Type: application/json + +### +PUT localhost:9200/_opendistro/_ism/policies/force_merge +Content-Type: application/json + +{ + "policy": { + "description": "action test", + "default_state": "action", + "states": [ + { + "name": "action", + "actions": [ + { + "force_merge": { + "max_num_segments": 1 + } + } + ], + "transitions": [ + { + "state_name": "delete" + } + ] + }, + { + "name": "delete", + "actions": [ + { + "delete": {} + } + ], + "transitions": [] + } + ], + "ism_template": { + "index_patterns": ["force_merge*"], + "priority": 100 + } + } +} + +### +PUT http://localhost:9200/force_merge_index +Content-Type: application/json + +### +PUT localhost:9200/_opendistro/_ism/policies/read_only +Content-Type: application/json + +{ + "policy": { + "description": "action test", + "default_state": "action", + "states": [ + { + "name": "action", + "actions": [ + { + "read_only": {} + } + ], + "transitions": [ + { + "state_name": "delete" + } + ] + }, + { + "name": "delete", + "actions": [ + { + "delete": {} + } + ], + "transitions": [] + } + ], + "ism_template": { + "index_patterns": ["read_only*"], + "priority": 100 + } + } +} + +### +PUT http://localhost:9200/read_only_index +Content-Type: application/json + +### +PUT localhost:9200/_opendistro/_ism/policies/read_write +Content-Type: application/json + +{ + "policy": { + "description": "action test", + "default_state": "action", + "states": [ + { + "name": "action", + "actions": [ + { + "read_write": {} + } + ], + "transitions": [ + { + "state_name": "delete" + } + ] + }, + { + "name": "delete", + "actions": [ + { + "delete": {} + } + ], + "transitions": [] + } + ], + "ism_template": { + "index_patterns": ["read_write*"], + "priority": 100 + } + } +} + +### +PUT http://localhost:9200/read_write_index +Content-Type: application/json + +### +PUT localhost:9200/_opendistro/_ism/policies/replica_count +Content-Type: application/json + +{ + "policy": { + "description": "action test", + "default_state": "action", + "states": [ + { + "name": "action", + "actions": [ + { + "replica_count": { + "number_of_replicas": 2 + } + } + ], + "transitions": [ + { + "state_name": "delete" + } + ] + }, + { + "name": "delete", + "actions": [ + { + "delete": {} + } + ], + "transitions": [] + } + ], + "ism_template": { + "index_patterns": ["replica_count*"] + } + } +} + +### +PUT http://localhost:9200/replica_count_index +Content-Type: application/json + +### +PUT localhost:9200/_opendistro/_ism/policies/close +Content-Type: application/json + +{ + "policy": { + "description": "action test", + "default_state": "action", + "states": [ + { + "name": "action", + "actions": [ + { + "close": {} + } + ], + "transitions": [ + { + "state_name": "delete" + } + ] + }, + { + "name": "delete", + "actions": [ + { + "delete": {} + } + ], + "transitions": [] + } + ], + "ism_template": { + "index_patterns": ["close*"] + } + } +} + +### +PUT http://localhost:9200/close_index +Content-Type: application/json + +### +PUT localhost:9200/_opendistro/_ism/policies/open +Content-Type: application/json + +{ + "policy": { + "description": "action test", + "default_state": "action", + "states": [ + { + "name": "action", + "actions": [ + { + "open": {} + } + ], + "transitions": [ + { + "state_name": "delete" + } + ] + }, + { + "name": "delete", + "actions": [ + { + "delete": {} + } + ], + "transitions": [] + } + ], + "ism_template": { + "index_patterns": ["open*"] + } + } +} + +### +PUT http://localhost:9200/open_index +Content-Type: application/json + +### +PUT localhost:9200/_opendistro/_ism/policies/notification +Content-Type: application/json + +{ + "policy": { + "description": "action test", + "default_state": "action", + "states": [ + { + "name": "action", + "actions": [ + { + "notification": { + "destination": { + "chime": { + "url": "" + } + }, + "message_template": { + "source": "the index is {{ctx.index}}" + } + } + } + ], + "transitions": [ + { + "state_name": "delete" + } + ] + }, + { + "name": "delete", + "actions": [ + { + "delete": {} + } + ], + "transitions": [] + } + ], + "ism_template": { + "index_patterns": ["notification*"] + } + } +} + +### +PUT http://localhost:9200/notification_index +Content-Type: application/json + +### +PUT localhost:9200/_opendistro/_ism/policies/snapshot +Content-Type: application/json + +{ + "policy": { + "description": "action test", + "default_state": "action", + "states": [ + { + "name": "action", + "actions": [ + { + "snapshot": { + "repository": "my_backup", + "snapshot": "snapshot1" + } + } + ], + "transitions": [ + { + "state_name": "delete" + } + ] + }, + { + "name": "delete", + "actions": [ + { + "delete": {} + } + ], + "transitions": [] + } + ], + "ism_template": { + "index_patterns": ["snapshot*"] + } + } +} + +### +PUT http://localhost:9200/snapshot_index +Content-Type: application/json + +### +PUT localhost:9200/_opendistro/_ism/policies/index_priority +Content-Type: application/json + +{ + "policy": { + "description": "action test", + "default_state": "action", + "states": [ + { + "name": "action", + "actions": [ + { + "index_priority": { + "priority": 50 + } + } + ], + "transitions": [ + { + "state_name": "delete" + } + ] + }, + { + "name": "delete", + "actions": [ + { + "delete": {} + } + ], + "transitions": [] + } + ], + "ism_template": { + "index_patterns": ["index_priority*"] + } + } +} + +### +PUT http://localhost:9200/index_priority_index +Content-Type: application/json + +### +PUT localhost:9200/_opendistro/_ism/policies/allocation +Content-Type: application/json + +{ + "policy": { + "description": "action test", + "default_state": "action", + "states": [ + { + "name": "action", + "actions": [ + { + "allocation": { + "require": { "temp": "warm" } + } + } + ], + "transitions": [ + { + "state_name": "delete" + } + ] + }, + { + "name": "delete", + "actions": [ + { + "delete": {} + } + ], + "transitions": [] + } + ], + "ism_template": { + "index_patterns": ["allocation*"] + } + } +} + +### +PUT http://localhost:9200/allocation_index +Content-Type: application/json diff --git a/worksheets/sm/create.http b/worksheets/sm/create.http new file mode 100644 index 000000000..9b0d4b648 --- /dev/null +++ b/worksheets/sm/create.http @@ -0,0 +1,84 @@ +### +GET localhost:9200/ +Accept: application/json + +### +POST localhost:9200/.opendistro-ism-config/_search +Content-Type: application/json + +{ + "seq_no_primary_term": true +} + +### +DELETE localhost:9200/.opendistro-ism-config + +### index sm +PUT localhost:9200/_plugins/_sm/daily_snapshot +Content-Type: application/json + +{ + "description": "A daily snapshot policy", + "creation": { + "schedule": { + "cron": { + "expression": "* * * * *", + "timezone": "America/Los_Angeles" + } + } + }, + "deletion": { + "schedule": { + "cron": { + "expression": "*/2 * * * *", + "timezone": "America/Los_Angeles" + } + }, + "condition": { + "max_count": 5 + } + }, + "snapshot_config": { + "repository": "repo", + "date_format": "yyyy-MM-dd-HH:mm", + "indices": "*", + "partial": "false", + "include_global_state": "false", + "ignore_unavailable": "true", + "metadata": { + "taken_by": "snapshot management" + } + } +} + +### +# "schedule": { +# "interval": { +# "start_time": 1645596970, +# "period": 1, +# "unit": "MINUTES" +# } +# }, + +### register repo +PUT localhost:9200/_snapshot/repo +Content-Type: application/json + +{ + "type": "fs", + "settings": { + "location": "my_backup_location" + } +} + +### delete repo +DELETE localhost:9200/_snapshot/repo + +### create snapshot +PUT localhost:9200/_snapshot/repo/my_snapshot + +### delete snapshot +DELETE localhost:9200/_snapshot/repo/test + +### get snapshot status +GET localhost:9200/_snapshot/repo/daily_snapshot*