-
Notifications
You must be signed in to change notification settings - Fork 3
/
coroutine.c
355 lines (288 loc) · 7.7 KB
/
coroutine.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
/*
* Copyright (C) 2011 MORITA Kazutaka <[email protected]>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.0 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see <http://www.gnu.org/licenses/>.
*
* This code is based on coroutine-ucontext.c and qemu-coroutine.c from QEMU:
* Copyright (C) 2006 Anthony Liguori <[email protected]>
* Copyright (C) 2011 Stefan Hajnoczi <[email protected]>
* Copyright (C) 2011 Kevin Wolf <[email protected]>
*/
#include <stdlib.h>
#include <setjmp.h>
#include <stdint.h>
#include <pthread.h>
#include <ucontext.h>
#include <errno.h>
#include <assert.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include "util.h"
#include "coroutine.h"
enum co_action {
COROUTINE_YIELD = 1,
COROUTINE_TERMINATE = 2,
};
/* Maximum free pool size prevents holding too many freed coroutines */
#ifdef COROUTINE_DEBUG
#define POOL_MAX_SIZE 1
#else
#define POOL_MAX_SIZE 1024
#endif
#define STACK_MAX_SIZE (1 << 16) /* 64 KB */
struct coroutine {
coroutine_entry_func_t *entry;
void *entry_arg;
struct coroutine *caller;
struct list_head pool_next;
struct list_head co_queue_next;
};
struct co_ucontext {
struct coroutine base;
void *stack;
jmp_buf env;
};
/**
* Per-thread coroutine bookkeeping
*/
struct co_thread_state{
/** Currently executing coroutine */
struct coroutine *current;
/** Free list to speed up creation */
struct list_head pool;
unsigned int pool_size;
/** The default coroutine */
struct co_ucontext leader;
};
static pthread_key_t thread_state_key;
static enum co_action coroutine_switch(struct coroutine *from,
struct coroutine *to,
enum co_action action);
/*
* va_args to makecontext() must be type 'int', so passing
* the pointer we need may require several int args. This
* union is a quick hack to let us do that
*/
union cc_arg {
void *p;
int i[2];
};
static struct co_thread_state *coroutine_get_thread_state(void)
{
struct co_thread_state *s = pthread_getspecific(thread_state_key);
if (!s) {
s = zalloc(sizeof(*s));
if (!s)
abort();
s->current = &s->leader.base;
INIT_LIST_HEAD(&s->pool);
pthread_setspecific(thread_state_key, s);
}
return s;
}
static void coroutine_thread_cleanup(void *opaque)
{
struct co_thread_state *s = opaque;
struct coroutine *co;
struct coroutine *tmp;
list_for_each_entry_safe(co, tmp, &s->pool, pool_next) {
free(container_of(co, struct co_ucontext, base)->stack);
free(co);
}
free(s);
}
static void __attribute__((constructor)) coroutine_init(void)
{
int ret;
ret = pthread_key_create(&thread_state_key, coroutine_thread_cleanup);
if (ret != 0) {
fprintf(stderr, "unable to create leader key: %m\n");
abort();
}
}
static void coroutine_trampoline(int i0, int i1)
{
union cc_arg arg;
struct co_ucontext *self;
struct coroutine *co;
arg.i[0] = i0;
arg.i[1] = i1;
self = arg.p;
co = &self->base;
/* Initialize longjmp environment and switch back the caller */
if (!setjmp(self->env))
longjmp(*(jmp_buf *)co->entry_arg, 1);
for (;;) {
co->entry(co->entry_arg);
coroutine_switch(co, co->caller, COROUTINE_TERMINATE);
}
}
#ifdef COROUTINE_DEBUG
#define MAGIC_NUMBER 0x1234567890123456
static void init_stack(struct co_ucontext *co)
{
uint64_t *stack = co->stack;
int i;
for (i = 0; i < STACK_MAX_SIZE / sizeof(stack[0]); i++)
stack[i] = MAGIC_NUMBER;
}
static int get_stack_size(struct co_ucontext *co)
{
uint64_t *stack = co->stack;
int i;
for (i = 0; i < STACK_MAX_SIZE / sizeof(stack[0]); i++)
if (stack[i] != MAGIC_NUMBER)
break;
if (i == 0) {
fprintf(stderr, "stack overflow\n");
fflush(stderr);
abort();
}
return STACK_MAX_SIZE - i * sizeof(stack[0]);
}
#endif
static struct coroutine *__coroutine_new(void)
{
const size_t stack_size = STACK_MAX_SIZE;
struct co_ucontext *co;
ucontext_t old_uc, uc;
jmp_buf old_env;
union cc_arg arg = {0};
/* The ucontext functions preserve signal masks which incurs a
* system call overhead. setjmp()/longjmp() does not preserve
* signal masks but only works on the current stack. Since we
* need a way to create and switch to a new stack, use the
* ucontext functions for that but setjmp()/longjmp() for
* everything else.
*/
if (getcontext(&uc) == -1)
abort();
co = zalloc(sizeof(*co));
if (!co)
abort();
co->stack = zalloc(stack_size);
if (!co->stack)
abort();
#ifdef COROUTINE_DEBUG
init_stack(co);
#endif
co->base.entry_arg = &old_env; /* stash away our jmp_buf */
uc.uc_link = &old_uc;
uc.uc_stack.ss_sp = co->stack;
uc.uc_stack.ss_size = stack_size;
uc.uc_stack.ss_flags = 0;
arg.p = co;
makecontext(&uc, (void (*)(void))coroutine_trampoline,
2, arg.i[0], arg.i[1]);
/* swapcontext() in, longjmp() back out */
if (!setjmp(old_env))
swapcontext(&old_uc, &uc);
return &co->base;
}
static struct coroutine *coroutine_new(void)
{
struct co_thread_state *s = coroutine_get_thread_state();
struct coroutine *co;
if (!list_empty(&s->pool)) {
co = list_first_entry(&s->pool, struct coroutine, pool_next);
list_del(&co->pool_next);
s->pool_size--;
} else
co = __coroutine_new();
return co;
}
static void coroutine_delete(struct coroutine *co_)
{
struct co_thread_state *s = coroutine_get_thread_state();
struct co_ucontext *co = container_of(co_, struct co_ucontext, base);
#ifdef COROUTINE_DEBUG
fprintf(stdout, "%d bytes are consumed\n", get_stack_size(co));
#endif
if (s->pool_size < POOL_MAX_SIZE) {
list_add(&co->base.pool_next, &s->pool);
co->base.caller = NULL;
s->pool_size++;
return;
}
free(co->stack);
free(co);
}
static enum co_action coroutine_switch(struct coroutine *from_,
struct coroutine *to_,
enum co_action action)
{
struct co_ucontext *from = container_of(from_, struct co_ucontext, base);
struct co_ucontext *to = container_of(to_, struct co_ucontext, base);
struct co_thread_state *s = coroutine_get_thread_state();
int ret;
s->current = to_;
ret = setjmp(from->env);
if (ret == 0)
longjmp(to->env, action);
return ret;
}
struct coroutine *coroutine_self(void)
{
struct co_thread_state *s = coroutine_get_thread_state();
return s->current;
}
int in_coroutine(void)
{
struct co_thread_state *s = pthread_getspecific(thread_state_key);
return s && s->current->caller;
}
struct coroutine *coroutine_create(coroutine_entry_func_t *entry)
{
struct coroutine *co = coroutine_new();
co->entry = entry;
return co;
}
static void coroutine_swap(struct coroutine *from, struct coroutine *to)
{
enum co_action ret;
ret = coroutine_switch(from, to, COROUTINE_YIELD);
switch (ret) {
case COROUTINE_YIELD:
return;
case COROUTINE_TERMINATE:
coroutine_delete(to);
return;
default:
abort();
}
}
void coroutine_enter(struct coroutine *co, void *opaque)
{
struct coroutine *self = coroutine_self();
if (unlikely(co->caller)) {
fprintf(stderr, "Co-routine re-entered recursively\n");
abort();
}
co->caller = self;
co->entry_arg = opaque;
coroutine_swap(self, co);
}
void coroutine_yield(void)
{
struct coroutine *self = coroutine_self();
struct coroutine *to = self->caller;
if (unlikely(!to)) {
fprintf(stderr, "Co-routine is yielding to no one\n");
abort();
}
self->caller = NULL;
coroutine_swap(self, to);
}