Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SM dev - SM datamodel and CRUD skeleton #355

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/ISSUE_TEMPLATE/documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
**Is your feature request related to a problem?**
A new feature has been added.

**What solution would you like?**
Document the usage of the new feature.

**What alternatives have you considered?**
N/A

**Do you have any additional context?**
_Add any other context or screenshots about the feature request here._
41 changes: 41 additions & 0 deletions .github/workflows/create-documentation-issue.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Create Documentation Issue
on:
pull_request:
types:
- labeled
env:
PR_NUMBER: ${{ github.event.number }}

jobs:
create-issue:
if: ${{ github.event.label.name == 'needs-documentation' }}
runs-on: ubuntu-latest
name: Create Documentation Issue
steps:
- name: GitHub App token
id: github_app_token
uses: tibdex/[email protected]
with:
app_id: ${{ secrets.APP_ID }}
private_key: ${{ secrets.APP_PRIVATE_KEY }}
installation_id: 22958780

- name: Checkout code
uses: actions/checkout@v2

- name: Edit the issue template
run: |
echo "https://github.com/opensearch-project/index-management/pull/${{ env.PR_NUMBER }}." >> ./.github/ISSUE_TEMPLATE/documentation.md

- name: Create Issue From File
id: create-issue
uses: peter-evans/create-issue-from-file@v4
with:
title: Add documentation related to new feature
content-filepath: ./.github/ISSUE_TEMPLATE/documentation.md
labels: documentation
repository: opensearch-project/documentation-website
token: ${{ steps.github_app_token.outputs.token }}

