Skip to content

Commit

Permalink
add IT
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Jan 23, 2025
1 parent 352075b commit b131f9a
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -135,7 +136,7 @@ private boolean shouldAddResult(float indexingPressurePercent, AnomalyResult res
}

private void addToFlattenedIndexIfExists(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {
String flattenedResultIndexName = resultIndex + "_flattened_" + result.getDetectorId().toLowerCase();
String flattenedResultIndexName = resultIndex + "_flattened_" + result.getDetectorId().toLowerCase(Locale.ROOT);
if (clusterService.state().metadata().hasIndex(flattenedResultIndexName)) {
addResult(bulkRequest, result, flattenedResultIndexName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,6 @@ protected void validateTimeField(boolean indexingDryRun, ActionListener<T> liste
* If {@code true}, the operation performs validation without creating/updating the configuration.
* If {@code false}, the configuration is created or updated.
* @param listener the {@link ActionListener} to handle the response or failure of the operation.
* @throws IOException if an I/O error occurs during the operation.
*
* <p><b>Behavior:</b></p>
* <ul>
Expand Down Expand Up @@ -467,8 +466,8 @@ private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listene
if (shouldHandleFlattening(indexingDryRun, createConfigResponse)) {
IndexAnomalyDetectorResponse response = (IndexAnomalyDetectorResponse) createConfigResponse;
String detectorId = response.getId();
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase();
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase();
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase(Locale.ROOT);
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase(Locale.ROOT);

timeSeriesIndices
.initFlattenedResultIndex(
Expand All @@ -495,8 +494,8 @@ private boolean shouldHandleFlattening(boolean indexingDryRun, Object createConf
}

protected void setupIngestPipeline(String detectorId, ActionListener<T> listener) {
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase();
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase();
String indexName = config.getCustomResultIndexOrAlias() + "_flattened_" + detectorId.toLowerCase(Locale.ROOT);
String pipelineId = "anomaly_detection_ingest_pipeline_" + detectorId.toLowerCase(Locale.ROOT);

try {
BytesReference pipelineSource = createPipelineDefinition(indexName);
Expand Down
52 changes: 50 additions & 2 deletions src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,20 @@ private AnomalyDetector createIndexAndGetAnomalyDetector(String indexName, List<
}

private AnomalyDetector createIndexAndGetAnomalyDetector(String indexName, List<Feature> features, boolean useDateNanos)
throws IOException {
throws IOException {
return createIndexAndGetAnomalyDetector(indexName, features, useDateNanos, false);
}

private AnomalyDetector createIndexAndGetAnomalyDetector(String indexName, List<Feature> features, boolean useDateNanos,
boolean useFlattenResultIndex) throws IOException {
TestHelpers.createIndexWithTimeField(client(), indexName, TIME_FIELD, useDateNanos);
String testIndexData = "{\"keyword-field\": \"field-1\", \"ip-field\": \"1.2.3.4\", \"timestamp\": 1}";
TestHelpers.ingestDataToIndex(client(), indexName, TestHelpers.toHttpEntity(testIndexData));
AnomalyDetector detector = TestHelpers.randomAnomalyDetector(TIME_FIELD, indexName, features);

AnomalyDetector detector = useFlattenResultIndex
? TestHelpers.randomAnomalyDetectorWithFlattenResultIndex(TIME_FIELD, indexName, features)
: TestHelpers.randomAnomalyDetector(TIME_FIELD, indexName, features);

return detector;
}

Expand Down Expand Up @@ -180,6 +189,45 @@ public void testCreateAnomalyDetectorWithDuplicateName() throws Exception {
);
}

public void testCreateAnomalyDetectorWithFlattenedResultIndex() throws Exception {
AnomalyDetector detector = createIndexAndGetAnomalyDetector(INDEX_NAME,
ImmutableList.of(TestHelpers.randomFeature(true)), false, true);

// test behavior when AD is disabled
updateClusterSettings(ADEnabledSetting.AD_ENABLED, false);
Exception ex = expectThrows(
ResponseException.class,
() -> TestHelpers
.makeRequest(
client(),
"POST",
TestHelpers.AD_BASE_DETECTORS_URI,
ImmutableMap.of(),
TestHelpers.toHttpEntity(detector),
null
)
);
assertThat(ex.getMessage(), containsString(ADCommonMessages.DISABLED_ERR_MSG));

// test behavior when AD is enabled
updateClusterSettings(ADEnabledSetting.AD_ENABLED, true);
Response response = TestHelpers
.makeRequest(client(), "POST", TestHelpers.AD_BASE_DETECTORS_URI, ImmutableMap.of(), TestHelpers.toHttpEntity(detector), null);
assertEquals("Create anomaly detector with flattened result index failed", RestStatus.CREATED, TestHelpers.restStatus(response));
Map<String, Object> responseMap = entityAsMap(response);
String id = (String) responseMap.get("_id");
int version = (int) responseMap.get("_version");
assertNotEquals("response is missing Id", AnomalyDetector.NO_ID, id);
assertTrue("incorrect version", version > 0);
// ensure the flattened result index was created
String expectedFlattenedIndex = String.format(
"opensearch-ad-plugin-result-test_flattened_%s",
id.toLowerCase(Locale.ROOT)
);
boolean indexExists = indexExists(expectedFlattenedIndex);
assertTrue(indexExists);
}

public void testCreateAnomalyDetector() throws Exception {
AnomalyDetector detector = createIndexAndGetAnomalyDetector(INDEX_NAME);
updateClusterSettings(ADEnabledSetting.AD_ENABLED, false);
Expand Down
34 changes: 34 additions & 0 deletions src/test/java/org/opensearch/timeseries/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,40 @@ public static AnomalyDetector randomAnomalyDetector(String timefield, String ind
);
}

public static AnomalyDetector randomAnomalyDetectorWithFlattenResultIndex(String timefield, String indexName, List<Feature> features) throws IOException {
return new AnomalyDetector(
randomAlphaOfLength(10),
randomLong(),
randomAlphaOfLength(20),
randomAlphaOfLength(30),
timefield,
ImmutableList.of(indexName.toLowerCase(Locale.ROOT)),
features,
randomQuery(),
randomIntervalTimeConfiguration(),
randomIntervalTimeConfiguration(),
randomIntBetween(1, TimeSeriesSettings.MAX_SHINGLE_SIZE),
null,
randomInt(),
Instant.now(),
null,
randomUser(),
ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "test",
TestHelpers.randomImputationOption(features),
// timeDecay (reverse of recencyEmphasis) should be less than 1.
// so we start with 2.
randomIntBetween(2, 10000),
randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2),
randomIntBetween(1, 1000),
null,
null,
null,
null,
true,
Instant.now()
);
}

public static AnomalyDetector randomAnomalyDetectorWithEmptyFeature() throws IOException {
return new AnomalyDetector(
randomAlphaOfLength(10),
Expand Down

0 comments on commit b131f9a

Please sign in to comment.