forked from franzinc/aserve
-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.cl
79 lines (64 loc) · 2.56 KB
/
queue.cl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
;;
;; See the file LICENSE for the full license governing this code.
(in-package :net.aserve)
(eval-when (compile) (declaim (optimize (speed 3))))
#+(version= 8 2)
(eval-when (:compile-toplevel :load-toplevel :execute)
(handler-case (dequeue (make-instance 'mp:queue) :timeout 0 :empty-queue-result :foo)
(error ()
(pushnew 'queue-does-not-timeout *features*))))
#-(and (version>= 8 2) (not net.aserve::queue-does-not-timeout))
(progn
(defclass queue-with-timeout ()
((items :initform nil :accessor items-of)
(gate :initform (mp:make-gate nil) :reader gate-of)
(dequeue-lock :initform (mp:make-process-lock) :reader dequeue-lock-of)))
(defun make-queue-with-timeout ()
(make-instance 'queue-with-timeout))
(defun enqueue (queue thing)
(mp:with-process-lock ((dequeue-lock-of queue))
(push thing (items-of queue))
(mp:open-gate (gate-of queue)))
thing)
(defun dequeue (queue &key (wait t))
(flet ((dequeue-without-waiting ()
(mp:with-process-lock ((dequeue-lock-of queue))
(unless (null (items-of queue))
(return-from dequeue
(multiple-value-prog1 (values (pop (items-of queue)) t)
(if* (null (items-of queue))
then (mp:close-gate (gate-of queue)))))))))
(if* wait
then (let* ((timeout (and (numberp wait) wait))
(started-at (get-internal-real-time))
(wait-until (if* timeout
then (+ started-at (* timeout internal-time-units-per-second))))
(timeout-remaining timeout))
(dequeue-without-waiting)
(while (or (null timeout-remaining)
(> timeout-remaining 0.08))
(if* timeout
then (mp:process-wait-with-timeout "Waiting for gate on potentially timeoutable queue"
timeout-remaining
#'mp:gate-open-p
(gate-of queue))
(setf timeout-remaining (max 0 (/ (- wait-until (get-internal-real-time)) internal-time-units-per-second)))
else (mp:process-wait "Waiting for gate on potentially timeoutable queue"
#'mp:gate-open-p
(gate-of queue)))
(dequeue-without-waiting))
(values nil nil))
else (dequeue-without-waiting)))))
#+(and (version>= 8 2) (not net.aserve::queue-does-not-timeout))
(progn
(defun make-queue-with-timeout ()
(make-instance 'mp:queue))
(defun enqueue (queue thing)
(mp:enqueue queue thing))
(defun dequeue (queue &key (wait t))
(let* ((failure '#:failure)
(result (mp:dequeue queue :wait wait :empty-queue-result failure
:whostate "Waiting on potentially timeoutable queue")))
(if* (eql result failure)
then (values nil nil)
else (values result t)))))