Skip to content

Commit

Permalink
Fix disconnected events (venasolutions#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle authored Jan 23, 2024
2 parents 5721cff + 91d66ae commit 3c59dbb
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private void connectionLoop() {
}
} catch (UnprocessableEventException|UnexpectedEventProcessingException e) {
addContextToException(e);
LOGGER.warn("Unable to process MongoDB change event; reconnecting: {}", e.getMessage(), e);
LOGGER.warn("Unable to process MongoDB change event; reconnecting ({})", e.getMessage(), e);
listener.onDisconnect(e);
// Reconnection will skip this event, so it's safe to try it right away
continue;
Expand Down Expand Up @@ -157,6 +157,10 @@ private void connectionLoop() {
LOGGER.warn("Timed out waiting for bosk state to initialize; will wait and retry", e);
listener.onDisconnect(e);
return;
} catch (DisconnectedException e) {
addContextToException(e);
LOGGER.warn("Driver is disconnected; will wait and retry", e);
return;
} catch (RuntimeException | Error e) {
addContextToException(e);
LOGGER.warn("Unexpected exception after connecting to MongoDB; will wait and retry", e);
Expand Down Expand Up @@ -223,6 +227,9 @@ private void eventLoop(MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument
processEvent(event);
}
}
} catch (DisconnectedException e) {
LOGGER.trace("connectionLoop can handle this exception; rethrow it", e);
throw e;
} catch (RuntimeException e) {
addContextToException(e);
LOGGER.debug("Unexpected {} while processing events", e.getClass().getSimpleName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void close() {

@Override
public void onEvent(ChangeStreamDocument<BsonDocument> event) {
LOGGER.debug("Already disconnected; ignoring event ({})", event.getOperationType().getValue());
throw disconnected();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ private <X extends Exception, Y extends Exception> void doRetryableDriverOperati
} catch (DisconnectedException e) {
LOGGER.debug("Driver is disconnected ({}); will wait and retry operation", e.getMessage());
waitAndRetry(operationInSession, description, args);
} finally {
LOGGER.debug("Finished operation " + description, args);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.vena.bosk.annotations.ReferencePath;
import io.vena.bosk.drivers.mongo.MongoDriverSettings.MongoDriverSettingsBuilder;
import io.vena.bosk.drivers.state.TestEntity;
import io.vena.bosk.drivers.state.TestValues;
import io.vena.bosk.exceptions.InvalidTypeException;
import java.lang.reflect.Method;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -124,6 +125,11 @@ public TestEntity initialRoot(Bosk<TestEntity> testEntityBosk) throws InvalidTyp
));
}

public TestEntity initialRootWithValues(Bosk<TestEntity> testEntityBosk) throws InvalidTypeException {
return initialRootWithEmptyCatalog(testEntityBosk)
.withValues(Optional.of(TestValues.blank()));
}

public TestEntity initialRootWithEmptyCatalog(Bosk<TestEntity> testEntityBosk) throws InvalidTypeException {
Refs refs = testEntityBosk.buildReferences(Refs.class);
return new TestEntity(rootID,
Expand Down Expand Up @@ -160,6 +166,7 @@ public interface Refs {
@ReferencePath("/catalog/-child-/catalog") CatalogReference<TestEntity> childCatalog(Identifier child);
@ReferencePath("/listing") ListingReference<TestEntity> listing();
@ReferencePath("/listing/-entity-") Reference<ListingEntry> listingEntry(Identifier entity);
@ReferencePath("/values") Reference<TestValues> values();
@ReferencePath("/values/string") Reference<String> valuesString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Stream;
import lombok.With;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
Expand Down Expand Up @@ -298,7 +299,7 @@ void initialStateHasNonexistentFields_ignored() throws InvalidTypeException {
setLogging(ERROR, BsonPlugin.class);

// Upon creating bosk, the initial value will be saved to MongoDB
new Bosk<TestEntity>("Newer", TestEntity.class, this::initialRootWithEmptyCatalog, driverFactory);
new Bosk<TestEntity>("Newer", TestEntity.class, this::initialRootWithValues, driverFactory);

// Upon creating prevBosk, the state in the database will be loaded into the local.
Bosk<OldEntity> prevBosk = new Bosk<OldEntity>(
Expand Down Expand Up @@ -332,18 +333,18 @@ void updateHasNonexistentFields_ignored() throws InvalidTypeException, IOExcepti
bosk.driver().submitReplacement(bosk.rootReference(),
initialRoot
.withString("replacementString")
.withListing(Listing.of(initialRoot.listing().domain(), Identifier.from("newEntry"))));
.withValues(Optional.of(TestValues.blank())));

prevBosk.driver().flush();

OldEntity expected = OldEntity.withString("replacementString", prevBosk);
OldEntity oldEntity = OldEntity.withString("replacementString", prevBosk);

OldEntity actual;
try (var __ = prevBosk.readContext()) {
actual = prevBosk.rootReference().value();
}

assertEquals(expected, actual);
assertEquals(oldEntity, actual);
}

@ParametersByName
Expand All @@ -358,15 +359,14 @@ void updateNonexistentField_ignored() throws InvalidTypeException, IOException,
(b) -> { throw new AssertionError("prevBosk should use the state from MongoDB"); },
createDriverFactory());

ListingReference<TestEntity> listingRef = bosk.rootReference().thenListing(TestEntity.class, TestEntity.Fields.listing);

TestEntity initialRoot = initialRootWithEmptyCatalog(bosk);
bosk.driver().submitReplacement(listingRef,
Listing.of(initialRoot.listing().domain(), Identifier.from("newEntry")));
Refs refs = bosk.buildReferences(Refs.class);
bosk.driver().submitReplacement(refs.values(),
TestValues.blank());

prevBosk.driver().flush();

OldEntity expected = OldEntity.withString(rootID.toString(), prevBosk); // unchanged
OldEntity expected = OldEntity // unchanged from before
.withString(rootID.toString(), prevBosk);

OldEntity actual;
try (var __ = prevBosk.readContext()) {
Expand Down Expand Up @@ -428,27 +428,62 @@ void updateInsidePolyfill_works() throws IOException, InterruptedException, Inva
void deleteNonexistentField_ignored() throws InvalidTypeException, IOException, InterruptedException {
setLogging(ERROR, MainDriver.class.getPackage()); // Need a big hammer because FormatDrivers complain

Bosk<TestEntity> bosk = new Bosk<TestEntity>("Newer", TestEntity.class, this::initialRootWithEmptyCatalog, driverFactory);
Bosk<TestEntity> newerBosk = new Bosk<TestEntity>("Newer", TestEntity.class, this::initialRootWithEmptyCatalog, driverFactory);
Bosk<OldEntity> prevBosk = new Bosk<OldEntity>(
"Prev",
OldEntity.class,
(b) -> { throw new AssertionError("prevBosk should use the state from MongoDB"); },
createDriverFactory());

ListingReference<TestEntity> listingRef = bosk.rootReference().thenListing(TestEntity.class, TestEntity.Fields.listing);

bosk.driver().submitDeletion(listingRef.then(entity123));
Refs refs = newerBosk.buildReferences(Refs.class);
newerBosk.driver().submitDeletion(refs.values());

prevBosk.driver().flush();

OldEntity expected = OldEntity.withString(rootID.toString(), prevBosk); // unchanged
OldEntity oldEntity = OldEntity.withString(rootID.toString(), prevBosk); // unchanged

OldEntity actual;
try (var __ = prevBosk.readContext()) {
actual = prevBosk.rootReference().value();
}

assertEquals(expected, actual);
assertEquals(oldEntity, actual);
}

@ParametersByName
@UsesMongoService
void databaseMissingField_fallsBackToDefaultState() throws InvalidTypeException, IOException, InterruptedException {
LOGGER.debug("Set up database with entity that has no string field");
Bosk<OptionalEntity> setupBosk = new Bosk<OptionalEntity>("Setup", OptionalEntity.class, b -> OptionalEntity.withString(Optional.empty(), b), createDriverFactory());

LOGGER.debug("Connect another bosk where the string field is mandatory");
Bosk<TestEntity> testBosk = new Bosk<TestEntity>("Test", TestEntity.class, this::initialRoot, driverFactory);
TestEntity expected1 = initialRoot(testBosk); // NOT what was put there by the setup bosk!
TestEntity actual1;
try (var __ = testBosk.readContext()) {
actual1 = testBosk.rootReference().value();
}

assertEquals(expected1, actual1, "Disconnected bosk should use the default initial root");

LOGGER.debug("Repair the bosk by writing the string value");
setupBosk.driver().submitReplacement(
setupBosk.rootReference().then(String.class, "string"),
"stringValue");

LOGGER.debug("Flush testBosk to get the state from the database");
testBosk.driver().flush();

Refs refs = testBosk.buildReferences(Refs.class);
TestEntity expected2 = TestEntity.empty(Identifier.from("optionalEntity"), refs.catalog())
.withString("stringValue");

TestEntity actual2;
try (var __ = testBosk.readContext()) {
actual2 = testBosk.rootReference().value();
}

assertEquals(expected2, actual2, "Reconnected bosk should see the state from the database");
}

@ParametersByName
Expand Down Expand Up @@ -657,6 +692,7 @@ private BsonString rootDocumentID() {
/**
* Represents an earlier version of the entity before some fields were added.
*/
@With
public record OldEntity(
Identifier id,
String string,
Expand All @@ -665,18 +701,19 @@ public record OldEntity(
SideTable<OldEntity, OldEntity> sideTable
) implements Entity {
public static OldEntity withString(String value, Bosk<OldEntity> bosk) throws InvalidTypeException {
Reference<Catalog<OldEntity>> catalogRef = bosk.rootReference().then(Classes.catalog(OldEntity.class), "catalog");
return new OldEntity(
rootID,
value,
Catalog.empty(),
SideTable.empty(bosk.rootReference().then(Classes.catalog(OldEntity.class), "catalog"))
SideTable.empty(catalogRef)
);
}
}

/**
* A version of {@link TestEntity} where the {@link Optional} {@link TestEntity#values()}
* field has a polyfill (and some other fields have been deleted).
* field has a polyfill.
*/
public record UpgradeableEntity(
Identifier id,
Expand All @@ -690,5 +727,30 @@ public record UpgradeableEntity(
static final TestValues DEFAULT_VALUES = TestValues.blank();
}

/**
* A version of {@link TestEntity} where all the fields are {@link Optional} so we
* have full control over what fields we set.
*/
@With
public record OptionalEntity(
Identifier id,
Optional<String> string,
Optional<Catalog<TestEntity>> catalog,
Optional<Listing<TestEntity>> listing,
Optional<SideTable<TestEntity, TestEntity>> sideTable,
Optional<TestValues> values
) implements Entity {
static OptionalEntity withString(Optional<String> string, Bosk<OptionalEntity> bosk) throws InvalidTypeException {
CatalogReference<TestEntity> domain = bosk.rootReference().thenCatalog(TestEntity.class, "catalog");
return new OptionalEntity(
Identifier.from("optionalEntity"),
string,
Optional.of(Catalog.empty()),
Optional.of(Listing.empty(domain)),
Optional.of(SideTable.empty(domain)),
Optional.empty());
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDriverSpecialTest.class);
}

0 comments on commit 3c59dbb

Please sign in to comment.