Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhances index, get, and delete snapshot management apis #369

Merged
merged 5 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package org.opensearch.indexmanagement

import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.opensearch.OpenSearchStatusException
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.alias.Alias
Expand All @@ -27,6 +29,10 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.indexmanagement.util.OpenForTesting
import org.opensearch.rest.RestStatus
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

@OpenForTesting
class IndexManagementIndices(
Expand Down Expand Up @@ -71,6 +77,23 @@ class IndexManagementIndices(
}
}

suspend fun checkAndUpdateIMConfigIndex(logger: Logger) {
val response: AcknowledgedResponse = suspendCoroutine { cont ->
checkAndUpdateIMConfigIndex(
object : ActionListener<AcknowledgedResponse> {
override fun onResponse(response: AcknowledgedResponse) = cont.resume(response)
override fun onFailure(e: Exception) = cont.resumeWithException(e)
}
)
}
if (response.isAcknowledged) {
logger.info("Successfully created or updated $INDEX_MANAGEMENT_INDEX with newest mappings.")
} else {
logger.error("Unable to create or update $INDEX_MANAGEMENT_INDEX with newest mapping.")
throw OpenSearchStatusException("Unable to create or update $INDEX_MANAGEMENT_INDEX with newest mapping.", RestStatus.INTERNAL_SERVER_ERROR)
}
}

fun indexManagementIndexExists(): Boolean = clusterService.state().routingTable.hasIndex(INDEX_MANAGEMENT_INDEX)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data"

const val SM_BASE_URI = "$PLUGINS_BASE_URI/_sm"
const val SM_POLICIES_URI = "$SM_BASE_URI/policies"

const val OLD_PLUGIN_NAME = "opendistro-im"
const val OPEN_DISTRO_BASE_URI = "/_opendistro"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,80 @@
package org.opensearch.indexmanagement.snapshotmanagement

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.client.Client
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy.Companion.SM_DOC_ID_SUFFIX
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy.Companion.SM_METADATA_ID_SUFFIX
import org.opensearch.rest.RestStatus

private val log = LogManager.getLogger("Snapshot Management Helper")

const val smSuffix = "-sm"
fun smPolicyNameToDocId(policyName: String) = "$policyName$smSuffix"
fun smDocIdToPolicyName(id: String) = id.substringBeforeLast(smSuffix)
fun getSMMetadataDocId(policyName: String) = "$policyName-sm-metadata"
fun smPolicyNameToDocId(policyName: String) = "$policyName$SM_DOC_ID_SUFFIX"
fun smDocIdToPolicyName(id: String) = id.substringBeforeLast(SM_DOC_ID_SUFFIX)
fun getSMMetadataDocId(policyName: String) = "$policyName$SM_METADATA_ID_SUFFIX"

@Suppress("RethrowCaughtException", "ThrowsCount")
suspend fun Client.getSMPolicy(policyID: String): SMPolicy {
try {
val getRequest = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, policyID)
val getResponse: GetResponse = this.suspendUntil { get(getRequest, it) }
if (!getResponse.isExists || getResponse.isSourceEmpty) {
throw OpenSearchStatusException("Snapshot management policy not found", RestStatus.NOT_FOUND)
}
return parseSMPolicy(getResponse)
} catch (e: OpenSearchStatusException) {
throw e
} catch (e: IndexNotFoundException) {
throw OpenSearchStatusException("Snapshot management config index not found", RestStatus.NOT_FOUND)
} catch (e: IllegalArgumentException) {
log.error("Failed to retrieve snapshot management policy [$policyID]", e)
throw OpenSearchStatusException("Snapshot management policy could not be parsed", RestStatus.INTERNAL_SERVER_ERROR)
} catch (e: Exception) {
log.error("Failed to retrieve snapshot management policy [$policyID]", e)
throw OpenSearchStatusException("Failed to retrieve Snapshot management policy.", RestStatus.NOT_FOUND)
}
}

