diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementIndices.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementIndices.kt index 54a2f1cf7..61a6e1d28 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementIndices.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementIndices.kt @@ -7,6 +7,8 @@ package org.opensearch.indexmanagement import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger +import org.opensearch.OpenSearchStatusException import org.opensearch.ResourceAlreadyExistsException import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.alias.Alias @@ -27,6 +29,10 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_ import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.util.IndexUtils import org.opensearch.indexmanagement.util.OpenForTesting +import org.opensearch.rest.RestStatus +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine @OpenForTesting class IndexManagementIndices( @@ -71,6 +77,23 @@ class IndexManagementIndices( } } + suspend fun checkAndUpdateIMConfigIndex(logger: Logger) { + val response: AcknowledgedResponse = suspendCoroutine { cont -> + checkAndUpdateIMConfigIndex( + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) = cont.resume(response) + override fun onFailure(e: Exception) = cont.resumeWithException(e) + } + ) + } + if (response.isAcknowledged) { + logger.info("Successfully created or updated $INDEX_MANAGEMENT_INDEX with newest mappings.") + } else { + logger.error("Unable to create or update $INDEX_MANAGEMENT_INDEX with newest mapping.") + throw OpenSearchStatusException("Unable to create or update $INDEX_MANAGEMENT_INDEX with newest mapping.", RestStatus.INTERNAL_SERVER_ERROR) + } + } + fun indexManagementIndexExists(): Boolean = clusterService.state().routingTable.hasIndex(INDEX_MANAGEMENT_INDEX) /** diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 8463b60df..99fdb6eab 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -197,6 +197,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data" const val SM_BASE_URI = "$PLUGINS_BASE_URI/_sm" + const val SM_POLICIES_URI = "$SM_BASE_URI/policies" const val OLD_PLUGIN_NAME = "opendistro-im" const val OPEN_DISTRO_BASE_URI = "/_opendistro" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt index 51f3bcf7d..d475f51dc 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMUtils.kt @@ -6,10 +6,80 @@ package org.opensearch.indexmanagement.snapshotmanagement import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.client.Client +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.index.IndexNotFoundException +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.opensearchapi.parseWithType +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy.Companion.SM_DOC_ID_SUFFIX +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy.Companion.SM_METADATA_ID_SUFFIX +import org.opensearch.rest.RestStatus private val log = LogManager.getLogger("Snapshot Management Helper") -const val smSuffix = "-sm" -fun smPolicyNameToDocId(policyName: String) = "$policyName$smSuffix" -fun smDocIdToPolicyName(id: String) = id.substringBeforeLast(smSuffix) -fun getSMMetadataDocId(policyName: String) = "$policyName-sm-metadata" +fun smPolicyNameToDocId(policyName: String) = "$policyName$SM_DOC_ID_SUFFIX" +fun smDocIdToPolicyName(id: String) = id.substringBeforeLast(SM_DOC_ID_SUFFIX) +fun getSMMetadataDocId(policyName: String) = "$policyName$SM_METADATA_ID_SUFFIX" + +@Suppress("RethrowCaughtException", "ThrowsCount") +suspend fun Client.getSMPolicy(policyID: String): SMPolicy { + try { + val getRequest = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, policyID) + val getResponse: GetResponse = this.suspendUntil { get(getRequest, it) } + if (!getResponse.isExists || getResponse.isSourceEmpty) { + throw OpenSearchStatusException("Snapshot management policy not found", RestStatus.NOT_FOUND) + } + return parseSMPolicy(getResponse) + } catch (e: OpenSearchStatusException) { + throw e + } catch (e: IndexNotFoundException) { + throw OpenSearchStatusException("Snapshot management config index not found", RestStatus.NOT_FOUND) + } catch (e: IllegalArgumentException) { + log.error("Failed to retrieve snapshot management policy [$policyID]", e) + throw OpenSearchStatusException("Snapshot management policy could not be parsed", RestStatus.INTERNAL_SERVER_ERROR) + } catch (e: Exception) { + log.error("Failed to retrieve snapshot management policy [$policyID]", e) + throw OpenSearchStatusException("Failed to retrieve Snapshot management policy.", RestStatus.NOT_FOUND) + } +} + +@Suppress("RethrowCaughtException", "ThrowsCount") +suspend fun Client.getSMMetadata(metadataID: String): SMMetadata { + try { + val getRequest = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, metadataID) + val getResponse: GetResponse = this.suspendUntil { get(getRequest, it) } + if (!getResponse.isExists || getResponse.isSourceEmpty) { + throw OpenSearchStatusException("Snapshot management metadata not found", RestStatus.NOT_FOUND) + } + return parseSMMetadata(getResponse) + } catch (e: OpenSearchStatusException) { + throw e + } catch (e: IndexNotFoundException) { + throw OpenSearchStatusException("Snapshot management config index not found", RestStatus.NOT_FOUND) + } catch (e: IllegalArgumentException) { + log.error("Failed to retrieve snapshot management metadata [$metadataID]", e) + throw OpenSearchStatusException("Snapshot management metadata could not be parsed", RestStatus.INTERNAL_SERVER_ERROR) + } catch (e: Exception) { + log.error("Failed to retrieve snapshot management metadata [$metadataID]", e) + throw OpenSearchStatusException("Failed to retrieve Snapshot management metadata.", RestStatus.NOT_FOUND) + } +} + +fun parseSMPolicy(response: GetResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): SMPolicy { + val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsBytesRef, XContentType.JSON) + return xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, SMPolicy.Companion::parse) +} + +fun parseSMMetadata(response: GetResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): SMMetadata { + val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsBytesRef, XContentType.JSON) + return xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, SMMetadata.Companion::parse) +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt index 7a3237b08..21926964f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/resthandler/RestSMPolicyHandler.kt @@ -6,16 +6,23 @@ package org.opensearch.indexmanagement.snapshotmanagement.api.resthandler import org.apache.logging.log4j.LogManager +import org.opensearch.action.support.WriteRequest import org.opensearch.client.node.NodeClient -import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.SM_BASE_URI +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.SM_POLICIES_URI import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_ACTION_TYPE import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_ACTION_TYPE import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMPolicyRequest import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMPolicyRequest import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyRequest -import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyResponse import org.opensearch.indexmanagement.snapshotmanagement.smPolicyNameToDocId +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy +import org.opensearch.indexmanagement.util.IF_PRIMARY_TERM +import org.opensearch.indexmanagement.util.IF_SEQ_NO +import org.opensearch.indexmanagement.util.REFRESH import org.opensearch.rest.BaseRestHandler import org.opensearch.rest.BytesRestResponse import org.opensearch.rest.RestHandler.Route @@ -24,8 +31,11 @@ import org.opensearch.rest.RestRequest.Method.POST import org.opensearch.rest.RestRequest.Method.PUT import org.opensearch.rest.RestRequest.Method.GET import org.opensearch.rest.RestRequest.Method.DELETE +import org.opensearch.rest.RestResponse import org.opensearch.rest.RestStatus +import org.opensearch.rest.action.RestResponseListener import org.opensearch.rest.action.RestToXContentListener +import java.time.Instant class RestSMPolicyHandler : BaseRestHandler() { @@ -37,10 +47,10 @@ class RestSMPolicyHandler : BaseRestHandler() { override fun routes(): List { return listOf( - Route(POST, "$SM_BASE_URI/{policyName}"), - Route(PUT, "$SM_BASE_URI/{policyName}"), - Route(GET, "$SM_BASE_URI/{policyName}"), - Route(DELETE, "$SM_BASE_URI/{policyName}"), + Route(POST, "$SM_POLICIES_URI/{policyName}"), + Route(PUT, "$SM_POLICIES_URI/{policyName}"), + Route(GET, "$SM_POLICIES_URI/{policyName}"), + Route(DELETE, "$SM_POLICIES_URI/{policyName}"), ) } @@ -70,7 +80,7 @@ class RestSMPolicyHandler : BaseRestHandler() { return RestChannelConsumer { client.execute( GET_SM_ACTION_TYPE, - GetSMPolicyRequest(policyName), + GetSMPolicyRequest(smPolicyNameToDocId(policyName)), RestToXContentListener(it) ) } @@ -83,17 +93,33 @@ class RestSMPolicyHandler : BaseRestHandler() { } // TODO validate policy name validateGeneratedSnapshotName - log.info("sm dev: receive request ${request.requiredContent().utf8ToString()}") - + val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO) + val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM) val xcp = request.contentParser() - val policy = SMPolicy.parse(xcp, id = smPolicyNameToDocId(policyName)) + val policy = SMPolicy.parse(xcp, id = smPolicyNameToDocId(policyName), seqNo = seqNo, primaryTerm = primaryTerm) + .copy(jobLastUpdateTime = Instant.now()) log.info("sm dev: policy parsed $policy") + val refreshPolicy = if (request.hasParam(REFRESH)) { + WriteRequest.RefreshPolicy.parse(request.param(REFRESH)) + } else { + WriteRequest.RefreshPolicy.IMMEDIATE + } + return RestChannelConsumer { client.execute( SMActions.INDEX_SM_ACTION_TYPE, - IndexSMPolicyRequest(policy, create), - RestToXContentListener(it) + IndexSMPolicyRequest(policy, create, refreshPolicy), + object : RestResponseListener(it) { + override fun buildResponse(response: IndexSMPolicyResponse): RestResponse { + val restResponse = BytesRestResponse(response.status, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)) + if (response.status == RestStatus.CREATED || response.status == RestStatus.OK) { + val location = "$SM_POLICIES_URI/${response.policy.policyName}" + restResponse.addHeader("Location", location) + } + return restResponse + } + } ) } } @@ -104,10 +130,16 @@ class RestSMPolicyHandler : BaseRestHandler() { throw IllegalArgumentException("Missing policy name") } + val refreshPolicy = if (request.hasParam(REFRESH)) { + WriteRequest.RefreshPolicy.parse(request.param(REFRESH)) + } else { + WriteRequest.RefreshPolicy.IMMEDIATE + } + return RestChannelConsumer { client.execute( DELETE_SM_ACTION_TYPE, - DeleteSMPolicyRequest(policyName), + DeleteSMPolicyRequest(smPolicyNameToDocId(policyName)).setRefreshPolicy(refreshPolicy), RestToXContentListener(it) ) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt index f0d0dcc9c..20ef6d2a8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/BaseTransportAction.kt @@ -20,6 +20,7 @@ import org.opensearch.common.io.stream.Writeable import org.opensearch.common.util.concurrent.ThreadContext.StoredContext import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT import org.opensearch.commons.authuser.User +import org.opensearch.index.engine.VersionConflictEngineException import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -51,13 +52,13 @@ abstract class BaseTransportAction listener.onResponse(executeRequest(request, user, threadContext)) } + } catch (ex: VersionConflictEngineException) { + listener.onFailure(ex) + } catch (ex: OpenSearchStatusException) { + listener.onFailure(ex) } catch (ex: Exception) { log.error("Uncaught exception:", ex) - listener.onFailure( - OpenSearchStatusException( - ex.message, RestStatus.INTERNAL_SERVER_ERROR - ) - ) + listener.onFailure(OpenSearchStatusException(ex.message, RestStatus.INTERNAL_SERVER_ERROR)) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt index 753acbf57..a4969b017 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/SMActions.kt @@ -6,12 +6,12 @@ package org.opensearch.indexmanagement.snapshotmanagement.api.transport import org.opensearch.action.ActionType +import org.opensearch.action.delete.DeleteResponse +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMPolicyAction import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMPolicyResponse import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.TransportGetSMPolicyAction import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyResponse import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMPolicyAction -import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMPolicyResponse -import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMPolicyAction object SMActions { /** @@ -30,5 +30,5 @@ object SMActions { * [TransportDeleteSMPolicyAction] */ const val DELETE_SM_ACTION_NAME = "cluster:admin/opensearch/snapshot_management/policy/delete" - val DELETE_SM_ACTION_TYPE = ActionType(DELETE_SM_ACTION_NAME, ::DeleteSMPolicyResponse) + val DELETE_SM_ACTION_TYPE = ActionType(DELETE_SM_ACTION_NAME, ::DeleteResponse) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyRequest.kt index 418eefb09..e594a7801 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyRequest.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyRequest.kt @@ -5,23 +5,23 @@ package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete -import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.delete.DeleteRequest import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput -class DeleteSMPolicyRequest( - val policyName: String -) : ActionRequest() { +class DeleteSMPolicyRequest : DeleteRequest { override fun validate(): ActionRequestValidationException? { return null } - constructor(sin: StreamInput) : this( - policyName = sin.readString() - ) + constructor(sin: StreamInput) : super(sin) + + constructor(id: String) { + super.id(id) + } override fun writeTo(out: StreamOutput) { - out.writeString(policyName) + super.writeTo(out) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyResponse.kt deleted file mode 100644 index 6a0ecd524..000000000 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/DeleteSMPolicyResponse.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete - -import org.opensearch.action.ActionResponse -import org.opensearch.common.io.stream.StreamInput -import org.opensearch.common.io.stream.StreamOutput -import org.opensearch.common.xcontent.ToXContent -import org.opensearch.common.xcontent.ToXContentObject -import org.opensearch.common.xcontent.XContentBuilder - -class DeleteSMPolicyResponse( - val status: String -) : ActionResponse(), ToXContentObject { - - constructor(sin: StreamInput) : this( - status = sin.readString() - ) - - override fun writeTo(out: StreamOutput) { - out.writeString(status) - } - - override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - return builder.startObject() - .field("delete", status) - .endObject() - } -} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt index 5091f8d9d..0dd733b5b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/delete/TransportDeleteSMPolicyAction.kt @@ -6,7 +6,6 @@ package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete import org.apache.logging.log4j.LogManager -import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.delete.DeleteResponse import org.opensearch.action.support.ActionFilters import org.opensearch.client.Client @@ -17,14 +16,14 @@ import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANA import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTransportAction import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_ACTION_NAME -import org.opensearch.indexmanagement.snapshotmanagement.smPolicyNameToDocId +import org.opensearch.indexmanagement.snapshotmanagement.getSMPolicy import org.opensearch.transport.TransportService class TransportDeleteSMPolicyAction @Inject constructor( client: Client, transportService: TransportService, actionFilters: ActionFilters, -) : BaseTransportAction( +) : BaseTransportAction( DELETE_SM_ACTION_NAME, transportService, client, actionFilters, ::DeleteSMPolicyRequest ) { @@ -34,9 +33,12 @@ class TransportDeleteSMPolicyAction @Inject constructor( request: DeleteSMPolicyRequest, user: User?, threadContext: ThreadContext.StoredContext - ): DeleteSMPolicyResponse { - val deleteReq = DeleteRequest(INDEX_MANAGEMENT_INDEX, smPolicyNameToDocId(request.policyName)) - val deleteRes: DeleteResponse = client.suspendUntil { delete(deleteReq, it) } - return DeleteSMPolicyResponse(deleteRes.status().toString()) + ): DeleteResponse { + val smPolicy = client.getSMPolicy(request.id()) + + // TODO sm verify permissions + + val deleteReq = request.index(INDEX_MANAGEMENT_INDEX) + return client.suspendUntil { delete(deleteReq, it) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyRequest.kt index cd70585c9..899705927 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyRequest.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyRequest.kt @@ -11,17 +11,17 @@ import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput class GetSMPolicyRequest( - val policyName: String + val policyID: String ) : ActionRequest() { override fun validate(): ActionRequestValidationException? { return null } constructor(sin: StreamInput) : this( - policyName = sin.readString(), + policyID = sin.readString(), ) override fun writeTo(out: StreamOutput) { - out.writeString(policyName) + out.writeString(policyID) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyResponse.kt index 02c80bcca..6efd03260 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyResponse.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/GetSMPolicyResponse.kt @@ -9,24 +9,46 @@ import org.opensearch.action.ActionResponse import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.xcontent.ToXContent -import org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS import org.opensearch.common.xcontent.ToXContentObject import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE_AND_USER import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy +import org.opensearch.indexmanagement.util._ID +import org.opensearch.indexmanagement.util._PRIMARY_TERM +import org.opensearch.indexmanagement.util._SEQ_NO +import org.opensearch.indexmanagement.util._VERSION class GetSMPolicyResponse( + val id: String, + val version: Long, + val seqNo: Long, + val primaryTerm: Long, val policy: SMPolicy ) : ActionResponse(), ToXContentObject { constructor(sin: StreamInput) : this( + id = sin.readString(), + version = sin.readLong(), + seqNo = sin.readLong(), + primaryTerm = sin.readLong(), policy = SMPolicy(sin) ) override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeLong(seqNo) + out.writeLong(primaryTerm) policy.writeTo(out) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - return policy.toXContent(builder, EMPTY_PARAMS) + builder.startObject() + .field(_ID, id) + .field(_VERSION, version) + .field(_SEQ_NO, seqNo) + .field(_PRIMARY_TERM, primaryTerm) + .field(SMPolicy.SM_TYPE, policy, XCONTENT_WITHOUT_TYPE_AND_USER) + return builder.endObject() } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt index 730f727f7..703d75525 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/get/TransportGetSMPolicyAction.kt @@ -6,6 +6,7 @@ package org.opensearch.indexmanagement.snapshotmanagement.api.transport.get import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters @@ -13,14 +14,13 @@ import org.opensearch.client.Client import org.opensearch.common.inject.Inject import org.opensearch.common.util.concurrent.ThreadContext import org.opensearch.commons.authuser.User -import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX -import org.opensearch.indexmanagement.opensearchapi.contentParser -import org.opensearch.indexmanagement.opensearchapi.parseWithType +import org.opensearch.index.IndexNotFoundException +import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTransportAction import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_ACTION_NAME -import org.opensearch.indexmanagement.snapshotmanagement.smPolicyNameToDocId -import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy +import org.opensearch.indexmanagement.snapshotmanagement.parseSMPolicy +import org.opensearch.rest.RestStatus import org.opensearch.transport.TransportService class TransportGetSMPolicyAction @Inject constructor( @@ -38,10 +38,24 @@ class TransportGetSMPolicyAction @Inject constructor( user: User?, threadContext: ThreadContext.StoredContext ): GetSMPolicyResponse { - val getReq = GetRequest(INDEX_MANAGEMENT_INDEX, smPolicyNameToDocId(request.policyName)) - val getRes: GetResponse = client.suspendUntil { get(getReq, it) } - val xcp = contentParser(getRes.sourceAsBytesRef) - val policy = xcp.parseWithType(getRes.id, getRes.seqNo, getRes.primaryTerm, SMPolicy.Companion::parse) - return GetSMPolicyResponse(policy) + val getRequest = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, request.policyID) + val getResponse: GetResponse = try { + client.suspendUntil { get(getRequest, it) } + } catch (e: IndexNotFoundException) { + throw OpenSearchStatusException("Snapshot management config index not found", RestStatus.NOT_FOUND) + } + if (!getResponse.isExists) { + throw OpenSearchStatusException("Snapshot management policy not found", RestStatus.NOT_FOUND) + } + val smPolicy = try { + parseSMPolicy(getResponse) + } catch (e: IllegalArgumentException) { + log.error("Error while parsing snapshot management policy ${request.policyID}", e) + throw OpenSearchStatusException("Snapshot management policy not found", RestStatus.INTERNAL_SERVER_ERROR) + } + // TODO SM security integration + + log.info("sm dev: Parsed SM policy: $smPolicy") + return GetSMPolicyResponse(getResponse.id, getResponse.version, getResponse.seqNo, getResponse.primaryTerm, smPolicy) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyRequest.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyRequest.kt index ad5b23341..255949592 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyRequest.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyRequest.kt @@ -5,27 +5,54 @@ package org.opensearch.indexmanagement.snapshotmanagement.api.transport.index -import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.ValidateActions +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.support.WriteRequest import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy -class IndexSMPolicyRequest( - val policy: SMPolicy, - val create: Boolean, -) : ActionRequest() { +class IndexSMPolicyRequest : IndexRequest { + + val policy: SMPolicy + + constructor( + policy: SMPolicy, + create: Boolean, + refreshPolicy: WriteRequest.RefreshPolicy + ) : super() { + this.policy = policy + this.create(create) + if (policy.seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO && policy.primaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { + this.setIfSeqNo(policy.seqNo).setIfPrimaryTerm(policy.primaryTerm) + } + this.refreshPolicy = refreshPolicy + } + override fun validate(): ActionRequestValidationException? { - return null + var validationException: ActionRequestValidationException? = null + val invalidSeqNumPrimaryTerm = this.ifSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || + this.ifPrimaryTerm() == SequenceNumbers.UNASSIGNED_PRIMARY_TERM + if (this.opType() != DocWriteRequest.OpType.CREATE && invalidSeqNumPrimaryTerm) { + validationException = ValidateActions.addValidationError(SEQ_NUM_PRIMARY_TERM_UPDATE_ERROR, validationException) + } + return validationException } - constructor(sin: StreamInput) : this( - policy = SMPolicy(sin), - create = sin.readBoolean(), - ) + constructor(sin: StreamInput) : super(sin) { + this.policy = SMPolicy(sin) + } override fun writeTo(out: StreamOutput) { + super.writeTo(out) policy.writeTo(out) - out.writeBoolean(create) + } + + companion object { + private const val SEQ_NUM_PRIMARY_TERM_UPDATE_ERROR = + "Sequence number and primary term must be provided when updating a snapshot management policy" } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyResponse.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyResponse.kt index c3c45cdd3..5663b064e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyResponse.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/IndexSMPolicyResponse.kt @@ -11,21 +11,49 @@ import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.ToXContentObject import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE_AND_USER import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy.Companion.SM_TYPE +import org.opensearch.indexmanagement.util._ID +import org.opensearch.indexmanagement.util._PRIMARY_TERM +import org.opensearch.indexmanagement.util._SEQ_NO +import org.opensearch.indexmanagement.util._VERSION +import org.opensearch.rest.RestStatus -class IndexSMPolicyResponse(val policy: SMPolicy) : ActionResponse(), ToXContentObject { +class IndexSMPolicyResponse( + val id: String, + val version: Long, + val seqNo: Long, + val primaryTerm: Long, + val policy: SMPolicy, + val status: RestStatus +) : ActionResponse(), ToXContentObject { constructor(sin: StreamInput) : this( - policy = SMPolicy(sin) + id = sin.readString(), + version = sin.readLong(), + seqNo = sin.readLong(), + primaryTerm = sin.readLong(), + policy = SMPolicy(sin), + status = sin.readEnum(RestStatus::class.java) ) override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeLong(seqNo) + out.writeLong(primaryTerm) policy.writeTo(out) + out.writeEnum(status) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { return builder.startObject() - .field(SMPolicy.SM_TYPE, policy) + .field(_ID, id) + .field(_VERSION, version) + .field(_SEQ_NO, seqNo) + .field(_PRIMARY_TERM, primaryTerm) + .field(SM_TYPE, policy, XCONTENT_WITHOUT_TYPE_AND_USER) .endObject() } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt index ab042379d..29b3d5035 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/api/transport/index/TransportIndexSMPolicyAction.kt @@ -6,7 +6,6 @@ package org.opensearch.indexmanagement.snapshotmanagement.api.transport.index import org.apache.logging.log4j.LogManager -import org.opensearch.action.index.IndexRequest import org.opensearch.action.index.IndexResponse import org.opensearch.action.support.ActionFilters import org.opensearch.client.Client @@ -15,6 +14,7 @@ import org.opensearch.common.util.concurrent.ThreadContext import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentFactory import org.opensearch.commons.authuser.User +import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.snapshotmanagement.api.transport.BaseTransportAction @@ -24,6 +24,7 @@ import org.opensearch.transport.TransportService class TransportIndexSMPolicyAction @Inject constructor( client: Client, transportService: TransportService, + val indexManagementIndices: IndexManagementIndices, actionFilters: ActionFilters, ) : BaseTransportAction( INDEX_SM_ACTION_NAME, transportService, client, actionFilters, ::IndexSMPolicyRequest @@ -36,12 +37,18 @@ class TransportIndexSMPolicyAction @Inject constructor( user: User?, threadContext: ThreadContext.StoredContext ): IndexSMPolicyResponse { + indexManagementIndices.checkAndUpdateIMConfigIndex(log) + return indexSMPolicy(request) + } + + private suspend fun indexSMPolicy(request: IndexSMPolicyRequest): IndexSMPolicyResponse { val policy = request.policy - val indexReq = IndexRequest(INDEX_MANAGEMENT_INDEX) + val indexReq = request.index(INDEX_MANAGEMENT_INDEX) .source(policy.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) .id(policy.id) - .create(request.create) val indexRes: IndexResponse = client.suspendUntil { index(indexReq, it) } - return IndexSMPolicyResponse(policy) + log.info("Index SM policy response: $indexRes") + + return IndexSMPolicyResponse(indexRes.id, indexRes.version, indexRes.seqNo, indexRes.primaryTerm, policy, indexRes.status()) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt index 59a551e8f..eab8bfeff 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMMetadata.kt @@ -71,7 +71,6 @@ data class SMMetadata( var policySeqNo: Long? = null var policyPrimaryTerm: Long? = null var currentState: SMState? = null - var atomic = false var creation: Creation? = null var deletion: Deletion? = null var info: InfoType = null diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt index 45ebdb415..3eb1ffca9 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/SMPolicy.kt @@ -16,10 +16,12 @@ import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indexmanagement.indexstatemanagement.util.WITH_TYPE import org.opensearch.indexmanagement.opensearchapi.instant import org.opensearch.indexmanagement.opensearchapi.nullValueHandler import org.opensearch.indexmanagement.opensearchapi.optionalField import org.opensearch.indexmanagement.opensearchapi.optionalTimeField +import org.opensearch.indexmanagement.snapshotmanagement.getSMMetadataDocId import org.opensearch.indexmanagement.snapshotmanagement.smDocIdToPolicyName import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionTimeout import org.opensearch.indexmanagement.util.NO_ID @@ -42,7 +44,7 @@ data class SMPolicy( val snapshotConfig: Map, val jobEnabled: Boolean, val jobLastUpdateTime: Instant, - val jobEnabledTime: Instant, + val jobEnabledTime: Instant?, val jobSchedule: Schedule, val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, @@ -54,8 +56,14 @@ data class SMPolicy( // TODO SM validate date_format is of right format } + // This name is used by the job scheduler and needs to match the id to avoid namespace conflicts with ISM policies sharing the same name override fun getName() = id + // This is the name which the user provided when creating the policy, and should be used when outputting to the user in REST responses + val policyName get() = smDocIdToPolicyName(id) + + val metadataID get() = getSMMetadataDocId(smDocIdToPolicyName(id)) + override fun getLastUpdateTime() = jobLastUpdateTime override fun getEnabledTime() = jobEnabledTime @@ -65,10 +73,9 @@ data class SMPolicy( override fun isEnabled() = jobEnabled override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - return builder - .startObject() - .startObject(SM_TYPE) - .field(NAME_FIELD, smDocIdToPolicyName(id)) // for searching policy by name + builder.startObject() + if (params.paramAsBoolean(WITH_TYPE, true)) builder.startObject(SM_TYPE) + builder.field(NAME_FIELD, smDocIdToPolicyName(id)) // for searching policy by name .optionalField(DESCRIPTION_FIELD, description) .field(CREATION_FIELD, creation) .field(DELETION_FIELD, deletion) @@ -77,12 +84,14 @@ data class SMPolicy( .field(ENABLED_FIELD, jobEnabled) .optionalTimeField(LAST_UPDATED_TIME_FIELD, jobLastUpdateTime) .optionalTimeField(ENABLED_TIME_FIELD, jobEnabledTime) - .endObject() - .endObject() + if (params.paramAsBoolean(WITH_TYPE, true)) builder.endObject() + return builder.endObject() } companion object { const val SM_TYPE = "sm" + const val SM_DOC_ID_SUFFIX = "-sm-policy" + const val SM_METADATA_ID_SUFFIX = "-sm-metadata" const val NAME_FIELD = "name" const val DESCRIPTION_FIELD = "description" const val CREATION_FIELD = "creation" @@ -150,7 +159,7 @@ data class SMPolicy( deletion = requireNotNull(deletion) { "deletion field must not be null" }, snapshotConfig = requireNotNull(snapshotConfig) { "snapshot_config field must not be null" }, jobLastUpdateTime = requireNotNull(lastUpdatedTime) { "last_updated_at field must not be null" }, - jobEnabledTime = requireNotNull(enabledTime) { "job_enabled_time field must not be null" }, + jobEnabledTime = enabledTime, jobSchedule = schedule, jobEnabled = enabled, id = id, @@ -166,7 +175,7 @@ data class SMPolicy( deletion = Deletion(sin), snapshotConfig = sin.readMap() as Map, jobLastUpdateTime = sin.readInstant(), - jobEnabledTime = sin.readInstant(), + jobEnabledTime = sin.readOptionalInstant(), jobSchedule = IntervalSchedule(sin), jobEnabled = sin.readBoolean(), id = sin.readString(), @@ -180,8 +189,8 @@ data class SMPolicy( deletion.writeTo(out) out.writeMap(snapshotConfig) out.writeInstant(jobLastUpdateTime) - out.writeInstant(jobEnabledTime) - out.writeOptionalWriteable(jobSchedule) + out.writeOptionalInstant(jobEnabledTime) + jobSchedule.writeTo(out) out.writeBoolean(jobEnabled) out.writeString(id) out.writeLong(seqNo) @@ -322,11 +331,13 @@ data class SMPolicy( companion object { const val MAX_COUNT_FIELD = "max_count" + private const val DEFAULT_MAX_COUNT = 50 const val MAX_AGE_FIELD = "max_age" const val MIN_COUNT_FIELD = "min_count" + private const val DEFAULT_MIN_COUNT = 5 fun parse(xcp: XContentParser): DeleteCondition { - var maxCount = 50 + var maxCount = DEFAULT_MAX_COUNT var maxAge: TimeValue? = null var minCount: Int? = null @@ -343,7 +354,7 @@ data class SMPolicy( } if (maxAge != null && minCount == null) { - minCount = minOf(5, maxCount) + minCount = minOf(DEFAULT_MIN_COUNT, maxCount) } return DeleteCondition( diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt new file mode 100644 index 000000000..b2fa76463 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement + +import org.apache.http.HttpEntity +import org.apache.http.HttpHeaders +import org.apache.http.entity.ContentType.APPLICATION_JSON +import org.apache.http.entity.StringEntity +import org.apache.http.message.BasicHeader +import org.opensearch.client.Response +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.common.xcontent.XContentType +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.IndexManagementRestTestCase +import org.opensearch.indexmanagement.makeRequest +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy +import org.opensearch.indexmanagement.util._ID +import org.opensearch.indexmanagement.util._PRIMARY_TERM +import org.opensearch.indexmanagement.util._SEQ_NO +import org.opensearch.rest.RestStatus +import java.io.InputStream + +abstract class SnapshotManagementRestTestCase : IndexManagementRestTestCase() { + + protected fun createSMPolicy( + smPolicy: SMPolicy, + refresh: Boolean = true, + ): SMPolicy { + val response = createSMPolicyJson(smPolicy.toJsonString(), smPolicy.policyName, refresh) + return parseSMPolicy(response.entity.content) + } + + protected fun createSMPolicyJson( + smPolicyString: String, + smPolicyName: String, + refresh: Boolean = true, + ): Response { + val response = client() + .makeRequest( + "POST", + "${IndexManagementPlugin.SM_POLICIES_URI}/$smPolicyName?refresh=$refresh", + emptyMap(), + StringEntity(smPolicyString, APPLICATION_JSON) + ) + assertEquals("Unable to create a new snapshot management policy", RestStatus.CREATED, response.restStatus()) + return response + } + + protected fun getSMPolicy( + smPolicyName: String, + header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"), + ): SMPolicy { + val response = client().makeRequest("GET", "${IndexManagementPlugin.SM_POLICIES_URI}/$smPolicyName", null, header) + assertEquals("Unable to get snapshot management policy $smPolicyName", RestStatus.OK, response.restStatus()) + return parseSMPolicy(response.entity.content) + } + + protected fun parseSMPolicy(inputStream: InputStream): SMPolicy { + val parser = createParser(XContentType.JSON.xContent(), inputStream) + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser) + + lateinit var id: String + var primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + var seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO + lateinit var smPolicy: SMPolicy + + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + parser.nextToken() + + when (parser.currentName()) { + _ID -> id = parser.text() + _SEQ_NO -> seqNo = parser.longValue() + _PRIMARY_TERM -> primaryTerm = parser.longValue() + SMPolicy.SM_TYPE -> smPolicy = SMPolicy.parse(parser, id, seqNo, primaryTerm) + } + } + return smPolicy + } + + protected fun SMPolicy.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), APPLICATION_JSON) +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/action/ActionTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/action/ActionTests.kt new file mode 100644 index 000000000..689663291 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/action/ActionTests.kt @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.action + +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_ACTION_NAME +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_ACTION_TYPE +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_ACTION_NAME +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_ACTION_TYPE +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.INDEX_SM_ACTION_NAME +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.INDEX_SM_ACTION_TYPE +import org.opensearch.test.OpenSearchTestCase + +class ActionTests : OpenSearchTestCase() { + + fun `test delete action name`() { + assertNotNull(DELETE_SM_ACTION_TYPE.name()) + assertEquals(DELETE_SM_ACTION_TYPE.name(), DELETE_SM_ACTION_NAME) + } + + fun `test index action name`() { + assertNotNull(INDEX_SM_ACTION_TYPE.name()) + assertEquals(INDEX_SM_ACTION_TYPE.name(), INDEX_SM_ACTION_NAME) + } + + fun `test get action name`() { + assertNotNull(GET_SM_ACTION_TYPE.name()) + assertEquals(GET_SM_ACTION_TYPE.name(), GET_SM_ACTION_NAME) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/action/RequestTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/action/RequestTests.kt new file mode 100644 index 000000000..3d11be98d --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/action/RequestTests.kt @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.action + +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.support.WriteRequest +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMPolicyRequest +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMPolicyRequest +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyRequest +import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy +import org.opensearch.test.OpenSearchTestCase + +class RequestTests : OpenSearchTestCase() { + + fun `test delete sm policy request`() { + val id = "some_id" + val req = DeleteSMPolicyRequest(id).index(INDEX_MANAGEMENT_INDEX) + + val out = BytesStreamOutput().apply { req.writeTo(this) } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val streamedReq = DeleteSMPolicyRequest(sin) + assertEquals(id, streamedReq.id()) + } + + fun `test get sm policy request`() { + val id = "some_id" + val req = GetSMPolicyRequest(id) + + val out = BytesStreamOutput().apply { req.writeTo(this) } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val streamedReq = GetSMPolicyRequest(sin) + assertEquals(id, streamedReq.policyID) + } + + fun `test index sm policy put request`() { + val smPolicy = randomSMPolicy().copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + val req = IndexSMPolicyRequest(policy = smPolicy, false, WriteRequest.RefreshPolicy.IMMEDIATE).index(INDEX_MANAGEMENT_INDEX) + + val out = BytesStreamOutput().apply { req.writeTo(this) } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val streamedReq = IndexSMPolicyRequest(sin) + assertEquals(smPolicy, streamedReq.policy) + assertEquals(smPolicy.seqNo, streamedReq.ifSeqNo()) + assertEquals(smPolicy.primaryTerm, streamedReq.ifPrimaryTerm()) + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, streamedReq.refreshPolicy) + assertEquals(DocWriteRequest.OpType.INDEX, streamedReq.opType()) + } + + fun `test index sm policy post request`() { + val smPolicy = randomSMPolicy().copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + val req = IndexSMPolicyRequest(policy = smPolicy, true, WriteRequest.RefreshPolicy.IMMEDIATE).index(INDEX_MANAGEMENT_INDEX) + + val out = BytesStreamOutput().apply { req.writeTo(this) } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val streamedReq = IndexSMPolicyRequest(sin) + assertEquals(smPolicy, streamedReq.policy) + assertEquals(smPolicy.seqNo, streamedReq.ifSeqNo()) + assertEquals(smPolicy.primaryTerm, streamedReq.ifPrimaryTerm()) + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, streamedReq.refreshPolicy) + assertEquals(DocWriteRequest.OpType.CREATE, streamedReq.opType()) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/action/ResponseTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/action/ResponseTests.kt new file mode 100644 index 000000000..4f8d47d0a --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/action/ResponseTests.kt @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.action + +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMPolicyResponse +import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyResponse +import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy +import org.opensearch.rest.RestStatus +import org.opensearch.test.OpenSearchTestCase + +class ResponseTests : OpenSearchTestCase() { + + fun `test index sm policy response`() { + val smPolicy = randomSMPolicy() + val res = IndexSMPolicyResponse("someid", 1L, 2L, 3L, smPolicy, RestStatus.OK) + val out = BytesStreamOutput().apply { res.writeTo(this) } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val streamedRes = IndexSMPolicyResponse(sin) + assertEquals("someid", streamedRes.id) + assertEquals(1L, streamedRes.version) + assertEquals(2L, streamedRes.seqNo) + assertEquals(3L, streamedRes.primaryTerm) + assertEquals(RestStatus.OK, streamedRes.status) + assertEquals(smPolicy, streamedRes.policy) + } + + fun `test get sm policy response`() { + val smPolicy = randomSMPolicy() + val res = GetSMPolicyResponse("someid", 1L, 2L, 3L, smPolicy) + val out = BytesStreamOutput().apply { res.writeTo(this) } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val streamedRes = GetSMPolicyResponse(sin) + assertEquals("someid", streamedRes.id) + assertEquals(1L, streamedRes.version) + assertEquals(2L, streamedRes.seqNo) + assertEquals(3L, streamedRes.primaryTerm) + assertEquals(smPolicy, streamedRes.policy) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/WriteableTests.kt new file mode 100644 index 000000000..a130579e5 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/WriteableTests.kt @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.model + +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy +import org.opensearch.test.OpenSearchTestCase + +class WriteableTests : OpenSearchTestCase() { + + fun `test sm policy as stream`() { + val smPolicy = randomSMPolicy() + val out = BytesStreamOutput().also { smPolicy.writeTo(it) } + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val streamedSMPolicy = SMPolicy(sin) + assertEquals("Round tripping sm policy stream doesn't work", smPolicy, streamedSMPolicy) + } + + // TODO SM add tests for sm metadata once SM State enum is filled out +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/XContentTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/XContentTests.kt new file mode 100644 index 000000000..003d343be --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/model/XContentTests.kt @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.model + +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentType +import org.opensearch.indexmanagement.opensearchapi.parseWithType +import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy +import org.opensearch.indexmanagement.snapshotmanagement.toJsonString +import org.opensearch.test.OpenSearchTestCase + +class XContentTests : OpenSearchTestCase() { + + fun `test sm policy parsing`() { + val smPolicy = randomSMPolicy() + val smPolicyString = smPolicy.toJsonString() + val parser = parserWithType(smPolicyString) + val parsedSMPolicy = parser.parseWithType(smPolicy.id, smPolicy.seqNo, smPolicy.primaryTerm, SMPolicy.Companion::parse) + assertEquals("Round tripping sm policy with type doesn't work", smPolicy, parsedSMPolicy) + } + + fun `test sm policy parsing without type`() { + val smPolicy = randomSMPolicy() + val smPolicyString = smPolicy.toJsonString() + val parsedSMPolicy = SMPolicy.parse(parser(smPolicyString), smPolicy.id, smPolicy.seqNo, smPolicy.primaryTerm) + assertEquals("Round tripping sm policy without type doesn't work", smPolicy, parsedSMPolicy) + } + + // TODO SM add tests for sm metadata once SM State enum is filled out + + private fun parser(xc: String): XContentParser { + val parser = XContentType.JSON.xContent().createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, xc) + parser.nextToken() + return parser + } + + private fun parserWithType(xc: String): XContentParser { + return XContentType.JSON.xContent().createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, xc) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestDeleteSnapshotManagementIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestDeleteSnapshotManagementIT.kt new file mode 100644 index 000000000..7bffd87a5 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestDeleteSnapshotManagementIT.kt @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.resthandler + +import org.opensearch.client.ResponseException +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.makeRequest +import org.opensearch.indexmanagement.snapshotmanagement.SnapshotManagementRestTestCase +import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy +import org.opensearch.rest.RestStatus + +class RestDeleteSnapshotManagementIT : SnapshotManagementRestTestCase() { + + fun `test deleting a snapshot management policy`() { + val smPolicy = createSMPolicy(randomSMPolicy()) + val deleteResponse = client().makeRequest("DELETE", "${IndexManagementPlugin.SM_POLICIES_URI}/${smPolicy.policyName}?refresh=true") + assertEquals("Delete failed", RestStatus.OK, deleteResponse.restStatus()) + + try { + client().makeRequest("GET", "${IndexManagementPlugin.SM_POLICIES_URI}/${smPolicy.policyName}") + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + } + } + + fun `test deleting a snapshot management policy that doesn't exist in existing config index`() { + try { + createSMPolicy(randomSMPolicy()) + client().makeRequest("DELETE", "${IndexManagementPlugin.SM_POLICIES_URI}/nonexistent_policy") + fail("expected 404 ResponseException") + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + } + } + + fun `test deleting a snapshot management policy that doesn't exist and config index doesnt exist`() { + try { + deleteIndex(INDEX_MANAGEMENT_INDEX) + client().makeRequest("DELETE", "${IndexManagementPlugin.SM_POLICIES_URI}/nonexistent_policy") + fail("expected 404 ResponseException") + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestGetSnapshotManagementIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestGetSnapshotManagementIT.kt new file mode 100644 index 000000000..53193980a --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestGetSnapshotManagementIT.kt @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.resthandler + +import org.opensearch.client.ResponseException +import org.opensearch.indexmanagement.IndexManagementPlugin +import org.opensearch.indexmanagement.snapshotmanagement.SnapshotManagementRestTestCase +import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy +import org.opensearch.rest.RestStatus + +class RestGetSnapshotManagementIT : SnapshotManagementRestTestCase() { + + fun `test getting a snapshot management policy`() { + var smPolicy = createSMPolicy(randomSMPolicy().copy(jobEnabled = false, jobEnabledTime = null)) + val indexedSMPolicy = getSMPolicy(smPolicy.policyName) + // Schema version and last updated time are updated during the creation so we need to update the original too for comparison + // Job schedule interval will have a dynamic start time + smPolicy = smPolicy.copy( + id = indexedSMPolicy.id, + seqNo = indexedSMPolicy.seqNo, + primaryTerm = indexedSMPolicy.primaryTerm, + jobLastUpdateTime = indexedSMPolicy.jobLastUpdateTime, + jobSchedule = indexedSMPolicy.jobSchedule + ) + assertEquals("Indexed and retrieved snapshot management policies differ", smPolicy, indexedSMPolicy) + } + + @Throws(Exception::class) + fun `test getting a snapshot management policy that doesn't exist`() { + try { + getSMPolicy(randomAlphaOfLength(20)) + fail("expected response exception") + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + } + } + + @Throws(Exception::class) + fun `test getting a snapshot management policy that doesn't exist and config index doesnt exist`() { + try { + deleteIndex(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) + getSMPolicy(randomAlphaOfLength(20)) + fail("expected response exception") + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestIndexSnapshotManagementIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestIndexSnapshotManagementIT.kt new file mode 100644 index 000000000..868585598 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/resthandler/RestIndexSnapshotManagementIT.kt @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.snapshotmanagement.resthandler + +import org.opensearch.client.ResponseException +import org.opensearch.common.xcontent.XContentType +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.SM_POLICIES_URI +import org.opensearch.indexmanagement.makeRequest +import org.opensearch.indexmanagement.opensearchapi.convertToMap +import org.opensearch.indexmanagement.snapshotmanagement.SnapshotManagementRestTestCase +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy +import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy.Companion.SM_TYPE +import org.opensearch.indexmanagement.snapshotmanagement.randomSMPolicy +import org.opensearch.indexmanagement.util.NO_ID +import org.opensearch.indexmanagement.util._ID +import org.opensearch.indexmanagement.util._SEQ_NO +import org.opensearch.rest.RestStatus +import java.time.Instant + +class RestIndexSnapshotManagementIT : SnapshotManagementRestTestCase() { + + @Suppress("UNCHECKED_CAST") + fun `test creating a snapshot management policy`() { + var smPolicy = randomSMPolicy() + val response = client().makeRequest("POST", "$SM_POLICIES_URI/${smPolicy.policyName}", emptyMap(), smPolicy.toHttpEntity()) + assertEquals("Create SM policy failed", RestStatus.CREATED, response.restStatus()) + val responseBody = response.asMap() + val createdId = responseBody["_id"] as String + assertNotEquals("Response is missing Id", NO_ID, createdId) + assertEquals("Not same id", smPolicy.id, createdId) + assertEquals("Incorrect Location header", "$SM_POLICIES_URI/${smPolicy.policyName}", response.getHeader("Location")) + val responseSMPolicy = responseBody[SM_TYPE] as Map + smPolicy = smPolicy.copy(jobLastUpdateTime = Instant.ofEpochMilli(responseSMPolicy[SMPolicy.LAST_UPDATED_TIME_FIELD] as Long)) + assertEquals("Created and returned snapshot management policies differ", smPolicy.convertToMap()[SM_TYPE], responseSMPolicy) + } + + fun `test updating a snapshot management policy with correct seq_no and primary_term`() { + val smPolicy = createSMPolicy(randomSMPolicy()) + val updateResponse = client().makeRequest( + "PUT", + "$SM_POLICIES_URI/${smPolicy.policyName}?refresh=true&if_seq_no=${smPolicy.seqNo}&if_primary_term=${smPolicy.primaryTerm}", + emptyMap(), smPolicy.toHttpEntity() + ) + + assertEquals("Update snapshot management policy failed", RestStatus.OK, updateResponse.restStatus()) + val responseBody = updateResponse.asMap() + val updatedId = responseBody[_ID] as String + val updatedSeqNo = (responseBody[_SEQ_NO] as Int).toLong() + assertNotEquals("response is missing Id", NO_ID, updatedId) + assertEquals("not same id", smPolicy.id, updatedId) + assertTrue("incorrect seqNo", smPolicy.seqNo < updatedSeqNo) + assertEquals("Incorrect Location header", "$SM_POLICIES_URI/${smPolicy.policyName}", updateResponse.getHeader("Location")) + } + + fun `test updating a snapshot management policy with incorrect seq_no and primary_term`() { + val smPolicy = createSMPolicy(randomSMPolicy()) + try { + client().makeRequest( + "PUT", + "$SM_POLICIES_URI/${smPolicy.policyName}?refresh=true&if_seq_no=10251989&if_primary_term=2342", + emptyMap(), smPolicy.toHttpEntity() + ) + fail("expected 409 ResponseException") + } catch (e: ResponseException) { + assertEquals(RestStatus.CONFLICT, e.response.restStatus()) + } + try { + client().makeRequest( + "PUT", + "$SM_POLICIES_URI/${smPolicy.policyName}?refresh=true", + emptyMap(), smPolicy.toHttpEntity() + ) + fail("expected exception") + } catch (e: ResponseException) { + assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus()) + } + } + + fun `test updating a nonexistent snapshot management policy`() { + val smPolicy = randomSMPolicy() + try { + client().makeRequest( + "PUT", + "$SM_POLICIES_URI/${smPolicy.policyName}?refresh=true&if_seq_no=10251989&if_primary_term=2342", + emptyMap(), smPolicy.toHttpEntity() + ) + fail("expected exception") + } catch (e: ResponseException) { + assertEquals(RestStatus.CONFLICT, e.response.restStatus()) + } + } + + @Throws(Exception::class) + fun `test creating sm policy with no name fails`() { + try { + val smPolicy = randomSMPolicy() + client().makeRequest("POST", SM_POLICIES_URI, emptyMap(), smPolicy.toHttpEntity()) + fail("Expected 400 Method BAD_REQUEST response") + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + } + } + + @Throws(Exception::class) + fun `test creating sm policy with PUT fails`() { + try { + val smPolicy = randomSMPolicy() + client().makeRequest("PUT", SM_POLICIES_URI, emptyMap(), smPolicy.toHttpEntity()) + fail("Expected 400 Method BAD_REQUEST response") + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + } + } + + @Throws(Exception::class) + @Suppress("UNCHECKED_CAST") + fun `test mappings after sm policy creation`() { + deleteIndex(INDEX_MANAGEMENT_INDEX) + createSMPolicy(randomSMPolicy()) + + val response = client().makeRequest("GET", "/$INDEX_MANAGEMENT_INDEX/_mapping") + val parserMap = createParser(XContentType.JSON.xContent(), response.entity.content).map() as Map> + val mappingsMap = parserMap[INDEX_MANAGEMENT_INDEX]!!["mappings"] as Map + val expected = createParser( + XContentType.JSON.xContent(), + javaClass.classLoader.getResource("mappings/opendistro-ism-config.json").readText() + ) + val expectedMap = expected.map() + + assertEquals("Mappings are different", expectedMap, mappingsMap) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt index 635038f71..c27788900 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/utils.kt @@ -13,6 +13,9 @@ import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse import org.opensearch.action.index.IndexResponse import org.opensearch.common.UUIDs import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.indexmanagement.opensearchapi.string import org.opensearch.indexmanagement.randomCronSchedule import org.opensearch.indexmanagement.randomInstant import org.opensearch.indexmanagement.randomIntervalSchedule @@ -25,7 +28,9 @@ import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.rest.RestStatus import org.opensearch.snapshots.SnapshotId import org.opensearch.snapshots.SnapshotInfo +import org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength import org.opensearch.test.OpenSearchTestCase.randomNonNegativeLong +import org.opensearch.test.rest.OpenSearchRestTestCase import java.time.Instant import java.time.Instant.now @@ -34,8 +39,7 @@ fun randomSMMetadata( nextCreationTime: Instant = now(), nextDeletionTime: Instant = now(), policySeqNo: Long = randomNonNegativeLong(), - policyPrimaryTerm: Long = randomNonNegativeLong(), - atomic: Boolean = false, + policyPrimaryTerm: Long = randomNonNegativeLong() ): SMMetadata { return SMMetadata( policySeqNo = policySeqNo, @@ -55,7 +59,8 @@ fun randomSMMetadata( } fun randomSMPolicy( - jobEnabled: Boolean = false, + policyName: String = randomAlphaOfLength(10), + jobEnabled: Boolean = OpenSearchRestTestCase.randomBoolean(), jobLastUpdateTime: Instant = randomInstant(), creationSchedule: CronSchedule = randomCronSchedule(), creationTimeout: ActionTimeout? = null, @@ -68,10 +73,11 @@ fun randomSMPolicy( "repository" to "repo", "date_format" to "yyyy-MM-dd-HH:mm" ), - jobEnabledTime: Instant = randomInstant(), + jobEnabledTime: Instant? = randomInstant(), jobSchedule: IntervalSchedule = randomIntervalSchedule() ): SMPolicy { return SMPolicy( + id = smPolicyNameToDocId(policyName), jobEnabled = jobEnabled, jobLastUpdateTime = jobLastUpdateTime, creation = SMPolicy.Creation( @@ -88,11 +94,13 @@ fun randomSMPolicy( ) ), snapshotConfig = snapshotConfig, - jobEnabledTime = jobEnabledTime, + jobEnabledTime = if (jobEnabled) jobEnabledTime else null, jobSchedule = jobSchedule ) } +fun SMPolicy.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String = this.toXContent(XContentFactory.jsonBuilder(), params).string() + fun mockIndexResponse(status: RestStatus = RestStatus.OK): IndexResponse { val indexResponse: IndexResponse = mock() whenever(indexResponse.status()).doReturn(status)