Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgraded test containers & introduced duration for supplying seconds #4

Merged
merged 6 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ its MVCC (MultiVersion Concurrency Control) capability.
```xml
<dependency>
<groupId>com.phonepe</groupId>
<artifactId>distributed-lock-manager</artifactId>
<version>3.0.6</version>
<artifactId>DLM</artifactId>
<version>1.0.0</version>
</dependency>
```

Expand Down
70 changes: 40 additions & 30 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
<modelVersion>4.0.0</modelVersion>

<groupId>com.phonepe</groupId>
<artifactId>distributed-lock-manager</artifactId>
<version>3.0.7-SNAPSHOT</version>
<artifactId>DLM</artifactId>
<version>1.0.0-SNAPSHOT</version>

<url>https://github.com/PhonePe/DLM</url>
<description>Distributed Lock Manager</description>
Expand Down Expand Up @@ -95,7 +95,7 @@

<!-- Test related properties -->
<mockito.version>4.3.1</mockito.version>
<junit.testcontainer.version>1.0.6</junit.testcontainer.version>
<junit.testcontainer.version>1.0.14</junit.testcontainer.version>
<jna.version>5.7.0</jna.version>
<awaitility.version>4.1.1</awaitility.version>
<junit.version>4.12</junit.version>
Expand All @@ -105,6 +105,7 @@
<jdk.target.version>17</jdk.target.version>
<jdk.release.version>17</jdk.release.version>

<sonar.projectKey>PhonePe_DLM</sonar.projectKey>
<sonar.organization>phonepe</sonar.organization>
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
</properties>
Expand Down Expand Up @@ -173,6 +174,12 @@
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.appform.testcontainer</groupId>
<artifactId>junit-testcontainer-commons</artifactId>
<version>${junit.testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.appform.testcontainer</groupId>
<artifactId>junit-testcontainer-aerospike</artifactId>
Expand Down Expand Up @@ -258,33 +265,36 @@

<profiles>
<profile>
<id>coverage</id>
<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco.maven.plugin.version}</version>
<executions>
<execution>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>report</id>
<phase>test</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
</executions>
<configuration>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<id>coverage</id>
<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco.maven.plugin.version}</version>
<executions>
<execution>
<id>prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>report</id>
<goals>
<goal>report</goal>
</goals>
<configuration>
<formats>
<format>XML</format>
</formats>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>release</id>
<build>
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/com/phonepe/dlm/DistributedLockManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.phonepe.dlm.lock.Lock;
import com.phonepe.dlm.lock.base.LockBase;
import com.phonepe.dlm.lock.level.LockLevel;

import java.time.Duration;

