Skip to content

Commit

Permalink
More docs, add #:silent? option to start-worker, use less generic rea…
Browse files Browse the repository at this point in the history
…dy message
  • Loading branch information
Metaxal committed Dec 1, 2023
1 parent 0b3b500 commit eb618fa
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 14 deletions.
22 changes: 20 additions & 2 deletions jobsched/scribblings/jobsched.scrbl
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,20 @@ The bindings in this section are also exported by @racketmodname[jobsched].
@defproc[(worker? [v any/c]) boolean?]{}

@defproc[(make-scheduler [make-worker-command (-> nonnegative-integer? list?)]) scheduler?]{
Returns a scheduler which will use @racket[make-worker-command] to start the workers'
Racket processes.

See also @racket[make-racket-cmd].}

@defproc[(scheduler-add-job! [sched scheduler?] [#:data data readable?] [#:cost cost number? 0]) void?]{}
@defproc[(scheduler-add-job! [sched scheduler?] [#:data data readable?] [#:cost cost number? 0])
void?]{
Adds a job to the scheduler's queue.

The @racket[data] will be sent to the worker, who will receive it on its input port and will be
accessible via @racket[job-data].

The @racket[cost] is used for ordering the job in the priority queue, which is ordered by minimum
cost.}

@defproc[(scheduler-start [sched scheduler?]
[n-workers nonnegative-integer?]
Expand All @@ -136,12 +147,19 @@ Re-exported from @racketmodname[racket/future].}
@defmodule[jobsched/worker]
The bindings in this section are also exported by @racketmodname[jobsched].

@defproc[(start-worker [run-job (-> job? any)]) void?]{
@defproc[(start-worker [run-job (-> job? any)]
[silent? (#:silent? any/c #f)]) void?]{
Starts a worker which waits for jobs.
Each time a job is received, the @racket[run-job] procedure is called.
The data of the job can be retrieved with @racket[(job-data job)].
If @racket[silent?] is not @racket[#f], all output of @racket[run-job] to its output port is
suppressed---the error port remains untouched.

See example at the top.

NOTICE: Any output @emph{before} @racket[start-worker] is called is processed by the server,
who is waiting for a ready signal from the worker. It is advised to avoid any output before
@racket[start-worker].
}

@section[#:tag "utils"]{Utilities}
Expand Down
14 changes: 10 additions & 4 deletions jobsched/server.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ watch -n 3 "cat /proc/cpuinfo | grep MHz; sensors"

(let loop ()
;; Find a worker that has output a value.
;; The workers must start by sending out `'ready`.
;; The workers must start by sending out `ready-message`.
(define wk (apply sync workers))
(define res (receive-msg (worker-in wk)))
(define now (- (current-seconds) start-seconds))
Expand All @@ -142,10 +142,10 @@ watch -n 3 "cat /proc/cpuinfo | grep MHz; sensors"
(scheduler-add-job! sched #:data (job-data jb) #:cost (job-cost jb)))
;; Remove the worker and add a new one.
(set! workers (cons (new-worker) (remove wk workers)))]
[(eq? res 'ready)
[(eq? res ready-message)
(when-verb (printf "time: ~a; WORKER READY\n" now))
(void)]
[else
[(worker-job wk)
; Processing job result
(-- n-pending)
(define jb (worker-job wk))
Expand All @@ -156,7 +156,13 @@ watch -n 3 "cat /proc/cpuinfo | grep MHz; sensors"
now (worker-index wk) (job-index jb) (job-cost jb)
(- (job-stop-ms jb) (job-start-ms jb))))
(after-stop sched jb res) ; callback
(set-worker-job! wk #f)])
(set-worker-job! wk #f)]
[else
; The job is #f, which means that a worker sends a message before `start-worker`
; is able to deal with the worker's outputs.
(when-verb
(printf "WARNING: received unprocessed message (before `start-worker`?): ~v\n"
res))])

;; Send a new job to the worker
(define jb (scheduler-extract-min! sched))
Expand Down
2 changes: 2 additions & 0 deletions jobsched/utils.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ limitations under the License.|#
(display "jobsched: ")
body ...))

(define ready-message 'JOBSCHED:READY)

;; From:
;; https://github.com/racket/racket/blob/master/pkgs/racket-benchmarks/tests/
;; racket/benchmarks/places/place-processes.rkt#L63
Expand Down
25 changes: 17 additions & 8 deletions jobsched/worker.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.|#

(require racket/contract
racket/port
"utils.rkt"
"job.rkt"
define2)
Expand All @@ -22,12 +23,13 @@ limitations under the License.|#
(contract-out
[start-worker
(->* [(procedure-arity-includes/c 1)]
(#:silent? any/c)
any)]))

;; run-job : gbs-node? -> any/c
;; The result of `start-worker` must be writeable and readable.
(define (start-worker run-job)
(send-msg 'ready) ; This is important
(define (start-worker run-job #:? [silent? #f])
(send-msg ready-message) ; This is important

(let loop ()
;; If it's more than a few ms, something's wrong (IO on the server side?)
Expand All @@ -40,12 +42,19 @@ limitations under the License.|#
;; but frankly I don't understand why yet.
(define cust (make-custodian))
(define res
(parameterize ([current-custodian cust]
;; The output port is used for communication with the server,
;; and must thus be reserved for that, so we temporarily redirect the
;; output port to the error port
[current-output-port (current-error-port)])
(run-job nd)))
(if silent?
(parameterize ([current-custodian cust]
;; The output port is used for communication with the server,
;; and must thus be reserved for that, so we temporarily redirect the
;; output port to the error port
[current-output-port (open-output-nowhere)])
(run-job nd))
(parameterize ([current-custodian cust]
;; The output port is used for communication with the server,
;; and must thus be reserved for that, so we temporarily redirect the
;; output port to the error port
[current-output-port (current-error-port)])
(run-job nd))))
(custodian-shutdown-all cust)

(send-msg res)
Expand Down
3 changes: 3 additions & 0 deletions lts-cm/domains/stp/gui.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ limitations under the License.|#
#:spec->state list->stp))

(module+ main (main))

#; ; Example:
(play-stp (list->stp '(12 5 11 1 8 0 16 3 19 4 2 15 7 14 9 20 6 18 13 23 21 10 17 24 22)))

0 comments on commit eb618fa

Please sign in to comment.