Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into incremental_bulk_pa…
Browse files Browse the repository at this point in the history
…rser_improvements
  • Loading branch information
Tim-Brooks committed Oct 23, 2024
2 parents 3daab65 + 64281dd commit 4741f81
Show file tree
Hide file tree
Showing 155 changed files with 4,735 additions and 1,605 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/114742.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114742
summary: Adding support for additional mapping to simulate ingest API
area: Ingest Node
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/114819.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 114819
summary: Don't use a `BytesStreamOutput` to copy keys in `BytesRefBlockHash`
area: EQL
type: bug
issues:
- 114599
5 changes: 5 additions & 0 deletions docs/changelog/114951.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114951
summary: Expose cluster-state role mappings in APIs
area: Authentication
type: bug
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/115102.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 115102
summary: Watch Next Run Interval Resets On Shard Move or Node Restart
area: Watcher
type: bug
issues:
- 111433
5 changes: 5 additions & 0 deletions docs/changelog/115317.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 115317
summary: Revert "Add `ResolvedExpression` wrapper"
area: Indices APIs
type: bug
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/115359.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 115359
summary: Adding support for simulate ingest mapping adddition for indices with mappings
that do not come from templates
area: Ingest Node
type: enhancement
issues: []

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions docs/reference/ingest/apis/simulate-ingest.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ POST /_ingest/_simulate
"index_patterns": ["my-index-*"],
"composed_of": ["component_template_1", "component_template_2"]
}
},
"mapping_addition": { <4>
"dynamic": "strict",
"properties": {
"foo": {
"type": "keyword"
}
}
}
}
----
Expand All @@ -117,6 +125,7 @@ POST /_ingest/_simulate
These templates can be used to change the pipeline(s) used, or to modify the mapping that will be used to validate the result.
<3> This replaces the existing `my-index-template` index template with the contents given here for the duration of this request.
These templates can be used to change the pipeline(s) used, or to modify the mapping that will be used to validate the result.
<4> This mapping is merged into the index's final mapping just before validation. It is used only for the duration of this request.

[[simulate-ingest-api-request]]
==== {api-request-title}
Expand Down Expand Up @@ -246,6 +255,10 @@ include::{es-ref-dir}/indices/put-index-template.asciidoc[tag=request-body]
====

`mapping_addition`::
(Optional, <<mapping,mapping object>>)
Definition of a mapping that will be merged into the index's mapping for validation during the course of this request.

[[simulate-ingest-api-example]]
==== {api-examples-title}