@Suppress("RethrowCaughtException", "ThrowsCount")
suspend fun Client.getSMMetadata(metadataID: String): SMMetadata {
try {
val getRequest = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, metadataID)
val getResponse: GetResponse = this.suspendUntil { get(getRequest, it) }
if (!getResponse.isExists || getResponse.isSourceEmpty) {
throw OpenSearchStatusException("Snapshot management metadata not found", RestStatus.NOT_FOUND)
}
return parseSMMetadata(getResponse)
} catch (e: OpenSearchStatusException) {
throw e
} catch (e: IndexNotFoundException) {
throw OpenSearchStatusException("Snapshot management config index not found", RestStatus.NOT_FOUND)
} catch (e: IllegalArgumentException) {
log.error("Failed to retrieve snapshot management metadata [$metadataID]", e)
throw OpenSearchStatusException("Snapshot management metadata could not be parsed", RestStatus.INTERNAL_SERVER_ERROR)
} catch (e: Exception) {
log.error("Failed to retrieve snapshot management metadata [$metadataID]", e)
throw OpenSearchStatusException("Failed to retrieve Snapshot management metadata.", RestStatus.NOT_FOUND)
}
}

fun parseSMPolicy(response: GetResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): SMPolicy {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsBytesRef, XContentType.JSON)
return xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, SMPolicy.Companion::parse)
}

fun parseSMMetadata(response: GetResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): SMMetadata {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsBytesRef, XContentType.JSON)
return xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, SMMetadata.Companion::parse)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@
package org.opensearch.indexmanagement.snapshotmanagement.api.resthandler

import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.WriteRequest
import org.opensearch.client.node.NodeClient
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.SM_BASE_URI
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.SM_POLICIES_URI
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.DELETE_SM_ACTION_TYPE
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions.GET_SM_ACTION_TYPE
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMPolicyRequest
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMPolicyRequest
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyRequest
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyResponse
import org.opensearch.indexmanagement.snapshotmanagement.smPolicyNameToDocId
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.util.IF_PRIMARY_TERM
import org.opensearch.indexmanagement.util.IF_SEQ_NO
import org.opensearch.indexmanagement.util.REFRESH
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BytesRestResponse
import org.opensearch.rest.RestHandler.Route
Expand All @@ -24,8 +31,11 @@ import org.opensearch.rest.RestRequest.Method.POST
import org.opensearch.rest.RestRequest.Method.PUT
import org.opensearch.rest.RestRequest.Method.GET
import org.opensearch.rest.RestRequest.Method.DELETE
import org.opensearch.rest.RestResponse
import org.opensearch.rest.RestStatus
import org.opensearch.rest.action.RestResponseListener
import org.opensearch.rest.action.RestToXContentListener
import java.time.Instant

class RestSMPolicyHandler : BaseRestHandler() {

Expand All @@ -37,10 +47,10 @@ class RestSMPolicyHandler : BaseRestHandler() {

override fun routes(): List<Route> {
return listOf(
Route(POST, "$SM_BASE_URI/{policyName}"),
Route(PUT, "$SM_BASE_URI/{policyName}"),
Route(GET, "$SM_BASE_URI/{policyName}"),
Route(DELETE, "$SM_BASE_URI/{policyName}"),
Route(POST, "$SM_POLICIES_URI/{policyName}"),
Route(PUT, "$SM_POLICIES_URI/{policyName}"),
Route(GET, "$SM_POLICIES_URI/{policyName}"),
Route(DELETE, "$SM_POLICIES_URI/{policyName}"),
)
}

Expand Down Expand Up @@ -70,7 +80,7 @@ class RestSMPolicyHandler : BaseRestHandler() {
return RestChannelConsumer {
client.execute(
GET_SM_ACTION_TYPE,
GetSMPolicyRequest(policyName),
GetSMPolicyRequest(smPolicyNameToDocId(policyName)),
RestToXContentListener(it)
)
}
Expand All @@ -83,17 +93,33 @@ class RestSMPolicyHandler : BaseRestHandler() {
}
// TODO validate policy name validateGeneratedSnapshotName

log.info("sm dev: receive request ${request.requiredContent().utf8ToString()}")

val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO)
val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
val xcp = request.contentParser()
val policy = SMPolicy.parse(xcp, id = smPolicyNameToDocId(policyName))
val policy = SMPolicy.parse(xcp, id = smPolicyNameToDocId(policyName), seqNo = seqNo, primaryTerm = primaryTerm)
.copy(jobLastUpdateTime = Instant.now())
log.info("sm dev: policy parsed $policy")

val refreshPolicy = if (request.hasParam(REFRESH)) {
WriteRequest.RefreshPolicy.parse(request.param(REFRESH))
} else {
WriteRequest.RefreshPolicy.IMMEDIATE
}

return RestChannelConsumer {
client.execute(
SMActions.INDEX_SM_ACTION_TYPE,
IndexSMPolicyRequest(policy, create),
RestToXContentListener(it)
IndexSMPolicyRequest(policy, create, refreshPolicy),
object : RestResponseListener<IndexSMPolicyResponse>(it) {
override fun buildResponse(response: IndexSMPolicyResponse): RestResponse {
val restResponse = BytesRestResponse(response.status, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS))
if (response.status == RestStatus.CREATED || response.status == RestStatus.OK) {
val location = "$SM_POLICIES_URI/${response.policy.policyName}"
restResponse.addHeader("Location", location)
}
return restResponse
}
}
)
}
}
Expand All @@ -104,10 +130,16 @@ class RestSMPolicyHandler : BaseRestHandler() {
throw IllegalArgumentException("Missing policy name")
}

