diff --git a/src/test/java/ir/sahab/walconsumer/WalConsumerTest.java b/src/test/java/ir/sahab/walconsumer/WalConsumerTest.java index 697649b..4aa2bca 100644 --- a/src/test/java/ir/sahab/walconsumer/WalConsumerTest.java +++ b/src/test/java/ir/sahab/walconsumer/WalConsumerTest.java @@ -8,13 +8,17 @@ import ir.sahab.uncaughtexceptionrule.UncaughtExceptionRule; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.commons.lang3.tuple.Pair; import org.awaitility.Awaitility; import org.awaitility.Durations; import org.hibernate.SessionFactory; import org.hibernate.cfg.Configuration; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -23,7 +27,7 @@ public class WalConsumerTest { private volatile boolean returnByFalse = Boolean.FALSE; private volatile boolean throwIoException = Boolean.FALSE; - private final Map model = new HashMap<>(); + private final Map synchronizationTarget = new HashMap<>(); @Rule public UncaughtExceptionRule uncaughtExceptionRule = new UncaughtExceptionRule(); @@ -33,6 +37,9 @@ public class WalConsumerTest { @Before public void setUp() { + WalConsumer.setSleepMillisWhenWalIsEmpty(1); + WalConsumer.setSleepMillisOnIoFailure(1); + // Read hibernate general properties from resource file. Configuration configuration = new Configuration(); configuration.configure("hibernate.cfg.xml"); @@ -50,10 +57,17 @@ public void setUp() { walEntityRepository = new TestWalEntityRepository(sessionFactory); } + @After + public void tearDown() { + synchronizationTarget.clear(); + } + + /** + * In this test, we do all kind of operations (i.e., 'add', 'update', 'delete') in relational db. We expect the WAl + * consumer to synchronize them. + */ @Test - public void testWalConsumer() throws Exception { - WalConsumer.setSleepMillisWhenWalIsEmpty(1); - WalConsumer.setSleepMillisOnIoFailure(1); + public void testWalConsuming() throws Exception { try (WalConsumer walConsumer = new WalConsumer(TestWalEntity.class, this::synchronizeModel, sessionFactory, "my_app")) { // Start the WAL consumer @@ -71,7 +85,7 @@ public void testWalConsumer() throws Exception { addWalRecordAndCheck(1L, "updated-name", Operation.UPDATE); // Add another entity but configure the callback to throw IOException. Then check that WAL consumer retries - // calling the callback until no exception is thrown and finally we can see it is synchronized. + // calling the callback until no exception is thrown, and finally we can see it is synchronized. throwIoException = Boolean.TRUE; addWalRecordAndCheck(3L, "name3", Operation.ADD); @@ -80,6 +94,52 @@ public void testWalConsumer() throws Exception { } } + /** + * In this test, we use more than one instances of wal consumers concurrently. According to our design, it is + * guaranteed that each item will be processed just once, and they will be processed sequentially (i.e., in any + * point in time there is just one consumer who is processing the head item from WAL). + */ + @Test + public void testWalConsumingMultipleThreads() throws Exception { + final int numConsumers = 10; + final double numEntities = 100; + + // Start WAL consumers. + WalConsumer[] consumers = new WalConsumer[numConsumers]; + for (int i = 0; i < numConsumers; i++) { + consumers[i] = new WalConsumer(TestWalEntity.class, this::synchronizeModel, sessionFactory, "my_app"); + consumers[i].start(); + } + + try { + List> expectedRows = new ArrayList<>(); + for (long id = 0; id <= numEntities; id++) { + // Add a new entity + saveTestWalEntity(id, "name" + id, Operation.ADD); + + // Keep the last state of the entity to be verified in the next step. + expectedRows.add(Pair.of(id, "name" + id)); + } + + // Check the entities from target. They should be there and be in the expected state. + // Because it is async, it will be visited with delay. + for (Pair expectedRow : expectedRows) { + Awaitility.await() + .atMost(Durations.ONE_SECOND) + .with() + .pollInterval(Durations.ONE_HUNDRED_MILLISECONDS) + .until(() -> synchronizationTarget.containsKey(expectedRow.getKey()) + && synchronizationTarget.get(expectedRow.getKey()) + .equals(expectedRow.getValue())); + } + + } finally { + for (int i = 0; i < numConsumers; i++) { + consumers[i].close(); + } + } + } + private void addWalRecordAndCheck(long id, String name, Operation operation) { saveTestWalEntity(id, name, operation); Awaitility.await() @@ -88,9 +148,9 @@ private void addWalRecordAndCheck(long id, String name, Operation operation) { .pollInterval(Durations.ONE_HUNDRED_MILLISECONDS) .until(() -> walEntityRepository.count() == 0); if (operation.equals(Operation.DELETE)) { - assertNull(model.get(id)); + assertNull(synchronizationTarget.get(id)); } else { - assertEquals(name, model.get(id)); + assertEquals(name, synchronizationTarget.get(id)); } } @@ -112,15 +172,15 @@ private boolean synchronizeModel(WalEntity entity) throws IOException { Operation operation = entity.getOperation(); switch (operation) { case DELETE: - model.remove(entity.getEntityId()); + synchronizationTarget.remove(entity.getEntityId()); break; case ADD: - assertFalse(model.containsKey(entity.getEntityId())); - model.put(entity.getEntityId(), name); + assertFalse(synchronizationTarget.containsKey(entity.getEntityId())); + synchronizationTarget.put(entity.getEntityId(), name); break; case UPDATE: - assertTrue(model.containsKey(entity.getEntityId())); - model.put(entity.getEntityId(), name); + assertTrue(synchronizationTarget.containsKey(entity.getEntityId())); + synchronizationTarget.put(entity.getEntityId(), name); break; default: throw new AssertionError("Invalid wal entity operation!");