Skip to content

Commit

Permalink
Rough to show AndreKurait's idea for how to leverage Flux to preserve…
Browse files Browse the repository at this point in the history
… the sequence of bulk puts in Document Migration to have a flux that effectively reports progress.

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Nov 21, 2024
1 parent 41e0b5d commit c188571
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.opensearch.migrations.bulkload.common.DefaultSourceRepoAccessor;
Expand Down Expand Up @@ -270,7 +271,9 @@ public static void main(String[] args) throws Exception {
}
IJsonTransformer docTransformer = new TransformationLoader().getTransformerFactoryLoader(docTransformerConfig);

try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC());
AtomicReference<DocumentReindexer.SegmentDocumentCursor> lastIndexedDocument = new AtomicReference<>();
try (var processManager =
new LeaseExpireTrigger(w -> exitOnLeaseTimeout(w, lastIndexedDocument), Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
Expand Down Expand Up @@ -307,6 +310,7 @@ public static void main(String[] args) throws Exception {
run(
LuceneDocumentsReader.getFactory(sourceResourceProvider),
reindexer,
lastIndexedDocument,
workCoordinator,
arguments.initialLeaseDuration,
processManager,
Expand All @@ -326,7 +330,9 @@ public static void main(String[] args) throws Exception {
}
}

private static void exitOnLeaseTimeout(String workItemId) {
private static void exitOnLeaseTimeout(String workItemId,
AtomicReference<DocumentReindexer.SegmentDocumentCursor> lastDocIndexed) {
// DO MORE HERE
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT_EXIT_CODE);
}
Expand All @@ -346,6 +352,7 @@ private static RootDocumentMigrationContext makeRootContext(Args arguments, Stri

public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocumentsReader> readerFactory,
DocumentReindexer reindexer,
AtomicReference<DocumentReindexer.SegmentDocumentCursor> lastDocIndexedRef,
IWorkCoordinator workCoordinator,
Duration maxInitialLeaseDuration,
LeaseExpireTrigger leaseExpireTrigger,
Expand All @@ -370,14 +377,20 @@ public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocument
)) {
throw new NoWorkLeftException("No work items are pending/all work items have been processed. Returning.");
}
var runner = new DocumentsRunner(scopedWorkCoordinator, maxInitialLeaseDuration, (name, shard) -> {
var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard);
log.info("Shard size: " + shardMetadata.getTotalSizeBytes());
if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes);
}
return shardMetadata;
}, unpackerFactory, readerFactory, reindexer);
var runner = new DocumentsRunner(scopedWorkCoordinator,
maxInitialLeaseDuration,
reindexer,
unpackerFactory,
(name, shard) -> {
var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard);
log.info("Shard size: " + shardMetadata.getTotalSizeBytes());
if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes);
}
return shardMetadata;
},
readerFactory,
lastDocIndexedRef::set);
return runner.migrateNextShard(rootDocumentContext::createReindexContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@ public class DocumentReindexer {
private final int maxConcurrentWorkItems;
private final IJsonTransformer transformer;

public Mono<Void> reindex(String indexName, Flux<RfsLuceneDocument> documentStream, IDocumentReindexContext context) {
public Flux<SegmentDocumentCursor> reindex(String indexName, Flux<RfsLuceneDocument> documentStream, IDocumentReindexContext context) {
var scheduler = Schedulers.newParallel("DocumentBulkAggregator");
var bulkDocs = documentStream
.publishOn(scheduler, 1)
.map(doc -> transformDocument(doc,indexName));

return this.reindexDocsInParallelBatches(bulkDocs, indexName, context)
.doOnSuccess(unused -> log.debug("All batches processed"))
.doOnError(e -> log.error("Error prevented all batches from being processed", e))
.doOnTerminate(scheduler::dispose);
}

Mono<Void> reindexDocsInParallelBatches(Flux<BulkDocSection> docs, String indexName, IDocumentReindexContext context) {
Flux<SegmentDocumentCursor> reindexDocsInParallelBatches(Flux<BulkDocSection> docs, String indexName, IDocumentReindexContext context) {
// Use parallel scheduler for send subscription due on non-blocking io client
var scheduler = Schedulers.newParallel("DocumentBatchReindexer");
var bulkDocsBatches = batchDocsBySizeOrCount(docs);
Expand All @@ -47,10 +45,9 @@ Mono<Void> reindexDocsInParallelBatches(Flux<BulkDocSection> docs, String indexN
return bulkDocsBatches
.limitRate(bulkDocsToBuffer, 1) // Bulk Doc Buffer, Keep Full
.publishOn(scheduler, 1) // Switch scheduler
.flatMap(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context, scheduler),
.flatMapSequential(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context, scheduler),
maxConcurrentWorkItems)
.doOnTerminate(scheduler::dispose)
.then();
.doOnTerminate(scheduler::dispose);
}

@SneakyThrows
Expand All @@ -63,7 +60,9 @@ BulkDocSection transformDocument(RfsLuceneDocument doc, String indexName) {
return BulkDocSection.fromMap(original.toMap());
}

Mono<Void> sendBulkRequest(UUID batchId, List<BulkDocSection> docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) {
public static class SegmentDocumentCursor {}

Mono<SegmentDocumentCursor> sendBulkRequest(UUID batchId, List<BulkDocSection> docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) {
return client.sendBulkRequest(indexName, docsBatch, context.createBulkRequest()) // Send the request
.doFirst(() -> log.atInfo().setMessage("Batch Id:{}, {} documents in current bulk request.")
.addArgument(batchId)
Expand All @@ -76,7 +75,7 @@ Mono<Void> sendBulkRequest(UUID batchId, List<BulkDocSection> docsBatch, String
.log())
// Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest
.onErrorResume(e -> Mono.empty())
.then() // Discard the response object
.then(Mono.just(new SegmentDocumentCursor())) // Discard the response object
.subscribeOn(scheduler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.nio.file.Path;
import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -17,13 +18,11 @@
import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator;
import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts;

import lombok.AllArgsConstructor;
import lombok.Lombok;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

@Slf4j
@AllArgsConstructor
public class DocumentsRunner {

private final ScopedWorkCoordinator workCoordinator;
Expand All @@ -32,6 +31,24 @@ public class DocumentsRunner {
private final SnapshotShardUnpacker.Factory unpackerFactory;
private final Function<Path, LuceneDocumentsReader> readerFactory;
private final DocumentReindexer reindexer;
private final Consumer<DocumentReindexer.SegmentDocumentCursor> segmentDocumentCursorConsumer;

public DocumentsRunner(ScopedWorkCoordinator workCoordinator,
Duration maxInitialLeaseDuration,
DocumentReindexer reindexer,
SnapshotShardUnpacker.Factory unpackerFactory,
BiFunction<String, Integer, ShardMetadata> shardMetadataFactory,
Function<Path, LuceneDocumentsReader> readerFactory,
Consumer<DocumentReindexer.SegmentDocumentCursor> segmentDocumentCursorConsumer) {
this.maxInitialLeaseDuration = maxInitialLeaseDuration;
this.readerFactory = readerFactory;
this.reindexer = reindexer;
this.shardMetadataFactory = shardMetadataFactory;
this.unpackerFactory = unpackerFactory;
this.workCoordinator = workCoordinator;

this.segmentDocumentCursorConsumer = segmentDocumentCursorConsumer;
}

public enum CompletionStatus {
NOTHING_DONE,
Expand Down Expand Up @@ -99,7 +116,10 @@ private void doDocumentsMigration(
Flux<RfsLuceneDocument> documents = reader.readDocuments(indexAndShardCursor.startingSegmentIndex, indexAndShardCursor.startingDocId);

reindexer.reindex(shardMetadata.getIndexName(), documents, context)
.doOnError(error -> log.error("Error during reindexing: " + error))
.doOnNext(segmentDocumentCursorConsumer)
.then()
.doOnError(e ->
log.atError().setCause(e).setMessage("Error prevented all batches from being processed").log())
.doOnSuccess(
done -> log.atInfo().setMessage("Reindexing completed for Index {}, Shard {}")
.addArgument(shardMetadata::getIndexName)
Expand Down

0 comments on commit c188571

Please sign in to comment.