Expand Down
10 changes: 9 additions & 1 deletion docs/reference/mapping/types/binary.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,16 @@ Synthetic source may sort `binary` values in order of their byte representation.
----
PUT idx
{
"settings": {
"index": {
"mapping": {
"source": {
"mode": "synthetic"
}
}
}
},
"mappings": {
"_source": { "mode": "synthetic" },
"properties": {
"binary": { "type": "binary", "doc_values": true }
}
Expand Down
13 changes: 8 additions & 5 deletions docs/reference/watcher/how-watcher-works.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,18 @@ add, the more distributed the watches can be executed. If you add or remove
replicas, all watches need to be reloaded. If a shard is relocated, the
primary and all replicas of this particular shard will reload.

Because the watches are executed on the node, where the watch shards are, you can create
dedicated watcher nodes by using shard allocation filtering.
Because the watches are executed on the node, where the watch shards are, you
can create dedicated watcher nodes by using shard allocation filtering. To do this
, configure nodes with a dedicated `node.attr.role: watcher` property.

You could configure nodes with a dedicated `node.attr.role: watcher` property and
then configure the `.watches` index like this:
As the `.watches` index is a system index, you can't use the normal `.watcher/_settings`
endpoint to modify its routing allocation. Instead, you can use the following dedicated
endpoint to adjust the allocation of the `.watches` shards to the nodes with the
`watcher` role attribute:

[source,console]
------------------------
PUT .watches/_settings
PUT _watcher/settings
{
"index.routing.allocation.include.role": "watcher"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -170,7 +171,7 @@ public void testTimeRanges() throws Exception {
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
time = randomBoolean() ? endTime : endTime.plusSeconds(randomIntBetween(1, 99));
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet());
expectThrows(IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class, () -> client().index(indexRequest).actionGet());
}

// Fetch UpdateTimeSeriesRangeService and increment time range of latest backing index:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public class DataStreamFeatures implements FeatureSpecification {

public static final NodeFeature DATA_STREAM_LIFECYCLE = new NodeFeature("data_stream.lifecycle");
public static final NodeFeature DATA_STREAM_FAILURE_STORE_TSDB_FIX = new NodeFeature("data_stream.failure_store.tsdb_fix");

@Override
public Map<NodeFeature, Version> getHistoricalFeatures() {
Expand All @@ -41,4 +42,9 @@ public Set<NodeFeature> getFeatures() {
DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14
);
}

@Override
public Set<NodeFeature> getTestFeatures() {
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,107 @@ index without timestamp:
body:
- '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'

---
TSDB failures go to failure store:
- requires:
cluster_features: ["data_stream.failure_store.tsdb_fix"]
reason: "tests tsdb failure store fixes in 8.16.0 that catch timestamp errors that happen earlier in the process and redirect them to the failure store."

- do:
allowed_warnings:
- "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
indices.put_index_template:
name: my-template2
body:
index_patterns: [ "fs-k8s*" ]
data_stream:
failure_store: true
template:
settings:
index:
mode: time_series
number_of_replicas: 1
number_of_shards: 2
routing_path: [ metricset, time_series_dimension ]
time_series:
start_time: 2021-04-28T00:00:00Z
end_time: 2021-04-29T00:00:00Z
mappings:
properties:
"@timestamp":
type: date
metricset:
type: keyword
time_series_dimension: true
k8s:
properties:
pod:
properties:
uid:
type: keyword
time_series_dimension: true
name:
type: keyword
ip:
type: ip
network:
properties:
tx:
type: long
rx:
type: long
- do:
index:
index: fs-k8s
body:
- '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- match: { result : "created"}
- match: { failure_store : "used"}

- do:
bulk:
refresh: true
body:
- '{ "create": { "_index": "fs-k8s"} }'
- '{"@timestamp":"2021-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- '{ "create": { "_index": "k8s"} }'
- '{ "@timestamp": "2021-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- '{ "create": { "_index": "fs-k8s"} }'
- '{ "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- '{ "create": { "_index": "fs-k8s"} }'
- '{ "@timestamp":"2000-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- '{ "create": { "_index": "k8s"} }'
- '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- '{ "create": { "_index": "k8s"} }'
- '{ "@timestamp":"2000-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
- is_true: errors

# Successfully indexed to backing index
- match: { items.0.create._index: '/\.ds-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { items.0.create.status: 201 }
- is_false: items.0.create.failure_store
- match: { items.1.create._index: '/\.ds-k8s-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { items.1.create.status: 201 }
- is_false: items.1.create.failure_store

# Successfully indexed to failure store
- match: { items.2.create._index: '/\.fs-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
- match: { items.2.create.status: 201 }
- match: { items.2.create.failure_store: used }
- match: { items.3.create._index: '/\.fs-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
- match: { items.3.create.status: 201 }
- match: { items.3.create.failure_store: used }

# Rejected, eligible to go to failure store, but failure store not enabled
- match: { items.4.create._index: 'k8s' }
- match: { items.4.create.status: 400 }
- match: { items.4.create.error.type: timestamp_error }
- match: { items.4.create.failure_store: not_enabled }
- match: { items.4.create._index: 'k8s' }
- match: { items.4.create.status: 400 }
- match: { items.4.create.error.type: timestamp_error }
- match: { items.4.create.failure_store: not_enabled }

---
index without timestamp with pipeline:
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ teardown:
# Successfully indexed to backing index
- match: { items.0.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { items.0.create.status: 201 }
- is_false: items.1.create.failure_store
- is_false: items.0.create.failure_store

# Rejected but not eligible to go to failure store
- match: { items.1.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
Expand Down Expand Up @@ -238,14 +239,11 @@ public void clusterChanged(ClusterChangedEvent event) {
}

static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
if (pipelineConfigurationsWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
if (pipelinesWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
return true;
}

Set<String> checkReferencedPipelines = pipelineConfigurationsWithGeoIpProcessor(clusterState, false).stream()
.map(PipelineConfiguration::getId)
.collect(Collectors.toSet());

final Set<String> checkReferencedPipelines = pipelinesWithGeoIpProcessor(clusterState, false);
if (checkReferencedPipelines.isEmpty()) {
return false;
}
Expand All @@ -258,22 +256,24 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
}

/**
* Retrieve list of pipelines that have at least one geoip processor.
* Retrieve the set of pipeline ids that have at least one geoip processor.
* @param clusterState Cluster state.
* @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation
* matching the param.
* @return A list of {@link PipelineConfiguration} matching criteria.
* @return A set of pipeline ids matching criteria.
*/
@SuppressWarnings("unchecked")
private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProcessor(
ClusterState clusterState,
boolean downloadDatabaseOnPipelineCreation
) {
List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
return pipelineDefinitions.stream().filter(pipelineConfig -> {
List<Map<String, Object>> processors = (List<Map<String, Object>>) pipelineConfig.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
return hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation);
}).toList();
private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) {
List<PipelineConfiguration> configurations = IngestService.getPipelines(clusterState);
Set<String> ids = new HashSet<>();
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (PipelineConfiguration configuration : configurations) {
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
ids.add(configuration.getId());
}
}
return Collections.unmodifiableSet(ids);
}

/**
Expand All @@ -283,7 +283,15 @@ private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProces
* @return true if a geoip processor is found in the processor list.
*/
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
return processors != null && processors.stream().anyMatch(p -> hasAtLeastOneGeoipProcessor(p, downloadDatabaseOnPipelineCreation));
if (processors != null) {
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (Map<String, Object> processor : processors) {
if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) {
return true;
}
}
}
return false;
}

/**
Expand Down Expand Up @@ -317,7 +325,7 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
}

/**
* Check if a processor config is has an on_failure clause containing at least a geoip processor.
* Check if a processor config has an on_failure clause containing at least a geoip processor.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @return true if a geoip processor is found in the processor list.
Expand All @@ -327,16 +335,17 @@ private static boolean isProcessorWithOnFailureGeoIpProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation
) {
return processor != null
&& processor.values()
.stream()
.anyMatch(
value -> value instanceof Map
&& hasAtLeastOneGeoipProcessor(
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
downloadDatabaseOnPipelineCreation
)
);
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (Object value : processor.values()) {
if (value instanceof Map
&& hasAtLeastOneGeoipProcessor(
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
downloadDatabaseOnPipelineCreation
)) {
return true;
}
}
return false;
}

/**
Expand Down
Loading

0 comments on commit 4741f81

Please sign in to comment.