Skip to content

Commit

Permalink
refactor request queue mechanics
Browse files Browse the repository at this point in the history
This is a prelude to #159 which introduces upgrade requests, with a few
major changes in `Server_connection`.

The goals here is to try to make queue management easier to reason about
by folding bits of logic from `advance_request_queue_if_necessary` into
`next_read_operation` and `next_write_operation` such that we only
perform side-effects when the operation in question demands it.

One of the ways I tried to make this easier to reason about was to make
the `next_<read|write>_operation` functions very parallel. Getting the
read operation starts out with a short-circuit for shutting down when
the server can no longer make progress (reader is closed and queue is
empty). This doesn't feel like it belongs here. Perhaps this check
should be part of `advance_request_queue` with some extra logic
triggering in `shutdown_reader`? After that, the next-operation
functions use some very simple probing of the input/output state of
`Reqd` to determine what to do next. Only in the case of `Complete` do
we move into a separate function (to make it easier to read):
`_final_<read|write>_operation`.

In these functions, we decide if we should shutdown the respective
reader/writer or consider the `reqd` complete and move it off the queue.
What's happening is that we don't know if the write action or read
action will be last, so each function checks the state of the other to
see if they're both complete. When we do shift it off, we recursively
ask for the next operation given the new queue state.

In the case of the writer triggering the advancing, before we return the
result, we wakeup the reader so that it can evaluate the next operation
given the new queue state.

Note that in the case of a non-persistent connection, the queue is never
advanced and the connection is shut down when both sides are done.

Though on the surface, these pieces feel fairly straightforward, there
are still a slew of re-entrancy bugs to consider. I think there are two
things that we can do to make this drastically easier to manage:

1. We call `t.request_handler` in two places, and this is mostly because
   we want to keep the invariant that the head of the request queue has
   already been passed off to the handler. I feel like splitting this up
   into a simple queue of unhandled requests and a [Reqd.t option] that
   represents the current request would be easier to manage.

2. It would be nice to schedule calls. Things like waking up the writer
   before you let the read loop know its next operation just immediately
   makes my mind fall apart and lose track of state. There's a fairly
   obvious solution of asking for a `schedule : (unit -> unit) -> unit`
   function from the runtime that promises to not call the thunk
   synchronously, but rather waits until it is outside of the read and
   write loops. But maybe we can solve it using what we have now, like
   establishing a contract that when the reader/writer is woken up, they
   must schedule their work for a fresh call stack and not immediately
   ask for operations.
  • Loading branch information
dpatti committed Apr 3, 2021
1 parent e8e8f89 commit c6a3750
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 55 deletions.
9 changes: 0 additions & 9 deletions lib/reqd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,6 @@ let output_state t : Output_state.t =
| Waiting -> Waiting
;;

let is_complete t =
match input_state t with
| Ready -> false
| Complete ->
(match output_state t with
| Waiting | Ready -> false
| Complete -> true)
;;

let flush_request_body t =
let request_body = request_body t in
if Body.has_pending_output request_body
Expand Down
110 changes: 64 additions & 46 deletions lib/server_connection.ml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ let current_reqd_exn t =

let yield_reader t k =
if is_closed t
then failwith "on_wakeup_reader on closed conn"
then failwith "yield_reader on closed conn"
else if Optional_thunk.is_some t.wakeup_reader
then failwith "yield_reader: only one callback can be registered at a time"
else t.wakeup_reader <- Optional_thunk.some k
Expand Down Expand Up @@ -155,6 +155,7 @@ let error_code t =
else None

let shutdown t =
Queue.clear t.request_queue;
shutdown_reader t;
shutdown_writer t;
wakeup_reader t;
Expand Down Expand Up @@ -182,53 +183,44 @@ let set_error_and_handle ?request t error =
let report_exn t exn =
set_error_and_handle t (`Exn exn)

let advance_request_queue_if_necessary t =
if is_active t then begin
let reqd = current_reqd_exn t in
if Reqd.persistent_connection reqd then begin
if Reqd.is_complete reqd then begin
ignore (Queue.take t.request_queue);
if not (Queue.is_empty t.request_queue)
then t.request_handler (current_reqd_exn t);
wakeup_reader t;
end
end else begin
(* Take the head of the queue, close the remaining request bodies, clear
* the queue, and push the head back on. We do not plan on processing any
* more requests after the current one. *)
ignore (Queue.take t.request_queue);
Queue.iter Reqd.close_request_body t.request_queue;
Queue.clear t.request_queue;
Queue.push reqd t.request_queue;
if Reqd.is_complete reqd
then shutdown t
else
match Reqd.input_state reqd with
| Ready -> ()
| Complete -> shutdown_reader t
end
end else if Reader.is_closed t.reader
then shutdown t

let _next_read_operation t =
advance_request_queue_if_necessary t;
if is_active t
then (
let advance_request_queue t =
ignore (Queue.take t.request_queue);
if not (Queue.is_empty t.request_queue)
then t.request_handler (Queue.peek_exn t.request_queue);
;;

let rec _next_read_operation t =
if not (is_active t) then (
if Reader.is_closed t.reader
then shutdown t;
Reader.next t.reader
) else (
let reqd = current_reqd_exn t in
match Reqd.input_state reqd with
| Ready -> Reader.next t.reader
| Complete ->
if Reqd.persistent_connection reqd
then `Yield
else (
shutdown_reader t;
Reader.next t.reader)
| Complete -> _final_read_operation_for t reqd
)
else Reader.next t.reader

and _final_read_operation_for t reqd =
let next =
if not (Reqd.persistent_connection reqd) then (
shutdown_reader t;
Reader.next t.reader;
) else (
match Reqd.output_state reqd with
| Waiting | Ready -> `Yield
| Complete ->
advance_request_queue t;
_next_read_operation t;
)
in
wakeup_writer t;
next
;;

let next_read_operation t =
match _next_read_operation t with
(* XXX(dpatti): These two [`Error _] constructors are never returned *)
| `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close
| `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close
| (`Read | `Yield | `Close) as operation -> operation
Expand Down Expand Up @@ -259,13 +251,39 @@ let read t bs ~off ~len =
let read_eof t bs ~off ~len =
read_with_more t bs ~off ~len Complete

let next_write_operation t =
advance_request_queue_if_necessary t;
if is_active t
then (
let rec _next_write_operation t =
if not (is_active t) then (
if Reader.is_closed t.reader
then shutdown t;
Writer.next t.writer
) else (
let reqd = current_reqd_exn t in
Reqd.flush_response_body reqd);
Writer.next t.writer
match Reqd.output_state reqd with
| Waiting -> `Yield
| Ready ->
Reqd.flush_response_body reqd;
Writer.next t.writer
| Complete -> _final_write_operation_for t reqd
)

and _final_write_operation_for t reqd =
let next =
if not (Reqd.persistent_connection reqd) then (
shutdown_writer t;
Writer.next t.writer;
) else (
match Reqd.input_state reqd with
| Ready -> assert false
| Complete ->
advance_request_queue t;
_next_write_operation t;
)
in
wakeup_reader t;
next
;;

let next_write_operation t = _next_write_operation t

let report_write_result t result =
Writer.report_result t.writer result

0 comments on commit c6a3750

Please sign in to comment.