Skip to content

Commit

Permalink
Refactors files/classes to prepare for multiple features under the In…
Browse files Browse the repository at this point in the history
…dex Management suite
  • Loading branch information
dbbaughe committed Aug 13, 2020
1 parent a8ac5a3 commit 4c49870
Show file tree
Hide file tree
Showing 144 changed files with 1,699 additions and 1,413 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ check.dependsOn jacocoTestReport

esplugin {
name 'opendistro_index_management'
description 'Open Distro Index State Management Plugin'
classname 'com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin'
description 'Open Distro Index Management Plugin'
classname 'com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin'
extendedPlugins = ['opendistro-job-scheduler']
}

allOpen {
annotation("com.amazon.opendistroforelasticsearch.indexstatemanagement.util.OpenForTesting")
annotation("com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting")
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement

import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util._DOC
package com.amazon.opendistroforelasticsearch.indexmanagement

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step
import com.amazon.opendistroforelasticsearch.indexmanagement.util._DOC
import com.amazon.opendistroforelasticsearch.indexmanagement.settings.IndexManagementSettings
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest
Expand All @@ -41,39 +42,49 @@ import org.elasticsearch.threadpool.Scheduler
import org.elasticsearch.threadpool.ThreadPool
import java.time.Instant

class IndexStateManagementHistory(
class IndexManagementHistory(
settings: Settings,
private val client: Client,
private val threadPool: ThreadPool,
private val clusterService: ClusterService,
private val indexStateManagementIndices: IndexStateManagementIndices
private val indexManagementIndices: IndexManagementIndices
) : LocalNodeMasterListener {

private val logger = LogManager.getLogger(javaClass)
private var scheduledRollover: Scheduler.Cancellable? = null

@Volatile private var historyEnabled = ManagedIndexSettings.HISTORY_ENABLED.get(settings)
@Volatile private var historyEnabled = IndexManagementSettings.HISTORY_ENABLED.get(settings)

@Volatile private var historyMaxDocs = ManagedIndexSettings.HISTORY_MAX_DOCS.get(settings)
// Setting<Long> does not have a fallbackSetting method signature so we are doing it manually here
@Volatile private var historyMaxDocs = when {
IndexManagementSettings.HISTORY_MAX_DOCS.exists(settings) -> IndexManagementSettings.HISTORY_MAX_DOCS.get(settings)
ManagedIndexSettings.DEPRECATED_HISTORY_MAX_DOCS.exists(settings) -> ManagedIndexSettings.DEPRECATED_HISTORY_MAX_DOCS.get(settings)
else -> IndexManagementSettings.HISTORY_MAX_DOCS.get(settings)
}

@Volatile private var historyMaxAge = ManagedIndexSettings.HISTORY_INDEX_MAX_AGE.get(settings)
@Volatile private var historyMaxAge = IndexManagementSettings.HISTORY_INDEX_MAX_AGE.get(settings)

@Volatile private var historyRolloverCheckPeriod = ManagedIndexSettings.HISTORY_ROLLOVER_CHECK_PERIOD.get(settings)
@Volatile private var historyRolloverCheckPeriod = IndexManagementSettings.HISTORY_ROLLOVER_CHECK_PERIOD.get(settings)

@Volatile private var historyRetentionPeriod = ManagedIndexSettings.HISTORY_RETENTION_PERIOD.get(settings)
@Volatile private var historyRetentionPeriod = IndexManagementSettings.HISTORY_RETENTION_PERIOD.get(settings)

init {
clusterService.addLocalNodeMasterListener(this)
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_ENABLED) {
historyEnabled = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_MAX_DOCS) { historyMaxDocs = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_INDEX_MAX_AGE) { historyMaxAge = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_ROLLOVER_CHECK_PERIOD) {
clusterService.clusterSettings.addSettingsUpdateConsumer(IndexManagementSettings.HISTORY_ENABLED) { historyEnabled = it }
// Setting<Long> does not have a fallbackSetting method signature so we are doing it manually here
clusterService.clusterSettings.addSettingsUpdateConsumer(IndexManagementSettings.HISTORY_MAX_DOCS, ManagedIndexSettings.DEPRECATED_HISTORY_MAX_DOCS) { im, ism ->
historyMaxDocs = when {
IndexManagementSettings.HISTORY_MAX_DOCS.exists(settings) -> im
ManagedIndexSettings.DEPRECATED_HISTORY_MAX_DOCS.exists(settings) -> ism
else -> im
}
}
clusterService.clusterSettings.addSettingsUpdateConsumer(IndexManagementSettings.HISTORY_INDEX_MAX_AGE) { historyMaxAge = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(IndexManagementSettings.HISTORY_ROLLOVER_CHECK_PERIOD) {
historyRolloverCheckPeriod = it
rescheduleRollover()
}
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_RETENTION_PERIOD) {
clusterService.clusterSettings.addSettingsUpdateConsumer(IndexManagementSettings.HISTORY_RETENTION_PERIOD) {
historyRetentionPeriod = it
}
}
Expand Down Expand Up @@ -111,19 +122,19 @@ class IndexStateManagementHistory(
}

private fun rolloverHistoryIndex(): Boolean {
if (!indexStateManagementIndices.indexStateManagementIndexHistoryExists()) {
if (!indexManagementIndices.indexManagementIndexHistoryExists()) {
return false
}

// We have to pass null for newIndexName in order to get Elastic to increment the index count.
val request = RolloverRequest(IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS, null)
request.createIndexRequest.index(IndexStateManagementIndices.HISTORY_INDEX_PATTERN)
.mapping(_DOC, IndexStateManagementIndices.indexStateManagementHistoryMappings, XContentType.JSON)
val request = RolloverRequest(IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS, null)
request.createIndexRequest.index(IndexManagementIndices.HISTORY_INDEX_PATTERN)
.mapping(_DOC, IndexManagementIndices.indexManagementHistoryMappings, XContentType.JSON)
request.addMaxIndexDocsCondition(historyMaxDocs)
request.addMaxIndexAgeCondition(historyMaxAge)
val response = client.admin().indices().rolloverIndex(request).actionGet()
if (!response.isRolledOver) {
logger.info("${IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS} not rolled over. Conditions were: ${response.conditionStatus}")
logger.info("${IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS} not rolled over. Conditions were: ${response.conditionStatus}")
}
return response.isRolledOver
}
Expand All @@ -134,7 +145,7 @@ class IndexStateManagementHistory(

val clusterStateRequest = ClusterStateRequest()
.clear()
.indices(IndexStateManagementIndices.HISTORY_ALL)
.indices(IndexManagementIndices.HISTORY_ALL)
.metadata(true)
.local(true)
.indicesOptions(IndicesOptions.strictExpand())
Expand All @@ -146,7 +157,7 @@ class IndexStateManagementHistory(
val creationTime = indexMetaData.creationDate

if ((Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis) {
val alias = indexMetaData.aliases.firstOrNull { IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS == it.value.alias }
val alias = indexMetaData.aliases.firstOrNull { IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS == it.value.alias }
if (alias != null && historyEnabled) {
// If index has write alias and history is enable, don't delete the index.
continue
Expand Down Expand Up @@ -177,20 +188,20 @@ class IndexStateManagementHistory(
}

@Suppress("NestedBlockDepth")
suspend fun addHistory(managedIndexMetaData: List<ManagedIndexMetaData>) {
suspend fun addManagedIndexMetaDataHistory(managedIndexMetaData: List<ManagedIndexMetaData>) {
if (!historyEnabled) {
logger.debug("Index State Management history is not enabled")
return
}

if (!indexStateManagementIndices.checkAndUpdateHistoryIndex()) {
if (!indexManagementIndices.checkAndUpdateHistoryIndex()) {
logger.error("Failed to create or update the ism history index:")
return // we can't continue to add the history documents below as it would potentially create dynamic mappings
}

val docWriteRequest: List<DocWriteRequest<*>> = managedIndexMetaData
.filter { shouldAddToHistory(it) }
.map { indexHistory(it) }
.filter { shouldAddManagedIndexMetaDataToHistory(it) }
.map { createManagedIndexMetaDataHistoryIndexRequest(it) }

if (docWriteRequest.isNotEmpty()) {
val bulkRequest = BulkRequest().add(docWriteRequest)
Expand All @@ -209,24 +220,24 @@ class IndexStateManagementHistory(
}
}

private fun shouldAddToHistory(managedIndexMetaData: ManagedIndexMetaData): Boolean {
return when {
managedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.STARTING -> false
managedIndexMetaData.stepMetaData?.stepStatus == Step.StepStatus.CONDITION_NOT_MET -> false
private fun shouldAddManagedIndexMetaDataToHistory(managedIndexMetaData: ManagedIndexMetaData): Boolean {
return when (managedIndexMetaData.stepMetaData?.stepStatus) {
Step.StepStatus.STARTING -> false
Step.StepStatus.CONDITION_NOT_MET -> false
else -> true
}
}

private fun indexHistory(managedIndexMetaData: ManagedIndexMetaData): IndexRequest {
private fun createManagedIndexMetaDataHistoryIndexRequest(managedIndexMetaData: ManagedIndexMetaData): IndexRequest {
val builder = XContentFactory.jsonBuilder()
.startObject()
.startObject(IndexStateManagementPlugin.INDEX_STATE_MANAGEMENT_HISTORY_TYPE)
.startObject(IndexManagementPlugin.INDEX_STATE_MANAGEMENT_HISTORY_TYPE)
managedIndexMetaData.toXContent(builder, ToXContent.EMPTY_PARAMS)
builder
.field("history_timestamp", Instant.now().toEpochMilli())
.endObject()
.endObject()
return IndexRequest(IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS)
return IndexRequest(IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS)
.source(builder)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement
package com.amazon.opendistroforelasticsearch.indexmanagement

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util._DOC
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.indexmanagement.util._DOC
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ResourceAlreadyExistsException
import org.elasticsearch.action.ActionListener
Expand All @@ -36,17 +36,17 @@ import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.XContentType

@OpenForTesting
class IndexStateManagementIndices(
class IndexManagementIndices(
private val client: IndicesAdminClient,
private val clusterService: ClusterService
) {

private val logger = LogManager.getLogger(javaClass)

fun checkAndUpdateISMConfigIndex(actionListener: ActionListener<AcknowledgedResponse>) {
if (!indexStateManagementIndexExists()) {
val indexRequest = CreateIndexRequest(INDEX_STATE_MANAGEMENT_INDEX)
.mapping(_DOC, indexStateManagementMappings, XContentType.JSON)
if (!indexManagementIndexExists()) {
val indexRequest = CreateIndexRequest(INDEX_MANAGEMENT_INDEX)
.mapping(_DOC, indexManagementMappings, XContentType.JSON)
.settings(Settings.builder().put("index.hidden", true).build())
client.create(indexRequest, object : ActionListener<CreateIndexResponse> {
override fun onFailure(e: Exception) {
Expand All @@ -62,38 +62,38 @@ class IndexStateManagementIndices(
}
}

fun indexStateManagementIndexExists(): Boolean = clusterService.state().routingTable.hasIndex(INDEX_STATE_MANAGEMENT_INDEX)
fun indexManagementIndexExists(): Boolean = clusterService.state().routingTable.hasIndex(INDEX_MANAGEMENT_INDEX)

/**
* Attempt to create [INDEX_STATE_MANAGEMENT_INDEX] and return whether it exists
* Attempt to create [INDEX_MANAGEMENT_INDEX] and return whether it exists
*/
@Suppress("ReturnCount")
suspend fun attemptInitStateManagementIndex(client: Client): Boolean {
if (indexStateManagementIndexExists()) return true
if (indexManagementIndexExists()) return true

return try {
val response: AcknowledgedResponse = client.suspendUntil { checkAndUpdateISMConfigIndex(it) }
if (response.isAcknowledged) {
return true
}
logger.error("Creating $INDEX_STATE_MANAGEMENT_INDEX with mappings NOT acknowledged")
logger.error("Creating $INDEX_MANAGEMENT_INDEX with mappings NOT acknowledged")
return false
} catch (e: ResourceAlreadyExistsException) {
true
} catch (e: Exception) {
logger.error("Error trying to create $INDEX_STATE_MANAGEMENT_INDEX", e)
logger.error("Error trying to create $INDEX_MANAGEMENT_INDEX", e)
false
}
}

/**
* ============== History =============
*/
fun indexStateManagementIndexHistoryExists(): Boolean = clusterService.state().metadata.hasAlias(HISTORY_WRITE_INDEX_ALIAS)
fun indexManagementIndexHistoryExists(): Boolean = clusterService.state().metadata.hasAlias(HISTORY_WRITE_INDEX_ALIAS)

@Suppress("ReturnCount")
suspend fun checkAndUpdateHistoryIndex(): Boolean {
if (!indexStateManagementIndexHistoryExists()) {
if (!indexManagementIndexHistoryExists()) {
return createHistoryIndex(HISTORY_INDEX_PATTERN, HISTORY_WRITE_INDEX_ALIAS)
} else {
val response: AcknowledgedResponse = client.suspendUntil {
Expand All @@ -117,7 +117,7 @@ class IndexStateManagementIndices(
if (existsResponse.isExists) return true

val request = CreateIndexRequest(index)
.mapping(_DOC, indexStateManagementHistoryMappings, XContentType.JSON)
.mapping(_DOC, indexManagementHistoryMappings, XContentType.JSON)
.settings(Settings.builder().put("index.hidden", true).build())
if (alias != null) request.alias(Alias(alias))
return try {
Expand All @@ -142,9 +142,9 @@ class IndexStateManagementIndices(
const val HISTORY_INDEX_PATTERN = "<$HISTORY_INDEX_BASE-{now/d{yyyy.MM.dd}}-1>"
const val HISTORY_ALL = "$HISTORY_INDEX_BASE*"

val indexStateManagementMappings = IndexStateManagementIndices::class.java.classLoader
val indexManagementMappings = IndexManagementIndices::class.java.classLoader
.getResource("mappings/opendistro-ism-config.json").readText()
val indexStateManagementHistoryMappings = IndexStateManagementIndices::class.java.classLoader
val indexManagementHistoryMappings = IndexManagementIndices::class.java.classLoader
.getResource("mappings/opendistro-ism-history.json").readText()
}
}
Loading

0 comments on commit 4c49870

Please sign in to comment.