Skip to content

Commit

Permalink
support broadcast join stream worker with right child
Browse files Browse the repository at this point in the history
  • Loading branch information
huasiy committed Oct 24, 2024
1 parent c2715fb commit f82f85d
Show file tree
Hide file tree
Showing 3 changed files with 569 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public CompletableFuture<Void> executePrev()
CompletableFuture<CompletableFuture<? extends Output>[]> largeChildFuture = null;
if (largeChild != null)
{
throw new InterruptedException();
largeChildFuture = largeChild.execute();
}
if (smallChildFuture != null)
{
Expand All @@ -114,7 +114,7 @@ public CompletableFuture<Void> executePrev()
if (largeChildFuture != null)
{
CompletableFuture<? extends Output>[] largeChildOutputs = largeChildFuture.join();
waitForCompletion(largeChildOutputs, LargeSideCompletionRatio);
// waitForCompletion(largeChildOutputs, LargeSideCompletionRatio);
}
prevStagesFuture.complete(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,53 @@ public static TypeDescription getSchemaFromPaths(Storage storage, List<String> p
return WorkerCommon.getFileSchemaFromPaths(storage, paths);
}

public static void getSchemaFromTwoPaths(ExecutorService executor,
Storage leftStorage, Storage rightStorage,
AtomicReference<TypeDescription> leftSchema,
AtomicReference<TypeDescription> rightSchema,
List<InputSplit> leftInputSplits, List<String> rightEndpoints)
{
requireNonNull(executor, "executor is null");
requireNonNull(leftSchema, "leftSchema is null");
requireNonNull(rightSchema, "rightSchema is null");
requireNonNull(leftInputSplits, "leftPaths is null");
requireNonNull(rightEndpoints, "rightPaths is null");

if (leftStorage != http && rightStorage == http)
{
Future<?> leftFuture = executor.submit(() -> {
try
{
leftSchema.set(getFileSchemaFromSplits(leftStorage, leftInputSplits));
} catch (IOException | InterruptedException e)
{
logger.error("failed to read the file schema for the left table", e);
}
});
Future<?> rightFuture = executor.submit(() -> {
try
{
PixelsReader pixelsReader = new PixelsReaderStreamImpl(rightEndpoints.get(0));
rightSchema.set(pixelsReader.getFileSchema());
pixelsReader.close();
// XXX: This `close()` makes the test noticeably slower. Will need to look into it.
} catch (Exception e)
{
e.printStackTrace();
}
});
try
{
leftFuture.get();
rightFuture.get();
} catch (Throwable e)
{
logger.error("interrupted while waiting for the termination of schema read", e);
}
} else
throw new UnsupportedOperationException("schema is not compatible");
}

public static void getSchemaFromPaths(ExecutorService executor,
Storage leftStorage, Storage rightStorage,
AtomicReference<TypeDescription> leftSchema,
Expand Down
Loading

0 comments on commit f82f85d

Please sign in to comment.