-
Notifications
You must be signed in to change notification settings - Fork 0
/
HelloUDPNonblockingServer.java
151 lines (135 loc) · 4.79 KB
/
HelloUDPNonblockingServer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package hello;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.charset.CharacterCodingException;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;
import static hello.Util.closeNullable;
import static hello.Util.log;
/**
* Implementation of {@link HelloServer}
*
* @author Pavel Sharaev ([email protected])
*/
public class HelloUDPNonblockingServer extends AbstractHelloServer {
private ByteBufferPool byteBufferPool;
private Selector selector;
private BlockingQueue<Response> responses;
private DatagramChannel serverChannel;
/**
* see {@link AbstractHelloServer#main(String[], Supplier)}
*
* @param args {@link AbstractHelloServer#MAIN_USAGE}
*/
public static void main(final String[] args) {
main(args, HelloUDPNonblockingServer::new);
}
@Override
public void start(final int port, final int threads) {
try {
selector = Selector.open();
serverChannel = DatagramChannel.open();
serverChannel.bind(new InetSocketAddress(port));
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_READ);
final int receiveBufferSize = serverChannel.getOption(StandardSocketOptions.SO_RCVBUF);
responses = new ArrayBlockingQueue<>(threads * TASKS_CAPACITY_FOR_THREAD);
byteBufferPool = new ByteBufferPool(threads * TASKS_CAPACITY_FOR_THREAD, receiveBufferSize);
} catch (final IOException e) {
log("Fail start server", e);
return;
}
startWorkersAndReceiver(threads);
}
@Override
protected void receiver() {
while (!Thread.currentThread().isInterrupted() && selector.isOpen()) {
try {
selector.select();
} catch (final IOException e) {
log("Fail select", e);
return;
}
for (final Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext(); ) {
final SelectionKey key = i.next();
try {
if (key.isReadable()) {
readKey(key);
}
if (key.isWritable()) {
writeKey(key);
}
} finally {
i.remove();
}
}
}
}
private void readKey(final SelectionKey key) {
final DatagramChannel channel = (DatagramChannel) key.channel();
final ByteBuffer receiveBuffer = byteBufferPool.poll();
if (receiveBuffer == null) {
return;
}
final SocketAddress address;
try {
address = channel.receive(receiveBuffer);
} catch (final IOException e) {
log("Failed receive buffer", e);
return;
}
try {
workers.submit(() -> {
final Response response;
try {
receiveBuffer.flip();
response = processRequest(address, receiveBuffer);
} catch (final CharacterCodingException e) {
log("Bad request", e);
byteBufferPool.release(receiveBuffer);
return;
}
try {
responses.add(response);
} catch (final IllegalStateException e) {
log("Failed add response", e);
return;
}
key.interestOpsOr(SelectionKey.OP_WRITE);
selector.wakeup();
});
} catch (final RejectedExecutionException e) {
log("Fail submit task", e);
}
}
private void writeKey(final SelectionKey key) {
final DatagramChannel channel = (DatagramChannel) key.channel();
final Response response = responses.poll();
if (response == null) {
key.interestOps(SelectionKey.OP_READ);
return;
}
try {
channel.send(response.getResponse(), response.getAddress());
} catch (final IOException e) {
log("Failed send buffer", e);
} finally {
byteBufferPool.release(response.getResponse());
}
}
@Override
public void close() {
closeNullable("server channel", serverChannel);
closeNullable("selector", selector);
closeWorkers();
}
}