-
Notifications
You must be signed in to change notification settings - Fork 0
/
ExternalServiceConnectionPool.java
105 lines (88 loc) · 3.94 KB
/
ExternalServiceConnectionPool.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package com.stefano.service;
import org.apache.log4j.Logger;
import java.util.Map;
import java.util.concurrent.*;
/**
* User: franzs
* Date: 26/02/14
* Time: 16:46
*/
public abstract class ExternalServiceConnectionPool<T extends AutoCloseable> {
private static final Logger logger = Logger.getLogger(ExternalServiceConnectionPool.class);
private final static ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
private final long milliesToWait;
private BlockingQueue<T> theServices = new LinkedBlockingQueue<>();
private ConcurrentHashMap<T, TakeServiceInfo> infoMap = new ConcurrentHashMap<>();
public ExternalServiceConnectionPool(int numberOfServices) {
this(numberOfServices, 30, TimeUnit.SECONDS);
}
public ExternalServiceConnectionPool(int numberOfServices, long timeout, TimeUnit timeoutUnit) {
for (int i = 0; i < numberOfServices; i++) {
T theService = initialValue();
infoMap.put(theService, TakeServiceInfo.AVAILABLE_SERVICE);
theServices.add(theService);
}
milliesToWait = timeoutUnit.toMillis(timeout);
EXECUTOR_SERVICE.scheduleWithFixedDelay(new CleaningRunnable(), timeout, timeout, timeoutUnit);
}
protected abstract T initialValue();
public void releaseService(T service) {
logger.info(Thread.currentThread() + " has released: " + service.toString());
TakeServiceInfo info;
if ((info = infoMap.replace(service, TakeServiceInfo.AVAILABLE_SERVICE)) != null && info != TakeServiceInfo.ABANDON_SERVICE){
theServices.offer(service);
}else{
infoMap.replace(service, TakeServiceInfo.AVAILABLE_SERVICE, TakeServiceInfo.ABANDON_SERVICE);
}
}
public T getService() {
try {
T service = theServices.take();
infoMap.put(service, new TakeServiceInfo());
logger.info(Thread.currentThread() + " took service: " + service);
return service;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void abandonService(T service) {
try {
if (infoMap.remove(service, TakeServiceInfo.ABANDON_SERVICE)) {
service.close();
T newService = initialValue();
infoMap.put(newService, TakeServiceInfo.AVAILABLE_SERVICE);
theServices.offer(newService);
}
} catch (Exception e) {
//NOTHING WE CAN DO NOW!
logger.info("Exception during close of service: ", e);
}
}
private static class TakeServiceInfo {
private static TakeServiceInfo ABANDON_SERVICE = new TakeServiceInfo();
private static TakeServiceInfo AVAILABLE_SERVICE = new TakeServiceInfo();
private Thread takingThread;
private long timeTaken;
private TakeServiceInfo() {
this.takingThread = Thread.currentThread();
this.timeTaken = System.currentTimeMillis();
}
}
private class CleaningRunnable implements Runnable {
@Override
public void run() {
for (Map.Entry<T, TakeServiceInfo> entry : infoMap.entrySet()) {
if (entry.getValue() != TakeServiceInfo.AVAILABLE_SERVICE && entry.getValue() != TakeServiceInfo.ABANDON_SERVICE) {
TakeServiceInfo info = entry.getValue();
if ((System.currentTimeMillis() - info.timeTaken) >= milliesToWait) {
logger.info("Expiring: " + entry.getKey() + " as it has been held by thread: " + info.takingThread + " for over the timeout value");
infoMap.replace(entry.getKey(), info, TakeServiceInfo.ABANDON_SERVICE);
}
}
if (infoMap.get(entry.getKey()) == TakeServiceInfo.ABANDON_SERVICE) {
abandonService(entry.getKey());
}
}
}
}
}