From ef815ea33c1d8181bf4abae519f838c6a3a2f299 Mon Sep 17 00:00:00 2001 From: Andrew Tropin Date: Wed, 1 May 2024 10:48:03 +0300 Subject: [PATCH] Add ares/spawn-reusable-thread to context and down the stack This is done to workaround fibers issue: https://github.com/wingo/fibers/issues/105 Fixes: https://todo.sr.ht/~abcdw/tickets/7 --- src/nrepl/bootstrap.scm | 17 ++++++++++++++++- src/nrepl/extensions/evaluation.scm | 19 ++++++++++++------- src/nrepl/server/evaluation.scm | 5 ++++- tests/integration-test.scm | 1 - 4 files changed, 32 insertions(+), 10 deletions(-) diff --git a/src/nrepl/bootstrap.scm b/src/nrepl/bootstrap.scm index 917c2b6..d2747bf 100644 --- a/src/nrepl/bootstrap.scm +++ b/src/nrepl/bootstrap.scm @@ -18,6 +18,7 @@ ;;; along with guile-ares-rs. If not, see . (define-module (nrepl bootstrap) + #:use-module (ares reusable-thread) #:use-module (fibers io-wakeup) #:use-module (fibers operations) #:use-module (ice-9 atomic) @@ -53,7 +54,21 @@ (define (initial-context initial-extensions) (let ((state (make-atomic-box '())) (handler (make-atomic-box (make-handler initial-extensions)))) - `((nrepl/state . ,state) + ;; Threads Manager thread is created outside of fibers, so all the + ;; threads created using threads-manager are not affected by + ;; https://github.com/wingo/fibers/issues/105 + (define threads-manager (make-reusable-thread)) + (define (spawn-reusable-thread ch) + (reusable-thread-discard-and-run + threads-manager + (lambda () + (make-reusable-thread ch))) + (assoc-ref + (reusable-thread-get-value threads-manager) + 'value)) + + `((ares/spawn-reusable-thread . ,spawn-reusable-thread) + (nrepl/state . ,state) (nrepl/handler . ,handler)))) (define (make-initial-context) diff --git a/src/nrepl/extensions/evaluation.scm b/src/nrepl/extensions/evaluation.scm index 3c77c06..72ba0fc 100644 --- a/src/nrepl/extensions/evaluation.scm +++ b/src/nrepl/extensions/evaluation.scm @@ -1,6 +1,6 @@ ;;; guile-ares-rs --- Asynchronous Reliable Extensible Sleek RPC Server ;;; -;;; Copyright © 2023 Andrew Tropin +;;; Copyright © 2023, 2024 Andrew Tropin ;;; ;;; This file is part of guile-ares-rs. ;;; @@ -28,16 +28,18 @@ #:use-module (srfi srfi-197) #:export (evaluation-extension)) -(define (make-evaluation-supervisor session) +(define (make-evaluation-supervisor session spawn-reusable-thread) (let ((control-channel (make-channel))) (spawn-fiber (evaluation-supervisor-thunk control-channel + #:spawn-reusable-thread spawn-reusable-thread #:shutdown-condition (assoc-ref session 'shutdown-condition))) control-channel)) -(define (create-evaluation-supervisor! session-atom) +(define (create-evaluation-supervisor! session-atom spawn-reusable-thread) (let* ((tmp-evaluation-supervisor - (make-evaluation-supervisor (atomic-box-ref session-atom))) + (make-evaluation-supervisor (atomic-box-ref session-atom) + spawn-reusable-thread)) (add-evaluation-supervisor (lambda (session) (let ((evaluation-supervisor @@ -54,22 +56,25 @@ (evaluation-supervisor-shutdown tmp-evaluation-supervisor)) new-evaluation-supervisor)) -(define (get-or-create-evaluation-supervisor! session-atom) +(define (get-or-create-evaluation-supervisor! + session-atom spawn-reusable-thread) (let ((evaluation-supervisor (assoc-ref (atomic-box-ref session-atom) 'evaluation-supervisor))) (if evaluation-supervisor evaluation-supervisor - (create-evaluation-supervisor! session-atom)))) + (create-evaluation-supervisor! session-atom spawn-reusable-thread)))) (define (process-message context) (let* ((state (assoc-ref context 'nrepl/state)) (message (assoc-ref context 'nrepl/message)) + (spawn-reusable-thread (assoc-ref context 'ares/spawn-reusable-thread)) (reply (assoc-ref context 'reply)) (session-id (assoc-ref message "session")) (session-atom (get-session state session-id))) (if session-id (evaluation-supervisor-process-nrepl-message - (get-or-create-evaluation-supervisor! session-atom) + (get-or-create-evaluation-supervisor! + session-atom spawn-reusable-thread) message reply) (reply `(("status" . #("error" "no-session-id-provided" "done"))))))) diff --git a/src/nrepl/server/evaluation.scm b/src/nrepl/server/evaluation.scm index 399c630..101b8f7 100644 --- a/src/nrepl/server/evaluation.scm +++ b/src/nrepl/server/evaluation.scm @@ -265,6 +265,7 @@ Stream managers waits until THUNK-FINISHED is signalled." (define* (evaluation-thread-manager-thunk command-channel #:key + (spawn-reusable-thread make-reusable-thread) ;; TODO: [Andrew Tropin, 2023-10-16] Implement shutdown (shutdown-condition (make-condition)) (terminate-condition (make-condition))) @@ -302,7 +303,7 @@ COMMAND-CHANNEL." (input-port (open-channel-input-port input-request-channel stdin-channel)) - (evaluation-rethread (make-reusable-thread result-channel))) + (evaluation-rethread (spawn-reusable-thread result-channel))) (let loop ((reply-channel #f) (evaluation-finished #f) (output-finished-condition #f)) @@ -399,6 +400,7 @@ COMMAND-CHANNEL." (define* (evaluation-supervisor-thunk control-channel #:key + (spawn-reusable-thread make-reusable-thread) ;; shutdown is graceful operation (shutdown-condition (make-condition)) (finished-condition (make-condition))) @@ -480,6 +482,7 @@ arrival or when evaluation is finished, #t and rest of the queue." (lambda () (spawn-fiber (evaluation-thread-manager-thunk evaluation-thread-command-channel + #:spawn-reusable-thread spawn-reusable-thread #:shutdown-condition evaluation-thread-shutdown-condition)) (let loop ((get-next-command-operation receive-command-operation) (evaluation-id #f) diff --git a/tests/integration-test.scm b/tests/integration-test.scm index 0e68605..111732e 100644 --- a/tests/integration-test.scm +++ b/tests/integration-test.scm @@ -82,7 +82,6 @@ (test-group "nonblocking socket" ;; https://todo.sr.ht/~abcdw/tickets/7 ;; Should return connection refused (system-error) - (test-expect-fail 1) (test-equal "create socket" "system-error" (assoc-ref (receive-message) "value")))