Skip to content

Commit

Permalink
Merge pull request #10 from PhonePe/oss
Browse files Browse the repository at this point in the history
Sonar fixes
  • Loading branch information
MISBMS authored Oct 23, 2024
2 parents 6df4dfe + 6678e21 commit 4b8c64c
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 147 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven-central-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
distribution: 'zulu'
server-id: central
server-username: MAVEN_USERNAME
server-password: MAVEN_PASSWORD
Expand Down
6 changes: 2 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@
<sonar.language>java</sonar.language>
<sonar.qualitygate.wait>true</sonar.qualitygate.wait>
<sonar.exclusions>
<!-- Excluding test classes from sonar smells -->
**/src/test/java/com/phonepe/dlm/**/*.java,
**/src/test/java/com/phonepe/dlm/*.java
**/src/test/java/com/phonepe/dlm/util/HBaseTableStub.java
</sonar.exclusions>
</properties>

Expand Down Expand Up @@ -317,7 +315,7 @@
</executions>
<configuration>
<excludes>
<exclude>**/src/test/**</exclude>
<exclude>**/src/test/java/com/phonepe/dlm/util/HBaseTableStub.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/com/phonepe/dlm/exception/DLMException.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ public class DLMException extends RuntimeException {
private static final long serialVersionUID = 7310153558992797589L;
private final ErrorCode errorCode;

public DLMException(ErrorCode errorCode) {
super();
this.errorCode = errorCode;
}

@Builder
public DLMException(ErrorCode errorCode, String message, Throwable cause) {
super(message, cause);
Expand Down
15 changes: 12 additions & 3 deletions src/main/java/com/phonepe/dlm/lock/base/LockBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
Expand Down Expand Up @@ -64,7 +63,6 @@ public void acquireLock(final Lock lock, final Duration duration) {
acquireLock(lock, duration, DEFAULT_WAIT_FOR_LOCK_IN_SECONDS);
}

@SneakyThrows
@Override
public void acquireLock(final Lock lock, final Duration duration, final Duration timeout) {
final Timer timer = new Timer(System.currentTimeMillis(), timeout.getSeconds());
Expand All @@ -79,14 +77,15 @@ public void acquireLock(final Lock lock, final Duration duration, final Duration
throw e;
}
if (e.getErrorCode() == ErrorCode.LOCK_UNAVAILABLE) {
Thread.sleep(WAIT_TIME_FOR_NEXT_RETRY);
sleep();
continue;
}
throw e;
}
} while (!success.get());
}


