diff --git a/src/main/java/org/cryptomator/cryptofs/health/api/AbstractHealthCheck.java b/src/main/java/org/cryptomator/cryptofs/health/api/AbstractHealthCheck.java deleted file mode 100644 index 4ef8c055..00000000 --- a/src/main/java/org/cryptomator/cryptofs/health/api/AbstractHealthCheck.java +++ /dev/null @@ -1,117 +0,0 @@ -package org.cryptomator.cryptofs.health.api; - -import org.cryptomator.cryptofs.VaultConfig; -import org.cryptomator.cryptolib.api.Cryptor; -import org.cryptomator.cryptolib.api.Masterkey; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.file.Path; -import java.util.Spliterators; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.TransferQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -public abstract class AbstractHealthCheck implements HealthCheck { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractHealthCheck.class); - - private final AtomicBoolean cancelled = new AtomicBoolean(); - private Future task; - - @Override - public final Stream check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, ExecutorService executor) { - ResultSpliterator resultSpliterator = new ResultSpliterator(); - - task = executor.submit(() -> { - try { - check(pathToVault, config, masterkey, cryptor, resultSpliterator); - } catch (CancellationException e) { - assert cancelled.get(); - LOG.debug("{} cancelled.", identifier()); - } finally { - resultSpliterator.end(); - } - }); - - return StreamSupport.stream(resultSpliterator, false); - } - - /** - * Checks the vault at the given path. - * - * @param pathToVault Path to the vault's root directory - * @param config The parsed and verified vault config - * @param masterkey The masterkey - * @param cryptor A cryptor initialized for this vault - * @param resultCollector A callback collecting results. Invoking this method may block until the result is processed. - */ - protected abstract void check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, Consumer resultCollector); - - @Override - public void cancel() { - if (task != null) { - cancelled.set(true); - task.cancel(true); - } - } - - private class ResultSpliterator extends Spliterators.AbstractSpliterator implements Consumer { - - private static final DiagnosticResult POISON = new PoisonResult(); - - private final TransferQueue queue = new LinkedTransferQueue<>(); - - public ResultSpliterator() { - super(Long.MAX_VALUE, DISTINCT | NONNULL | IMMUTABLE); - } - - @Override - public boolean tryAdvance(Consumer action) { - try { - var result = queue.take(); - if (result == POISON) { - return false; - } else { - action.accept(result); - return true; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } - } - - @Override - public void accept(DiagnosticResult diagnosticResult) throws CancellationException { - if (cancelled.get()) { - throw new CancellationException(); - } - try { - queue.transfer(diagnosticResult); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new CancellationException(); - } - } - - public void end() { - queue.offer(POISON); - } - - } - - private static record PoisonResult() implements DiagnosticResult { - @Override - public Severity getServerity() { - return null; - } - } - -} diff --git a/src/main/java/org/cryptomator/cryptofs/health/api/HealthCheck.java b/src/main/java/org/cryptomator/cryptofs/health/api/HealthCheck.java index 1fd7eb76..eedbd647 100644 --- a/src/main/java/org/cryptomator/cryptofs/health/api/HealthCheck.java +++ b/src/main/java/org/cryptomator/cryptofs/health/api/HealthCheck.java @@ -3,17 +3,25 @@ import org.cryptomator.cryptofs.VaultConfig; import org.cryptomator.cryptolib.api.Cryptor; import org.cryptomator.cryptolib.api.Masterkey; +import org.slf4j.LoggerFactory; import java.nio.file.Path; import java.util.Collection; import java.util.ServiceLoader; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ForkJoinPool; +import java.util.function.Consumer; import java.util.stream.Stream; +import java.util.stream.StreamSupport; public interface HealthCheck { + /** + * @return All known health checks + */ + static Collection allChecks() { + return ServiceLoader.load(HealthCheck.class).stream().map(ServiceLoader.Provider::get).toList(); + } + /** * @return A unique name for this check (that might be used as a translation key) */ @@ -24,24 +32,42 @@ default String identifier() { /** * Checks the vault at the given path. * - * @param pathToVault Path to the vault's root directory - * @param config The parsed and verified vault config - * @param masterkey The masterkey - * @param cryptor A cryptor initialized for this vault - * @param executor An executor service to run the health check - * @return Diagnostic results + * @param pathToVault Path to the vault's root directory + * @param config The parsed and verified vault config + * @param masterkey The masterkey + * @param cryptor A cryptor initialized for this vault + * @param resultCollector Callback called for each result. */ - Stream check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, ExecutorService executor); + void check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, Consumer resultCollector); /** - * Attempts to cancel this health check (if it is running). + * Invokes the health check on a background thread scheduled using the given executor service. The results will be + * streamed. If the stream gets {@link Stream#close() closed} before it terminates, an attempt is made to cancel + * the health check. + *

+ * The check blocks if the stream is not consumed * - * Calling this method does not guarantee that no further results are produced due to async behaviour. + * @param pathToVault Path to the vault's root directory + * @param config The parsed and verified vault config + * @param masterkey The masterkey + * @param cryptor A cryptor initialized for this vault + * @param executor An executor service to run the health check + * @return A lazily filled stream of diagnostic results. */ - void cancel(); + default Stream check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, ExecutorService executor) { + var resultSpliterator = new TransferSpliterator(new PoisonResult()); - static Collection allChecks() { - return ServiceLoader.load(HealthCheck.class).stream().map(ServiceLoader.Provider::get).toList(); + var task = executor.submit(() -> { + try { + check(pathToVault, config, masterkey, cryptor, resultSpliterator); + } catch (TransferSpliterator.TransferClosedException e) { + LoggerFactory.getLogger(HealthCheck.class).debug("{} cancelled.", identifier()); + } finally { + resultSpliterator.close(); + } + }); + + return StreamSupport.stream(resultSpliterator, false).onClose(() -> task.cancel(true)); } -} +} \ No newline at end of file diff --git a/src/main/java/org/cryptomator/cryptofs/health/api/PoisonResult.java b/src/main/java/org/cryptomator/cryptofs/health/api/PoisonResult.java new file mode 100644 index 00000000..416e64af --- /dev/null +++ b/src/main/java/org/cryptomator/cryptofs/health/api/PoisonResult.java @@ -0,0 +1,8 @@ +package org.cryptomator.cryptofs.health.api; + +record PoisonResult() implements DiagnosticResult { + @Override + public Severity getServerity() { + return null; + } +} \ No newline at end of file diff --git a/src/main/java/org/cryptomator/cryptofs/health/api/TransferSpliterator.java b/src/main/java/org/cryptomator/cryptofs/health/api/TransferSpliterator.java new file mode 100644 index 00000000..31472238 --- /dev/null +++ b/src/main/java/org/cryptomator/cryptofs/health/api/TransferSpliterator.java @@ -0,0 +1,82 @@ +package org.cryptomator.cryptofs.health.api; + +import com.google.common.base.Preconditions; + +import java.util.Objects; +import java.util.Spliterators; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.TransferQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * A concurrent spliterator that only {@link java.util.Spliterator#tryAdvance(Consumer) advances} by transferring + * elements it {@link Consumer#accept(Object) consumes}. Consumption blocks if the spliterator is not advanced. + *

+ * Once no futher elements are expected, this spliterator must be {@link #close() closed}, otherwise it'll + * wait indefinitely. + * + * @param the type of elements consumed and returned + */ +class TransferSpliterator extends Spliterators.AbstractSpliterator implements Consumer, AutoCloseable { + + private final TransferQueue queue = new LinkedTransferQueue<>(); + private final AtomicBoolean poisoned = new AtomicBoolean(); + private final T poison; + + /** + * @param poison A unique value that must be distinct to every single value expected to be transferred. + */ + public TransferSpliterator(T poison) { + super(Long.MAX_VALUE, DISTINCT | NONNULL | IMMUTABLE); + this.poison = Objects.requireNonNull(poison); + } + + @Override + public boolean tryAdvance(Consumer action) { + try { + var element = queue.take(); + if (element == poison) { + return false; + } else { + action.accept(element); + return true; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + + /** + * Transfers the value to consuming thread. Blocks until transfer is complete or thread is interrupted. + * @param value The value to transfer + * @throws TransferClosedException If the transfer has been closed or this thread is interrupted while waiting for the consuming side. + */ + @Override + public void accept(T value) throws TransferClosedException { + Preconditions.checkArgument(value != poison, "must not feed poison"); + if (poisoned.get()) { + throw new TransferClosedException(); + } + try { + queue.transfer(value); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TransferClosedException(); + } + } + + @Override + public void close() { + poisoned.set(true); + queue.offer(poison); + } + + /** + * Thrown if an attempt is made to {@link #accept(Object) transfer} further elements after the TransferSpliterator + * has been closed. + */ + public static class TransferClosedException extends IllegalStateException {} + +} diff --git a/src/main/java/org/cryptomator/cryptofs/health/dirid/DirIdCheck.java b/src/main/java/org/cryptomator/cryptofs/health/dirid/DirIdCheck.java index f6575e96..a288d39b 100644 --- a/src/main/java/org/cryptomator/cryptofs/health/dirid/DirIdCheck.java +++ b/src/main/java/org/cryptomator/cryptofs/health/dirid/DirIdCheck.java @@ -2,7 +2,6 @@ import org.cryptomator.cryptofs.VaultConfig; import org.cryptomator.cryptofs.common.Constants; -import org.cryptomator.cryptofs.health.api.AbstractHealthCheck; import org.cryptomator.cryptofs.health.api.CheckFailed; import org.cryptomator.cryptofs.health.api.DiagnosticResult; import org.cryptomator.cryptofs.health.api.HealthCheck; @@ -18,30 +17,22 @@ import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.Spliterator; -import java.util.Spliterators; import java.util.function.Consumer; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; /** * Reads all dir.c9r files and checks if the corresponding dir exists. */ -public class DirIdCheck extends AbstractHealthCheck { +public class DirIdCheck implements HealthCheck { private static final Logger LOG = LoggerFactory.getLogger(DirIdCheck.class); private static final int MAX_TRAVERSAL_DEPTH = 4; // d/2/30/Fo0==.c9r/dir.c9r @Override - protected void check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, Consumer resultCollector) { + public void check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, Consumer resultCollector) { // scan vault structure: var dataDirPath = pathToVault.resolve(Constants.DATA_DIR_NAME); var dirVisitor = new DirVisitor(dataDirPath, resultCollector); diff --git a/src/test/java/org/cryptomator/cryptofs/health/api/AbstractHealthCheckTest.java b/src/test/java/org/cryptomator/cryptofs/health/api/HealthCheckTest.java similarity index 63% rename from src/test/java/org/cryptomator/cryptofs/health/api/AbstractHealthCheckTest.java rename to src/test/java/org/cryptomator/cryptofs/health/api/HealthCheckTest.java index f760c295..f2168ff6 100644 --- a/src/test/java/org/cryptomator/cryptofs/health/api/AbstractHealthCheckTest.java +++ b/src/test/java/org/cryptomator/cryptofs/health/api/HealthCheckTest.java @@ -17,9 +17,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -class AbstractHealthCheckTest { +class HealthCheckTest { private Path pathToVault = Mockito.mock(Path.class); private VaultConfig config = Mockito.mock(VaultConfig.class); @@ -42,12 +41,9 @@ public void teardown() { public void testConsumeStream() { DiagnosticResult result1 = Mockito.mock(DiagnosticResult.class, "result1"); DiagnosticResult result2 = Mockito.mock(DiagnosticResult.class, "result2"); - AbstractHealthCheck check = new AbstractHealthCheck() { - @Override - protected void check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, Consumer resultCollector) { - resultCollector.accept(result1); - resultCollector.accept(result2); - } + HealthCheck check = (pathToVault, config, masterkey, cryptor, resultCollector) -> { + resultCollector.accept(result1); + resultCollector.accept(result2); }; var stream = check.check(pathToVault, config, masterkey, cryptor, executor); @@ -63,13 +59,10 @@ public void testCheckerBlocks() throws InterruptedException { DiagnosticResult result1 = Mockito.mock(DiagnosticResult.class, "result1"); CountDownLatch cdl1 = new CountDownLatch(1); CountDownLatch cdl2 = new CountDownLatch(1); - AbstractHealthCheck check = new AbstractHealthCheck() { - @Override - protected void check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, Consumer resultCollector) { - cdl1.countDown(); - resultCollector.accept(result1); - cdl2.countDown(); - } + HealthCheck check = (pathToVault, config, masterkey, cryptor, resultCollector) -> { + cdl1.countDown(); + resultCollector.accept(result1); + cdl2.countDown(); }; check.check(pathToVault, config, masterkey, cryptor, executor); @@ -79,35 +72,31 @@ protected void check(Path pathToVault, VaultConfig config, Masterkey masterkey, } @RepeatedTest(100) - @DisplayName("health check can be cancelled") - public void testCancel() { + @DisplayName("closing stream cancels health check") + public void testClose() { DiagnosticResult result1 = Mockito.mock(DiagnosticResult.class, "result1"); DiagnosticResult result2 = Mockito.mock(DiagnosticResult.class, "result2"); CountDownLatch cdl1 = new CountDownLatch(1); - AbstractHealthCheck check = new AbstractHealthCheck() { - @Override - protected void check(Path pathToVault, VaultConfig config, Masterkey masterkey, Cryptor cryptor, Consumer resultCollector) { + CountDownLatch cdl2 = new CountDownLatch(1); + CountDownLatch cdl3 = new CountDownLatch(1); + HealthCheck check = (pathToVault, config, masterkey, cryptor, resultCollector) -> { + try { + cdl1.countDown(); // job started resultCollector.accept(result1); - try { - cdl1.await(); - } catch (InterruptedException e) { - // expected, as cancel() interrupts the check worker - } resultCollector.accept(result2); + cdl2.countDown(); // job not finished + } finally { + cdl3.countDown(); // control } }; - var stream = check.check(pathToVault, config, masterkey, cryptor, executor); Assertions.assertTimeoutPreemptively(Duration.ofSeconds(1), () -> { - var results = stream.peek(result -> { - if (result == result1) { - // cancel check after first result - check.cancel(); - cdl1.countDown(); - } - }).toArray(DiagnosticResult[]::new); - Assertions.assertArrayEquals(new DiagnosticResult[]{result1}, results); + try (var stream = check.check(pathToVault, config, masterkey, cryptor, executor)) { + cdl1.await(); + } + cdl3.await(); + Assertions.assertEquals(1, cdl2.getCount()); }); } diff --git a/src/test/java/org/cryptomator/cryptofs/health/api/TransferSpliteratorTest.java b/src/test/java/org/cryptomator/cryptofs/health/api/TransferSpliteratorTest.java new file mode 100644 index 00000000..38e78731 --- /dev/null +++ b/src/test/java/org/cryptomator/cryptofs/health/api/TransferSpliteratorTest.java @@ -0,0 +1,98 @@ +package org.cryptomator.cryptofs.health.api; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +class TransferSpliteratorTest { + + private ExecutorService executor; + + @BeforeEach + public void setup() { + executor = Executors.newCachedThreadPool(); + } + + @AfterEach + public void teardown() { + executor.shutdown(); + } + + @RepeatedTest(100) + @DisplayName("spliterator terminates after reading all results") + public void testTerminatesWhenClosed() { + TransferSpliterator transferSpliterator = new TransferSpliterator<>("POISON"); + executor.submit(() -> { + try { + transferSpliterator.accept("one"); + transferSpliterator.accept("two"); + transferSpliterator.accept("three"); + } finally { + transferSpliterator.close(); + } + }); + + Assertions.assertTimeoutPreemptively(Duration.ofSeconds(1), () -> { + Assertions.assertTrue(transferSpliterator.tryAdvance(s -> Assertions.assertEquals("one", s))); + Assertions.assertTrue(transferSpliterator.tryAdvance(s -> Assertions.assertEquals("two", s))); + Assertions.assertTrue(transferSpliterator.tryAdvance(s -> Assertions.assertEquals("three", s))); + Assertions.assertFalse(transferSpliterator.tryAdvance(s -> {})); + }); + } + + @Test + @DisplayName("constructor rejects null poison") + public void testFailOnNullPoison() { + Assertions.assertThrows(NullPointerException.class, () -> { + new TransferSpliterator<>(null); + }); + } + + @Test + @DisplayName("spliterator throws exception when attempting to transfer after close()") + public void testDoesNotAcceptAfterClose() { + TransferSpliterator transferSpliterator = new TransferSpliterator<>("POISON"); + transferSpliterator.close(); + Assertions.assertThrows(TransferSpliterator.TransferClosedException.class, () -> { + transferSpliterator.accept("one"); + }); + } + + @RepeatedTest(100) + @DisplayName("spliterator handles interrupt gracefully") + public void testHandleInterruptGracefully() { + TransferSpliterator transferSpliterator = new TransferSpliterator<>("POISON"); + CountDownLatch cdl1 = new CountDownLatch(1); + CountDownLatch cdl2 = new CountDownLatch(1); + Future task = executor.submit(() -> { + try { + transferSpliterator.accept("one"); + cdl1.countDown(); + transferSpliterator.accept("two"); // will be cancelled by interrupt + transferSpliterator.accept("three"); + } catch (TransferSpliterator.TransferClosedException e) { + cdl2.countDown(); + } finally { + transferSpliterator.close(); + } + }); + + Assertions.assertTimeoutPreemptively(Duration.ofSeconds(1), () -> { + Assertions.assertTrue(transferSpliterator.tryAdvance(s -> Assertions.assertEquals("one", s))); + cdl1.await(); + task.cancel(true); + cdl2.await(); + Assertions.assertFalse(transferSpliterator.tryAdvance(s -> {})); + }); + } + +} \ No newline at end of file