- name: Print Issue
run: echo Created related documentation issue ${{ steps.create-issue.outputs.issue-number }}
2 changes: 1 addition & 1 deletion .idea/copyright/OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions .idea/copyright/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import java.util.function.Predicate
buildscript {
ext {
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
opensearch_version = System.getProperty("opensearch.version", "2.0.0-rc1-SNAPSHOT")
buildVersionQualifier = System.getProperty("build.version_qualifier", "rc1")
// 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT
opensearch_version = System.getProperty("opensearch.version", "2.0.0-SNAPSHOT")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
// 2.0.0-SNAPSHOT -> 2.0.0.0-SNAPSHOT
version_tokens = opensearch_version.tokenize('-')
opensearch_build = version_tokens[0] + '.0'
job_scheduler_no_snapshot = opensearch_build
Expand Down Expand Up @@ -99,6 +99,10 @@ configurations.all {
}
}

idea.module {
excludeDirs -= file("$buildDir")
}

def usingRemoteCluster = System.properties.containsKey('tests.rest.cluster') || System.properties.containsKey('tests.cluster')
def usingMultiNode = project.properties.containsKey('numNodes')
// Only apply jacoco test coverage if we are running a local single node cluster
Expand Down
2 changes: 1 addition & 1 deletion detekt.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# TODO: Remove this before initial release, only for developmental purposes
build:
maxIssues: 20
maxIssues: 100
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved

exceptions:
TooGenericExceptionCaught:
Expand Down
10 changes: 10 additions & 0 deletions spi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ apply plugin: 'opensearch.java-rest-test'
apply plugin: 'kotlin'
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'
apply plugin: 'idea'

ext {
projectSubstitutions = [:]
Expand Down Expand Up @@ -64,6 +65,15 @@ dependencies {
testImplementation "org.apache.logging.log4j:log4j-core:${versions.log4j}"
}

idea.module {
excludeDirs -= file("$buildDir")
}

task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}

test {
doFirst {
test.classpath -= project.files(project.tasks.named('shadowJar'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestSMPolicyHandler
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.SMActions
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.delete.TransportDeleteSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.TransportGetSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMPolicyAction
import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata
import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy
import org.opensearch.indexmanagement.spi.IndexManagementExtension
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker
Expand Down Expand Up @@ -164,6 +171,8 @@ import java.util.function.Supplier
@Suppress("TooManyFunctions")
class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, Plugin() {

private val log = LogManager.getLogger(javaClass)

private val logger = LogManager.getLogger(javaClass)
lateinit var indexManagementIndices: IndexManagementIndices
lateinit var clusterService: ClusterService
Expand All @@ -187,6 +196,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management"
const val INDEX_STATE_MANAGEMENT_HISTORY_TYPE = "managed_index_meta_data"

const val SM_BASE_URI = "$PLUGINS_BASE_URI/_sm"

const val OLD_PLUGIN_NAME = "opendistro-im"
const val OPEN_DISTRO_BASE_URI = "/_opendistro"
const val LEGACY_ISM_BASE_URI = "$OPEN_DISTRO_BASE_URI/_ism"
Expand All @@ -208,6 +219,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
@Suppress("ComplexMethod")
override fun getJobParser(): ScheduledJobParser {
return ScheduledJobParser { xcp, id, jobDocVersion ->
log.info("sm dev: plugin job parser")
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
Expand Down Expand Up @@ -235,6 +247,12 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE -> {
return@ScheduledJobParser null
}
SMPolicy.SM_TYPE -> {
return@ScheduledJobParser SMPolicy.parse(xcp, id, jobDocVersion.seqNo, jobDocVersion.primaryTerm)
}
SMMetadata.SM_METADATA_TYPE -> {
return@ScheduledJobParser null
}
else -> {
logger.warn("Unsupported document was indexed in $INDEX_MANAGEMENT_INDEX with type: $fieldName")
xcp.skipChildren()
Expand Down Expand Up @@ -300,7 +318,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RestDeleteTransformAction(),
RestExplainTransformAction(),
RestStartTransformAction(),
RestStopTransformAction()
RestStopTransformAction(),
RestSMPolicyHandler(),
)
}

Expand Down Expand Up @@ -502,7 +521,10 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ActionPlugin.ActionHandler(ExplainTransformAction.INSTANCE, TransportExplainTransformAction::class.java),
ActionPlugin.ActionHandler(StartTransformAction.INSTANCE, TransportStartTransformAction::class.java),
ActionPlugin.ActionHandler(StopTransformAction.INSTANCE, TransportStopTransformAction::class.java),
ActionPlugin.ActionHandler(ManagedIndexAction.INSTANCE, TransportManagedIndexAction::class.java)
ActionPlugin.ActionHandler(ManagedIndexAction.INSTANCE, TransportManagedIndexAction::class.java),
ActionPlugin.ActionHandler(SMActions.INDEX_SM_ACTION_TYPE, TransportIndexSMPolicyAction::class.java),
ActionPlugin.ActionHandler(SMActions.GET_SM_ACTION_TYPE, TransportGetSMPolicyAction::class.java),
ActionPlugin.ActionHandler(SMActions.DELETE_SM_ACTION_TYPE, TransportDeleteSMPolicyAction::class.java),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,28 +633,49 @@ class ManagedIndexCoordinator(
suspend fun sweepManagedIndexJobs(client: Client): List<String> {
val managedIndexUuids = mutableListOf<String>()

val managedIndexSearchRequest = getSweptManagedIndexSearchRequest()
var response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) }
var uuids = response.hits.map { it.id }
val scrollIDsToClear = mutableSetOf<String>()

while (uuids.isNotEmpty()) {
managedIndexUuids.addAll(uuids)
val scrollID = response.scrollId
scrollIDsToClear.add(scrollID)
val scrollRequest = SearchScrollRequest().scrollId(scrollID).scroll(TimeValue.timeValueMinutes(1))
response = client.suspendUntil { searchScroll(scrollRequest, it) }
uuids = response.hits.map { it.id }
}
// if # of documents below 10k, don't use scroll search
val countReq = getSweptManagedIndexSearchRequest(size = 0)
val countRes: SearchResponse = client.suspendUntil { search(countReq, it) }
val totalHits = countRes.hits.totalHits ?: return managedIndexUuids

if (scrollIDsToClear.isNotEmpty()) {
val clearScrollRequest = ClearScrollRequest()
clearScrollRequest.scrollIds(scrollIDsToClear.toList())
val clearScrollResponse: ClearScrollResponse =
client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) }
if (totalHits.value >= MAX_HITS) {
val scrollIDsToClear = mutableSetOf<String>()
try {
val managedIndexSearchRequest = getSweptManagedIndexSearchRequest(scroll = true)
var response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) }
var uuids = transformManagedIndexSearchRes(response)

while (uuids.isNotEmpty()) {
managedIndexUuids.addAll(uuids)
val scrollID = response.scrollId
scrollIDsToClear.add(scrollID)
val scrollRequest = SearchScrollRequest().scrollId(scrollID).scroll(TimeValue.timeValueMinutes(1))
response = client.suspendUntil { searchScroll(scrollRequest, it) }
uuids = transformManagedIndexSearchRes(response)
}
} finally {
if (scrollIDsToClear.isNotEmpty()) {
val clearScrollRequest = ClearScrollRequest()
clearScrollRequest.scrollIds(scrollIDsToClear.toList())
val clearScrollResponse: ClearScrollResponse =
client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) }
}
}
return managedIndexUuids
}

return managedIndexUuids
val response: SearchResponse = client.suspendUntil { search(getSweptManagedIndexSearchRequest(), it) }
return transformManagedIndexSearchRes(response)
}

fun transformManagedIndexSearchRes(response: SearchResponse): List<String> {
if (response.isTimedOut || response.failedShards > 0 || response.skippedShards > 0) {
val errorMsg = "Sweep managed indices failed. Timed out: ${response.isTimedOut} | " +
"Failed shards: ${response.failedShards} | Skipped shards: ${response.skippedShards}."
logger.error(errorMsg)
throw ISMCoordinatorSearchException(message = errorMsg)
}
return response.hits.map { it.id }
}

/**
Expand Down Expand Up @@ -736,3 +757,5 @@ class ManagedIndexCoordinator(
const val BUFFER = 20L
}
}

class ISMCoordinatorSearchException(message: String, cause: Throwable? = null) : Exception(message, cause)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.opensearchapi.instant
import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.schedule.Schedule
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser
Expand Down Expand Up @@ -86,7 +87,6 @@ data class ManagedIndexConfig(

companion object {
const val MANAGED_INDEX_TYPE = "managed_index"
const val NO_ID = ""
const val NAME_FIELD = "name"
const val ENABLED_FIELD = "enabled"
const val SCHEDULE_FIELD = "schedule"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.indexmanagement.opensearchapi.optionalTimeField
import org.opensearch.indexmanagement.opensearchapi.optionalUserField
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.indexmanagement.util.NO_ID
import java.io.IOException
import java.time.Instant

Expand Down Expand Up @@ -147,7 +148,6 @@ data class Policy(
const val POLICY_TYPE = "policy"
const val POLICY_ID_FIELD = "policy_id"
const val DESCRIPTION_FIELD = "description"
const val NO_ID = ""
const val LAST_UPDATED_TIME_FIELD = "last_updated_time"
const val SCHEMA_VERSION_FIELD = "schema_version"
const val ERROR_NOTIFICATION_FIELD = "error_notification"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.transport.action.inde
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.indexmanagement.util.IF_PRIMARY_TERM
import org.opensearch.indexmanagement.util.IF_SEQ_NO
import org.opensearch.indexmanagement.util.NO_ID
import org.opensearch.indexmanagement.util.REFRESH
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
Expand Down Expand Up @@ -69,8 +70,8 @@ class RestIndexPolicyAction(

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val id = request.param("policyID", Policy.NO_ID)
if (Policy.NO_ID == id) {
val id = request.param("policyID", NO_ID)
if (NO_ID == id) {
throw IllegalArgumentException("Missing policy ID")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.action.support.WriteRequest
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.util.NO_ID
import java.io.IOException

class IndexPolicyRequest : ActionRequest {
Expand Down Expand Up @@ -47,7 +48,7 @@ class IndexPolicyRequest : ActionRequest {

override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (Policy.NO_ID == policyID) {
if (NO_ID == policyID) {
validationException = ValidateActions.addValidationError("Missing policyID", validationException)
}
return validationException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,19 @@ fun getManagedIndicesToDelete(
}
}

fun getSweptManagedIndexSearchRequest(): SearchRequest {
fun getSweptManagedIndexSearchRequest(scroll: Boolean = false, size: Int = ManagedIndexCoordinator.MAX_HITS): SearchRequest {
val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.existsQuery(ManagedIndexConfig.MANAGED_INDEX_TYPE))
return SearchRequest()
.indices(INDEX_MANAGEMENT_INDEX)
.scroll(TimeValue.timeValueMinutes(1))
val req = SearchRequest().indices(INDEX_MANAGEMENT_INDEX)
.allowPartialSearchResults(false)
.source(
SearchSourceBuilder.searchSource()
.size(ManagedIndexCoordinator.MAX_HITS)
.size(size)
.seqNoAndPrimaryTerm(true)
.fetchSource(emptyArray(), emptyArray())
.query(boolQueryBuilder)
)
if (scroll) req.scroll(TimeValue.timeValueMinutes(1))
return req
}

@Suppress("ReturnCount", "ComplexCondition")
Expand Down
Loading