Skip to content

Commit

Permalink
SM dev - SM datamodel and CRUD skeleton (#355)
Browse files Browse the repository at this point in the history
* Strengthen scroll search in Coordinator (#356)

* Removes rc1 qualifier (#353)

* Allows error in shrink test

* Adds workflow to create documentation issues (#358)

* Adds documentation issue workflow

* Makes the issue template more relevant

* Data model and Basic CRUD APIs

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn authored May 17, 2022
1 parent 534bb10 commit 7d6fd16
Show file tree
Hide file tree
Showing 54 changed files with 2,725 additions and 69 deletions.
11 changes: 11 additions & 0 deletions .github/ISSUE_TEMPLATE/documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
**Is your feature request related to a problem?**
A new feature has been added.

**What solution would you like?**
Document the usage of the new feature.

**What alternatives have you considered?**
N/A

**Do you have any additional context?**
_Add any other context or screenshots about the feature request here._
41 changes: 41 additions & 0 deletions .github/workflows/create-documentation-issue.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Create Documentation Issue
on:
pull_request:
types:
- labeled
env:
PR_NUMBER: ${{ github.event.number }}

jobs:
create-issue:
if: ${{ github.event.label.name == 'needs-documentation' }}
runs-on: ubuntu-latest
name: Create Documentation Issue
steps:
- name: GitHub App token
id: github_app_token
uses: tibdex/[email protected]
with:
app_id: ${{ secrets.APP_ID }}
private_key: ${{ secrets.APP_PRIVATE_KEY }}
installation_id: 22958780

- name: Checkout code
uses: actions/checkout@v2

- name: Edit the issue template
run: |
echo "https://github.com/opensearch-project/index-management/pull/${{ env.PR_NUMBER }}." >> ./.github/ISSUE_TEMPLATE/documentation.md
- name: Create Issue From File
id: create-issue
uses: peter-evans/create-issue-from-file@v4
with:
title: Add documentation related to new feature
content-filepath: ./.github/ISSUE_TEMPLATE/documentation.md
labels: documentation
repository: opensearch-project/documentation-website
token: ${{ steps.github_app_token.outputs.token }}

- name: Print Issue
run: echo Created related documentation issue ${{ steps.create-issue.outputs.issue-number }}
2 changes: 1 addition & 1 deletion .idea/copyright/OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions .idea/copyright/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import java.util.function.Predicate
buildscript {
ext {
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
opensearch_version = System.getProperty("opensearch.version", "2.0.0-rc1-SNAPSHOT")
buildVersionQualifier = System.getProperty("build.version_qualifier", "rc1")
// 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT
opensearch_version = System.getProperty("opensearch.version", "2.0.0-SNAPSHOT")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
// 2.0.0-SNAPSHOT -> 2.0.0.0-SNAPSHOT
version_tokens = opensearch_version.tokenize('-')
opensearch_build = version_tokens[0] + '.0'
job_scheduler_no_snapshot = opensearch_build
Expand Down Expand Up @@ -99,6 +99,10 @@ configurations.all {
}
}

idea.module {
excludeDirs -= file("$buildDir")
}

def usingRemoteCluster = System.properties.containsKey('tests.rest.cluster') || System.properties.containsKey('tests.cluster')
def usingMultiNode = project.properties.containsKey('numNodes')
// Only apply jacoco test coverage if we are running a local single node cluster
Expand Down
2 changes: 1 addition & 1 deletion detekt.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# TODO: Remove this before initial release, only for developmental purposes
build:
maxIssues: 20
maxIssues: 100

exceptions:
TooGenericExceptionCaught:
Expand Down
10 changes: 10 additions & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ apply plugin: 'opensearch.java-rest-test'
apply plugin: 'kotlin'
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'
apply plugin: 'idea'

ext {
projectSubstitutions = [:]
Expand Down Expand Up @@ -64,6 +65,15 @@ dependencies {
testImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
}

idea.module {
excludeDirs -= file("$buildDir")
}

task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}

test {
doFirst {
test.classpath -= project.files(project.tasks.named('shadowJar'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestSMPolicyHandler
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.TransportGetSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.spi.IndexManagementExtension
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker
Expand Down Expand Up @@ -164,6 +171,8 @@ import java.util.function.Supplier
@Suppress("TooManyFunctions")
class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, Plugin() {

private val log = LogManager.getLogger(javaClass)

private val logger = LogManager.getLogger(javaClass)
lateinit var indexManagementIndices: IndexManagementIndices
lateinit var clusterService: ClusterService
Expand All @@ -187,6 +196,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management"
const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data"

const val SM_BASE_URI = "$PLUGINS_BASE_URI/_sm"

const val OLD_PLUGIN_NAME = "opendistro-im"
const val OPEN_DISTRO_BASE_URI = "/_opendistro"
const val LEGACY_ISM_BASE_URI = "$OPEN_DISTRO_BASE_URI/_ism"
Expand All @@ -208,6 +219,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
@Suppress("ComplexMethod")
override fun getJobParser(): ScheduledJobParser {
return ScheduledJobParser { xcp, id, jobDocVersion ->
log.info("sm dev: plugin job parser")
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
Expand Down Expand Up @@ -235,6 +247,12 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE -> {
return@ScheduledJobParser null
}
SMPolicy.SM_TYPE -> {
return@ScheduledJobParser SMPolicy.parse(xcp, id, jobDocVersion.seqNo, jobDocVersion.primaryTerm)
}
SMMetadata.SM_METADATA_TYPE -> {
return@ScheduledJobParser null
}
else -> {
logger.warn("Unsupported document was indexed in $INDEX_MANAGEMENT_INDEX with type: $fieldName")
xcp.skipChildren()
Expand Down Expand Up @@ -300,7 +318,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RestDeleteTransformAction(),
RestExplainTransformAction(),
RestStartTransformAction(),
RestStopTransformAction()
RestStopTransformAction(),
RestSMPolicyHandler(),
)
}

Expand Down Expand Up @@ -502,7 +521,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ActionPlugin.ActionHandler(ExplainTransformAction.INSTANCE, TransportExplainTransformAction::class.java),
ActionPlugin.ActionHandler(StartTransformAction.INSTANCE, TransportStartTransformAction::class.java),
ActionPlugin.ActionHandler(StopTransformAction.INSTANCE, TransportStopTransformAction::class.java),
ActionPlugin.ActionHandler(ManagedIndexAction.INSTANCE, TransportManagedIndexAction::class.java)
ActionPlugin.ActionHandler(ManagedIndexAction.INSTANCE, TransportManagedIndexAction::class.java),
ActionPlugin.ActionHandler(SMActions.INDEX_SM_ACTION_TYPE, TransportIndexSMPolicyAction::class.java),
ActionPlugin.ActionHandler(SMActions.GET_SM_ACTION_TYPE, TransportGetSMPolicyAction::class.java),
ActionPlugin.ActionHandler(SMActions.DELETE_SM_ACTION_TYPE, TransportDeleteSMPolicyAction::class.java),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,28 +633,49 @@ class ManagedIndexCoordinator(
suspend fun sweepManagedIndexJobs(client: Client): List<String> {
val managedIndexUuids = mutableListOf<String>()

val managedIndexSearchRequest = getSweptManagedIndexSearchRequest()
var response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) }
var uuids = response.hits.map { it.id }
val scrollIDsToClear = mutableSetOf<String>()

while (uuids.isNotEmpty()) {
managedIndexUuids.addAll(uuids)
val scrollID = response.scrollId
scrollIDsToClear.add(scrollID)
val scrollRequest = SearchScrollRequest().scrollId(scrollID).scroll(TimeValue.timeValueMinutes(1))
response = client.suspendUntil { searchScroll(scrollRequest, it) }
uuids = response.hits.map { it.id }
}
// if # of documents below 10k, don't use scroll search
val countReq = getSweptManagedIndexSearchRequest(size = 0)
val countRes: SearchResponse = client.suspendUntil { search(countReq, it) }
val totalHits = countRes.hits.totalHits ?: return managedIndexUuids

if (scrollIDsToClear.isNotEmpty()) {
val clearScrollRequest = ClearScrollRequest()
clearScrollRequest.scrollIds(scrollIDsToClear.toList())
val clearScrollResponse: ClearScrollResponse =
client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) }
if (totalHits.value >= MAX_HITS) {
val scrollIDsToClear = mutableSetOf<String>()
try {
val managedIndexSearchRequest = getSweptManagedIndexSearchRequest(scroll = true)
var response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) }
var uuids = transformManagedIndexSearchRes(response)

while (uuids.isNotEmpty()) {
managedIndexUuids.addAll(uuids)
val scrollID = response.scrollId
scrollIDsToClear.add(scrollID)
val scrollRequest = SearchScrollRequest().scrollId(scrollID).scroll(TimeValue.timeValueMinutes(1))
response = client.suspendUntil { searchScroll(scrollRequest, it) }
uuids = transformManagedIndexSearchRes(response)
}
} finally {
if (scrollIDsToClear.isNotEmpty()) {
val clearScrollRequest = ClearScrollRequest()
clearScrollRequest.scrollIds(scrollIDsToClear.toList())
val clearScrollResponse: ClearScrollResponse =
client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) }
}
}
return managedIndexUuids
}

return managedIndexUuids
val response: SearchResponse = client.suspendUntil { search(getSweptManagedIndexSearchRequest(), it) }
return transformManagedIndexSearchRes(response)
}

fun transformManagedIndexSearchRes(response: SearchResponse): List<String> {
if (response.isTimedOut || response.failedShards > 0 || response.skippedShards > 0) {
val errorMsg = "Sweep managed indices failed. Timed out: ${response.isTimedOut} | " +
"Failed shards: ${response.failedShards} | Skipped shards: ${response.skippedShards}."
logger.error(errorMsg)
throw ISMCoordinatorSearchException(message = errorMsg)
}
return response.hits.map { it.id }
}

/**
Expand Down Expand Up @@ -736,3 +757,5 @@ class ManagedIndexCoordinator(
const val BUFFER = 20L
}
}

class ISMCoordinatorSearchException(message: String, cause: Throwable? = null) : Exception(message, cause)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.opensearchapi.instant
import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.schedule.Schedule
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser
Expand Down Expand Up @@ -86,7 +87,6 @@ data class ManagedIndexConfig(

companion object {
const val MANAGED_INDEX_TYPE = "managed_index"
const val NO_ID = ""
const val NAME_FIELD = "name"
const val ENABLED_FIELD = "enabled"
const val SCHEDULE_FIELD = "schedule"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
import org.opensearch.indexmanagement.opensearchapi.optionalUserField
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.indexmanagement.util.NO_ID
import java.io.IOException
import java.time.Instant

Expand Down Expand Up @@ -147,7 +148,6 @@ data class Policy(
const val POLICY_TYPE = "policy"
const val POLICY_ID_FIELD = "policy_id"
const val DESCRIPTION_FIELD = "description"
const val NO_ID = ""
const val LAST_UPDATED_TIME_FIELD = "last_updated_time"
const val SCHEMA_VERSION_FIELD = "schema_version"
const val ERROR_NOTIFICATION_FIELD = "error_notification"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.inde
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.indexmanagement.util.IF_PRIMARY_TERM
import org.opensearch.indexmanagement.util.IF_SEQ_NO
import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.indexmanagement.util.REFRESH
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
Expand Down Expand Up @@ -69,8 +70,8 @@ class RestIndexPolicyAction(

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val id = request.param("policyID", Policy.NO_ID)
if (Policy.NO_ID == id) {
val id = request.param("policyID", NO_ID)
if (NO_ID == id) {
throw IllegalArgumentException("Missing policy ID")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.action.support.WriteRequest
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.util.NO_ID
import java.io.IOException

class IndexPolicyRequest : ActionRequest {
Expand Down Expand Up @@ -47,7 +48,7 @@ class IndexPolicyRequest : ActionRequest {

override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (Policy.NO_ID == policyID) {
if (NO_ID == policyID) {
validationException = ValidateActions.addValidationError("Missing policyID", validationException)
}
return validationException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,19 @@ fun getManagedIndicesToDelete(
}
}

fun getSweptManagedIndexSearchRequest(): SearchRequest {
fun getSweptManagedIndexSearchRequest(scroll: Boolean = false, size: Int = ManagedIndexCoordinator.MAX_HITS): SearchRequest {
val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.existsQuery(ManagedIndexConfig.MANAGED_INDEX_TYPE))
return SearchRequest()
.indices(INDEX_MANAGEMENT_INDEX)
.scroll(TimeValue.timeValueMinutes(1))
val req = SearchRequest().indices(INDEX_MANAGEMENT_INDEX)
.allowPartialSearchResults(false)
.source(
SearchSourceBuilder.searchSource()
.size(ManagedIndexCoordinator.MAX_HITS)
.size(size)
.seqNoAndPrimaryTerm(true)
.fetchSource(emptyArray(), emptyArray())
.query(boolQueryBuilder)
)
if (scroll) req.scroll(TimeValue.timeValueMinutes(1))
return req
}

@Suppress("ReturnCount", "ComplexCondition")
Expand Down
Loading

0 comments on commit 7d6fd16

Please sign in to comment.