Skip to content

Commit

Permalink
Move metadata (opensearch-project#280)
Browse files Browse the repository at this point in the history
* Migrates metadata from cluster state to config index
  • Loading branch information
bowenlan-amzn authored Mar 8, 2021
1 parent 3d60704 commit 23a6945
Show file tree
Hide file tree
Showing 42 changed files with 2,374 additions and 347 deletions.
45 changes: 44 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ ext {
}

group = "com.amazon.opendistroforelasticsearch"
version = "${opendistroVersion}.0"
version = "${opendistroVersion}.1"

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${es_version}"
Expand Down Expand Up @@ -315,3 +315,46 @@ task ktlintFormat(type: JavaExec, group: "formatting") {
compileKotlin { kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict'] }

apply from: 'build-tools/pkgbuild.gradle'

// This IT is to simulate the situation
// when there are old version (without metadata change)
// and new version mixed in one cluster
import org.elasticsearch.gradle.test.RestIntegTestTask

def mixedClusterTest = project.tasks.create('mixedCluster', RestIntegTestTask.class)
def mixedClusterFlag = findProperty('mixed') as Boolean ?: false
println("mixed cluster flag: $mixedClusterFlag")
mixedClusterTest.dependsOn(bundlePlugin)

testClusters.mixedCluster {
testDistribution = "OSS"
if (_numNodes > 1) numberOfNodes = _numNodes
getNodes().each { node ->
node.plugin(provider({
new RegularFile() {
@Override
File getAsFile() { fileTree("src/test/resources/job-scheduler").getSingleFile() }
}
}))

if (mixedClusterFlag && node.name == "mixedCluster-1") {
node.plugin(provider({
new RegularFile() {
@Override
File getAsFile() { fileTree("src/test/resources/index-management").getSingleFile() }
}
}))
} else {
node.plugin(project.tasks.bundlePlugin.archiveFile)
}
node.plugins.each { println("plugin in the node: ${it.get()}") }
}
setting 'path.repo', repo.absolutePath
}

mixedCluster {
systemProperty 'tests.security.manager', 'false'
systemProperty 'tests.path.repo', repo.absolutePath
systemProperty 'cluster.mixed', "$mixedClusterFlag"
systemProperty 'cluster.number_of_nodes', "${_numNodes}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* permissions and limitations under the License.
*/

@file:Suppress("ReturnCount")
package com.amazon.opendistroforelasticsearch.indexmanagement

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
Expand Down Expand Up @@ -84,7 +85,6 @@ class IndexManagementIndices(
/**
* Attempt to create [INDEX_MANAGEMENT_INDEX] and return whether it exists
*/
@Suppress("ReturnCount")
suspend fun attemptInitStateManagementIndex(client: Client): Boolean {
if (indexManagementIndexExists()) return true

Expand All @@ -103,6 +103,19 @@ class IndexManagementIndices(
}
}

suspend fun attemptUpdateConfigIndexMapping(): Boolean {
return try {
val response: AcknowledgedResponse = client.suspendUntil {
IndexUtils.checkAndUpdateConfigIndexMapping(clusterService.state(), client, it) }
if (response.isAcknowledged) return true
logger.error("Trying to update config index mapping not acknowledged.")
return false
} catch (e: Exception) {
logger.error("Failed when trying to update config index mapping.", e)
false
}
}

/**
* ============== History =============
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package com.amazon.opendistroforelasticsearch.indexmanagement
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.IndexStateManagementHistory
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ManagedIndexRunner
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.MetadataService
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.SkipExecution
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestAddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction
Expand Down Expand Up @@ -165,6 +168,9 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
RollupMetadata.ROLLUP_METADATA_TYPE -> {
return@ScheduledJobParser null
}
ManagedIndexMetaData.MANAGED_INDEX_METADATA_TYPE -> {
return@ScheduledJobParser null
}
else -> {
logger.warn("Unsupported document was indexed in $INDEX_MANAGEMENT_INDEX with type: $fieldName")
xcp.skipChildren()
Expand Down Expand Up @@ -218,13 +224,6 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
): Collection<Any> {
val settings = environment.settings()
this.clusterService = clusterService
val managedIndexRunner = ManagedIndexRunner
.registerClient(client)
.registerClusterService(clusterService)
.registerNamedXContentRegistry(xContentRegistry)
.registerScriptService(scriptService)
.registerSettings(settings)
.registerConsumers() // registerConsumers must happen after registerSettings/clusterService
val rollupRunner = RollupRunner
.registerClient(client)
.registerClusterService(clusterService)
Expand All @@ -239,6 +238,8 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
.registerConsumers()
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client, clusterService)
indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService)
val indexStateManagementHistory =
IndexStateManagementHistory(
Expand All @@ -249,8 +250,20 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
indexManagementIndices
)

val managedIndexRunner = ManagedIndexRunner
.registerClient(client)
.registerClusterService(clusterService)
.registerNamedXContentRegistry(xContentRegistry)
.registerScriptService(scriptService)
.registerSettings(settings)
.registerConsumers() // registerConsumers must happen after registerSettings/clusterService
.registerHistoryIndex(indexStateManagementHistory)
.registerSkipFlag(skipFlag)

val metadataService = MetadataService(client, clusterService, skipFlag, indexManagementIndices)

val managedIndexCoordinator = ManagedIndexCoordinator(environment.settings(),
client, clusterService, threadPool, indexManagementIndices)
client, clusterService, threadPool, indexManagementIndices, metadataService)

return listOf(managedIndexRunner, rollupRunner, indexManagementIndices, managedIndexCoordinator, indexStateManagementHistory)
}
Expand All @@ -267,6 +280,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
ManagedIndexSettings.POLICY_ID,
ManagedIndexSettings.ROLLOVER_ALIAS,
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_REPLICAS
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS
import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.indexmanagement.util._DOC
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.DocWriteRequest
Expand All @@ -46,6 +47,7 @@ import org.elasticsearch.threadpool.Scheduler
import org.elasticsearch.threadpool.ThreadPool
import java.time.Instant

@OpenForTesting
class IndexStateManagementHistory(
settings: Settings,
private val client: Client,
Expand Down
Loading

0 comments on commit 23a6945

Please sign in to comment.