val refreshPolicy = if (request.hasParam(REFRESH)) {
WriteRequest.RefreshPolicy.parse(request.param(REFRESH))
} else {
WriteRequest.RefreshPolicy.IMMEDIATE
}

return RestChannelConsumer {
client.execute(
DELETE_SM_ACTION_TYPE,
DeleteSMPolicyRequest(policyName),
DeleteSMPolicyRequest(smPolicyNameToDocId(policyName)).setRefreshPolicy(refreshPolicy),
RestToXContentListener(it)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
import org.opensearch.commons.authuser.User
import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
Expand Down Expand Up @@ -51,13 +52,13 @@ abstract class BaseTransportAction<Request : ActionRequest, Response : ActionRes
client.threadPool().threadContext.stashContext().use { threadContext ->
listener.onResponse(executeRequest(request, user, threadContext))
}
} catch (ex: VersionConflictEngineException) {
listener.onFailure(ex)
} catch (ex: OpenSearchStatusException) {
listener.onFailure(ex)
} catch (ex: Exception) {
log.error("Uncaught exception:", ex)
listener.onFailure(
OpenSearchStatusException(
ex.message, RestStatus.INTERNAL_SERVER_ERROR
)
)
listener.onFailure(OpenSearchStatusException(ex.message, RestStatus.INTERNAL_SERVER_ERROR))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
package org.opensearch.indexmanagement.snapshotmanagement.api.transport

import org.opensearch.action.ActionType
import org.opensearch.action.delete.DeleteResponse
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.GetSMPolicyResponse
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.TransportGetSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.IndexSMPolicyResponse
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.DeleteSMPolicyResponse
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMPolicyAction

object SMActions {
/**
Expand All @@ -30,5 +30,5 @@ object SMActions {
* [TransportDeleteSMPolicyAction]
*/
const val DELETE_SM_ACTION_NAME = "cluster:admin/opensearch/snapshot_management/policy/delete"
val DELETE_SM_ACTION_TYPE = ActionType(DELETE_SM_ACTION_NAME, ::DeleteSMPolicyResponse)
val DELETE_SM_ACTION_TYPE = ActionType(DELETE_SM_ACTION_NAME, ::DeleteResponse)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@

package org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput

class DeleteSMPolicyRequest(
val policyName: String
) : ActionRequest() {
class DeleteSMPolicyRequest : DeleteRequest {
override fun validate(): ActionRequestValidationException? {
return null
}

constructor(sin: StreamInput) : this(
policyName = sin.readString()
)
constructor(sin: StreamInput) : super(sin)

constructor(id: String) {
super.id(id)
}

override fun writeTo(out: StreamOutput) {
out.writeString(policyName)
super.writeTo(out)
}
}

This file was deleted.

Loading