@Override
public boolean releaseLock(final Lock lock) {
if (lock.getAcquiredStatus().get()) {
Expand All @@ -101,4 +100,14 @@ private void writeToStore(final Lock lock, final Duration ttlSeconds) {
lockStore.write(lock.getLockId(), lock.getLockLevel(), lock.getFarmId(), ttlSeconds);
lock.getAcquiredStatus().compareAndSet(false, true);
}

private static void sleep() {
try {
Thread.sleep(WAIT_TIME_FOR_NEXT_RETRY);
} catch (InterruptedException e) {
log.error("Error sleeping the thread", e);
Thread.currentThread().interrupt();
throw DLMException.propagate(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public void write(String lockId, LockLevel lockLevel, String farmId, Duration tt
.errorCode(ErrorCode.CONNECTION_ERROR)
.message(String.format("Error writing lock in HBase [id = %s]", lockId))
.build();
} catch (Exception e) {
throw DLMException.propagate(e);
}
}

Expand Down
71 changes: 48 additions & 23 deletions src/test/java/com/phonepe/dlm/DistributedLockWithAerospikeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.phonepe.dlm.util.TestUtils;
import io.appform.testcontainers.aerospike.AerospikeContainerConfiguration;
import io.appform.testcontainers.aerospike.container.AerospikeContainer;
import lombok.extern.slf4j.Slf4j;
import org.junit.*;
import org.junit.rules.ExpectedException;

Expand All @@ -46,6 +47,7 @@
/**
* @author shantanu.tiwari
*/
@Slf4j
public class DistributedLockWithAerospikeTest {
public static final String AEROSPIKE_HOST = "localhost";
public static final String AEROSPIKE_DOCKER_IMAGE = "aerospike/aerospike-server:6.4.0.23";
Expand All @@ -72,18 +74,7 @@ public void setUp() {
new Host(AEROSPIKE_DOCKER_CONTAINER.getHost(), AEROSPIKE_DOCKER_CONTAINER.getConnectionPort()));
aerospikeClient.truncate(aerospikeClient.getInfoPolicyDefault(), AEROSPIKE_NAMESPACE, null, null);

lockManager = DistributedLockManager.builder()
.clientId("CLIENT_ID")
.farmId("FA1")
.lockBase(LockBase.builder()
.mode(LockMode.EXCLUSIVE)
.lockStore(AerospikeStore.builder()
.aerospikeClient(aerospikeClient)
.namespace(AEROSPIKE_NAMESPACE)
.setSuffix("distributed_lock")
.build())
.build())
.build();
lockManager = getLockManager(LockMode.EXCLUSIVE);
lockManager.initialize();
}

Expand All @@ -93,7 +84,7 @@ public void destroy() {
}

@Test
public void lockTestPositiveSiloDC() {
public void lockPositiveSiloDCTest() {
final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC);
lockManager.tryAcquireLock(lock);
Assert.assertTrue(lock.getAcquiredStatus()
Expand All @@ -110,7 +101,7 @@ public void lockTestPositiveSiloDC() {
}

@Test
public void lockTestPositiveXDC() {
public void lockPositiveXDCTest() {
final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.XDC);
lockManager.tryAcquireLock(lock, Duration.ofSeconds(90));
Assert.assertTrue(lock.getAcquiredStatus()
Expand All @@ -128,26 +119,26 @@ public void lockTestPositiveXDC() {
}

@Test
public void testLockUnavailableForAcquireLock() {
public void lockUnavailableForAcquireLockTest() {
final Lock lock = lockManager.getLockInstance("NEW_LOCK_ID", LockLevel.DC);
lockManager.acquireLock(lock); // Wait and try acquiring the lock.
lockManager.acquireLock(lock, Duration.ofSeconds(30)); // Wait and try acquiring the lock.

exceptionThrown.expect(DLMExceptionMatcher.hasCode(ErrorCode.LOCK_UNAVAILABLE));
lockManager.acquireLock(lock, Duration.ofSeconds(2), Duration.ofSeconds(2)); // Wait for 2 seconds only for acquiring the lock
}

@Test
public void testLockUnavailableForTryAcquireLockWithSameLockInstance() {
public void lockUnavailableForTryAcquireLockWithSameLockInstanceTest() {
final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC);
lockManager.tryAcquireLock(lock);
lockManager.acquireLock(lock, Duration.ofSeconds(30), Duration.ofSeconds(5));
Assert.assertTrue(lock.getAcquiredStatus().get());

exceptionThrown.expect(DLMExceptionMatcher.hasCode(ErrorCode.LOCK_UNAVAILABLE));
lockManager.tryAcquireLock(lock);
}

@Test
public void testLockUnavailableForTryAcquireLockWithDifferentLockInstance() {
public void lockUnavailableForTryAcquireLockWithDifferentLockInstanceTest() {
Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC);
lockManager.tryAcquireLock(lock);
Assert.assertTrue(lock.getAcquiredStatus().get());
Expand All @@ -158,7 +149,7 @@ public void testLockUnavailableForTryAcquireLockWithDifferentLockInstance() {
}

@Test
public void concurrentLockAttempt() {
public void concurrentLockAttemptTest() {
final int attempts = Runtime.getRuntime()
.availableProcessors();
final Map<String, AtomicInteger> trackers = Maps.newConcurrentMap();
Expand All @@ -184,7 +175,7 @@ public void concurrentLockAttempt() {
trackers.computeIfAbsent("FAILED_ACQUIRES", x -> new AtomicInteger(0))
.getAndIncrement();
} catch (Exception e) {
// ignore;
log.warn("Gracefully ignoring exception", e);
} finally {
boolean result = lockManager.releaseLock(lock);
Assert.assertFalse(lock.getAcquiredStatus()
Expand All @@ -206,8 +197,8 @@ public void concurrentLockAttempt() {
if (counter.decrementAndGet() <= 1) {
latch.countDown();
}
} catch (InterruptedException | ExecutionException e1) {
// ignore;
} catch (InterruptedException | ExecutionException e) {
log.warn("Gracefully ignoring exception", e);
}
});

Expand All @@ -226,8 +217,42 @@ public void concurrentLockAttempt() {
.get());
}

@Test(expected = DLMException.class)
public void exceptionInLockTest() {
final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC);
lockManager.acquireLock(lock, Duration.ofDays(7300));
}

@Test
public void interruptedExceptionInLockTest() {
try {
final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC);
lockManager.acquireLock(lock);
Thread.currentThread().interrupt();
exceptionThrown.expect(DLMExceptionMatcher.hasCode(ErrorCode.INTERNAL_ERROR));
lockManager.acquireLock(lock, Duration.ofSeconds(30));
} finally {
Thread.interrupted(); // clearing thread interrupted status for isolation
}
}

@After
public void tearDown() {
aerospikeClient.truncate(aerospikeClient.getInfoPolicyDefault(), AEROSPIKE_NAMESPACE, null, null);
}

public DistributedLockManager getLockManager(final LockMode lockMode) {
return lockMode.accept(() -> DistributedLockManager.builder()
.clientId("CLIENT_ID")
.farmId("FA1")
.lockBase(LockBase.builder()
.mode(lockMode)
.lockStore(AerospikeStore.builder()
.aerospikeClient(aerospikeClient)
.namespace(AEROSPIKE_NAMESPACE)
.setSuffix("distributed_lock")
.build())
.build())
.build());
}
}
30 changes: 18 additions & 12 deletions src/test/java/com/phonepe/dlm/DistributedLockWithHBaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
package com.phonepe.dlm;

import com.google.common.collect.Maps;
import com.phonepe.dlm.exception.DLMException;
import com.phonepe.dlm.exception.ErrorCode;
import com.phonepe.dlm.lock.Lock;
import com.phonepe.dlm.lock.base.LockBase;
import com.phonepe.dlm.lock.level.LockLevel;
import com.phonepe.dlm.lock.mode.LockMode;
import com.phonepe.dlm.lock.storage.hbase.HBaseStore;
import com.phonepe.dlm.util.HBaseConnectionStub;
import com.phonepe.dlm.exception.DLMException;
import com.phonepe.dlm.exception.ErrorCode;
import com.phonepe.dlm.util.DLMExceptionMatcher;
import com.phonepe.dlm.util.HBaseConnectionStub;
import com.phonepe.dlm.util.TestUtils;
import lombok.extern.slf4j.Slf4j;
import org.junit.*;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
Expand All @@ -41,6 +42,7 @@

import static org.junit.Assert.assertEquals;

@Slf4j
@RunWith(MockitoJUnitRunner.class)
public class DistributedLockWithHBaseTest {
@Rule
Expand Down Expand Up @@ -101,28 +103,32 @@ public void lockTestPositiveXDC() {
Assert.assertFalse(released);
}

@Test(expected = DLMException.class)
@Test
public void lockTestNegative1() {
final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC);
lockManager.tryAcquireLock(lock);
Assert.assertTrue(lock.getAcquiredStatus().get());

exceptionThrown.expect(DLMExceptionMatcher.hasCode(ErrorCode.LOCK_UNAVAILABLE));
lockManager.tryAcquireLock(lock);
}

@Test(expected = DLMException.class)
@Test
public void lockTestNegative2() {
Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC);
lockManager.tryAcquireLock(lock);
Assert.assertTrue(lock.getAcquiredStatus().get());

exceptionThrown.expect(DLMExceptionMatcher.hasCode(ErrorCode.LOCK_UNAVAILABLE));
lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC);
lockManager.tryAcquireLock(lock);
}

@Test(expected = DLMException.class)
@Test
public void lockTestNegative3() {
final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.DC);
lockManager.tryAcquireLock(lock);
Assert.assertTrue(lock.getAcquiredStatus().get());

exceptionThrown.expect(DLMExceptionMatcher.hasCode(ErrorCode.LOCK_UNAVAILABLE));
final Lock lock1 = lockManager.getLockInstance("LOCK_ID", LockLevel.DC);
lockManager.tryAcquireLock(lock1);
}
Expand Down Expand Up @@ -202,8 +208,8 @@ public void concurrentLockAttempt() {
} catch (DLMException e) {
trackers.computeIfAbsent("FAILED_ACQUIRES", x -> new AtomicInteger(0))
.getAndIncrement();
} catch (Exception ignored) {
// ignore;
} catch (Exception e) {
log.warn("Gracefully ignoring exception", e);
} finally {
boolean result = lockManager.releaseLock(lock);
if (result) {
Expand All @@ -223,8 +229,8 @@ public void concurrentLockAttempt() {
if (counter.decrementAndGet() <= 1) {
latch.countDown();
}
} catch (InterruptedException | ExecutionException e1) {
// ignore;
} catch (InterruptedException | ExecutionException e) {
log.warn("Gracefully ignoring exception", e);
}
});

Expand Down
Loading

0 comments on commit 4b8c64c

Please sign in to comment.