Skip to content

Commit

Permalink
Upgrades supported ES version to 7.10 (opendistro-for-elasticsearch#349)
Browse files Browse the repository at this point in the history
* Upgrades gradle wrapper

* Upgrades ES to 7.10, fixes gradle tasks, and updates dependencies

* Fixes compile errors from upgrading to 7.10

* Updates version to 1.12.0.0

* Updates set-env command in workflow
  • Loading branch information
dbbaughe authored Nov 20, 2020
1 parent f62beef commit 5a8b647
Show file tree
Hide file tree
Showing 67 changed files with 209 additions and 168 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
cp ./build/distributions/*.deb index-management-artifacts_deb
cp ./build/distributions/*.rpm index-management-artifacts
cp ./build/distributions/*.rpm index-management-artifacts_rpm
echo ::set-env name=TAG_VERSION::${GITHUB_REF/refs\/tags\//}
echo "TAG_VERSION=${GITHUB_REF/refs\/tags\//}" >> $GITHUB_ENV
# AWS authentication
- name: Configure AWS Credentials
Expand Down
4 changes: 2 additions & 2 deletions build-tools/esplugin-coverage.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ task dummyIntegTest(type: Test) {
}
}

integTest.runner {
integTest {
systemProperty 'jacoco.dir', "${jacocoDir}"
}

Expand All @@ -68,7 +68,7 @@ jacocoTestReport {

allprojects{
afterEvaluate {
jacocoTestReport.dependsOn integTest.runner
jacocoTestReport.dependsOn integTest

testClusters.integTest {
jvmArgs " ${dummyIntegTest.jacoco.getAsJvmArg()}".replace('javaagent:','javaagent:/')
Expand Down
35 changes: 26 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.function.Predicate

buildscript {
ext {
es_version = System.getProperty("es.version", "7.9.1")
es_version = System.getProperty("es.version", "7.10.0")
kotlin_version = System.getProperty("kotlin.version", "1.3.72")
}

Expand Down Expand Up @@ -50,6 +50,7 @@ apply plugin: 'jacoco'
apply plugin: 'idea'
apply plugin: 'elasticsearch.esplugin'
apply plugin: 'elasticsearch.testclusters'
apply plugin: 'elasticsearch.rest-test'
apply plugin: 'io.gitlab.arturbosch.detekt'
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'
Expand All @@ -69,6 +70,10 @@ esplugin {
extendedPlugins = ['opendistro-job-scheduler']
}

tasks.named("integTest").configure {
it.dependsOn(project.tasks.named("bundlePlugin"))
}

allOpen {
annotation("com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting")
}
Expand Down Expand Up @@ -96,12 +101,12 @@ configurations.all {

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${es_version}"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.11.0.0"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.12.0.0"
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7'
compile "org.jetbrains:annotations:13.0"
compile "com.amazon.opendistroforelasticsearch:notification:1.11.0.0"
compile "com.amazon.opendistroforelasticsearch:notification:1.12.0.0"

testCompile "org.elasticsearch.test:framework:${es_version}"
testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down Expand Up @@ -143,6 +148,7 @@ javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
licenseHeaders.enabled = true
dependencyLicenses.enabled = false
thirdPartyAudit.enabled = false
loggerUsageCheck.enabled = false
validateNebulaPom.enabled = false

def es_tmp_dir = rootProject.file('build/private/es_tmp').absoluteFile
Expand Down Expand Up @@ -187,6 +193,7 @@ test {
File repo = file("$buildDir/testclusters/repo")
def _numNodes = findProperty('numNodes') as Integer ?: 1
testClusters.integTest {
plugin(project.tasks.bundlePlugin.archiveFile)
testDistribution = "OSS"
// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
Expand All @@ -200,18 +207,27 @@ testClusters.integTest {
debugPort += 1
}
}
plugin(fileTree("src/test/resources/job-scheduler").getSingleFile())
plugin(provider({
new RegularFile() {
@Override
File getAsFile() { fileTree("src/test/resources/job-scheduler").getSingleFile() }
}
}))
if (securityEnabled) {
plugin(fileTree("src/test/resources/security") { include "opendistro_security*" }.getSingleFile())
// TODO: Update the actual zip to 1.12
plugin(provider({
new RegularFile() {
@Override
File getAsFile() { fileTree("src/test/resources/security") { include "opendistro_security*" }.getSingleFile() }
}
}))
}
setting 'path.repo', repo.absolutePath
}

integTest.runner {
integTest {
systemProperty 'tests.security.manager', 'false'
systemProperty 'java.io.tmpdir', es_tmp_dir.absolutePath
systemProperty 'tests.path.repo', repo.absolutePath
systemProperty 'buildDir', buildDir
systemProperty 'buildDir', buildDir.path
systemProperty "https", System.getProperty("https", securityEnabled.toString())
systemProperty "user", System.getProperty("user", "admin")
systemProperty "password", System.getProperty("password", "admin")
Expand All @@ -235,6 +251,7 @@ integTest.runner {
}

run {
useCluster project.testClusters.integTest
doFirst {
// There seems to be an issue when running multi node run or integ tasks with unicast_hosts
// not being written, the waitForAllConditions ensures it's written
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# permissions and limitations under the License.
#

version = 1.11.0
version = 1.12.0
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
5 changes: 2 additions & 3 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#Tue May 19 22:58:14 PDT 2020
distributionUrl=https\://services.gradle.org/distributions/gradle-6.5-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7-all.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
2 changes: 1 addition & 1 deletion gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ fi
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`

JAVACMD=`cygpath --unix "$JAVACMD"`

# We build the pattern for arguments to be converted via cygpath
Expand Down
21 changes: 3 additions & 18 deletions gradlew.bat
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome

set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto init
if "%ERRORLEVEL%" == "0" goto execute

echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Expand All @@ -54,7 +54,7 @@ goto fail
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe

if exist "%JAVA_EXE%" goto init
if exist "%JAVA_EXE%" goto execute

echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
Expand All @@ -64,29 +64,14 @@ echo location of your Java installation.

goto fail

:init
@rem Get command-line arguments, handling Windows variants

if not "%OS%" == "Windows_NT" goto win9xME_args

:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2

:win9xME_args_slurp
if "x%~1" == "x" goto execute

set CMD_LINE_ARGS=%*

:execute
@rem Setup the command line

set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar


@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*

:end
@rem End local scope for the variables with windows NT shell
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act

override fun getJobParser(): ScheduledJobParser {
return ScheduledJobParser { xcp, id, jobDocVersion ->
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContent
if (instant == null) {
return nullField(name)
}
return this.timeField(name, name, instant.toEpochMilli())
return this.timeField(name, "${name}_in_millis", instant.toEpochMilli())
}

/**
Expand Down Expand Up @@ -163,10 +163,10 @@ fun <T> XContentParser.parseWithType(
primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
parse: (xcp: XContentParser, id: String, seqNo: Long, primaryTerm: Long) -> T
): T {
ensureExpectedToken(Token.START_OBJECT, nextToken(), this::getTokenLocation)
ensureExpectedToken(Token.FIELD_NAME, nextToken(), this::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, nextToken(), this::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, nextToken(), this)
ensureExpectedToken(Token.FIELD_NAME, nextToken(), this)
ensureExpectedToken(Token.START_OBJECT, nextToken(), this)
val parsed = parse(this, id, seqNo, primaryTerm)
ensureExpectedToken(Token.END_OBJECT, this.nextToken(), this::getTokenLocation)
ensureExpectedToken(Token.END_OBJECT, this.nextToken(), this)
return parsed
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class IndexStateManagementHistory(
// try to rollover immediately as we might be restarting the cluster
rolloverHistoryIndex()
// schedule the next rollover for approx MAX_AGE later
scheduledRollover = threadPool.scheduleWithFixedDelay({ rolloverAndDeleteHistoryIndex() }, historyRolloverCheckPeriod, executorName())
scheduledRollover = threadPool.scheduleWithFixedDelay({ rolloverAndDeleteHistoryIndex() },
historyRolloverCheckPeriod, ThreadPool.Names.MANAGEMENT)
} catch (e: Exception) {
// This should be run on cluster startup
logger.error("Error creating ISM history index.", e)
Expand All @@ -96,14 +97,11 @@ class IndexStateManagementHistory(
scheduledRollover?.cancel()
}

override fun executorName(): String {
return ThreadPool.Names.MANAGEMENT
}

private fun rescheduleRollover() {
if (clusterService.state().nodes.isLocalNodeElectedMaster) {
scheduledRollover?.cancel()
scheduledRollover = threadPool.scheduleWithFixedDelay({ rolloverAndDeleteHistoryIndex() }, historyRolloverCheckPeriod, executorName())
scheduledRollover = threadPool.scheduleWithFixedDelay({ rolloverAndDeleteHistoryIndex() },
historyRolloverCheckPeriod, ThreadPool.Names.MANAGEMENT)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.ClusterChangedEvent
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.ClusterStateListener
import org.elasticsearch.cluster.LocalNodeMasterListener
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.bytes.BytesReference
Expand Down Expand Up @@ -107,7 +106,7 @@ class ManagedIndexCoordinator(
private val clusterService: ClusterService,
private val threadPool: ThreadPool,
indexManagementIndices: IndexManagementIndices
) : LocalNodeMasterListener, ClusterStateListener,
) : LocalNodeMasterListener,
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")),
LifecycleListener() {

Expand Down Expand Up @@ -153,10 +152,6 @@ class ManagedIndexCoordinator(
scheduledFullSweep?.cancel()
}

override fun executorName(): String {
return ThreadPool.Names.MANAGEMENT
}

@Suppress("ReturnCount")
override fun clusterChanged(event: ClusterChangedEvent) {
if (!isIndexStateManagementEnabled()) return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,9 +709,9 @@ object ManagedIndexRunner : ScheduledJobRunner,
return withContext(Dispatchers.IO) {
val intervalJsonString = managedIndexConfig.schedule.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).string()
val xcp = XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, intervalJsonString)
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation) // start of schedule block
ensureExpectedToken(Token.FIELD_NAME, xcp.nextToken(), xcp::getTokenLocation) // "interval"
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp::getTokenLocation) // start of interval block
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) // start of schedule block
ensureExpectedToken(Token.FIELD_NAME, xcp.nextToken(), xcp) // "interval"
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) // start of interval block
var startTime: Long? = null
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ data class ChangePolicy(
var isSafe: Boolean = false
val include = mutableListOf<StateFilter>()

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
Expand All @@ -93,7 +93,7 @@ data class ChangePolicy(
POLICY_ID_FIELD -> policyID = xcp.text()
STATE_FIELD -> state = xcp.textOrNull()
INCLUDE_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
include.add(StateFilter.parse(xcp))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ data class ErrorNotification(
var destination: Destination? = null
var messageTemplate: Script? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ data class ManagedIndexConfig(
var policyPrimaryTerm: Long? = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
var policySeqNo: Long? = SequenceNumbers.UNASSIGNED_SEQ_NO

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ data class ManagedIndexMetaData(

var info: Map<String, Any>? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ data class Policy(
var schemaVersion: Long = IndexUtils.DEFAULT_SCHEMA_VERSION
val states: MutableList<State> = mutableListOf()

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
Expand All @@ -143,7 +143,7 @@ data class Policy(
ERROR_NOTIFICATION_FIELD -> errorNotification = if (xcp.currentToken() == Token.VALUE_NULL) null else ErrorNotification.parse(xcp)
DEFAULT_STATE_FIELD -> defaultState = xcp.text()
STATES_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
states.add(State.parse(xcp))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,21 @@ data class State(
val actions: MutableList<ActionConfig> = mutableListOf()
val transitions: MutableList<Transition> = mutableListOf()

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
NAME_FIELD -> name = xcp.text()
ACTIONS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
actions.add(ActionConfig.parse(xcp, actions.size))
}
}
TRANSITIONS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
transitions.add(Transition.parse(xcp))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ data class StateFilter(val state: String) : Writeable {
fun parse(xcp: XContentParser): StateFilter {
var state: String? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
Expand Down
Loading

0 comments on commit 5a8b647

Please sign in to comment.