Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New lock with notification #16

Merged
merged 9 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ Originally conceived as a group of locks, then some synchronization primitives,
These include

* Synchronization: primitives to synchronize process and threads one with other
* Lock: exclusive locks. Normal locks, also interrupting locks and a java Lock implementation.
* Lock: exclusive locks. Normal locks, also interrupting locks and a java Lock implementation.
Also, notification locks, that uses a asynchronous notification system to know it another lock has released
the grip and they can proceed to get the lock, without poolling
* Semaphores
* CountDownLatch: count down to open the flood gates and allow all waiters to progress
* Collections: redis-backed implementation of Java collection interfaces, with all data stored on Redis, like
Expand All @@ -39,6 +41,8 @@ These include
* Cache: A simple cache with readthrougth and writethrougth operations
* Cycle: A list of elements that cycles for one to the next, and to the initial one; one result per call, in a cycle
* RateLimiter: temporal or bucket limited distributed rate
* StreamMessageSystem: a class that lets you send messages to a stream and receive from the same stream
(in a background thread of the class). One by one, and no messsage is lost (AT LEAST ONCE).
* More utils like
* SimplePubSub: a simple pub/sub that only consumes messages via a BiConsumer function

Expand All @@ -49,7 +53,7 @@ It's intended to make possible distributes locking and synchronization, share da

All classes have tests, unit and functional ones.
You can test the latter ones by activating them and configuring your own redis server, to test that all the classes work properly in theory and practice.
There are **more than 500 working tests**, so the code is pretty secure.
There are **more than 600 working tests**, so the code is pretty secure.


**See the [wiki](https://github.com/oscar-besga-panel/JedisExtraUtils/wiki) for more documentation**
Expand Down Expand Up @@ -85,6 +89,7 @@ Also, you will find a little Groovy and a docker composer to setup a testing red

| Library version | Jedis version | JDK Version |
|-----------------|---------------|-------------|
| 6.1.0 | 5.1.X | JDK11 |
| 6.0.0 | 5.0.X | JDK11 |
| 5.3.0 | 4.4.X | JDK11 |

Expand Down
6 changes: 3 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
projectVersion=6.0.0
projectVersion=6.1.0
#javaVersion=JavaVersion.VERSION_11
javaVersion=11

# https://github.com/johnrengelman/shadow
shadowVersion=7.1.2

# https://mvnrepository.com/artifact/redis.clients/jedis/4.4.3
jedisVersion=5.0.2
# https://mvnrepository.com/artifact/redis.clients/jedis/5.1.0
jedisVersion=5.1.0
# https://mvnrepository.com/artifact/org.javassist/javassist
javassistVersion=3.29.2-GA
# https://mvnrepository.com/artifact/org.slf4j/slf4j-api
Expand Down
186 changes: 186 additions & 0 deletions src/main/deprecated/notificationLock/NotificationLock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package org.oba.jedis.extra.utils.notificationLock;

import org.oba.jedis.extra.utils.utils.Named;
import org.oba.jedis.extra.utils.utils.ScriptEvalSha1;
import org.oba.jedis.extra.utils.utils.UniversalReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Response;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.params.SetParams;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;

public class NotificationLock implements Named, AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(NotificationLock.class);

public static final String SCRIPT_NAME = "semaphore.lua";
public static final String FILE_PATH = "./src/main/resources/semaphore.lua";

public static final String CLIENT_RESPONSE_OK = "OK";

private final NotificationLockFactory factory;
private final String lockName;
private final String uniqueToken;
private final ScriptEvalSha1 script;
private final Semaphore semaphore;

NotificationLock(NotificationLockFactory factory, String lockName, String uniqueToken) {
this.factory = factory;
this.lockName = lockName;
this.uniqueToken = uniqueToken;
this.script = new ScriptEvalSha1(factory.getJedisPool(), new UniversalReader().
withResoruce(SCRIPT_NAME).
withFile(FILE_PATH));
this.semaphore = new Semaphore(0);
}

@Override
public String getName() {
return lockName;
}

@Override
public void close() {
unlock();
}

public synchronized boolean tryLock() {
return redisLock();
}

public synchronized void lock() {
boolean locked = redisLock();
while (!locked) {
try {
semaphore.acquire();
locked = redisLock();
} catch (InterruptedException ie) {
LOGGER.debug("interrupted", ie);
}
}
}


public synchronized void lockInterruptibly() throws InterruptedException {
boolean locked = redisLock();
while (!locked) {
semaphore.acquire();
locked = redisLock();
}
}

public synchronized void unlock() {
redisUnlock();
}


public synchronized void underLock(Runnable task) {
try (NotificationLock nl = this) {
nl.lock();
task.run();
}
}

public synchronized <T> T underLock(Supplier<T> task) {
try (NotificationLock nl = this){
nl.lock();
return task.get();
}
}


void awake() {
semaphore.release();
}



/**
* If a leaseTime is set, it checks the leasetime and the timelimit
* Then it checks if remote redis has te same value as the lock
* If not, returns false
* @return true if the lock is remotely held
*/
private synchronized boolean redisCheckLock() {
return factory.withJedisPoolGet(this::redisCheckLockUnderPool);
}

/**
* If a leaseTime is set, it checks the leasetime and the timelimit
* Then it checks if remote redis has te same value as the lock
* If not, returns false
* @return true if the lock is remotely held
*/
private synchronized boolean redisCheckLockUnderPool(Jedis jedis) {
boolean check = false;
String currentValueRedis = jedis.get(lockName);
LOGGER.debug("checkLock >" + Thread.currentThread().getName() + "check value {} currentValueRedis {}", uniqueToken, currentValueRedis);
check = uniqueToken.equals(currentValueRedis);
return check;
}


/**
* Attempts to get the lock.
* It will try one time and return
* The leaseMoment and timeLimit are set if lock is obtained
* @return true if lock obtained, false otherwise
*/
private synchronized boolean redisLock() {
return factory.withJedisPoolGet(this::redisLockUnderPool);
}

private synchronized boolean redisLockUnderPool(Jedis jedis) {
LOGGER.debug("redisLockUnderPool");
SetParams setParams = new SetParams().nx();
Transaction t = jedis.multi();
Response<String> responseClientStatusCodeReply = t.set(lockName, uniqueToken,setParams);
Response<String> responseCurrentValueRedis = t.get(lockName);
t.exec();
String clientStatusCodeReply = responseClientStatusCodeReply.get();
String currentValueRedis = responseCurrentValueRedis.get();
LOGGER.debug("redisLockUnderPool clientStatusCodeReply {} currentValueRedis {} uniqueToken {}",
clientStatusCodeReply, currentValueRedis, uniqueToken);
return CLIENT_RESPONSE_OK.equalsIgnoreCase(clientStatusCodeReply) && uniqueToken.equals(currentValueRedis);
}

/**
* Attempts to unlock the lock
*/
private synchronized void redisUnlock() {
if (!redisCheckLock()) return;
List<String> keys = Collections.singletonList(lockName);
List<String> values = Collections.singletonList(uniqueToken);
Object response = script.evalSha(keys, values);
LOGGER.debug("redisUnlock response {}", response);
int num = 0;
if (response != null) {
LOGGER.debug("response {}", response);
num = Integer.parseInt(response.toString());
}
if (num > 0) {
factory.messageOnUnlock(this);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NotificationLock that = (NotificationLock) o;
return Objects.equals(factory, that.factory) && Objects.equals(lockName, that.lockName) && Objects.equals(uniqueToken, that.uniqueToken);
}

@Override
public int hashCode() {
return Objects.hash(factory, lockName, uniqueToken);
}

}
Loading