diff --git a/README.md b/README.md index aa36ca7..feccd48 100644 --- a/README.md +++ b/README.md @@ -111,10 +111,10 @@ simple. It has just a couple of methods: after the client was last connected (in ms). * `{response_limit, integer()}` - the maximum size of a single http streaming response (in bytes). - * `{hib_timeout, integer() | hibernate}` - hibernate websocket - process after hib_timeout milliseconds of inactivity (5000 by - default) to reduce memory footprint. Set to 'hibernate' atom to - hibernate always (may be inefficient). (implementation is + * `{hib_timeout, integer() | hibernate | infinity}` - hibernate + websocket process after hib_timeout milliseconds of inactivity + (5000 by default) to reduce memory footprint. Set to 'hibernate' + atom to hibernate always (may be inefficient). (implementation is incomplete, see #15) * `{logger, fun/3}` - a function called on every request, used to print request to the logs (or on the screen by default). diff --git a/src/sockjs_action.erl b/src/sockjs_action.erl index 3f13beb..ceb4457 100644 --- a/src/sockjs_action.erl +++ b/src/sockjs_action.erl @@ -189,27 +189,8 @@ chunk_start(Req, Headers, ContentType) -> reply_loop(Req, SessionId, ResponseLimit, Fmt, Service) -> Req0 = sockjs_http:hook_tcp_close(Req), case sockjs_session:reply(SessionId) of - wait -> receive - %% In Cowboy we need to capture async - %% messages from the tcp connection - - %% ie: {active, once}. - {tcp_closed, _} -> - Req0; - %% In Cowboy we may in theory get real - %% http requests, this is bad. - {tcp, _S, Data} -> - error_logger:error_msg( - "Received unexpected data on a " - "long-polling http connection: ~p. " - "Connection aborted.~n", - [Data]), - Req1 = sockjs_http:abruptly_kill(Req), - Req1; - go -> - Req1 = sockjs_http:unhook_tcp_close(Req0), - reply_loop(Req1, SessionId, ResponseLimit, - Fmt, Service) - end; + {wait, hibernate} -> catch erlang:hibernate(?MODULE, reply_loop_wait, [Req0, SessionId, ResponseLimit, Fmt, Service, infinity]); + {wait, Timeout} -> reply_loop_wait(Req0, SessionId, ResponseLimit, Fmt, Service, Timeout); session_in_use -> Frame = sockjs_util:encode_frame({close, ?STILL_OPEN}), chunk_end(Req0, Frame, Fmt); {close, Frame} -> Frame1 = sockjs_util:encode_frame(Frame), @@ -222,6 +203,28 @@ reply_loop(Req, SessionId, ResponseLimit, Fmt, Service) -> Fmt, Service) end. +reply_loop_wait(Req0, SessionId, ResponseLimit, Fmt, Service, Timeout) -> + receive + %% In Cowboy we need to capture async + %% messages from the tcp connection - + %% ie: {active, once}. + {tcp_closed, _} -> Req0; + %% In Cowboy we may in theory get real + %% http requests, this is bad. + {tcp, _S, Data} -> + error_logger:error_msg( + "Received unexpected data on a " + "long-polling http connection: ~p. " + "Connection aborted.~n", + [Data]), + sockjs_http:abruptly_kill(Req0); + go -> + Req1 = sockjs_http:unhook_tcp_close(Req0), + reply_loop(Req1, SessionId, ResponseLimit, + Fmt, Service) + after + Timeout -> catch erlang:hibernate(?MODULE, reply_loop_wait, [Req0, SessionId, ResponseLimit, Fmt, Service, infinity]) + end. reply_loop0(Req, _SessionId, ResponseLimit, _Fmt, _Service) when ResponseLimit =< 0 -> chunk_end(Req); reply_loop0(Req, SessionId, ResponseLimit, Fmt, Service) -> diff --git a/src/sockjs_cowboy_handler.erl b/src/sockjs_cowboy_handler.erl index 630cdc8..d64d716 100644 --- a/src/sockjs_cowboy_handler.erl +++ b/src/sockjs_cowboy_handler.erl @@ -31,7 +31,7 @@ terminate(_Req, _Service) -> %% -------------------------------------------------------------------------- websocket_init(_TransportName, Req, - Service = #service{logger = Logger, hib_timeout = HibTimeout}) -> + Service = #service{logger = Logger}) -> Req0 = Logger(Service, {cowboy, Req}, websocket), {Info, Req1} = sockjs_handler:extract_info(Req0), @@ -43,43 +43,32 @@ websocket_init(_TransportName, Req, {WS, Req2} end, self() ! go, - mh({ok, Req3, {RawWebsocket, SessionPid, {undefined, HibTimeout}}}). + {ok, Req3, {RawWebsocket, SessionPid}}. -websocket_handle({text, Data}, Req, {RawWebsocket, SessionPid, _HT} = S) -> +websocket_handle({text, Data}, Req, {RawWebsocket, SessionPid} = S) -> case sockjs_ws_handler:received(RawWebsocket, SessionPid, Data) of - ok -> mh({ok, Req, S}); - shutdown -> {shutdown, Req, S} + {ok, hibernate} -> {ok, Req, S, hibernate}; + {ok, _Timeout} -> {ok, Req, S}; + shutdown -> {shutdown, Req, S} end; websocket_handle(_Unknown, Req, S) -> {shutdown, Req, S}. -websocket_info(go, Req, {RawWebsocket, SessionPid, _HT} = S) -> +websocket_info(go, Req, {RawWebsocket, SessionPid} = S) -> case sockjs_ws_handler:reply(RawWebsocket, SessionPid) of - wait -> mh({ok, Req, S}); - {ok, Data} -> self() ! go, - {reply, {text, Data}, Req, S}; - {close, <<>>} -> {shutdown, Req, S}; - {close, Data} -> self() ! shutdown, - {reply, {text, Data}, Req, S} + {wait, hibernate} -> {ok, Req, S, hibernate}; + {wait, _Timeout} -> {ok, Req, S}; + {ok, Data} -> self() ! go, + {reply, {text, Data}, Req, S}; + {close, <<>>} -> {shutdown, Req, S}; + {close, Data} -> self() ! shutdown, + {reply, {text, Data}, Req, S} end; websocket_info(shutdown, Req, S) -> {shutdown, Req, S}; websocket_info(hibernate_triggered, Req, S) -> {ok, Req, S, hibernate}. -websocket_terminate(_Reason, _Req, {RawWebsocket, SessionPid, _HT}) -> +websocket_terminate(_Reason, _Req, {RawWebsocket, SessionPid}) -> sockjs_ws_handler:close(RawWebsocket, SessionPid), ok. - -%% -------------------------------------------------------------------------- - -mh({ok, Req, {RawWebsocket, SessionPid, {TRef, hibernate}}}) -> - {ok, Req, {RawWebsocket, SessionPid, {TRef, hibernate}}, hibernate}; - -mh({ok, Req, {RawWebsocket, SessionPid, {TRef, HibTimeout}}}) -> - case TRef of - undefined -> ok; - _ -> sockjs_util:cancel_send_after(TRef, hibernate_triggered) - end, - TRef2 = erlang:send_after(HibTimeout, self(), hibernate_triggered), - {ok, Req, {RawWebsocket, SessionPid, {TRef2, HibTimeout}}}. diff --git a/src/sockjs_internal.hrl b/src/sockjs_internal.hrl index 6687400..87d9f08 100644 --- a/src/sockjs_internal.hrl +++ b/src/sockjs_internal.hrl @@ -15,7 +15,7 @@ disconnect_delay :: non_neg_integer(), heartbeat_delay :: non_neg_integer(), response_limit :: non_neg_integer(), - hib_timeout :: non_neg_integer() | hibernate, + hib_timeout :: non_neg_integer() | hibernate | infinity, logger :: logger() }). diff --git a/src/sockjs_session.erl b/src/sockjs_session.erl index 114f072..1827a7c 100644 --- a/src/sockjs_session.erl +++ b/src/sockjs_session.erl @@ -20,6 +20,8 @@ disconnect_delay = 5000 :: non_neg_integer(), heartbeat_tref :: reference() | triggered, heartbeat_delay = 25000 :: non_neg_integer(), + hibernate_tref :: reference(), + hibernate_delay :: non_neg_integer() | hibernate | infinity, ready_state = connecting :: connecting | open | closed, close_msg :: {non_neg_integer(), string()}, callback, @@ -56,8 +58,8 @@ maybe_create(SessionId, Service, Info) -> -spec received(list(iodata()), session_or_pid()) -> ok. received(Messages, SessionPid) when is_pid(SessionPid) -> case gen_server:call(SessionPid, {received, Messages}, infinity) of - ok -> ok; - error -> throw(no_session) + {ok, Timeout} -> {ok, Timeout}; + error -> throw(no_session) %% TODO: should we respond 404 when session is closed? end; received(Messages, SessionId) -> @@ -166,12 +168,27 @@ emit(What, State = #session{callback = Callback, ok -> State end. +mh(#session{hibernate_delay = hibernate} = State) -> {State, hibernate}; + +mh(#session{hibernate_delay = infinity} = State) -> {State, infinity}; + +mh(#session{hibernate_delay = HibTimeout, heartbeat_delay = HeartbeatDelay} = State) when HibTimeout >= HeartbeatDelay -> {State, infinity}; + +mh(#session{hibernate_delay = HibTimeout, hibernate_tref = TRef} = State) -> + case TRef of + undefined -> ok; + _ -> sockjs_util:cancel_send_after(TRef, hibernate_triggered) + end, + TRef2 = erlang:send_after(HibTimeout, self(), hibernate_triggered), + {State#session{hibernate_tref = TRef2}, infinity}. + %% -------------------------------------------------------------------------- --spec init({session_or_undefined(), service(), info()}) -> {ok, #session{}}. +-spec init({session_or_undefined(), service(), info()}) -> {ok, #session{}, infinity | hibernate}. init({SessionId, #service{callback = Callback, state = UserState, disconnect_delay = DisconnectDelay, + hib_timeout = HibTimeout, heartbeat_delay = HeartbeatDelay}, Info}) -> case SessionId of undefined -> ok; @@ -179,7 +196,7 @@ init({SessionId, #service{callback = Callback, end, process_flag(trap_exit, true), TRef = erlang:send_after(DisconnectDelay, self(), session_timeout), - {ok, #session{id = SessionId, + State = #session{id = SessionId, callback = Callback, state = UserState, response_pid = undefined, @@ -187,7 +204,12 @@ init({SessionId, #service{callback = Callback, disconnect_delay = DisconnectDelay, heartbeat_tref = undefined, heartbeat_delay = HeartbeatDelay, - handle = {?MODULE, {self(), Info}}}}. + hibernate_tref = undefined, + hibernate_delay = HibTimeout, + handle = {?MODULE, {self(), Info}}}, + {State2, Timeout} = mh(State), + {ok, State2, Timeout}. + handle_call({reply, Pid, _Multiple}, _From, State = #session{ @@ -212,10 +234,10 @@ handle_call({reply, Pid, _Multiple}, _From, State = #session{ {reply, session_in_use, State}; handle_call({reply, Pid, Multiple}, _From, State = #session{ - ready_state = open, - response_pid = RPid, - heartbeat_tref = HeartbeatTRef, - outbound_queue = Q}) + ready_state = open, + response_pid = RPid, + heartbeat_tref = HeartbeatTRef, + outbound_queue = Q}) when RPid == undefined orelse RPid == Pid -> {Messages, Q1} = case Multiple of true -> {queue:to_list(Q), queue:new()}; @@ -228,7 +250,8 @@ handle_call({reply, Pid, Multiple}, _From, State = #session{ {[], triggered} -> State1 = unmark_waiting(Pid, State), {reply, {ok, {heartbeat, nil}}, State1}; {[], _TRef} -> State1 = mark_waiting(Pid, State), - {reply, wait, State1}; + {State2, Timeout} = mh(State1), + {reply, {wait, Timeout}, State2, Timeout}; _More -> State1 = unmark_waiting(Pid, State), {reply, {ok, {data, Messages}}, State1#session{outbound_queue = Q1}} @@ -238,7 +261,8 @@ handle_call({received, Messages}, _From, State = #session{ready_state = open}) - State2 = lists:foldl(fun(Msg, State1) -> emit({recv, iolist_to_binary(Msg)}, State1) end, State, Messages), - {reply, ok, State2}; + {State3, Timeout} = mh(State2), + {reply, {ok, Timeout}, State3, Timeout}; handle_call({received, _Data}, _From, State = #session{ready_state = _Any}) -> {reply, error, State}; @@ -281,9 +305,17 @@ handle_info(force_shutdown, State) -> handle_info(session_timeout, State = #session{response_pid = undefined}) -> {stop, normal, State}; +handle_info(hibernate_triggered, #session{response_pid = RPid} = State) -> + case RPid of + undefined -> ok; + _ -> RPid ! hibernate_triggered + end, + {noreply, State#session{hibernate_tref = undefined}, hibernate}; + handle_info(heartbeat_triggered, State = #session{response_pid = RPid}) when RPid =/= undefined -> RPid ! go, - {noreply, State#session{heartbeat_tref = triggered}}; + {State2, Timeout} = mh(State), + {noreply, State2#session{heartbeat_tref = triggered}, Timeout}; handle_info(Info, State) -> {stop, {odd_info, Info}, State}. diff --git a/src/sockjs_ws_handler.erl b/src/sockjs_ws_handler.erl index bcf463d..88a0bf8 100644 --- a/src/sockjs_ws_handler.erl +++ b/src/sockjs_ws_handler.erl @@ -25,9 +25,9 @@ received(rawwebsocket, SessionPid, Data) -> session_received(Messages, SessionPid) -> try sockjs_session:received(Messages, SessionPid) of - ok -> ok + {ok, Timeout} -> {ok, Timeout} catch - no_session -> shutdown + no_session -> shutdown end. -spec reply(websocket|rawwebsocket, pid()) -> {close|open, binary()} | wait. @@ -36,8 +36,8 @@ reply(websocket, SessionPid) -> {W, Frame} when W =:= ok orelse W =:= close-> Frame1 = sockjs_util:encode_frame(Frame), {W, iolist_to_binary(Frame1)}; - wait -> - wait + {wait, Timeout} -> + {wait, Timeout} end; reply(rawwebsocket, SessionPid) -> case sockjs_session:reply(SessionPid, false) of @@ -48,8 +48,8 @@ reply(rawwebsocket, SessionPid) -> {data, [Msg]} -> {ok, iolist_to_binary(Msg)}; {heartbeat, nil} -> reply(rawwebsocket, SessionPid) end; - wait -> - wait + {wait, Timeout} -> + {wait, Timeout} end. -spec close(websocket|rawwebsocket, pid()) -> ok.