* Should throw no exceptions except {@link DownstreamInitialRootException}.
*
From e32fc742db7a561506a34898e6a1329ece369bc1 Mon Sep 17 00:00:00 2001
From: Patrick Doyle
Date: Mon, 22 Jan 2024 09:16:14 -0500
Subject: [PATCH 4/9] Tweak comments and logging.
Also, change ChangeReceiver so it doesn't call processEvent if closed.
---
.../bosk/drivers/mongo/ChangeReceiver.java | 16 ++++++++++++++-
.../vena/bosk/drivers/mongo/MainDriver.java | 20 +++++++++++++++----
2 files changed, 31 insertions(+), 5 deletions(-)
diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/ChangeReceiver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/ChangeReceiver.java
index b1328207..7d503605 100644
--- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/ChangeReceiver.java
+++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/ChangeReceiver.java
@@ -25,6 +25,17 @@
* Houses a background thread that repeatedly initializes, processes, and closes a change stream cursor.
* Ideally, the opening and closing happen just once, but they're done in a loop for fault tolerance,
* so that the driver can reinitialize if certain unusual conditions arise.
+ *
+ *
+ * We're maintaining an in-memory replica of database state, and so
+ * the loading of database state and the subsequent handling of events are inherently coupled.
+ * To coordinate this, we call back into {@link ChangeListener#onConnectionSucceeded()} to indicate the
+ * appropriate time to load the initial state before any event processing begins.
+ * The code ends up looking a bit complicated, with {@link MainDriver} creating a {@link ChangeReceiver}
+ * that calls back into {@link MainDriver} via {@link ChangeListener};
+ * but it eliminates all race conditions
+ * (which dramatically simplifies the reasoning about parallelism and corner cases)
+ * because the state loading and event processing happen on the same thread.
*/
class ChangeReceiver implements Closeable {
private final String boskName;
@@ -53,6 +64,7 @@ class ChangeReceiver implements Closeable {
public void close() {
isClosed = true;
ex.shutdownNow();
+ // Note: don't awaitTermination. It seems like a good idea, but it makes the tests crawl, and it doesn't matter for correctness
}
/**
@@ -207,7 +219,9 @@ private void eventLoop(MongoChangeStreamCursor implements MongoDriver {
+class MainDriver implements MongoDriver {
private final Bosk bosk;
private final ChangeReceiver receiver;
private final MongoDriverSettings driverSettings;
@@ -101,6 +101,8 @@ public class MainDriver implements MongoDriver {
@Override
public R initialRoot(Type rootType) throws InvalidTypeException, InterruptedException, IOException {
try (MDCScope __ = beginDriverOperation("initialRoot({})", rootType)) {
+ // The actual loading of the initial state happens on the ChangeReceiver thread.
+ // Here, we just wait for that to finish and deal with the consequences.
FutureTask task = listener.taskRef.get();
if (task == null) {
throw new IllegalStateException("initialRoot has already run");
@@ -140,8 +142,9 @@ public R initialRoot(Type rootType) throws InvalidTypeException, InterruptedExce
}
/**
- * Called on the {@link ChangeReceiver}'s background thread via {@link Listener#taskRef},
- * so there are no concerns with more events arriving concurrently.
+ * Called on the {@link ChangeReceiver}'s background thread via {@link Listener#taskRef}
+ * because it's important that this logic finishes before processing any change events,
+ * and no other change events can arrive concurrently.
*
* Should throw no exceptions except {@link DownstreamInitialRootException}.
*
@@ -151,8 +154,17 @@ public R initialRoot(Type rootType) throws InvalidTypeException, InterruptedExce
* and {@link InitialDatabaseUnavailableMode#FAIL} is active.
*/
private R doInitialRoot(Type rootType) {
+ // This establishes a safe fallback in case things go wrong. It also causes any
+ // calls to driver update methods to wait until we're finished here. (There shouldn't
+ // be any such calls while initialRoot is still running, but this ensures that if any
+ // do happen, they won't corrupt our internal state in confusing ways.)
+ quietlySetFormatDriver(new DisconnectedDriver<>(FAILURE_TO_COMPUTE_INITIAL_ROOT));
+
+ // In effect, at this point, the entire driver is now single-threaded for the remainder
+ // of this method. Our only concurrency concerns now involve database operations performed
+ // by other processes.
+
R root;
- quietlySetFormatDriver(new DisconnectedDriver<>(FAILURE_TO_COMPUTE_INITIAL_ROOT)); // Pessimistic fallback
try (var __ = collection.newReadOnlySession()){
FormatDriver detectedDriver = detectFormat();
StateAndMetadata loadedState = detectedDriver.loadAllState();
From b3e1e32f1cb76a0b164f1151330449cb90f05e70 Mon Sep 17 00:00:00 2001
From: Patrick Doyle
Date: Mon, 22 Jan 2024 09:45:10 -0500
Subject: [PATCH 5/9] Add "bosk." prefix to MDC keys for logging
---
.../main/java/io/vena/bosk/drivers/mongo/ChangeReceiver.java | 2 +-
.../io/vena/bosk/drivers/mongo/MappedDiagnosticContext.java | 2 +-
.../io/vena/bosk/drivers/mongo/TransactionalCollection.java | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/ChangeReceiver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/ChangeReceiver.java
index 7d503605..54519d75 100644
--- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/ChangeReceiver.java
+++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/ChangeReceiver.java
@@ -263,6 +263,6 @@ private void processEvent(ChangeStreamDocument event) throws Unpro
}
private static final AtomicLong EVENT_COUNTER = new AtomicLong(0);
- public static final String MDC_KEY = "MongoDriver.event";
+ public static final String MDC_KEY = "bosk.MongoDriver.event";
private static final Logger LOGGER = LoggerFactory.getLogger(ChangeReceiver.class);
}
diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MappedDiagnosticContext.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MappedDiagnosticContext.java
index 6369228d..60d9f4e0 100644
--- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MappedDiagnosticContext.java
+++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MappedDiagnosticContext.java
@@ -26,5 +26,5 @@ static final class MDCScope implements AutoCloseable {
@Override public void close() { MDC.put(MDC_KEY, oldValue); }
}
- private static final String MDC_KEY = "MongoDriver";
+ private static final String MDC_KEY = "bosk.MongoDriver";
}
diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/TransactionalCollection.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/TransactionalCollection.java
index 944617b9..f8816e5c 100644
--- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/TransactionalCollection.java
+++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/TransactionalCollection.java
@@ -133,7 +133,7 @@ public void close() {
MDC.put(MDC_KEY, oldMDC);
}
- private static final String MDC_KEY = "MongoDriver.transaction";
+ private static final String MDC_KEY = "bosk.MongoDriver.transaction";
}
/**
From b18e50286f9b83f57cb225f3f25e7a03b00f815f Mon Sep 17 00:00:00 2001
From: Patrick Doyle
Date: Mon, 22 Jan 2024 10:47:04 -0500
Subject: [PATCH 6/9] Refactor setDisconnectedDriver
---
.../vena/bosk/drivers/mongo/MainDriver.java | 33 ++++++++++++-------
1 file changed, 22 insertions(+), 11 deletions(-)
diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java
index 9675a57c..1f0da85a 100644
--- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java
+++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java
@@ -60,7 +60,15 @@ class MainDriver implements MongoDriver {
private final Listener listener;
final Formatter formatter;
+ /**
+ * Hold this while waiting on {@link #formatDriverChanged}
+ */
private final ReentrantLock formatDriverLock = new ReentrantLock();
+
+ /**
+ * Wait on this in cases where the {@link #formatDriver} isn't working
+ * and might be asynchronously repaired.
+ */
private final Condition formatDriverChanged = formatDriverLock.newCondition();
private volatile FormatDriver formatDriver = new DisconnectedDriver<>(new Exception("Driver not yet initialized"));
@@ -158,7 +166,7 @@ private R doInitialRoot(Type rootType) {
// calls to driver update methods to wait until we're finished here. (There shouldn't
// be any such calls while initialRoot is still running, but this ensures that if any
// do happen, they won't corrupt our internal state in confusing ways.)
- quietlySetFormatDriver(new DisconnectedDriver<>(FAILURE_TO_COMPUTE_INITIAL_ROOT));
+ setDisconnectedDriver(FAILURE_TO_COMPUTE_INITIAL_ROOT);
// In effect, at this point, the entire driver is now single-threaded for the remainder
// of this method. Our only concurrency concerns now involve database operations performed
@@ -183,7 +191,7 @@ private R doInitialRoot(Type rootType) {
publishFormatDriver(preferredDriver);
} catch (RuntimeException | IOException e2) {
LOGGER.warn("Failed to initialize database; disconnecting", e2);
- quietlySetFormatDriver(new DisconnectedDriver<>(e2));
+ setDisconnectedDriver(e2);
}
} catch (RuntimeException | UnrecognizedFormatException | IOException e) {
switch (driverSettings.initialDatabaseUnavailableMode()) {
@@ -192,7 +200,7 @@ private R doInitialRoot(Type rootType) {
throw new InitialRootFailureException("Unable to load initial state from MongoDB", e);
case DISCONNECT:
LOGGER.info("Unable to load initial root from database; will proceed with downstream.initialRoot", e);
- quietlySetFormatDriver(new DisconnectedDriver<>(e));
+ setDisconnectedDriver(e);
root = callDownstreamInitialRoot(rootType);
break;
default:
@@ -443,7 +451,7 @@ public void onConnectionFailed(Exception e) throws InterruptedException, Initial
public void onDisconnect(Throwable e) {
LOGGER.debug("onDisconnect({})", e.toString());
formatDriver.close();
- quietlySetFormatDriver(new DisconnectedDriver<>(e));
+ setDisconnectedDriver(e);
}
}
@@ -539,7 +547,7 @@ private void doRetryableDriverOperati
operation.run();
session.commitTransactionIfAny();
} catch (FailedSessionException e) {
- quietlySetFormatDriver(new DisconnectedDriver<>(e));
+ setDisconnectedDriver(e);
throw new DisconnectedException(e);
}
};
@@ -587,22 +595,25 @@ private void waitAndRetry(RetryableOp
}
/**
- * Sets {@link #formatDriver} but does not signal threads waiting to retry,
- * because there's very likely a better driver on its way.
+ * Sets {@link #formatDriver} but does not signal threads waiting on {@link #formatDriverChanged},
+ * because there's no point waking them to retry with {@link DisconnectedDriver}
+ * (which is bound to fail); they might as well keep waiting and hope for another
+ * better driver to arrive instead.
*/
- void quietlySetFormatDriver(FormatDriver newFormatDriver) {
- LOGGER.debug("quietlySetFormatDriver({}) (was {})", newFormatDriver.getClass().getSimpleName(), formatDriver.getClass().getSimpleName());
+ void setDisconnectedDriver(Throwable reason) {
+ LOGGER.debug("quietlySetDisconnectedDriver({}) (previously {})", reason.getClass().getSimpleName(), formatDriver.getClass().getSimpleName());
try {
formatDriverLock.lock();
formatDriver.close();
- formatDriver = newFormatDriver;
+ formatDriver = new DisconnectedDriver<>(reason);
} finally {
formatDriverLock.unlock();
}
}
/**
- * Sets {@link #formatDriver} and also signals any threads waiting to retry.
+ * Sets {@link #formatDriver} and also signals any threads waiting on {@link #formatDriverChanged}
+ * to retry their operation.
*/
void publishFormatDriver(FormatDriver newFormatDriver) {
LOGGER.debug("publishFormatDriver({}) (was {})", newFormatDriver.getClass().getSimpleName(), formatDriver.getClass().getSimpleName());
From 224e220d0b9e7b1d171b14615ee1dd6988171272 Mon Sep 17 00:00:00 2001
From: Patrick Doyle
Date: Mon, 22 Jan 2024 10:52:12 -0500
Subject: [PATCH 7/9] Refactor - minor tweak
---
.../main/java/io/vena/bosk/drivers/mongo/PandoFormatDriver.java | 2 +-
.../java/io/vena/bosk/drivers/mongo/SequoiaFormatDriver.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/PandoFormatDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/PandoFormatDriver.java
index d4c6abb5..6b52f723 100644
--- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/PandoFormatDriver.java
+++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/PandoFormatDriver.java
@@ -93,7 +93,7 @@ final class PandoFormatDriver implements FormatDriver downstream
) {
- this.description = PandoFormatDriver.class.getSimpleName() + ": " + driverSettings;
+ this.description = getClass().getSimpleName() + ": " + driverSettings;
this.settings = driverSettings;
this.format = format;
this.formatter = new Formatter(bosk, bsonPlugin);
diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/SequoiaFormatDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/SequoiaFormatDriver.java
index 735b10f8..b1a78178 100644
--- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/SequoiaFormatDriver.java
+++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/SequoiaFormatDriver.java
@@ -70,7 +70,7 @@ final class SequoiaFormatDriver implements FormatDriver
FlushLock flushLock,
BoskDriver downstream
) {
- this.description = SequoiaFormatDriver.class.getSimpleName() + ": " + driverSettings;
+ this.description = getClass().getSimpleName() + ": " + driverSettings;
this.settings = driverSettings;
this.formatter = new Formatter(bosk, bsonPlugin);
this.collection = collection;
From 4062e9345992fbd3a09ce00cd7df7f2ba0751f20 Mon Sep 17 00:00:00 2001
From: Patrick Doyle
Date: Mon, 22 Jan 2024 12:11:11 -0500
Subject: [PATCH 8/9] Refactor: use doRetryableDriverOperation for flush
---
.../vena/bosk/drivers/mongo/MainDriver.java | 26 +++----------------
1 file changed, 4 insertions(+), 22 deletions(-)
diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java
index 1f0da85a..62646b90 100644
--- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java
+++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java
@@ -296,33 +296,15 @@ public void submitConditionalDeletion(Reference target, Reference flushOperation = this::doFlush;
- try (MDCScope __ = beginDriverOperation("flush")) {
- try {
- flushOperation.run();
- } catch (DisconnectedException e) {
- LOGGER.debug("Driver is disconnected ({}); will wait and retry operation", e.getMessage());
- waitAndRetry(flushOperation, "flush");
- } catch (RevisionFieldDisruptedException e) {
- // TODO: Really, at the moment the damage is noticed, we should probably make the receiver reboot; but we currently have no way to do so!
- LOGGER.debug("Revision field has been disrupted; wait for receiver to notice something is wrong", e);
- waitAndRetry(flushOperation, "flush");
- } catch (FailedSessionException e) {
- LOGGER.debug("Cannot open MongoDB session; will wait and retry flush", e);
- waitAndRetry(flushOperation, "flush");
- }
- }
+ this.doRetryableDriverOperation(() -> {
+ formatDriver.flush();
+ }, "flush");
} catch (DisconnectedException | FailedSessionException e) {
+ // Callers are expecting a FlushFailureException in these cases
throw new FlushFailureException(e);
}
}
- private void doFlush() throws IOException, InterruptedException {
- try (var ___ = collection.newReadOnlySession()) {
- formatDriver.flush();
- }
- }
-
@Override
public void refurbish() throws IOException {
doRetryableDriverOperation(() -> {
From 8808bc33fc7247b1590a6d18ab1f0f95184e52e0 Mon Sep 17 00:00:00 2001
From: Patrick Doyle
Date: Mon, 22 Jan 2024 12:52:40 -0500
Subject: [PATCH 9/9] Modernize AbstractBoskTest
---
.../java/io/vena/bosk/AbstractBoskTest.java | 114 +++++++++---------
.../java/io/vena/bosk/AbstractEntity.java | 40 ------
2 files changed, 57 insertions(+), 97 deletions(-)
delete mode 100644 lib-testing/src/main/java/io/vena/bosk/AbstractEntity.java
diff --git a/lib-testing/src/main/java/io/vena/bosk/AbstractBoskTest.java b/lib-testing/src/main/java/io/vena/bosk/AbstractBoskTest.java
index 6c1c5118..98d7306b 100644
--- a/lib-testing/src/main/java/io/vena/bosk/AbstractBoskTest.java
+++ b/lib-testing/src/main/java/io/vena/bosk/AbstractBoskTest.java
@@ -14,16 +14,14 @@
import static java.util.Arrays.asList;
public abstract class AbstractBoskTest {
- @Value
- @EqualsAndHashCode(callSuper=false)
@With
@FieldNameConstants
- public static class TestRoot extends AbstractEntity {
- Identifier id;
- Catalog entities;
- StringListValueSubclass someStrings;
- MapValue someMappedStrings;
- }
+ public record TestRoot(
+ Identifier id,
+ Catalog entities,
+ StringListValueSubclass someStrings,
+ MapValue someMappedStrings
+ ) implements Entity { }
public static class StringListValueSubclass extends ListValue {
public StringListValueSubclass(String... entries) {
@@ -31,74 +29,76 @@ public StringListValueSubclass(String... entries) {
}
}
- @Value
- @EqualsAndHashCode(callSuper=false)
@With
@FieldNameConstants
- public static class TestEntity extends AbstractEntity {
- Identifier id;
- String string;
- TestEnum testEnum;
- Catalog children;
- Listing oddChildren;
- SideTable stringSideTable;
- Phantoms phantoms;
- Optionals optionals;
- ImplicitRefs implicitRefs;
-
+ public record TestEntity(
+ Identifier id,
+ String string,
+ TestEnum testEnum,
+ Catalog children,
+ Listing oddChildren,
+ SideTable stringSideTable,
+ Phantoms phantoms,
+ Optionals optionals,
+ ImplicitRefs implicitRefs
+ ) implements Entity {
public TestEntity withChild(TestChild child) {
return this.withChildren(children.with(child));
}
}
- @Value
- @EqualsAndHashCode(callSuper=false)
@With
@FieldNameConstants
- public static class TestChild extends AbstractEntity {
- Identifier id;
- String string;
- TestEnum testEnum;
- Catalog recursiveChildren;
- }
+ public record TestChild(
+ Identifier id,
+ String string,
+ TestEnum testEnum,
+ Catalog recursiveChildren
+ ) implements Entity { }
- @Value
- @EqualsAndHashCode(callSuper=false)
@With
@FieldNameConstants
- public static class Optionals extends AbstractEntity {
- Identifier id;
- Optional optionalString;
- Optional optionalEntity;
- Optional> optionalRef;
- Optional> optionalCatalog;
- Optional> optionalListing;
- Optional> optionalSideTable;
-
+ public record Optionals(
+ Identifier id,
+ Optional optionalString,
+ Optional optionalEntity,
+ Optional> optionalRef,
+ Optional> optionalCatalog,
+ Optional> optionalListing,
+ Optional> optionalSideTable
+ ) implements Entity {
public static Optionals empty(Identifier id) {
- return new Optionals(id,
- Optional.empty(), Optional.empty(), Optional.empty(),
- Optional.empty(), Optional.empty(), Optional.empty());
+ return new Optionals(
+ id,
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty());
}
}
- @Value
- @EqualsAndHashCode(callSuper=false)
@With
@FieldNameConstants
- public static class Phantoms extends AbstractEntity {
- Identifier id;
- Phantom phantomString;
- Phantom phantomEntity;
- Phantom> phantomRef;
- Phantom> phantomCatalog;
- Phantom> phantomListing;
- Phantom> phantomSideTable;
-
+ public record Phantoms(
+ Identifier id,
+ Phantom phantomString,
+ Phantom phantomEntity,
+ Phantom> phantomRef,
+ Phantom> phantomCatalog,
+ Phantom> phantomListing,
+ Phantom> phantomSideTable
+ ) implements Entity {
public static Phantoms empty(Identifier id) {
- return new Phantoms(id,
- Phantom.empty(), Phantom.empty(), Phantom.empty(),
- Phantom.empty(), Phantom.empty(), Phantom.empty());
+ return new Phantoms(
+ id,
+ Phantom.empty(),
+ Phantom.empty(),
+ Phantom.empty(),
+ Phantom.empty(),
+ Phantom.empty(),
+ Phantom.empty());
}
}
diff --git a/lib-testing/src/main/java/io/vena/bosk/AbstractEntity.java b/lib-testing/src/main/java/io/vena/bosk/AbstractEntity.java
deleted file mode 100644
index 8b083c08..00000000
--- a/lib-testing/src/main/java/io/vena/bosk/AbstractEntity.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package io.vena.bosk;
-
-/**
- * Some handy defaults for {@link Entity} implementations that don't
- * inherit {@link ReflectiveEntity}.
- *
- * @deprecated This dates back to a time when value-based equals and hashCode
- * were discouraged, but this is no longer the case. There's no reason newly
- * written Entities should inherit this.
- */
-@Deprecated
-public abstract class AbstractEntity implements Entity {
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "(" + id() + ")";
- }
-
- @Override
- public int hashCode() {
- throw notSupported("hashCode");
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- // Required to be reflexive by the specification of Object.equals.
- return true;
- } else {
- throw notSupported("equals");
- }
- }
-
- private IllegalArgumentException notSupported(String methodName) {
- return new IllegalArgumentException(
- getClass().getSimpleName() + "." + methodName
- + " not supported; see `Entity` javadocs for more information");
- }
-
-}