Skip to content

Commit

Permalink
Simplify async opening code
Browse files Browse the repository at this point in the history
PendingDataSource turned out to be more trouble than it's worth. The
completable future can do the job as good, even providing saner error
handling code.

Issue: n/a
  • Loading branch information
mlopatkin committed Nov 17, 2023
1 parent 5c9fe82 commit 12365be
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 613 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

import name.mlopatkin.andlogview.base.MyThrowables;

import com.google.errorprone.annotations.CanIgnoreReturnValue;

import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -57,7 +60,7 @@ public static Cancellable toCancellable(CompletableFuture<?> future) {
*/
public static Consumer<Throwable> ignoreCancellations(Consumer<? super Throwable> failureHandler) {
return th -> {
if (!(MyThrowables.unwrapUninteresting(th) instanceof CancellationException)) {
if (!isCancellation(th)) {
failureHandler.accept(th);
}
};
Expand Down Expand Up @@ -208,6 +211,49 @@ public static Function<Throwable, Void> exceptionHandler(Runnable consumer) {
return consumingHandler(ignored -> {}, errorConsumer);
}

/**
* Arranges the futures in such a way that cancellation of the {@code canceller} cancels the provided
* {@code future}. This method returns the given {@code future} to allow for simpler expressions. Cancellations of
* canceller's upstream stages also cancel the {@code future}, if they cause the canceller to complete.
*
* @param future the future to be cancelled
* @param canceller the stage, cancellation of which cancels the {@code future}
* @param <F> the exact type of the given {@code future}
* @return the {@code future}
*/
@CanIgnoreReturnValue
public static <F extends CompletableFuture<?>> F cancelBy(F future, CompletionStage<?> canceller) {
canceller.exceptionally(th -> {
if (isCancellation(th)) {
future.cancel(false);
}
return null;
}).exceptionally(MyFutures::uncaughtException);
return future;
}

/**
* Arranges the futures in such a way that cancellation of the {@code canceller} cancels the provided
* {@code cancellable}. This method returns the given {@code cancellable} to allow for simpler expressions.
* Cancellations of canceller's upstream stages also cancel the {@code cancellable}, if they cause the canceller to
* complete.
*
* @param cancellable the cancellable to be cancelled
* @param canceller the stage, cancellation of which cancels the {@code cancellable}
* @param <T> the exact type of the given {@code cancellable}
* @return the {@code cancellable}
*/
@CanIgnoreReturnValue
public static <T extends Cancellable> T cancelBy(T cancellable, CompletionStage<?> canceller) {
canceller.exceptionally(th -> {
if (isCancellation(th)) {
cancellable.cancel();
}
return null;
}).exceptionally(MyFutures::uncaughtException);
return cancellable;
}

/**
* Helper for {@link CompletableFuture#exceptionally(Function)} that forwards exception to thread's default
* exception handler. The failure is rethrown, so the downstream chain still fails.
Expand All @@ -227,4 +273,8 @@ public static Function<Throwable, Void> exceptionHandler(Runnable consumer) {
private static <E extends Throwable> RuntimeException sneakyRethrow(Throwable th) throws E {
throw (E) th;
}

private static boolean isCancellation(Throwable th) {
return (MyThrowables.unwrapUninteresting(th) instanceof CancellationException);
}
}
163 changes: 163 additions & 0 deletions base/src/test/java/name/mlopatkin/andlogview/utils/MyFuturesTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright 2023 the Andlogview authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package name.mlopatkin.andlogview.utils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import name.mlopatkin.andlogview.base.MyThrowables;
import name.mlopatkin.andlogview.test.ThreadTestUtils;

import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;

import java.util.concurrent.CompletableFuture;

class MyFuturesTest {
@Test
void cancelByTouchesNoCancellableBeforeCancel() {
Cancellable cancellable = mock();
var canceller = new CompletableFuture<Void>();

MyFutures.cancelBy(cancellable, canceller);

verify(cancellable, never()).cancel();
}

@Test
void cancelBTouchesNoFutureBeforeCancel() {
var cancellable = new CompletableFuture<>();
var canceller = new CompletableFuture<Void>();

MyFutures.cancelBy(cancellable, canceller);

assertThat(cancellable).isNotCancelled();
}

@Test
void cancelByCancelsCancellableAfterCancel() {
Cancellable cancellable = mock();
var canceller = new CompletableFuture<Void>();

MyFutures.cancelBy(cancellable, canceller);
canceller.cancel(false);

verify(cancellable).cancel();
}

@Test
void cancelByCancelsFutureAfterCancel() {
var cancellable = new CompletableFuture<>();
var canceller = new CompletableFuture<Void>();

MyFutures.cancelBy(cancellable, canceller);
canceller.cancel(false);

assertThat(cancellable).isCancelled();
}

@Test
void exceptionsFromCancellableAreForwarded() throws Exception {
Cancellable cancellable = mock();
var ex = new RuntimeException("test");
when(cancellable.cancel()).thenThrow(ex);

var canceller = new CompletableFuture<Void>();

MyFutures.cancelBy(cancellable, canceller);

var handler = ThreadTestUtils.withUncaughtExceptionHandler(mock(), () -> {
canceller.cancel(false);
});

verify(handler).uncaughtException(any(),
ArgumentMatchers.argThat(argument -> ex.equals(MyThrowables.unwrapUninteresting(argument))));
}

@Test
void cancelByCancelledFutureImmediatelyCancelsCancellable() {
Cancellable cancellable = mock();
var future = cancelledFuture();

MyFutures.cancelBy(cancellable, future);

verify(cancellable).cancel();
}

@Test
void cancelByCancelledFutureImmediatelyCancelsFuture() {
var cancellable = new CompletableFuture<>();
var future = cancelledFuture();

MyFutures.cancelBy(cancellable, future);

assertThat(cancellable).isCancelled();
}

@Test
void cancelByFailedFutureDoesNotCancel() {
Cancellable cancellable1 = mock();
var cancellable2 = new CompletableFuture<>();
var future = MyFutures.failedFuture(new Exception("already failed"));

MyFutures.cancelBy(cancellable1, future);
MyFutures.cancelBy(cancellable2, future);

verify(cancellable1, never()).cancel();
assertThat(cancellable2).isNotCancelled();
}

@Test
void attachingCancellableDoesNotPropagateExceptions() throws Exception {
Cancellable cancellable1 = mock();
var cancellable2 = new CompletableFuture<>();
var future = MyFutures.failedFuture(new Exception("already failed"));

var handler = ThreadTestUtils.withUncaughtExceptionHandler(mock(), () -> {
MyFutures.cancelBy(cancellable1, future);
MyFutures.cancelBy(cancellable2, future);
});

verify(handler, never()).uncaughtException(any(), any());
}

@Test
void indirectCancellationPropagates() {
Cancellable cancellable1 = mock();
var cancellable2 = new CompletableFuture<>();
var future = new CompletableFuture<String>();
var indirect = future.thenApply(String::trim);

MyFutures.cancelBy(cancellable1, indirect);
MyFutures.cancelBy(cancellable2, indirect);

future.cancel(false);

verify(cancellable1).cancel();
assertThat(cancellable2).isCancelled();
}

private static CompletableFuture<Void> cancelledFuture() {
var future = new CompletableFuture<Void>();
future.cancel(false);
return future;
}
}
33 changes: 22 additions & 11 deletions src/name/mlopatkin/andlogview/MainFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import name.mlopatkin.andlogview.ui.FileDialog;
import name.mlopatkin.andlogview.ui.FrameDimensions;
import name.mlopatkin.andlogview.ui.FrameLocation;
import name.mlopatkin.andlogview.ui.PendingDataSource;
import name.mlopatkin.andlogview.ui.bookmarks.BookmarkController;
import name.mlopatkin.andlogview.ui.device.AdbOpener;
import name.mlopatkin.andlogview.ui.device.AdbServicesInitializationPresenter;
Expand All @@ -54,6 +53,7 @@
import name.mlopatkin.andlogview.ui.search.logtable.TablePosition;
import name.mlopatkin.andlogview.ui.status.StatusPanel;
import name.mlopatkin.andlogview.utils.CommonChars;
import name.mlopatkin.andlogview.utils.MyFutures;
import name.mlopatkin.andlogview.widgets.UiHelper;

import com.google.common.io.Files;
Expand All @@ -74,7 +74,9 @@
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Supplier;

import javax.inject.Inject;
Expand Down Expand Up @@ -164,7 +166,7 @@ public class MainFrame implements MainFrameSearchUi {
@Inject
FileDialog fileDialog;

private @Nullable PendingDataSource<? extends @Nullable DataSource> pendingDataSource;
private @Nullable CompletableFuture<? extends @Nullable DataSource> pendingDataSource;

private final LogModel.Observer autoscrollObserver = new LogModel.Observer() {
@Override
Expand Down Expand Up @@ -216,7 +218,7 @@ private MainFrame(AppGlobals globals, CommandLine commandLine) {
initialize(commandLine.isDebug());
}

private void consumePendingSourceAsync(PendingDataSource<? extends @Nullable DataSource> origin,
private void consumePendingSourceAsync(CompletableFuture<? extends @Nullable DataSource> origin,
@Nullable DataSource newSource) {
if (EventQueue.isDispatchThread()) {
consumePendingSource(origin, newSource);
Expand All @@ -225,7 +227,7 @@ private void consumePendingSourceAsync(PendingDataSource<? extends @Nullable Dat
}
}

private void consumePendingSource(PendingDataSource<? extends @Nullable DataSource> origin,
private void consumePendingSource(CompletableFuture<? extends @Nullable DataSource> origin,
@Nullable DataSource newSource) {
assert EventQueue.isDispatchThread();
// Clean up current pending operation.
Expand Down Expand Up @@ -448,8 +450,7 @@ private void selectAndOpenFile() {

private void connectToDevice() {
var source = adbOpener.selectAndOpenDevice();
source.whenFailed(this::disableAdbCommandsAsync);
startOpeningDataSource(() -> source);
startOpeningDataSource(() -> source, this::disableAdbCommandsAsync);
}

private void selectAndDumpDevice() {
Expand All @@ -464,7 +465,13 @@ public void tryToConnectToFirstAvailableDevice() {
}

private void startOpeningDataSource(
Supplier<? extends PendingDataSource<? extends @Nullable DataSource>> pendingDataSource) {
Supplier<? extends CompletableFuture<? extends @Nullable DataSource>> pendingDataSource) {
startOpeningDataSource(pendingDataSource, MyFutures::uncaughtException);
}

private void startOpeningDataSource(
Supplier<? extends CompletableFuture<? extends @Nullable DataSource>> pendingDataSource,
Consumer<? super Throwable> failureHandler) {
cancelPendingOperation();
// Cancelling before obtaining is important, because obtaining a pending data source may start a nested event
// loop and block.
Expand All @@ -474,14 +481,19 @@ private void startOpeningDataSource(
// overwrite data source too.
this.pendingDataSource = newPendingDataSource;

newPendingDataSource.whenAvailable((source) -> consumePendingSourceAsync(newPendingDataSource, source));
newPendingDataSource.handle(MyFutures.consumingHandler(
source -> consumePendingSourceAsync(newPendingDataSource, source),
MyFutures.ignoreCancellations(failureHandler)))
// The failureHandler has consumed all exceptions at this point.
// The next stage handles exceptions from the handler itself.
.exceptionally(MyFutures::uncaughtException);
}

private void cancelPendingOperation() {
assert EventQueue.isDispatchThread();
var operation = pendingDataSource;
if (operation != null) {
operation.cancel();
operation.cancel(false);
pendingDataSource = null;
}
}
Expand All @@ -491,8 +503,7 @@ private void cancelPendingOperation() {
*/
public void waitForDevice() {
var source = adbOpener.awaitDevice();
source.whenFailed(this::disableAdbCommandsAsync);
startOpeningDataSource(() -> source);
startOpeningDataSource(() -> source, this::disableAdbCommandsAsync);
}

private void saveToFile() {
Expand Down
Loading

0 comments on commit 12365be

Please sign in to comment.