From c188571b741d20467dffb4fb0e775449c93d225e Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Thu, 21 Nov 2024 08:34:19 -0500 Subject: [PATCH] Rough to show AndreKurait's idea for how to leverage Flux to preserve the sequence of bulk puts in Document Migration to have a flux that effectively reports progress. Signed-off-by: Greg Schohn --- .../migrations/RfsMigrateDocuments.java | 33 +++++++++++++------ .../bulkload/common/DocumentReindexer.java | 17 +++++----- .../bulkload/worker/DocumentsRunner.java | 26 +++++++++++++-- 3 files changed, 54 insertions(+), 22 deletions(-) diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index 569e8b4a6..dd8c5e1bd 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -6,6 +6,7 @@ import java.time.Clock; import java.time.Duration; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.opensearch.migrations.bulkload.common.DefaultSourceRepoAccessor; @@ -270,7 +271,9 @@ public static void main(String[] args) throws Exception { } IJsonTransformer docTransformer = new TransformationLoader().getTransformerFactoryLoader(docTransformerConfig); - try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC()); + AtomicReference lastIndexedDocument = new AtomicReference<>(); + try (var processManager = + new LeaseExpireTrigger(w -> exitOnLeaseTimeout(w, lastIndexedDocument), Clock.systemUTC()); var workCoordinator = new OpenSearchWorkCoordinator( new CoordinateWorkHttpClient(connectionContext), TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, @@ -307,6 +310,7 @@ public static void main(String[] args) throws Exception { run( LuceneDocumentsReader.getFactory(sourceResourceProvider), reindexer, + lastIndexedDocument, workCoordinator, arguments.initialLeaseDuration, processManager, @@ -326,7 +330,9 @@ public static void main(String[] args) throws Exception { } } - private static void exitOnLeaseTimeout(String workItemId) { + private static void exitOnLeaseTimeout(String workItemId, + AtomicReference lastDocIndexed) { + // DO MORE HERE log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId); System.exit(PROCESS_TIMED_OUT_EXIT_CODE); } @@ -346,6 +352,7 @@ private static RootDocumentMigrationContext makeRootContext(Args arguments, Stri public static DocumentsRunner.CompletionStatus run(Function readerFactory, DocumentReindexer reindexer, + AtomicReference lastDocIndexedRef, IWorkCoordinator workCoordinator, Duration maxInitialLeaseDuration, LeaseExpireTrigger leaseExpireTrigger, @@ -370,14 +377,20 @@ public static DocumentsRunner.CompletionStatus run(Function { - var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard); - log.info("Shard size: " + shardMetadata.getTotalSizeBytes()); - if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) { - throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes); - } - return shardMetadata; - }, unpackerFactory, readerFactory, reindexer); + var runner = new DocumentsRunner(scopedWorkCoordinator, + maxInitialLeaseDuration, + reindexer, + unpackerFactory, + (name, shard) -> { + var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard); + log.info("Shard size: " + shardMetadata.getTotalSizeBytes()); + if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) { + throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes); + } + return shardMetadata; + }, + readerFactory, + lastDocIndexedRef::set); return runner.migrateNextShard(rootDocumentContext::createReindexContext); } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java index 63a020191..21be4b826 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java @@ -26,19 +26,17 @@ public class DocumentReindexer { private final int maxConcurrentWorkItems; private final IJsonTransformer transformer; - public Mono reindex(String indexName, Flux documentStream, IDocumentReindexContext context) { + public Flux reindex(String indexName, Flux documentStream, IDocumentReindexContext context) { var scheduler = Schedulers.newParallel("DocumentBulkAggregator"); var bulkDocs = documentStream .publishOn(scheduler, 1) .map(doc -> transformDocument(doc,indexName)); return this.reindexDocsInParallelBatches(bulkDocs, indexName, context) - .doOnSuccess(unused -> log.debug("All batches processed")) - .doOnError(e -> log.error("Error prevented all batches from being processed", e)) .doOnTerminate(scheduler::dispose); } - Mono reindexDocsInParallelBatches(Flux docs, String indexName, IDocumentReindexContext context) { + Flux reindexDocsInParallelBatches(Flux docs, String indexName, IDocumentReindexContext context) { // Use parallel scheduler for send subscription due on non-blocking io client var scheduler = Schedulers.newParallel("DocumentBatchReindexer"); var bulkDocsBatches = batchDocsBySizeOrCount(docs); @@ -47,10 +45,9 @@ Mono reindexDocsInParallelBatches(Flux docs, String indexN return bulkDocsBatches .limitRate(bulkDocsToBuffer, 1) // Bulk Doc Buffer, Keep Full .publishOn(scheduler, 1) // Switch scheduler - .flatMap(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context, scheduler), + .flatMapSequential(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context, scheduler), maxConcurrentWorkItems) - .doOnTerminate(scheduler::dispose) - .then(); + .doOnTerminate(scheduler::dispose); } @SneakyThrows @@ -63,7 +60,9 @@ BulkDocSection transformDocument(RfsLuceneDocument doc, String indexName) { return BulkDocSection.fromMap(original.toMap()); } - Mono sendBulkRequest(UUID batchId, List docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) { + public static class SegmentDocumentCursor {} + + Mono sendBulkRequest(UUID batchId, List docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) { return client.sendBulkRequest(indexName, docsBatch, context.createBulkRequest()) // Send the request .doFirst(() -> log.atInfo().setMessage("Batch Id:{}, {} documents in current bulk request.") .addArgument(batchId) @@ -76,7 +75,7 @@ Mono sendBulkRequest(UUID batchId, List docsBatch, String .log()) // Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest .onErrorResume(e -> Mono.empty()) - .then() // Discard the response object + .then(Mono.just(new SegmentDocumentCursor())) // Discard the response object .subscribeOn(scheduler); } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java index ebc8e7844..fae499836 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/DocumentsRunner.java @@ -4,6 +4,7 @@ import java.nio.file.Path; import java.time.Duration; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -17,13 +18,11 @@ import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator; import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts; -import lombok.AllArgsConstructor; import lombok.Lombok; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @Slf4j -@AllArgsConstructor public class DocumentsRunner { private final ScopedWorkCoordinator workCoordinator; @@ -32,6 +31,24 @@ public class DocumentsRunner { private final SnapshotShardUnpacker.Factory unpackerFactory; private final Function readerFactory; private final DocumentReindexer reindexer; + private final Consumer segmentDocumentCursorConsumer; + + public DocumentsRunner(ScopedWorkCoordinator workCoordinator, + Duration maxInitialLeaseDuration, + DocumentReindexer reindexer, + SnapshotShardUnpacker.Factory unpackerFactory, + BiFunction shardMetadataFactory, + Function readerFactory, + Consumer segmentDocumentCursorConsumer) { + this.maxInitialLeaseDuration = maxInitialLeaseDuration; + this.readerFactory = readerFactory; + this.reindexer = reindexer; + this.shardMetadataFactory = shardMetadataFactory; + this.unpackerFactory = unpackerFactory; + this.workCoordinator = workCoordinator; + + this.segmentDocumentCursorConsumer = segmentDocumentCursorConsumer; + } public enum CompletionStatus { NOTHING_DONE, @@ -99,7 +116,10 @@ private void doDocumentsMigration( Flux documents = reader.readDocuments(indexAndShardCursor.startingSegmentIndex, indexAndShardCursor.startingDocId); reindexer.reindex(shardMetadata.getIndexName(), documents, context) - .doOnError(error -> log.error("Error during reindexing: " + error)) + .doOnNext(segmentDocumentCursorConsumer) + .then() + .doOnError(e -> + log.atError().setCause(e).setMessage("Error prevented all batches from being processed").log()) .doOnSuccess( done -> log.atInfo().setMessage("Reindexing completed for Index {}, Shard {}") .addArgument(shardMetadata::getIndexName)