Skip to content

Commit

Permalink
[Backport 2.x] add a feature that flattens custom result index when e…
Browse files Browse the repository at this point in the history
…nabled (#1401) (#1405)

* add a feature that flattens custom result index when enabled (#1401)

* add a feature that flattens custom result index when enabled

Signed-off-by: Jackie Han <[email protected]>

* clean up

Signed-off-by: Jackie Han <[email protected]>

* add IT

Signed-off-by: Jackie Han <[email protected]>

* cleanup

Signed-off-by: Jackie Han <[email protected]>

* address comments

Signed-off-by: Jackie Han <[email protected]>

* update IT

Signed-off-by: Jackie Han <[email protected]>

* update IT

Signed-off-by: Jackie Han <[email protected]>

* clean up

Signed-off-by: Jackie Han <[email protected]>

* add more IT

Signed-off-by: Jackie Han <[email protected]>

* add more IT

Signed-off-by: Jackie Han <[email protected]>

* add more IT

Signed-off-by: Jackie Han <[email protected]>

* address comments

* address comments

Signed-off-by: Jackie Han <[email protected]>

* address comments

Signed-off-by: Jackie Han <[email protected]>

* utlizing a node state manager when writing results into flattened result index

Signed-off-by: Jackie Han <[email protected]>

* build flatten resuilt index enabled into the ResultWriteRequest

Signed-off-by: Jackie Han <[email protected]>

* unbind ingest pipeline with flattened result index when it's disabled

Signed-off-by: Jackie Han <[email protected]>

* test

* address comments

* cleanup

Signed-off-by: Jackie Han <[email protected]>

* make flatten result index use detector name

Signed-off-by: Jackie Han <[email protected]>

---------

Signed-off-by: Jackie Han <[email protected]>
Signed-off-by: Jackie Han <[email protected]>

* spotless apply

Signed-off-by: Jackie Han <[email protected]>

* update recency_emphasis to be greater than 1 in test cases

Signed-off-by: Jackie Han <[email protected]>

---------

Signed-off-by: Jackie Han <[email protected]>
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang authored Jan 29, 2025
1 parent 78e2b5e commit 328f901
Show file tree
Hide file tree
Showing 33 changed files with 888 additions and 103 deletions.
19 changes: 19 additions & 0 deletions src/main/java/org/opensearch/ad/indices/ADIndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.util.EnumMap;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -45,6 +46,8 @@
import org.opensearch.timeseries.indices.IndexManagement;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* This class provides utility methods for various anomaly detection indices.
*/
Expand Down Expand Up @@ -122,6 +125,22 @@ public static String getResultMappings() throws IOException {
return getMappings(ANOMALY_RESULTS_INDEX_MAPPING_FILE);
}

/**
* Retrieves the JSON mapping for the flattened result index with the "dynamic" field set to true
* @return JSON mapping for the flattened result index.
* @throws IOException if the mapping file cannot be read.
*/
public static String getFlattenedResultMappings() throws IOException {
ObjectMapper objectMapper = new ObjectMapper();

Map<String, Object> mapping = objectMapper
.readValue(ADIndexManagement.class.getClassLoader().getResourceAsStream(ANOMALY_RESULTS_INDEX_MAPPING_FILE), Map.class);

mapping.put("dynamic", true);

return objectMapper.writeValueAsString(mapping);
}

/**
* Get anomaly detector state index mapping json content.
*
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/model/AnomalyDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ public static AnomalyDetector parse(
case RESULT_INDEX_FIELD_TTL:
customResultIndexTTL = onlyParseNumberValue(parser);
break;
case FLATTEN_RESULT_INDEX_MAPPING:
case FLATTEN_CUSTOM_RESULT_INDEX:
flattenResultIndexMapping = onlyParseBooleanValue(parser);
break;
case BREAKING_UI_CHANGE_TIME:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ public ADResultWriteRequest(
String detectorId,
RequestPriority priority,
AnomalyResult result,
String resultIndex
String resultIndex,
String flattenResultIndex
) {
super(expirationEpochMs, detectorId, priority, result, resultIndex);
super(expirationEpochMs, detectorId, priority, result, resultIndex, flattenResultIndex);
}

public ADResultWriteRequest(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ protected ADResultWriteRequest createResultWriteRequest(
String configId,
RequestPriority priority,
AnomalyResult result,
String resultIndex
String resultIndex,
String flattenResultIndex
) {
return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex);
return new ADResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public void saveResult(AnomalyResult result, Config config) {
config.getId(),
result.getAnomalyGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM,
result,
config.getCustomResultIndexOrAlias()
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexAlias()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
public class ADResultBulkTransportAction extends ResultBulkTransportAction<AnomalyResult, ADResultWriteRequest, ADResultBulkRequest> {

private static final Logger LOG = LogManager.getLogger(ADResultBulkTransportAction.class);
private final ClusterService clusterService;
private final Client client;

@Inject
public ADResultBulkTransportAction(
Expand All @@ -61,39 +63,77 @@ public ADResultBulkTransportAction(
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
ADResultBulkRequest::new
);
this.clusterService = clusterService;
this.client = client;
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_INDEX_PRESSURE_SOFT_LIMIT, it -> softLimit = it);
clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_INDEX_PRESSURE_HARD_LIMIT, it -> hardLimit = it);
}

/**
* Prepares a {@link BulkRequest} for indexing anomaly detection results.
*
* This method processes a list of anomaly detection results provided in the {@link ADResultBulkRequest}.
* Each result is evaluated based on the current indexing pressure and result priority. If a flattened
* result index exists for the result, the result is also added to the flattened index.
*
* @param indexingPressurePercent the current percentage of indexing pressure. This value influences
* whether a result is indexed based on predefined thresholds and probabilities.
* @param request the {@link ADResultBulkRequest} containing anomaly detection results
* to be processed.
* @return a {@link BulkRequest} containing all results that are eligible for indexing.
*
* <p><b>Behavior:</b></p>
* <ul>
* <li>Results are added to the bulk request if the indexing pressure is within acceptable limits
* or the result has high priority.</li>
* <li>If a flattened result index exists for a result, it is added to the flattened index in addition
* to the primary index.</li>
* </ul>
*
* <p><b>Indexing Pressure Thresholds:</b></p>
* <ul>
* <li>Below the soft limit: All results are added.</li>
* <li>Between the soft limit and the hard limit: High-priority results are always added, and
* other results are added based on a probability that decreases with increasing pressure.</li>
* <li>Above the hard limit: Only high-priority results are added.</li>
* </ul>
*
* @see ADResultBulkRequest
* @see BulkRequest
* @see ADResultWriteRequest
*/
@Override
protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ADResultBulkRequest request) {
BulkRequest bulkRequest = new BulkRequest();
List<ADResultWriteRequest> results = request.getResults();

if (indexingPressurePercent <= softLimit) {
for (ADResultWriteRequest resultWriteRequest : results) {
addResult(bulkRequest, resultWriteRequest.getResult(), resultWriteRequest.getResultIndex());
for (ADResultWriteRequest resultWriteRequest : results) {
AnomalyResult result = resultWriteRequest.getResult();
String resultIndex = resultWriteRequest.getResultIndex();

if (shouldAddResult(indexingPressurePercent, result)) {
addResult(bulkRequest, result, resultIndex);
if (resultWriteRequest.getFlattenResultIndex() != null) {
addResult(bulkRequest, result, resultWriteRequest.getFlattenResultIndex());
}
}
}

return bulkRequest;
}

private boolean shouldAddResult(float indexingPressurePercent, AnomalyResult result) {
if (indexingPressurePercent <= softLimit) {
// Always add when below soft limit
return true;
} else if (indexingPressurePercent <= hardLimit) {
// exceed soft limit (60%) but smaller than hard limit (90%)
float acceptProbability = 1 - indexingPressurePercent;
for (ADResultWriteRequest resultWriteRequest : results) {
AnomalyResult result = resultWriteRequest.getResult();
if (result.isHighPriority() || random.nextFloat() < acceptProbability) {
addResult(bulkRequest, result, resultWriteRequest.getResultIndex());
}
}
return result.isHighPriority() || random.nextFloat() < acceptProbability;
} else {
// if exceeding hard limit, only index non-zero grade or error result
for (ADResultWriteRequest resultWriteRequest : results) {
AnomalyResult result = resultWriteRequest.getResult();
if (result.isHighPriority()) {
addResult(bulkRequest, result, resultWriteRequest.getResultIndex());
}
}
return result.isHighPriority();
}

return bulkRequest;
}

private void addResult(BulkRequest bulkRequest, AnomalyResult result, String resultIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public ADResultWriteRequest createResultWriteRequest(Config config, AnomalyResul
config.getId(),
RequestPriority.MEDIUM,
result,
config.getCustomResultIndexOrAlias()
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexAlias()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ public static Forecaster parse(
case RESULT_INDEX_FIELD_TTL:
customResultIndexTTL = parser.intValue();
break;
case FLATTEN_RESULT_INDEX_MAPPING:
case FLATTEN_CUSTOM_RESULT_INDEX:
flattenResultIndexMapping = parser.booleanValue();
break;
case BREAKING_UI_CHANGE_TIME:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ public ForecastResultWriteRequest(
String forecasterId,
RequestPriority priority,
ForecastResult result,
String resultIndex
String resultIndex,
String flattenResultIndex
) {
super(expirationEpochMs, forecasterId, priority, result, resultIndex);
super(expirationEpochMs, forecasterId, priority, result, resultIndex, flattenResultIndex);
}

public ForecastResultWriteRequest(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ protected ForecastResultWriteRequest createResultWriteRequest(
String configId,
RequestPriority priority,
ForecastResult result,
String resultIndex
String resultIndex,
String flattenResultIndex
) {
return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex);
return new ForecastResultWriteRequest(expirationEpochMs, configId, priority, result, resultIndex, flattenResultIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public void saveResult(ForecastResult result, Config config) {
config.getId(),
RequestPriority.MEDIUM,
result,
config.getCustomResultIndexOrAlias()
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexAlias()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public ForecastResultWriteRequest createResultWriteRequest(Config config, Foreca
config.getId(),
RequestPriority.MEDIUM,
result,
config.getCustomResultIndexOrAlias()
config.getCustomResultIndexOrAlias(),
config.getFlattenResultIndexAlias()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public static String getTooManyCategoricalFieldErr(int limit) {
public static String FAIL_TO_FIND_CONFIG_MSG = "Can't find config with id: ";
public static final String CAN_NOT_CHANGE_CATEGORY_FIELD = "Can't change category field";
public static final String CAN_NOT_CHANGE_CUSTOM_RESULT_INDEX = "Can't change custom result index";
public static final String CAN_NOT_CHANGE_FLATTEN_RESULT_INDEX = "Can't change flatten result index";
public static final String CATEGORICAL_FIELD_TYPE_ERR_MSG = "Categorical field %s must be of type keyword or ip.";
// Modifying message for FEATURE below may break the parseADValidationException method of ValidateAnomalyDetectorTransportAction
public static final String FEATURE_INVALID_MSG_PREFIX = "Feature has an invalid query";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.opensearch.timeseries.indices;

import static org.opensearch.ad.indices.ADIndexManagement.getFlattenedResultMappings;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;

Expand Down Expand Up @@ -89,6 +90,7 @@
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.io.Resources;

Expand Down Expand Up @@ -136,6 +138,7 @@ public abstract class IndexManagement<IndexType extends Enum<IndexType> & TimeSe
private NamedXContentRegistry xContentRegistry;
protected BiCheckedFunction<XContentParser, String, ? extends Config, IOException> configParser;
protected String customResultIndexPrefix;
private final ObjectMapper objectMapper = new ObjectMapper();

protected class IndexState {
// keep track of whether the mapping version is up-to-date
Expand Down Expand Up @@ -272,6 +275,11 @@ protected static String getMappings(String mappingFileRelativePath) throws IOExc
return Resources.toString(url, Charsets.UTF_8);
}

public static String getScripts(String scriptFileRelativePath) throws IOException {
URL url = IndexManagement.class.getClassLoader().getResource(scriptFileRelativePath);
return Resources.toString(url, Charsets.UTF_8);
}

protected void choosePrimaryShards(CreateIndexRequest request, boolean hiddenIndex) {
request
.settings(
Expand Down Expand Up @@ -1008,6 +1016,45 @@ public <T> void initCustomResultIndexAndExecute(String resultIndexOrAlias, Execu
}
}

/**
* creates flattened result index
* @param flattenedResultIndexAlias the flattened result index alias
* @param actionListener the action listener
*/
public void initFlattenedResultIndex(String flattenedResultIndexAlias, ActionListener<CreateIndexResponse> actionListener) {
try {
String indexName = getCustomResultIndexPattern(flattenedResultIndexAlias);
logger.info("Initializing flattened result index: {}", indexName);

CreateIndexRequest request = new CreateIndexRequest(indexName)
.mapping(getFlattenedResultMappings(), XContentType.JSON)
.settings(settings);

if (flattenedResultIndexAlias != null) {
request.alias(new Alias(flattenedResultIndexAlias));
}

choosePrimaryShards(request, false);

adminClient.indices().create(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
logger.info("Successfully created flattened result index: {} with alias: {}", indexName, flattenedResultIndexAlias);
actionListener.onResponse(response);
} else {
String errorMsg = "Index creation not acknowledged for index: " + indexName;
logger.error(errorMsg);
actionListener.onFailure(new IllegalStateException(errorMsg));
}
}, exception -> {
logger.error("Failed to create flattened result index: {}", indexName, exception);
actionListener.onFailure(exception);
}));
} catch (Exception e) {
logger.error("Error while initializing flattened result index: {}", flattenedResultIndexAlias, e);
actionListener.onFailure(e);
}
}

public <T> void validateCustomIndexForBackendJob(
String resultIndexOrAlias,
String securityLogId,
Expand Down Expand Up @@ -1252,15 +1299,18 @@ protected void rolloverAndDeleteHistoryIndex(
}

// perform rollover and delete on found custom result index alias
candidateResultAliases.forEach(config -> handleCustomResultIndex(config, resultIndex));
candidateResultAliases.forEach(config -> {
handleResultIndexRolloverAndDelete(config.getCustomResultIndexOrAlias(), config, resultIndex);
if (config.getFlattenResultIndexMapping()) {
String flattenedResultIndexAlias = config.getFlattenResultIndexAlias();
handleResultIndexRolloverAndDelete(flattenedResultIndexAlias, config, resultIndex);
}
});
}, e -> { logger.error("Failed to get configs with custom result index alias.", e); }));
}

private void handleCustomResultIndex(Config config, IndexType resultIndex) {
RolloverRequest rolloverRequest = buildRolloverRequest(
config.getCustomResultIndexOrAlias(),
getCustomResultIndexPattern(config.getCustomResultIndexOrAlias())
);
private void handleResultIndexRolloverAndDelete(String indexAlias, Config config, IndexType resultIndex) {
RolloverRequest rolloverRequest = buildRolloverRequest(indexAlias, getCustomResultIndexPattern(indexAlias));

// add rollover conditions if found in config
if (config.getCustomResultIndexMinAge() != null) {
Expand All @@ -1272,9 +1322,9 @@ private void handleCustomResultIndex(Config config, IndexType resultIndex) {

// perform rollover and delete on custom result index alias
proceedWithRolloverAndDelete(
config.getCustomResultIndexOrAlias(),
indexAlias,
rolloverRequest,
getAllCustomResultIndexPattern(config.getCustomResultIndexOrAlias()),
getAllCustomResultIndexPattern(indexAlias),
resultIndex,
config.getCustomResultIndexTTL()
);
Expand Down
Loading

0 comments on commit 328f901

Please sign in to comment.