forked from leverich/simucached
-
Notifications
You must be signed in to change notification settings - Fork 0
/
thread.cc
229 lines (193 loc) · 7.18 KB
/
thread.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
#include "config.h"
#include <errno.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/uio.h>
#include <unistd.h>
#include <string>
#include <sstream>
#include "log.h"
#include "simucached.h"
#include "thread.h"
#include "work.h"
#define MAX_EVENTS 2048
__thread char devnull[READ_CHUNK];
void* thread_main(void* data) {
Thread *td = (Thread *) data;
int efd = td->efd;
// We use an IOV for GET replies so we can cheaply insert the
// requested "key" without having to manipulate a full reply buffer.
//
// iov[0] = "VALUE "
// iov[1] = key
// iov[2] = " 0 <value length>\r\n<value>\r\nEND\r\n"
struct iovec iovs[3];
iovs[0].iov_base = (char*) "VALUE ";
iovs[0].iov_len = strlen((char*) iovs[0].iov_base);
iovs[1].iov_base = (char*) "key";
iovs[1].iov_len = strlen((char*) iovs[1].iov_base);
std::stringstream tailstream;
tailstream << " 0 " << args.value_size_arg << "\r\n";
tailstream << std::string().append(args.value_size_arg, 'f');
tailstream << "\r\nEND\r\n";
std::string tail = tailstream.str();
iovs[2].iov_base = (char*) tail.c_str();
iovs[2].iov_len = strlen(tail.c_str());
struct epoll_event events[MAX_EVENTS];
while (1) {
int n = epoll_wait(efd, events, MAX_EVENTS, -1);
if (n < 1) DIE("epoll_wait failed: %s", strerror(errno));
for (int i = 0; i < n; i++) {
Connection *conn = (Connection *) events[i].data.ptr;
int fd = conn->fd;
#if 0 // Lazily detect connection closure with read() below.
if (events[i].events & EPOLLHUP) {
close(fd);
delete conn;
continue;
}
#endif
// ***
// Zero-effort protocol parser. Assumes every command ends with
// a \r\n and replies with a generic GET reply.
// ***
if (args.no_parse_given) {
int ret = read(fd, conn->buffer, sizeof(conn->buffer));
if (ret <= 0) {
if (ret == EAGAIN) W("read() returned EAGAIN");
close(fd);
delete conn;
continue;
}
conn->buffer_idx = ret;
conn->buffer[conn->buffer_idx] = '\0';
char *start = conn->buffer;
// Locate a \r\n
char *crlf = NULL;
while (start < &conn->buffer[conn->buffer_idx]) {
crlf = strstr(start, "\r\n");
if (crlf == NULL) break; // No \r\n found.
int length = crlf - start;
if (writev(fd, iovs, 3) == EAGAIN) W("writev() returned EAGAIN");
start += length + 2;
}
continue;
}
// ***
// Minimal-effort protocol parser.
//
// Searches for a \r\n in the input buffer and then attempts to
// parse the given command. Incomplete commands (i.e. if no
// \r\n can be found) will be kept at the head of the
// connection's buffer. If the buffer becomes full (READ_CHUNK
// bytes), it is discarded; thus, commands will be clipped if
// they are bigger than a READ_CHUNK.
//
// After finding a \r\n, this parser understands GET and SET
// commands. Following a GET command, it replies with a fake
// VALUE for the given key. Following a SET command, the
// connection switches to "GOBBLE" state where it swallows X
// bytes from the socket (where X was the size specified in the
// SET command). After it finishes gobbling these bytes, it
// switches back to normal mode.
// ***
if (conn->state == Connection::GOBBLE) {
int ret = read(fd, devnull,
conn->bytes_to_eat > READ_CHUNK ?
READ_CHUNK : conn->bytes_to_eat);
if (ret <= 0) {
close(fd);
delete conn;
continue;
}
conn->bytes_to_eat -= ret;
if (conn->bytes_to_eat <= 0) {
conn->state = Connection::IDLE;
write(fd, "STORED\r\n", 8);
}
} else {
// Read into the connection's input buffer. buffer_idx points
// to the current tail of the buffer.
int ret = read(fd, &conn->buffer[conn->buffer_idx],
sizeof(conn->buffer) - conn->buffer_idx - 1);
if (ret <= 0) { // EOF or error.
close(fd);
delete conn;
continue;
}
conn->buffer_idx += ret;
// NUL-terminate to protect string operations below.
conn->buffer[conn->buffer_idx] = '\0';
char *start = conn->buffer;
// Search for \r\n (end of command).
char *crlf = NULL;
while (start < &conn->buffer[conn->buffer_idx]) {
crlf = strstr(start, "\r\n");
if (crlf == NULL) break; // No \r\n found, we're finished.
int length = crlf - start;
start[length] = '\0'; // Mark the end of a command.
if (!strncasecmp(start, "get", 3)) {
// Slice the key out of the command.
// FIXME: This won't parse "GET foo" correctly.
char *key = strchr(start, ' ');
if (key != NULL && *++key != '\0') {
char *end = strchr(key, ' ');
if (end != NULL) *end = '\0'; // Only take 1 key.
iovs[1].iov_base = key;
iovs[1].iov_len = strlen(key);
work();
writev(fd, iovs, 3);
} else {
W("Failed to parse GET command: %s", start);
write(fd, "ERROR\r\n", 7);
}
start += length + 2;
} else if (!strncasecmp(start, "set", 3)) {
int setsize = -1;
if (sscanf(start, "%*s %*s %*d %*d %d", &setsize) &&
setsize >= 0) {
start += length + 2;
int remaining = &conn->buffer[conn->buffer_idx] - start;
// Case 1: All of the SET data is in the buffer. Eat it
// immediately. Case 2: We don't have enough data in
// the buffer to complete the SET. Switch to GOBBLE
// state and start eating bytes.
if (setsize + 2 <= remaining) {
start += setsize + 2;
write(fd, "STORED\r\n", 8);
} else {
conn->state = Connection::GOBBLE;
conn->bytes_to_eat = setsize + 2 - remaining;
start = &conn->buffer[conn->buffer_idx];
break;
}
} else {
W("Failed to parse SET command: %s", start);
write(fd, "ERROR\r\n", 7);
start += length + 2;
}
} else {
D("Unknown command: %s", start);
write(fd, "ERROR\r\n", 7);
start += length + 2;
}
}
// Reset buffer_idx if we run out of buffer space or we've
// successfully parsed everything in the buffer.
if (((start == conn->buffer &&
conn->buffer_idx >= sizeof(conn->buffer) - 1)) ||
&conn->buffer[conn->buffer_idx] == start) {
conn->buffer_idx = 0;
} else {
// If there is any data left in the buffer (i.e. an
// incomplete command), move it to the front.
int shift = &conn->buffer[conn->buffer_idx] - start;
memmove(conn->buffer, start, shift);
conn->buffer_idx = shift;
}
}
}
}
}