Skip to content

Commit

Permalink
More tweaks (venasolutions#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle authored Jan 22, 2024
2 parents d56d7ae + 8808bc3 commit 5721cff
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 154 deletions.
25 changes: 13 additions & 12 deletions bosk-core/src/main/java/io/vena/bosk/Bosk.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.val;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -322,7 +321,7 @@ private synchronized <T> boolean tryGraftReplacement(Reference<T> target, T newV
*/
private synchronized <T> boolean tryGraftDeletion(Reference<T> target) {
Path targetPath = target.path();
if (targetPath.length() == 0) {
if (targetPath.isEmpty()) {
throw new IllegalArgumentException("Cannot delete root object");
}
Dereferencer dereferencer = dereferencerFor(target);
Expand Down Expand Up @@ -375,6 +374,13 @@ private <T,S> void triggerQueueingOfHooks(Reference<T> target, @Nullable R prior
try (@SuppressWarnings("unused") ReadContext executionContext = new ReadContext(rootForHook)) {
LOGGER.debug("Hook: RUN {}({})", reg.name, changedRef);
reg.hook.onChanged(changedRef);
} catch (Exception e) {
LOGGER.error("Bosk hook \"" + reg.name() + "\" terminated with an exception, which usually indicates a bug. State updates may have been lost", e);

// Note that we don't catch Error. The practical reason is to allow users to write
// unit tests that throw AssertionError from hooks, but the bigger reason is that
// Errors indicate that something has gone dreadfully wrong, and we probably should
// not attempt to continue.
} finally {
LOGGER.debug("Hook: end {}({})", reg.name, changedRef);
}
Expand Down Expand Up @@ -415,11 +421,7 @@ private void drainQueueIfAllowed() {
if (hookExecutionPermit.tryAcquire()) {
try {
for (Runnable ex = hookExecutionQueue.pollFirst(); ex != null; ex = hookExecutionQueue.pollFirst()) {
try {
ex.run();
} catch (Exception e) {
LOGGER.error("Bosk hook terminated with an exception, which usually indicates a bug. State updates may have been lost", e);
}
ex.run();
}
} finally {
hookExecutionPermit.release();
Expand Down Expand Up @@ -844,10 +846,11 @@ public <T> T buildReferences(Class<T> refsClass) throws InvalidTypeException {
}
}

@Getter
@RequiredArgsConstructor
private abstract class ReferenceImpl<T> implements Reference<T> {
@Getter protected final Path path;
@Getter protected final Type targetType;
protected final Path path;
protected final Type targetType;

@Override
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -930,7 +933,7 @@ public final boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (!(obj instanceof Reference)) {
if (!(obj instanceof @SuppressWarnings({"rawtypes"})Reference other)) {
return false;
}

Expand All @@ -939,8 +942,6 @@ public final boolean equals(Object obj) {
// That means we can compare references from one Bosk to the other
// if they both have the same root type.

@SuppressWarnings({"rawtypes", "unchecked"})
Reference other = (Reference) obj;
return Objects.equals(this.rootType(), other.root().targetType())
&& Objects.equals(path, other.path());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@
* Houses a background thread that repeatedly initializes, processes, and closes a change stream cursor.
* Ideally, the opening and closing happen just once, but they're done in a loop for fault tolerance,
* so that the driver can reinitialize if certain unusual conditions arise.
*
* <p>
* We're maintaining an in-memory replica of database state, and so
* the loading of database state and the subsequent handling of events are inherently coupled.
* To coordinate this, we call back into {@link ChangeListener#onConnectionSucceeded()} to indicate the
* appropriate time to load the initial state before any event processing begins.
* The code ends up looking a bit complicated, with {@link MainDriver} creating a {@link ChangeReceiver}
* that calls back into {@link MainDriver} via {@link ChangeListener};
* but it eliminates all race conditions
* (which dramatically simplifies the reasoning about parallelism and corner cases)
* because the state loading and event processing happen on the same thread.
*/
class ChangeReceiver implements Closeable {
private final String boskName;
Expand Down Expand Up @@ -53,6 +64,7 @@ class ChangeReceiver implements Closeable {
public void close() {
isClosed = true;
ex.shutdownNow();
// Note: don't awaitTermination. It seems like a good idea, but it makes the tests crawl, and it doesn't matter for correctness
}

/**
Expand Down Expand Up @@ -207,7 +219,9 @@ private void eventLoop(MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument
LOGGER.debug("Interrupted while waiting for change event: {}", e.toString());
break;
}
processEvent(event);
if (!isClosed) {
processEvent(event);
}
}
} catch (RuntimeException e) {
addContextToException(e);
Expand Down Expand Up @@ -249,6 +263,6 @@ private void processEvent(ChangeStreamDocument<BsonDocument> event) throws Unpro
}

private static final AtomicLong EVENT_COUNTER = new AtomicLong(0);
public static final String MDC_KEY = "MongoDriver.event";
public static final String MDC_KEY = "bosk.MongoDriver.event";
private static final Logger LOGGER = LoggerFactory.getLogger(ChangeReceiver.class);
}
84 changes: 45 additions & 39 deletions bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MainDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
* are delegated to a {@link FormatDriver} object that can be swapped out dynamically
* as the database evolves.
*/
public class MainDriver<R extends StateTreeNode> implements MongoDriver<R> {
class MainDriver<R extends StateTreeNode> implements MongoDriver<R> {
private final Bosk<R> bosk;
private final ChangeReceiver receiver;
private final MongoDriverSettings driverSettings;
Expand All @@ -60,13 +60,21 @@ public class MainDriver<R extends StateTreeNode> implements MongoDriver<R> {
private final Listener listener;
final Formatter formatter;

/**
* Hold this while waiting on {@link #formatDriverChanged}
*/
private final ReentrantLock formatDriverLock = new ReentrantLock();

/**
* Wait on this in cases where the {@link #formatDriver} isn't working
* and might be asynchronously repaired.
*/
private final Condition formatDriverChanged = formatDriverLock.newCondition();

private volatile FormatDriver<R> formatDriver = new DisconnectedDriver<>(new Exception("Driver not yet initialized"));
private volatile boolean isClosed = false;

public MainDriver(
MainDriver(
Bosk<R> bosk,
MongoClientSettings clientSettings,
MongoDriverSettings driverSettings,
Expand Down Expand Up @@ -101,6 +109,8 @@ public MainDriver(
@Override
public R initialRoot(Type rootType) throws InvalidTypeException, InterruptedException, IOException {
try (MDCScope __ = beginDriverOperation("initialRoot({})", rootType)) {
// The actual loading of the initial state happens on the ChangeReceiver thread.
// Here, we just wait for that to finish and deal with the consequences.
FutureTask<R> task = listener.taskRef.get();
if (task == null) {
throw new IllegalStateException("initialRoot has already run");
Expand Down Expand Up @@ -140,7 +150,9 @@ public R initialRoot(Type rootType) throws InvalidTypeException, InterruptedExce
}

/**
* Executed on the thread that calls {@link #initialRoot}.
* Called on the {@link ChangeReceiver}'s background thread via {@link Listener#taskRef}
* because it's important that this logic finishes before processing any change events,
* and no other change events can arrive concurrently.
* <p>
* Should throw no exceptions except {@link DownstreamInitialRootException}.
*
Expand All @@ -150,8 +162,17 @@ public R initialRoot(Type rootType) throws InvalidTypeException, InterruptedExce
* and {@link InitialDatabaseUnavailableMode#FAIL} is active.
*/
private R doInitialRoot(Type rootType) {
// This establishes a safe fallback in case things go wrong. It also causes any
// calls to driver update methods to wait until we're finished here. (There shouldn't
// be any such calls while initialRoot is still running, but this ensures that if any
// do happen, they won't corrupt our internal state in confusing ways.)
setDisconnectedDriver(FAILURE_TO_COMPUTE_INITIAL_ROOT);

// In effect, at this point, the entire driver is now single-threaded for the remainder
// of this method. Our only concurrency concerns now involve database operations performed
// by other processes.

R root;
quietlySetFormatDriver(new DisconnectedDriver<>(FAILURE_TO_COMPUTE_INITIAL_ROOT)); // Pessimistic fallback
try (var __ = collection.newReadOnlySession()){
FormatDriver<R> detectedDriver = detectFormat();
StateAndMetadata<R> loadedState = detectedDriver.loadAllState();
Expand All @@ -170,7 +191,7 @@ private R doInitialRoot(Type rootType) {
publishFormatDriver(preferredDriver);
} catch (RuntimeException | IOException e2) {
LOGGER.warn("Failed to initialize database; disconnecting", e2);
quietlySetFormatDriver(new DisconnectedDriver<>(e2));
setDisconnectedDriver(e2);
}
} catch (RuntimeException | UnrecognizedFormatException | IOException e) {
switch (driverSettings.initialDatabaseUnavailableMode()) {
Expand All @@ -179,7 +200,7 @@ private R doInitialRoot(Type rootType) {
throw new InitialRootFailureException("Unable to load initial state from MongoDB", e);
case DISCONNECT:
LOGGER.info("Unable to load initial root from database; will proceed with downstream.initialRoot", e);
quietlySetFormatDriver(new DisconnectedDriver<>(e));
setDisconnectedDriver(e);
root = callDownstreamInitialRoot(rootType);
break;
default:
Expand Down Expand Up @@ -275,33 +296,15 @@ public <T> void submitConditionalDeletion(Reference<T> target, Reference<Identif
@Override
public void flush() throws IOException, InterruptedException {
try {
RetryableOperation<IOException, InterruptedException> flushOperation = this::doFlush;
try (MDCScope __ = beginDriverOperation("flush")) {
try {
flushOperation.run();
} catch (DisconnectedException e) {
LOGGER.debug("Driver is disconnected ({}); will wait and retry operation", e.getMessage());
waitAndRetry(flushOperation, "flush");
} catch (RevisionFieldDisruptedException e) {
// TODO: Really, at the moment the damage is noticed, we should probably make the receiver reboot; but we currently have no way to do so!
LOGGER.debug("Revision field has been disrupted; wait for receiver to notice something is wrong", e);
waitAndRetry(flushOperation, "flush");
} catch (FailedSessionException e) {
LOGGER.debug("Cannot open MongoDB session; will wait and retry flush", e);
waitAndRetry(flushOperation, "flush");
}
}
this.<InterruptedException, IOException>doRetryableDriverOperation(() -> {
formatDriver.flush();
}, "flush");
} catch (DisconnectedException | FailedSessionException e) {
// Callers are expecting a FlushFailureException in these cases
throw new FlushFailureException(e);
}
}

private void doFlush() throws IOException, InterruptedException {
try (var ___ = collection.newReadOnlySession()) {
formatDriver.flush();
}
}

@Override
public void refurbish() throws IOException {
doRetryableDriverOperation(() -> {
Expand Down Expand Up @@ -430,14 +433,14 @@ public void onConnectionFailed(Exception e) throws InterruptedException, Initial
public void onDisconnect(Throwable e) {
LOGGER.debug("onDisconnect({})", e.toString());
formatDriver.close();
quietlySetFormatDriver(new DisconnectedDriver<>(e));
setDisconnectedDriver(e);
}
}

private FormatDriver<R> newPreferredFormatDriver() {
DatabaseFormat preferred = driverSettings.preferredDatabaseFormat();
if (preferred.equals(SEQUOIA) || preferred instanceof PandoFormat) {
return newSingleDocFormatDriver(REVISION_ZERO.longValue(), preferred);
return newFormatDriver(REVISION_ZERO.longValue(), preferred);
}
throw new AssertionError("Unknown database format setting: " + preferred);
}
Expand Down Expand Up @@ -465,7 +468,7 @@ private FormatDriver<R> detectFormat() throws UninitializedCollectionException,
BsonInt64 revision = cursor
.next()
.getInt64(DocumentFields.revision.name(), REVISION_ZERO);
return newSingleDocFormatDriver(revision.longValue(), format);
return newFormatDriver(revision.longValue(), format);
} else {
// Note that this message is confusing if the user specified
// a preference for Pando but no manifest file exists, because
Expand All @@ -480,7 +483,7 @@ private FormatDriver<R> detectFormat() throws UninitializedCollectionException,
}
}

private FormatDriver<R> newSingleDocFormatDriver(long revisionAlreadySeen, DatabaseFormat format) {
private FormatDriver<R> newFormatDriver(long revisionAlreadySeen, DatabaseFormat format) {
if (format.equals(SEQUOIA)) {
return new SequoiaFormatDriver<>(
bosk,
Expand Down Expand Up @@ -526,7 +529,7 @@ private <X extends Exception, Y extends Exception> void doRetryableDriverOperati
operation.run();
session.commitTransactionIfAny();
} catch (FailedSessionException e) {
quietlySetFormatDriver(new DisconnectedDriver<>(e));
setDisconnectedDriver(e);
throw new DisconnectedException(e);
}
};
Expand Down Expand Up @@ -574,22 +577,25 @@ private <X extends Exception, Y extends Exception> void waitAndRetry(RetryableOp
}

/**
* Sets {@link #formatDriver} but does not signal threads waiting to retry,
* because there's very likely a better driver on its way.
* Sets {@link #formatDriver} but does not signal threads waiting on {@link #formatDriverChanged},
* because there's no point waking them to retry with {@link DisconnectedDriver}
* (which is bound to fail); they might as well keep waiting and hope for another
* better driver to arrive instead.
*/
void quietlySetFormatDriver(FormatDriver<R> newFormatDriver) {
LOGGER.debug("quietlySetFormatDriver({}) (was {})", newFormatDriver.getClass().getSimpleName(), formatDriver.getClass().getSimpleName());
void setDisconnectedDriver(Throwable reason) {
LOGGER.debug("quietlySetDisconnectedDriver({}) (previously {})", reason.getClass().getSimpleName(), formatDriver.getClass().getSimpleName());
try {
formatDriverLock.lock();
formatDriver.close();
formatDriver = newFormatDriver;
formatDriver = new DisconnectedDriver<>(reason);
} finally {
formatDriverLock.unlock();
}
}

/**
* Sets {@link #formatDriver} and also signals any threads waiting to retry.
* Sets {@link #formatDriver} and also signals any threads waiting on {@link #formatDriverChanged}
* to retry their operation.
*/
void publishFormatDriver(FormatDriver<R> newFormatDriver) {
LOGGER.debug("publishFormatDriver({}) (was {})", newFormatDriver.getClass().getSimpleName(), formatDriver.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ static final class MDCScope implements AutoCloseable {
@Override public void close() { MDC.put(MDC_KEY, oldValue); }
}

private static final String MDC_KEY = "MongoDriver";
private static final String MDC_KEY = "bosk.MongoDriver";
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ final class PandoFormatDriver<R extends StateTreeNode> implements FormatDriver<R
FlushLock flushLock,
BoskDriver<R> downstream
) {
this.description = PandoFormatDriver.class.getSimpleName() + ": " + driverSettings;
this.description = getClass().getSimpleName() + ": " + driverSettings;
this.settings = driverSettings;
this.format = format;
this.formatter = new Formatter(bosk, bsonPlugin);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ final class SequoiaFormatDriver<R extends StateTreeNode> implements FormatDriver
FlushLock flushLock,
BoskDriver<R> downstream
) {
this.description = SequoiaFormatDriver.class.getSimpleName() + ": " + driverSettings;
this.description = getClass().getSimpleName() + ": " + driverSettings;
this.settings = driverSettings;
this.formatter = new Formatter(bosk, bsonPlugin);
this.collection = collection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void close() {
MDC.put(MDC_KEY, oldMDC);
}

private static final String MDC_KEY = "MongoDriver.transaction";
private static final String MDC_KEY = "bosk.MongoDriver.transaction";
}

/**
Expand Down
Loading

0 comments on commit 5721cff

Please sign in to comment.