Skip to content

Add unit test for multiple WAL consumers concurrently do synchronization #3

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 72 additions & 12 deletions src/test/java/ir/sahab/walconsumer/WalConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@

import ir.sahab.uncaughtexceptionrule.UncaughtExceptionRule;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.tuple.Pair;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.hibernate.SessionFactory;
import org.hibernate.cfg.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -23,7 +27,7 @@ public class WalConsumerTest {

private volatile boolean returnByFalse = Boolean.FALSE;
private volatile boolean throwIoException = Boolean.FALSE;
private final Map<Long, String> model = new HashMap<>();
private final Map<Long, String> synchronizationTarget = new HashMap<>();

@Rule
public UncaughtExceptionRule uncaughtExceptionRule = new UncaughtExceptionRule();
Expand All @@ -33,6 +37,9 @@ public class WalConsumerTest {

@Before
public void setUp() {
WalConsumer.setSleepMillisWhenWalIsEmpty(1);
WalConsumer.setSleepMillisOnIoFailure(1);

// Read hibernate general properties from resource file.
Configuration configuration = new Configuration();
configuration.configure("hibernate.cfg.xml");
Expand All @@ -50,10 +57,17 @@ public void setUp() {
walEntityRepository = new TestWalEntityRepository(sessionFactory);
}

@After
public void tearDown() {
synchronizationTarget.clear();
}

/**
* In this test, we do all kind of operations (i.e., 'add', 'update', 'delete') in relational db. We expect the WAl
* consumer to synchronize them.
*/
@Test
public void testWalConsumer() throws Exception {
WalConsumer.setSleepMillisWhenWalIsEmpty(1);
WalConsumer.setSleepMillisOnIoFailure(1);
public void testWalConsuming() throws Exception {
try (WalConsumer walConsumer =
new WalConsumer(TestWalEntity.class, this::synchronizeModel, sessionFactory, "my_app")) {
// Start the WAL consumer
Expand All @@ -71,7 +85,7 @@ public void testWalConsumer() throws Exception {
addWalRecordAndCheck(1L, "updated-name", Operation.UPDATE);

// Add another entity but configure the callback to throw IOException. Then check that WAL consumer retries
// calling the callback until no exception is thrown and finally we can see it is synchronized.
// calling the callback until no exception is thrown, and finally we can see it is synchronized.
throwIoException = Boolean.TRUE;
addWalRecordAndCheck(3L, "name3", Operation.ADD);

Expand All @@ -80,6 +94,52 @@ public void testWalConsumer() throws Exception {
}
}

/**
* In this test, we use more than one instances of wal consumers concurrently. According to our design, it is
* guaranteed that each item will be processed just once, and they will be processed sequentially (i.e., in any
* point in time there is just one consumer who is processing the head item from WAL).
*/
@Test
public void testWalConsumingMultipleThreads() throws Exception {
final int numConsumers = 10;
final double numEntities = 100;

// Start WAL consumers.
WalConsumer[] consumers = new WalConsumer[numConsumers];
for (int i = 0; i < numConsumers; i++) {
consumers[i] = new WalConsumer(TestWalEntity.class, this::synchronizeModel, sessionFactory, "my_app");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not check the order using HashMap, so for this test, it is better to use a Vector or CopyOnWriteArrayList (which are thread-safe) and check the exact order on them. So please define two methods: synchronizeMap (for the first test) and synchronizeVector (for the second test). Also rename the data structures to targetMap and targetVector.

consumers[i].start();
}

try {
List<Pair<Long, String>> expectedRows = new ArrayList<>();
for (long id = 0; id <= numEntities; id++) {
// Add a new entity
saveTestWalEntity(id, "name" + id, Operation.ADD);

// Keep the last state of the entity to be verified in the next step.
expectedRows.add(Pair.of(id, "name" + id));
}

// Check the entities from target. They should be there and be in the expected state.
// Because it is async, it will be visited with delay.
for (Pair<Long, String> expectedRow : expectedRows) {
Awaitility.await()
.atMost(Durations.ONE_SECOND)
.with()
.pollInterval(Durations.ONE_HUNDRED_MILLISECONDS)
.until(() -> synchronizationTarget.containsKey(expectedRow.getKey())
&& synchronizationTarget.get(expectedRow.getKey())
.equals(expectedRow.getValue()));
}

} finally {
for (int i = 0; i < numConsumers; i++) {
consumers[i].close();
}
}
}

private void addWalRecordAndCheck(long id, String name, Operation operation) {
saveTestWalEntity(id, name, operation);
Awaitility.await()
Expand All @@ -88,9 +148,9 @@ private void addWalRecordAndCheck(long id, String name, Operation operation) {
.pollInterval(Durations.ONE_HUNDRED_MILLISECONDS)
.until(() -> walEntityRepository.count() == 0);
if (operation.equals(Operation.DELETE)) {
assertNull(model.get(id));
assertNull(synchronizationTarget.get(id));
} else {
assertEquals(name, model.get(id));
assertEquals(name, synchronizationTarget.get(id));
}
}

Expand All @@ -112,15 +172,15 @@ private boolean synchronizeModel(WalEntity entity) throws IOException {
Operation operation = entity.getOperation();
switch (operation) {
case DELETE:
model.remove(entity.getEntityId());
synchronizationTarget.remove(entity.getEntityId());
break;
case ADD:
assertFalse(model.containsKey(entity.getEntityId()));
model.put(entity.getEntityId(), name);
assertFalse(synchronizationTarget.containsKey(entity.getEntityId()));
synchronizationTarget.put(entity.getEntityId(), name);
break;
case UPDATE:
assertTrue(model.containsKey(entity.getEntityId()));
model.put(entity.getEntityId(), name);
assertTrue(synchronizationTarget.containsKey(entity.getEntityId()));
synchronizationTarget.put(entity.getEntityId(), name);
break;
default:
throw new AssertionError("Invalid wal entity operation!");
Expand Down