diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java index 404a2e581857d..a51df45eaccc4 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/IncrementalBulkIT.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.index.query.QueryBuilders; @@ -23,7 +22,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; @@ -32,8 +30,6 @@ public class IncrementalBulkIT extends ESIntegTestCase { - - public void testSingleBulkRequest() { String index = "test"; createIndex(index); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index aad3dcc457241..fd6a3e1098a6a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; @@ -48,6 +49,8 @@ import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.IncrementalBulkService; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchRequest; @@ -186,11 +189,13 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -1762,11 +1767,48 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma ); logger.info("Index [{}] docs async: [{}] bulk: [{}] partitions [{}]", builders.size(), false, true, partition.size()); for (List segmented : partition) { - BulkRequestBuilder bulkBuilder = client().prepareBulk(); - for (IndexRequestBuilder indexRequestBuilder : segmented) { - bulkBuilder.add(indexRequestBuilder); + BulkResponse actionGet; + if (randomBoolean()) { + BulkRequestBuilder bulkBuilder = client().prepareBulk(); + for (IndexRequestBuilder indexRequestBuilder : segmented) { + bulkBuilder.add(indexRequestBuilder); + } + actionGet = bulkBuilder.get(); + } else { + IncrementalBulkService bulkService = internalCluster().getInstance(IncrementalBulkService.class); + IncrementalBulkService.Handler handler = bulkService.newBulkRequest(); + + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + segmented.forEach(b -> queue.add(b.request())); + + PlainActionFuture future = new PlainActionFuture<>(); + AtomicInteger runs = new AtomicInteger(0); + Runnable r = new Runnable() { + + @Override + public void run() { + int toRemove = Math.min(randomIntBetween(5, 10), queue.size()); + ArrayList> docs = new ArrayList<>(); + for (int i = 0; i < toRemove; i++) { + docs.add(queue.poll()); + } + + if (queue.isEmpty()) { + handler.lastItems(docs, () -> {}, future); + } else { + handler.addItems(docs, () -> {}, () -> { + if (runs.incrementAndGet() % 100 == 0) { + new Thread(this).start(); + } else { + this.run(); + } + }); + } + } + }; + r.run(); + actionGet = future.actionGet(); } - BulkResponse actionGet = bulkBuilder.get(); assertThat(actionGet.hasFailures() ? actionGet.buildFailureMessage() : "", actionGet.hasFailures(), equalTo(false)); } }