diff --git a/.github/workflows/maven-central-publish.yml b/.github/workflows/maven-central-publish.yml index a9008fd..8d41b91 100644 --- a/.github/workflows/maven-central-publish.yml +++ b/.github/workflows/maven-central-publish.yml @@ -11,7 +11,7 @@ jobs: uses: actions/setup-java@v4 with: java-version: '17' - distribution: 'temurin' + distribution: 'zulu' server-id: central server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD diff --git a/pom.xml b/pom.xml index 568a14e..89a170e 100644 --- a/pom.xml +++ b/pom.xml @@ -109,9 +109,7 @@ java true - - **/src/test/java/com/phonepe/dlm/**/*.java, - **/src/test/java/com/phonepe/dlm/*.java + **/src/test/java/com/phonepe/dlm/util/HBaseTableStub.java @@ -317,7 +315,7 @@ - **/src/test/** + **/src/test/java/com/phonepe/dlm/util/HBaseTableStub.java diff --git a/src/main/java/com/phonepe/dlm/exception/DLMException.java b/src/main/java/com/phonepe/dlm/exception/DLMException.java index 9763f2a..481db8c 100644 --- a/src/main/java/com/phonepe/dlm/exception/DLMException.java +++ b/src/main/java/com/phonepe/dlm/exception/DLMException.java @@ -30,11 +30,6 @@ public class DLMException extends RuntimeException { private static final long serialVersionUID = 7310153558992797589L; private final ErrorCode errorCode; - public DLMException(ErrorCode errorCode) { - super(); - this.errorCode = errorCode; - } - @Builder public DLMException(ErrorCode errorCode, String message, Throwable cause) { super(message, cause); diff --git a/src/main/java/com/phonepe/dlm/lock/base/LockBase.java b/src/main/java/com/phonepe/dlm/lock/base/LockBase.java index 5c32480..43bcd70 100644 --- a/src/main/java/com/phonepe/dlm/lock/base/LockBase.java +++ b/src/main/java/com/phonepe/dlm/lock/base/LockBase.java @@ -26,7 +26,6 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.time.Duration; @@ -64,7 +63,6 @@ public void acquireLock(final Lock lock, final Duration duration) { acquireLock(lock, duration, DEFAULT_WAIT_FOR_LOCK_IN_SECONDS); } - @SneakyThrows @Override public void acquireLock(final Lock lock, final Duration duration, final Duration timeout) { final Timer timer = new Timer(System.currentTimeMillis(), timeout.getSeconds()); @@ -79,7 +77,7 @@ public void acquireLock(final Lock lock, final Duration duration, final Duration throw e; } if (e.getErrorCode() == ErrorCode.LOCK_UNAVAILABLE) { - Thread.sleep(WAIT_TIME_FOR_NEXT_RETRY); + sleep(); continue; } throw e; @@ -87,6 +85,7 @@ public void acquireLock(final Lock lock, final Duration duration, final Duration } while (!success.get()); } + @Override public boolean releaseLock(final Lock lock) { if (lock.getAcquiredStatus().get()) { @@ -101,4 +100,14 @@ private void writeToStore(final Lock lock, final Duration ttlSeconds) { lockStore.write(lock.getLockId(), lock.getLockLevel(), lock.getFarmId(), ttlSeconds); lock.getAcquiredStatus().compareAndSet(false, true); } + + private static void sleep() { + try { + Thread.sleep(WAIT_TIME_FOR_NEXT_RETRY); + } catch (InterruptedException e) { + log.error("Error sleeping the thread", e); + Thread.currentThread().interrupt(); + throw DLMException.propagate(e); + } + } } diff --git a/src/main/java/com/phonepe/dlm/lock/storage/hbase/HBaseStore.java b/src/main/java/com/phonepe/dlm/lock/storage/hbase/HBaseStore.java index 99d8d8b..b0e6172 100644 --- a/src/main/java/com/phonepe/dlm/lock/storage/hbase/HBaseStore.java +++ b/src/main/java/com/phonepe/dlm/lock/storage/hbase/HBaseStore.java @@ -92,6 +92,8 @@ public void write(String lockId, LockLevel lockLevel, String farmId, Duration tt .errorCode(ErrorCode.CONNECTION_ERROR) .message(String.format("Error writing lock in HBase [id = %s]", lockId)) .build(); + } catch (Exception e) { + throw DLMException.propagate(e); } } diff --git a/src/test/java/com/phonepe/dlm/DistributedLockWithAerospikeTest.java b/src/test/java/com/phonepe/dlm/DistributedLockWithAerospikeTest.java index bc883ac..d00093d 100644 --- a/src/test/java/com/phonepe/dlm/DistributedLockWithAerospikeTest.java +++ b/src/test/java/com/phonepe/dlm/DistributedLockWithAerospikeTest.java @@ -31,6 +31,7 @@ import com.phonepe.dlm.util.TestUtils; import io.appform.testcontainers.aerospike.AerospikeContainerConfiguration; import io.appform.testcontainers.aerospike.container.AerospikeContainer; +import lombok.extern.slf4j.Slf4j; import org.junit.*; import org.junit.rules.ExpectedException; @@ -46,6 +47,7 @@ /** * @author shantanu.tiwari */ +@Slf4j public class DistributedLockWithAerospikeTest { public static final String AEROSPIKE_HOST = "localhost"; public static final String AEROSPIKE_DOCKER_IMAGE = "aerospike/aerospike-server:6.4.0.23"; @@ -72,18 +74,7 @@ public void setUp() { new Host(AEROSPIKE_DOCKER_CONTAINER.getHost(), AEROSPIKE_DOCKER_CONTAINER.getConnectionPort())); aerospikeClient.truncate(aerospikeClient.getInfoPolicyDefault(), AEROSPIKE_NAMESPACE, null, null); - lockManager = DistributedLockManager.builder() - .clientId("CLIENT_ID") - .farmId("FA1") - .lockBase(LockBase.builder() - .mode(LockMode.EXCLUSIVE) - .lockStore(AerospikeStore.builder() - .aerospikeClient(aerospikeClient) - .namespace(AEROSPIKE_NAMESPACE) - .setSuffix("distributed_lock") - .build()) - .build()) - .build(); + lockManager = getLockManager(LockMode.EXCLUSIVE); lockManager.initialize(); } @@ -93,7 +84,7 @@ public void destroy() { } @Test - public void lockTestPositiveSiloDC() { + public void lockPositiveSiloDCTest() { final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); lockManager.tryAcquireLock(lock); Assert.assertTrue(lock.getAcquiredStatus() @@ -110,7 +101,7 @@ public void lockTestPositiveSiloDC() { } @Test - public void lockTestPositiveXDC() { + public void lockPositiveXDCTest() { final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.XDC); lockManager.tryAcquireLock(lock, Duration.ofSeconds(90)); Assert.assertTrue(lock.getAcquiredStatus() @@ -128,18 +119,18 @@ public void lockTestPositiveXDC() { } @Test - public void testLockUnavailableForAcquireLock() { + public void lockUnavailableForAcquireLockTest() { final Lock lock = lockManager.getLockInstance("NEW_LOCK_ID", LockLevel.DC); - lockManager.acquireLock(lock); // Wait and try acquiring the lock. + lockManager.acquireLock(lock, Duration.ofSeconds(30)); // Wait and try acquiring the lock. exceptionThrown.expect(DLMExceptionMatcher.hasCode(ErrorCode.LOCK_UNAVAILABLE)); lockManager.acquireLock(lock, Duration.ofSeconds(2), Duration.ofSeconds(2)); // Wait for 2 seconds only for acquiring the lock } @Test - public void testLockUnavailableForTryAcquireLockWithSameLockInstance() { + public void lockUnavailableForTryAcquireLockWithSameLockInstanceTest() { final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); - lockManager.tryAcquireLock(lock); + lockManager.acquireLock(lock, Duration.ofSeconds(30), Duration.ofSeconds(5)); Assert.assertTrue(lock.getAcquiredStatus().get()); exceptionThrown.expect(DLMExceptionMatcher.hasCode(ErrorCode.LOCK_UNAVAILABLE)); @@ -147,7 +138,7 @@ public void testLockUnavailableForTryAcquireLockWithSameLockInstance() { } @Test - public void testLockUnavailableForTryAcquireLockWithDifferentLockInstance() { + public void lockUnavailableForTryAcquireLockWithDifferentLockInstanceTest() { Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); lockManager.tryAcquireLock(lock); Assert.assertTrue(lock.getAcquiredStatus().get()); @@ -158,7 +149,7 @@ public void testLockUnavailableForTryAcquireLockWithDifferentLockInstance() { } @Test - public void concurrentLockAttempt() { + public void concurrentLockAttemptTest() { final int attempts = Runtime.getRuntime() .availableProcessors(); final Map trackers = Maps.newConcurrentMap(); @@ -184,7 +175,7 @@ public void concurrentLockAttempt() { trackers.computeIfAbsent("FAILED_ACQUIRES", x -> new AtomicInteger(0)) .getAndIncrement(); } catch (Exception e) { - // ignore; + log.warn("Gracefully ignoring exception", e); } finally { boolean result = lockManager.releaseLock(lock); Assert.assertFalse(lock.getAcquiredStatus() @@ -206,8 +197,8 @@ public void concurrentLockAttempt() { if (counter.decrementAndGet() <= 1) { latch.countDown(); } - } catch (InterruptedException | ExecutionException e1) { - // ignore; + } catch (InterruptedException | ExecutionException e) { + log.warn("Gracefully ignoring exception", e); } }); @@ -226,8 +217,42 @@ public void concurrentLockAttempt() { .get()); } + @Test(expected = DLMException.class) + public void exceptionInLockTest() { + final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.acquireLock(lock, Duration.ofDays(7300)); + } + + @Test + public void interruptedExceptionInLockTest() { + try { + final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); + lockManager.acquireLock(lock); + Thread.currentThread().interrupt(); + exceptionThrown.expect(DLMExceptionMatcher.hasCode(ErrorCode.INTERNAL_ERROR)); + lockManager.acquireLock(lock, Duration.ofSeconds(30)); + } finally { + Thread.interrupted(); // clearing thread interrupted status for isolation + } + } + @After public void tearDown() { aerospikeClient.truncate(aerospikeClient.getInfoPolicyDefault(), AEROSPIKE_NAMESPACE, null, null); } + + public DistributedLockManager getLockManager(final LockMode lockMode) { + return lockMode.accept(() -> DistributedLockManager.builder() + .clientId("CLIENT_ID") + .farmId("FA1") + .lockBase(LockBase.builder() + .mode(lockMode) + .lockStore(AerospikeStore.builder() + .aerospikeClient(aerospikeClient) + .namespace(AEROSPIKE_NAMESPACE) + .setSuffix("distributed_lock") + .build()) + .build()) + .build()); + } } \ No newline at end of file diff --git a/src/test/java/com/phonepe/dlm/DistributedLockWithHBaseTest.java b/src/test/java/com/phonepe/dlm/DistributedLockWithHBaseTest.java index 2f51790..56831cc 100644 --- a/src/test/java/com/phonepe/dlm/DistributedLockWithHBaseTest.java +++ b/src/test/java/com/phonepe/dlm/DistributedLockWithHBaseTest.java @@ -17,16 +17,17 @@ package com.phonepe.dlm; import com.google.common.collect.Maps; +import com.phonepe.dlm.exception.DLMException; +import com.phonepe.dlm.exception.ErrorCode; import com.phonepe.dlm.lock.Lock; import com.phonepe.dlm.lock.base.LockBase; import com.phonepe.dlm.lock.level.LockLevel; import com.phonepe.dlm.lock.mode.LockMode; import com.phonepe.dlm.lock.storage.hbase.HBaseStore; -import com.phonepe.dlm.util.HBaseConnectionStub; -import com.phonepe.dlm.exception.DLMException; -import com.phonepe.dlm.exception.ErrorCode; import com.phonepe.dlm.util.DLMExceptionMatcher; +import com.phonepe.dlm.util.HBaseConnectionStub; import com.phonepe.dlm.util.TestUtils; +import lombok.extern.slf4j.Slf4j; import org.junit.*; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; @@ -41,6 +42,7 @@ import static org.junit.Assert.assertEquals; +@Slf4j @RunWith(MockitoJUnitRunner.class) public class DistributedLockWithHBaseTest { @Rule @@ -101,28 +103,32 @@ public void lockTestPositiveXDC() { Assert.assertFalse(released); } - @Test(expected = DLMException.class) + @Test public void lockTestNegative1() { final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); lockManager.tryAcquireLock(lock); Assert.assertTrue(lock.getAcquiredStatus().get()); + + exceptionThrown.expect(DLMExceptionMatcher.hasCode(ErrorCode.LOCK_UNAVAILABLE)); lockManager.tryAcquireLock(lock); } - @Test(expected = DLMException.class) + @Test public void lockTestNegative2() { Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); lockManager.tryAcquireLock(lock); - Assert.assertTrue(lock.getAcquiredStatus().get()); + + exceptionThrown.expect(DLMExceptionMatcher.hasCode(ErrorCode.LOCK_UNAVAILABLE)); lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); lockManager.tryAcquireLock(lock); } - @Test(expected = DLMException.class) + @Test public void lockTestNegative3() { final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); lockManager.tryAcquireLock(lock); - Assert.assertTrue(lock.getAcquiredStatus().get()); + + exceptionThrown.expect(DLMExceptionMatcher.hasCode(ErrorCode.LOCK_UNAVAILABLE)); final Lock lock1 = lockManager.getLockInstance("LOCK_ID", LockLevel.DC); lockManager.tryAcquireLock(lock1); } @@ -202,8 +208,8 @@ public void concurrentLockAttempt() { } catch (DLMException e) { trackers.computeIfAbsent("FAILED_ACQUIRES", x -> new AtomicInteger(0)) .getAndIncrement(); - } catch (Exception ignored) { - // ignore; + } catch (Exception e) { + log.warn("Gracefully ignoring exception", e); } finally { boolean result = lockManager.releaseLock(lock); if (result) { @@ -223,8 +229,8 @@ public void concurrentLockAttempt() { if (counter.decrementAndGet() <= 1) { latch.countDown(); } - } catch (InterruptedException | ExecutionException e1) { - // ignore; + } catch (InterruptedException | ExecutionException e) { + log.warn("Gracefully ignoring exception", e); } }); diff --git a/src/test/java/com/phonepe/dlm/util/HBaseTableStub.java b/src/test/java/com/phonepe/dlm/util/HBaseTableStub.java index 3720b2a..e1b252c 100644 --- a/src/test/java/com/phonepe/dlm/util/HBaseTableStub.java +++ b/src/test/java/com/phonepe/dlm/util/HBaseTableStub.java @@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.util.Bytes; @@ -71,8 +70,8 @@ public class HBaseTableStub implements Table { Bytes.BYTES_COMPARATOR); private static List toCell(byte[] row, - NavigableMap>> rowdata, - int maxVersions) { + NavigableMap>> rowdata, + int maxVersions) { return toCell(row, rowdata, 0, Long.MAX_VALUE, maxVersions); } @@ -150,10 +149,10 @@ public Result append(Append append) { } private static List toCell(byte[] row, - NavigableMap>> rowdata, - long timestampStart, - long timestampEnd, - int maxVersions) { + NavigableMap>> rowdata, + long timestampStart, + long timestampEnd, + int maxVersions) { List ret = new ArrayList<>(); byte putType = KeyValue.Type.Put.getCode(); for (byte[] family : rowdata.keySet()) @@ -188,7 +187,7 @@ public boolean exists(Get get) throws IOException { } @Override - public boolean[] exists(List list) throws IOException { + public boolean[] exists(List list) { return new boolean[0]; } @@ -202,14 +201,14 @@ public boolean[] existsAll(List gets) throws IOException { } @Override - public void batch(List list, Object[] objects) throws IOException, InterruptedException { - + public void batch(List list, Object[] objects) { + // Not implemented } /** * {@inheritDoc} */ - public void batch(List actions) throws IOException { + public void batch(List actions) { batch(actions); } @@ -419,14 +418,6 @@ public ResultScanner getScanner(Scan scan) throws IOException { } } } -// skip filter implementation -// if (filter != null) { -// kvs = filter(filter, kvs); -// // Check for early out optimization -// if (filter.filterAllRemaining()) { -// break; -// } -// } if (!kvs.isEmpty()) { kvs.sort(new CellComparator() { @Override @@ -492,10 +483,12 @@ public Comparator getSimpleComparator() { return new ResultScanner() { private final Iterator iterator = ret.iterator(); + @Override public Iterator iterator() { return iterator; } + @Override public Result[] next(int nbRows) { ArrayList resultSets = new ArrayList<>(nbRows); for (int i = 0; i < nbRows; i++) { @@ -509,6 +502,7 @@ public Result[] next(int nbRows) { return resultSets.toArray(new Result[0]); } + @Override public Result next() { try { return iterator().next(); @@ -611,9 +605,6 @@ public void put(Put put) { new TreeMap<>(Bytes.BYTES_COMPARATOR)); for (byte[] family : put.getFamilyCellMap() .keySet()) { -// if (!columnFamilies.contains(new String(family))) { -// throw new RuntimeException("Not Exists columnFamily : " + new String(family)); -// } NavigableMap> familyData = forceFind(rowData, family, new TreeMap<>(Bytes.BYTES_COMPARATOR)); @@ -675,21 +666,21 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] v @Override public boolean checkAndPut(byte[] row, - byte[] family, - byte[] qualifier, - CompareFilter.CompareOp compareOp, - byte[] value, - Put put) { + byte[] family, + byte[] qualifier, + CompareFilter.CompareOp compareOp, + byte[] value, + Put put) { return false; } @Override public boolean checkAndPut(byte[] bytes, - byte[] bytes1, - byte[] bytes2, - CompareOperator compareOperator, - byte[] bytes3, - Put put) throws IOException { + byte[] bytes1, + byte[] bytes2, + CompareOperator compareOperator, + byte[] bytes3, + Put put) throws IOException { return false; } @@ -756,24 +747,23 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[ return false; } - // TODO: Implement? @Override public boolean checkAndDelete(byte[] row, - byte[] family, - byte[] qualifier, - CompareFilter.CompareOp compareOp, - byte[] value, - Delete delete) { + byte[] family, + byte[] qualifier, + CompareFilter.CompareOp compareOp, + byte[] value, + Delete delete) { throw new RuntimeException(this.getClass() + " does NOT implement this method."); } @Override public boolean checkAndDelete(byte[] bytes, - byte[] bytes1, - byte[] bytes2, - CompareOperator compareOperator, - byte[] bytes3, - Delete delete) throws IOException { + byte[] bytes1, + byte[] bytes2, + CompareOperator compareOperator, + byte[] bytes3, + Delete delete) throws IOException { return false; } @@ -783,31 +773,26 @@ public CheckAndMutateBuilder checkAndMutate(byte[] bytes, byte[] bytes1) { @Override public CheckAndMutateBuilder ifMatches(CompareOperator arg0, byte[] arg1) { - // TODO Auto-generated method stub return this; } @Override public CheckAndMutateBuilder ifNotExists() { - // TODO Auto-generated method stub return this; } @Override public CheckAndMutateBuilder qualifier(byte[] arg0) { - // TODO Auto-generated method stub return this; } @Override public boolean thenDelete(Delete arg0) throws IOException { - // TODO Auto-generated method stub return false; } @Override public boolean thenMutate(RowMutations arg0) throws IOException { - // TODO Auto-generated method stub return false; } @@ -822,7 +807,6 @@ public boolean thenPut(Put arg0) throws IOException { @Override public CheckAndMutateBuilder timeRange(TimeRange arg0) { - // TODO Auto-generated method stub return this; } }; @@ -861,7 +845,6 @@ public Result increment(Increment increment) { return Result.create(cells); } - // TODO: Implement? /** * {@inheritDoc} */ @@ -903,8 +886,7 @@ public Class aClass, byte[] bytes, byte[] bytes1, - Batch.Call call) throws ServiceException, - Throwable { + Batch.Call call) { return null; } @@ -914,9 +896,8 @@ public byte[] bytes, byte[] bytes1, Batch.Call call, - Batch.Callback callback) throws ServiceException, - Throwable { - + Batch.Callback callback) { + // Not implemented } @Override @@ -925,8 +906,7 @@ public Ma org.apache.hadoop.hbase.shaded.com.google.protobuf.Message message, byte[] bytes, byte[] bytes1, - R r) throws ServiceException, - Throwable { + R r) { return null; } @@ -937,55 +917,27 @@ public vo byte[] bytes, byte[] bytes1, R r, - Batch.Callback callback) throws ServiceException, - Throwable { + Batch.Callback callback) { } - /** - * {@inheritDoc} - */ -// @Override -// public long getWriteBufferSize() { -// throw new RuntimeException(this.getClass() + " does NOT implement this method."); -// } - - /** - * {@inheritDoc} - */ -// @Override -// public void setWriteBufferSize(long writeBufferSize) { -// throw new RuntimeException(this.getClass() + " does NOT implement this method."); -// } -// -// @Override -// public Map batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) { -// throw new RuntimeException(this.getClass() + " does NOT implement this method."); -// } -// -// @Override -// public void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback callback) { -// throw new RuntimeException(this.getClass() + " does NOT implement this method."); -// } - - // TODO: Implement? @Override public boolean checkAndMutate(byte[] row, - byte[] family, - byte[] qualifier, - CompareFilter.CompareOp compareOp, - byte[] value, - RowMutations mutation) { + byte[] family, + byte[] qualifier, + CompareFilter.CompareOp compareOp, + byte[] value, + RowMutations mutation) { throw new RuntimeException(this.getClass() + " does NOT implement this method."); } @Override public boolean checkAndMutate(byte[] bytes, - byte[] bytes1, - byte[] bytes2, - CompareOperator compareOperator, - byte[] bytes3, - RowMutations rowMutations) throws IOException { + byte[] bytes1, + byte[] bytes2, + CompareOperator compareOperator, + byte[] bytes3, + RowMutations rowMutations) { return false; } @@ -1021,7 +973,7 @@ public int getReadRpcTimeout() { @Override public void setReadRpcTimeout(int i) { - + // Not implemented } @Override @@ -1036,7 +988,7 @@ public int getWriteRpcTimeout() { @Override public void setWriteRpcTimeout(int i) { - + // Not implemented } @Override @@ -1132,7 +1084,7 @@ private static void put(HBaseTableStub table, String row, String column, String * @return {"family", "qualifier"} */ private static String[] split(String column) { - return new String[] { column.substring(0, column.indexOf(':')), column.substring(column.indexOf(':') + 1) }; + return new String[]{column.substring(0, column.indexOf(':')), column.substring(column.indexOf(':') + 1)}; } /**