Skip to content

Commit

Permalink
Enhances index, get, and delete snapshot management apis (#369)
Browse files Browse the repository at this point in the history
* Adds tests and fills out index, get, delete apis

Signed-off-by: Clay Downs <[email protected]>

* Fixes detekt issues

Signed-off-by: Clay Downs <[email protected]>

* Refactors based on comments

Signed-off-by: Clay Downs <[email protected]>

* Renames getSMPolicyName method

Signed-off-by: Clay Downs <[email protected]>

* Refactors SMUtils

Signed-off-by: Clay Downs <[email protected]>
  • Loading branch information
downsrob authored May 24, 2022
1 parent 7d6fd16 commit 9d2b61a
Show file tree
Hide file tree
Showing 27 changed files with 872 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package org.opensearch.indexmanagement

import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.opensearch.OpenSearchStatusException
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.alias.Alias
Expand All @@ -27,6 +29,10 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.indexmanagement.util.OpenForTesting
import org.opensearch.rest.RestStatus
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

@OpenForTesting
class IndexManagementIndices(
Expand Down Expand Up @@ -71,6 +77,23 @@ class IndexManagementIndices(
}
}

suspend fun checkAndUpdateIMConfigIndex(logger: Logger) {
val response: AcknowledgedResponse = suspendCoroutine { cont ->
checkAndUpdateIMConfigIndex(
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) = cont.resume(response)
override fun onFailure(e: Exception) = cont.resumeWithException(e)
}
)
}
if (response.isAcknowledged) {
logger.info("Successfully created or updated $INDEX_MANAGEMENT_INDEX with newest mappings.")
} else {
logger.error("Unable to create or update $INDEX_MANAGEMENT_INDEX with newest mapping.")
throw OpenSearchStatusException("Unable to create or update $INDEX_MANAGEMENT_INDEX with newest mapping.", RestStatus.INTERNAL_SERVER_ERROR)
}
}

fun indexManagementIndexExists(): Boolean = clusterService.state().routingTable.hasIndex(INDEX_MANAGEMENT_INDEX)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data"

const val SM_BASE_URI = "$PLUGINS_BASE_URI/_sm"
const val SM_POLICIES_URI = "$SM_BASE_URI/policies"

const val OLD_PLUGIN_NAME = "opendistro-im"
const val OPEN_DISTRO_BASE_URI = "/_opendistro"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,80 @@
package org.opensearch.indexmanagement.snapshotmanagement

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.client.Client
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy.Companion.SM_DOC_ID_SUFFIX
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy.Companion.SM_METADATA_ID_SUFFIX
import org.opensearch.rest.RestStatus

private val log = LogManager.getLogger("Snapshot Management Helper")

const val smSuffix = "-sm"
fun smPolicyNameToDocId(policyName: String) = "$policyName$smSuffix"
fun smDocIdToPolicyName(id: String) = id.substringBeforeLast(smSuffix)
fun getSMMetadataDocId(policyName: String) = "$policyName-sm-metadata"
fun smPolicyNameToDocId(policyName: String) = "$policyName$SM_DOC_ID_SUFFIX"
fun smDocIdToPolicyName(id: String) = id.substringBeforeLast(SM_DOC_ID_SUFFIX)
fun getSMMetadataDocId(policyName: String) = "$policyName$SM_METADATA_ID_SUFFIX"

@Suppress("RethrowCaughtException", "ThrowsCount")
suspend fun Client.getSMPolicy(policyID: String): SMPolicy {
try {
val getRequest = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, policyID)
val getResponse: GetResponse = this.suspendUntil { get(getRequest, it) }
if (!getResponse.isExists || getResponse.isSourceEmpty) {
throw OpenSearchStatusException("Snapshot management policy not found", RestStatus.NOT_FOUND)
}
return parseSMPolicy(getResponse)
} catch (e: OpenSearchStatusException) {
throw e
} catch (e: IndexNotFoundException) {
throw OpenSearchStatusException("Snapshot management config index not found", RestStatus.NOT_FOUND)
} catch (e: IllegalArgumentException) {
log.error("Failed to retrieve snapshot management policy [$policyID]", e)
throw OpenSearchStatusException("Snapshot management policy could not be parsed", RestStatus.INTERNAL_SERVER_ERROR)
} catch (e: Exception) {
log.error("Failed to retrieve snapshot management policy [$policyID]", e)
throw OpenSearchStatusException("Failed to retrieve Snapshot management policy.", RestStatus.NOT_FOUND)
}
}