import com.phonepe.dlm.exception.DLMException;
import lombok.AllArgsConstructor;
import lombok.Builder;
Expand Down Expand Up @@ -56,7 +59,7 @@ public void tryAcquireLock(final Lock lock) {
* @param duration The lock duration in seconds for which lock will be held
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is already acquired
*/
public void tryAcquireLock(final Lock lock, final int duration) {
public void tryAcquireLock(final Lock lock, final Duration duration) {
lockBase.tryAcquireLock(lock, duration);
}

Expand Down Expand Up @@ -86,7 +89,7 @@ public void acquireLock(final Lock lock) {
* @param duration The lock duration in seconds for which lock will be held
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout
*/
public void acquireLock(final Lock lock, final int duration) {
public void acquireLock(final Lock lock, final Duration duration) {
lockBase.acquireLock(lock, duration);
}

Expand All @@ -101,7 +104,7 @@ public void acquireLock(final Lock lock, final int duration) {
* @param timeout The timeout(wait duration in seconds) for a lock to become available
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout
*/
public void acquireLock(final Lock lock, final int duration, final int timeout) {
public void acquireLock(final Lock lock, final Duration duration, final Duration timeout) {
lockBase.acquireLock(lock, duration, timeout);
}

Expand Down
9 changes: 6 additions & 3 deletions src/main/java/com/phonepe/dlm/lock/ILockable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import com.phonepe.dlm.exception.ErrorCode;
import com.phonepe.dlm.lock.base.LockBase;

import java.time.Duration;

import com.phonepe.dlm.exception.DLMException;

public interface ILockable {
Expand All @@ -41,7 +44,7 @@ public interface ILockable {
* @param duration The lock duration in seconds for which lock will be held
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is already acquired
*/
void tryAcquireLock(final Lock lock, final int duration);
void tryAcquireLock(final Lock lock, final Duration duration);

/**
* This method tries to acquire the lock, and if the lock is currently held by another thread,
Expand All @@ -67,7 +70,7 @@ public interface ILockable {
* @param duration The lock duration in seconds for which lock will be held
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout
*/
void acquireLock(final Lock lock, final int duration);
void acquireLock(final Lock lock, final Duration duration);

/**
* This method attempts to acquire the lock and waits for a limited time for the lock to become available.
Expand All @@ -80,7 +83,7 @@ public interface ILockable {
* @param timeout The timeout(wait duration in seconds) for a lock to become available
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout
*/
void acquireLock(final Lock lock, final int duration, final int timeout);
void acquireLock(final Lock lock, final Duration duration, final Duration timeout);

/**
* This method releases the acquired lock, allowing other threads to acquire it.
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/com/phonepe/dlm/lock/base/LockBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@AllArgsConstructor
@Builder
@Getter
public class LockBase implements ILockable {
public static final int DEFAULT_LOCK_TTL_SECONDS = 90;
public static final int DEFAULT_WAIT_FOR_LOCK_IN_SECONDS = 90;
public static final Duration DEFAULT_LOCK_TTL_SECONDS = Duration.ofSeconds(90);
public static final Duration DEFAULT_WAIT_FOR_LOCK_IN_SECONDS = Duration.ofSeconds(90);
public static final int WAIT_TIME_FOR_NEXT_RETRY = 1000; // 1 second

private final ILockStore lockStore;
Expand All @@ -49,7 +50,7 @@ public void tryAcquireLock(final Lock lock) {
}

@Override
public void tryAcquireLock(final Lock lock, final int duration) {
public void tryAcquireLock(final Lock lock, final Duration duration) {
writeToStore(lock, duration);
}

Expand All @@ -59,14 +60,14 @@ public void acquireLock(final Lock lock) {
}

@Override
public void acquireLock(final Lock lock, final int duration) {
public void acquireLock(final Lock lock, final Duration duration) {
acquireLock(lock, duration, DEFAULT_WAIT_FOR_LOCK_IN_SECONDS);
}

@SneakyThrows
@Override
public void acquireLock(final Lock lock, final int duration, final int timeout) {
final Timer timer = new Timer(System.currentTimeMillis(), timeout);
public void acquireLock(final Lock lock, final Duration duration, final Duration timeout) {
final Timer timer = new Timer(System.currentTimeMillis(), timeout.getSeconds());
final AtomicBoolean success = new AtomicBoolean(false);
do {
try {
Expand Down Expand Up @@ -96,7 +97,7 @@ public boolean releaseLock(final Lock lock) {
return false;
}

private void writeToStore(final Lock lock, final int ttlSeconds) {
private void writeToStore(final Lock lock, final Duration ttlSeconds) {
lockStore.write(lock.getLockId(), lock.getLockLevel(), lock.getFarmId(), ttlSeconds);
lock.getAcquiredStatus().compareAndSet(false, true);
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/phonepe/dlm/lock/storage/ILockStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package com.phonepe.dlm.lock.storage;

import java.time.Duration;

import com.phonepe.dlm.lock.level.LockLevel;

public interface ILockStore {
void initialize();

void write(String lockId, LockLevel lockLevel, String farmId, int ttlSeconds);
void write(String lockId, LockLevel lockLevel, String farmId, Duration ttlSeconds);

void remove(String lockId, LockLevel lockLevel, String farmId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import lombok.Builder;
import lombok.Data;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -52,11 +53,11 @@ public void initialize() {
}

@Override
public void write(String lockId, LockLevel lockLevel, String farmId, int ttlSeconds) {
public void write(String lockId, LockLevel lockLevel, String farmId, Duration ttlSeconds) {
final WritePolicy writePolicy = new WritePolicy(aerospikeClient.getWritePolicyDefault());
writePolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
writePolicy.generation = 0;
writePolicy.expiration = ttlSeconds;
writePolicy.expiration = Long.valueOf(ttlSeconds.getSeconds()).intValue(); // as only int is supported
writePolicy.commitLevel = CommitLevel.COMMIT_MASTER; // Committing to master only, as there is no read required so there is no chance of dirty reads.
try {
final List<Bin> binList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.time.Duration;

@Data
@Builder
Expand Down Expand Up @@ -70,14 +71,14 @@ public void initialize() {
}

@Override
public void write(String lockId, LockLevel lockLevel, String farmId, int ttlSeconds) {
public void write(String lockId, LockLevel lockLevel, String farmId, Duration ttlSeconds) {
final byte[] normalisedRowKey = getNormalisedRowKey(lockId, lockLevel, farmId);

try (final Table table = getTable()) {
final boolean result = table.checkAndMutate(normalisedRowKey, COLUMN_FAMILY)
.qualifier(COLUMN_NAME)
.ifNotExists()
.thenPut(new Put(normalisedRowKey, System.currentTimeMillis()).setTTL(ttlSeconds * 1_000L)
.thenPut(new Put(normalisedRowKey, System.currentTimeMillis()).setTTL(ttlSeconds.getSeconds() * 1_000L)
.addColumn(COLUMN_FAMILY, COLUMN_NAME, COLUMN_DATA));
if (!result) {
throw DLMException.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.Before;
import org.junit.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void lockTestPositiveSiloDC() {
@Test
public void lockTestPositiveXDC() {
final Lock lock = lockManager.getLockInstance("LOCK_ID", LockLevel.XDC);
lockManager.tryAcquireLock(lock, 90);
lockManager.tryAcquireLock(lock, Duration.ofSeconds(90));
Assert.assertTrue(lock.getAcquiredStatus()
.get());

Expand All @@ -127,7 +128,7 @@ public void lockTestPositiveXDC() {
@Test
public void testAcquireLockWithWait() {
final Lock lock = lockManager.getLockInstance("NEW_LOCK_ID", LockLevel.DC);
lockManager.acquireLock(lock, 2); // Lock acquired for 1 seconds
lockManager.acquireLock(lock, Duration.ofSeconds(2)); // Lock acquired for 2 seconds
Assert.assertTrue(lock.getAcquiredStatus().get());

try {
Expand All @@ -140,7 +141,7 @@ public void testAcquireLockWithWait() {
Assert.assertTrue(lock.getAcquiredStatus().get());

try {
lockManager.acquireLock(lock, 2, 2); // Wait for 2 seconds only for acquiring the lock
lockManager.acquireLock(lock, Duration.ofSeconds(2), Duration.ofSeconds(2)); // Wait for 2 seconds only for acquiring the lock
Assert.fail("Flow should not have reached here");
} catch (DLMException e) {
Assert.assertEquals(ErrorCode.LOCK_UNAVAILABLE, e.getErrorCode()); // As it won't be released for next 90 secs default
Expand Down
Loading