Skip to content

Commit

Permalink
Add ares/spawn-reusable-thread to context and down the stack
Browse files Browse the repository at this point in the history
This is done to workaround fibers issue:
wingo/fibers#105

Fixes: https://todo.sr.ht/~abcdw/tickets/7
  • Loading branch information
abcdw committed May 1, 2024
1 parent 902c3e6 commit ef815ea
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 10 deletions.
17 changes: 16 additions & 1 deletion src/nrepl/bootstrap.scm
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
;;; along with guile-ares-rs. If not, see <http://www.gnu.org/licenses/>.

(define-module (nrepl bootstrap)
#:use-module (ares reusable-thread)
#:use-module (fibers io-wakeup)
#:use-module (fibers operations)
#:use-module (ice-9 atomic)
Expand Down Expand Up @@ -53,7 +54,21 @@
(define (initial-context initial-extensions)
(let ((state (make-atomic-box '()))
(handler (make-atomic-box (make-handler initial-extensions))))
`((nrepl/state . ,state)
;; Threads Manager thread is created outside of fibers, so all the
;; threads created using threads-manager are not affected by
;; https://github.com/wingo/fibers/issues/105
(define threads-manager (make-reusable-thread))
(define (spawn-reusable-thread ch)
(reusable-thread-discard-and-run
threads-manager
(lambda ()
(make-reusable-thread ch)))
(assoc-ref
(reusable-thread-get-value threads-manager)
'value))

`((ares/spawn-reusable-thread . ,spawn-reusable-thread)
(nrepl/state . ,state)
(nrepl/handler . ,handler))))

(define (make-initial-context)
Expand Down
19 changes: 12 additions & 7 deletions src/nrepl/extensions/evaluation.scm
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
;;; guile-ares-rs --- Asynchronous Reliable Extensible Sleek RPC Server
;;;
;;; Copyright © 2023 Andrew Tropin <[email protected]>
;;; Copyright © 2023, 2024 Andrew Tropin <[email protected]>
;;;
;;; This file is part of guile-ares-rs.
;;;
Expand Down Expand Up @@ -28,16 +28,18 @@
#:use-module (srfi srfi-197)
#:export (evaluation-extension))

(define (make-evaluation-supervisor session)
(define (make-evaluation-supervisor session spawn-reusable-thread)
(let ((control-channel (make-channel)))
(spawn-fiber (evaluation-supervisor-thunk
control-channel
#:spawn-reusable-thread spawn-reusable-thread
#:shutdown-condition (assoc-ref session 'shutdown-condition)))
control-channel))

(define (create-evaluation-supervisor! session-atom)
(define (create-evaluation-supervisor! session-atom spawn-reusable-thread)
(let* ((tmp-evaluation-supervisor
(make-evaluation-supervisor (atomic-box-ref session-atom)))
(make-evaluation-supervisor (atomic-box-ref session-atom)
spawn-reusable-thread))
(add-evaluation-supervisor
(lambda (session)
(let ((evaluation-supervisor
Expand All @@ -54,22 +56,25 @@
(evaluation-supervisor-shutdown tmp-evaluation-supervisor))
new-evaluation-supervisor))

(define (get-or-create-evaluation-supervisor! session-atom)
(define (get-or-create-evaluation-supervisor!
session-atom spawn-reusable-thread)
(let ((evaluation-supervisor (assoc-ref (atomic-box-ref session-atom)
'evaluation-supervisor)))
(if evaluation-supervisor
evaluation-supervisor
(create-evaluation-supervisor! session-atom))))
(create-evaluation-supervisor! session-atom spawn-reusable-thread))))

(define (process-message context)
(let* ((state (assoc-ref context 'nrepl/state))
(message (assoc-ref context 'nrepl/message))
(spawn-reusable-thread (assoc-ref context 'ares/spawn-reusable-thread))
(reply (assoc-ref context 'reply))
(session-id (assoc-ref message "session"))
(session-atom (get-session state session-id)))
(if session-id
(evaluation-supervisor-process-nrepl-message
(get-or-create-evaluation-supervisor! session-atom)
(get-or-create-evaluation-supervisor!
session-atom spawn-reusable-thread)
message reply)
(reply `(("status" . #("error" "no-session-id-provided" "done")))))))

Expand Down
5 changes: 4 additions & 1 deletion src/nrepl/server/evaluation.scm
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ Stream managers waits until THUNK-FINISHED is signalled."
(define* (evaluation-thread-manager-thunk
command-channel
#:key
(spawn-reusable-thread make-reusable-thread)
;; TODO: [Andrew Tropin, 2023-10-16] Implement shutdown
(shutdown-condition (make-condition))
(terminate-condition (make-condition)))
Expand Down Expand Up @@ -302,7 +303,7 @@ COMMAND-CHANNEL."
(input-port (open-channel-input-port
input-request-channel
stdin-channel))
(evaluation-rethread (make-reusable-thread result-channel)))
(evaluation-rethread (spawn-reusable-thread result-channel)))
(let loop ((reply-channel #f)
(evaluation-finished #f)
(output-finished-condition #f))
Expand Down Expand Up @@ -399,6 +400,7 @@ COMMAND-CHANNEL."
(define* (evaluation-supervisor-thunk
control-channel
#:key
(spawn-reusable-thread make-reusable-thread)
;; shutdown is graceful operation
(shutdown-condition (make-condition))
(finished-condition (make-condition)))
Expand Down Expand Up @@ -480,6 +482,7 @@ arrival or when evaluation is finished, #t and rest of the queue."
(lambda ()
(spawn-fiber (evaluation-thread-manager-thunk
evaluation-thread-command-channel
#:spawn-reusable-thread spawn-reusable-thread
#:shutdown-condition evaluation-thread-shutdown-condition))
(let loop ((get-next-command-operation receive-command-operation)
(evaluation-id #f)
Expand Down
1 change: 0 additions & 1 deletion tests/integration-test.scm
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
(test-group "nonblocking socket"
;; https://todo.sr.ht/~abcdw/tickets/7
;; Should return connection refused (system-error)
(test-expect-fail 1)
(test-equal "create socket"
"system-error"
(assoc-ref (receive-message) "value")))
Expand Down

0 comments on commit ef815ea

Please sign in to comment.