@Suppress("RethrowCaughtException", "ThrowsCount")
suspend fun Client.getSMMetadata(metadataID: String): SMMetadata {
try {
val getRequest = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, metadataID)
val getResponse: GetResponse = this.suspendUntil { get(getRequest, it) }
if (!getResponse.isExists || getResponse.isSourceEmpty) {
throw OpenSearchStatusException("Snapshot management metadata not found", RestStatus.NOT_FOUND)
}
return parseSMMetadata(getResponse)
} catch (e: OpenSearchStatusException) {
throw e
} catch (e: IndexNotFoundException) {
throw OpenSearchStatusException("Snapshot management config index not found", RestStatus.NOT_FOUND)
} catch (e: IllegalArgumentException) {
log.error("Failed to retrieve snapshot management metadata [$metadataID]", e)
throw OpenSearchStatusException("Snapshot management metadata could not be parsed", RestStatus.INTERNAL_SERVER_ERROR)
} catch (e: Exception) {
log.error("Failed to retrieve snapshot management metadata [$metadataID]", e)
throw OpenSearchStatusException("Failed to retrieve Snapshot management metadata.", RestStatus.NOT_FOUND)
}
}

fun parseSMPolicy(response: GetResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): SMPolicy {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsBytesRef, XContentType.JSON)
return xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, SMPolicy.Companion::parse)
}

fun parseSMMetadata(response: GetResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): SMMetadata {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsBytesRef, XContentType.JSON)
return xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, SMMetadata.Companion::parse)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@
package org.opensearch.indexmanagement.snapshotmanagement.api.resthandler

import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.WriteRequest
import org.opensearch.client.node.NodeClient
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.SM_BASE_URI
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.SM_POLICIES_URI
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_ACTION_TYPE
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_ACTION_TYPE
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMPolicyRequest
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMPolicyRequest
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyRequest
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyResponse
import org.opensearch.indexmanagement.snapshotmanagement.smPolicyNameToDocId
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.util.IF_PRIMARY_TERM
import org.opensearch.indexmanagement.util.IF_SEQ_NO
import org.opensearch.indexmanagement.util.REFRESH
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BytesRestResponse
import org.opensearch.rest.RestHandler.Route
Expand All @@ -24,8 +31,11 @@ import org.opensearch.rest.RestRequest.Method.POST
import org.opensearch.rest.RestRequest.Method.PUT
import org.opensearch.rest.RestRequest.Method.GET
import org.opensearch.rest.RestRequest.Method.DELETE
import org.opensearch.rest.RestResponse
import org.opensearch.rest.RestStatus
import org.opensearch.rest.action.RestResponseListener
import org.opensearch.rest.action.RestToXContentListener
import java.time.Instant

class RestSMPolicyHandler : BaseRestHandler() {

Expand All @@ -37,10 +47,10 @@ class RestSMPolicyHandler : BaseRestHandler() {

override fun routes(): List<Route> {
return listOf(
Route(POST, "$SM_BASE_URI/{policyName}"),
Route(PUT, "$SM_BASE_URI/{policyName}"),
Route(GET, "$SM_BASE_URI/{policyName}"),
Route(DELETE, "$SM_BASE_URI/{policyName}"),
Route(POST, "$SM_POLICIES_URI/{policyName}"),
Route(PUT, "$SM_POLICIES_URI/{policyName}"),
Route(GET, "$SM_POLICIES_URI/{policyName}"),
Route(DELETE, "$SM_POLICIES_URI/{policyName}"),
)
}

Expand Down Expand Up @@ -70,7 +80,7 @@ class RestSMPolicyHandler : BaseRestHandler() {
return RestChannelConsumer {
client.execute(
GET_SM_ACTION_TYPE,
GetSMPolicyRequest(policyName),
GetSMPolicyRequest(smPolicyNameToDocId(policyName)),
RestToXContentListener(it)
)
}
Expand All @@ -83,17 +93,33 @@ class RestSMPolicyHandler : BaseRestHandler() {
}
// TODO validate policy name validateGeneratedSnapshotName

log.info("sm dev: receive request ${request.requiredContent().utf8ToString()}")

val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO)
val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
val xcp = request.contentParser()
val policy = SMPolicy.parse(xcp, id = smPolicyNameToDocId(policyName))
val policy = SMPolicy.parse(xcp, id = smPolicyNameToDocId(policyName), seqNo = seqNo, primaryTerm = primaryTerm)
.copy(jobLastUpdateTime = Instant.now())
log.info("sm dev: policy parsed $policy")

val refreshPolicy = if (request.hasParam(REFRESH)) {
WriteRequest.RefreshPolicy.parse(request.param(REFRESH))
} else {
WriteRequest.RefreshPolicy.IMMEDIATE
}

return RestChannelConsumer {
client.execute(
SMActions.INDEX_SM_ACTION_TYPE,
IndexSMPolicyRequest(policy, create),
RestToXContentListener(it)
IndexSMPolicyRequest(policy, create, refreshPolicy),
object : RestResponseListener<IndexSMPolicyResponse>(it) {
override fun buildResponse(response: IndexSMPolicyResponse): RestResponse {
val restResponse = BytesRestResponse(response.status, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS))
if (response.status == RestStatus.CREATED || response.status == RestStatus.OK) {
val location = "$SM_POLICIES_URI/${response.policy.policyName}"
restResponse.addHeader("Location", location)
}
return restResponse
}
}
)
}
}
Expand All @@ -104,10 +130,16 @@ class RestSMPolicyHandler : BaseRestHandler() {
throw IllegalArgumentException("Missing policy name")
}

val refreshPolicy = if (request.hasParam(REFRESH)) {
WriteRequest.RefreshPolicy.parse(request.param(REFRESH))
} else {
WriteRequest.RefreshPolicy.IMMEDIATE
}

return RestChannelConsumer {
client.execute(
DELETE_SM_ACTION_TYPE,
DeleteSMPolicyRequest(policyName),
DeleteSMPolicyRequest(smPolicyNameToDocId(policyName)).setRefreshPolicy(refreshPolicy),
RestToXContentListener(it)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
import org.opensearch.commons.authuser.User
import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
Expand Down Expand Up @@ -51,13 +52,13 @@ abstract class BaseTransportAction<Request : ActionRequest, Response : ActionRes
client.threadPool().threadContext.stashContext().use { threadContext ->
listener.onResponse(executeRequest(request, user, threadContext))
}
} catch (ex: VersionConflictEngineException) {
listener.onFailure(ex)
} catch (ex: OpenSearchStatusException) {
listener.onFailure(ex)
} catch (ex: Exception) {
log.error("Uncaught exception:", ex)
listener.onFailure(
OpenSearchStatusException(
ex.message, RestStatus.INTERNAL_SERVER_ERROR
)
)
listener.onFailure(OpenSearchStatusException(ex.message, RestStatus.INTERNAL_SERVER_ERROR))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
package org.opensearch.indexmanagement.snapshotmanagement.api.transport

import org.opensearch.action.ActionType
import org.opensearch.action.delete.DeleteResponse
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMPolicyResponse
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.TransportGetSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyResponse
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMPolicyResponse
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMPolicyAction

object SMActions {
/**
Expand All @@ -30,5 +30,5 @@ object SMActions {
* [TransportDeleteSMPolicyAction]
*/
const val DELETE_SM_ACTION_NAME = "cluster:admin/opensearch/snapshot_management/policy/delete"
val DELETE_SM_ACTION_TYPE = ActionType(DELETE_SM_ACTION_NAME, ::DeleteSMPolicyResponse)
val DELETE_SM_ACTION_TYPE = ActionType(DELETE_SM_ACTION_NAME, ::DeleteResponse)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@

package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput

class DeleteSMPolicyRequest(
val policyName: String
) : ActionRequest() {
class DeleteSMPolicyRequest : DeleteRequest {
override fun validate(): ActionRequestValidationException? {
return null
}

constructor(sin: StreamInput) : this(
policyName = sin.readString()
)
constructor(sin: StreamInput) : super(sin)

constructor(id: String) {
super.id(id)
}

override fun writeTo(out: StreamOutput) {
out.writeString(policyName)
super.writeTo(out)
}
}

This file was deleted.

Loading

0 comments on commit 9d2b61a

Please sign in to comment.