Skip to content

Commit

Permalink
feat: add option for more graceful request failures
Browse files Browse the repository at this point in the history
  • Loading branch information
savonarola committed Nov 13, 2023
1 parent 31370f2 commit e0e2093
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 51 deletions.
43 changes: 16 additions & 27 deletions .github/workflows/run_test_case.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,55 +9,43 @@ jobs:

steps:
- uses: actions/checkout@v1
- uses: ilammy/msvc-dev-cmd@v1
- uses: gleam-lang/[email protected]
id: install_erlang
- uses: erlef/setup-beam@v1
with:
otp-version: 22.3
- name: Run tests
run: |
$env:PATH = "${{ steps.install_erlang.outputs.erlpath }}\bin;$env:PATH"
$rebar3 = $env:USERPROFILE + "\rebar3"
(New-Object System.Net.WebClient).DownloadFile('https://github.com/emqx/rebar3/releases/download/3.14.3-emqx-4/rebar3', $rebar3)
escript $rebar3 xref
escript $rebar3 dialyzer
escript $rebar3 eunit
escript $rebar3 ct
otp-version: '24'
rebar3-version: '3.16.1'
- run: rebar3 xref
- run: rebar3 dialyzer
- run: rebar3 eunit
- run: rebar3 ct

linux:
runs-on: ubuntu-latest

strategy:
matrix:
os:
- ubuntu20.04
- ubuntu18.04
- ubuntu16.04
- ubuntu14.04
- debian10
- debian9
- debian8
- opensuse
- centos7
- centos8
- centos6
- raspbian10
- raspbian9
- raspbian8

steps:
- uses: actions/checkout@v1
- name: Code analyze
env:
ERL_OTP: erl22.1
ERL_OTP: erl24.3.4.2-1
SYSTEM: ${{ matrix.os }}
run: |
version=$(echo ${{ github.ref }} | sed -r "s .*/.*/(.*) \1 g")
sudo docker run --rm --privileged multiarch/qemu-user-static:register --reset
sudo docker run --rm -i --name $SYSTEM -v $(pwd):/repos emqx/build-env:$ERL_OTP-$SYSTEM sh -c "cd /repos && make xref && make dialyzer"
- name: Run tests
env:
ERL_OTP: erl22.1
ERL_OTP: erl24.3.4.2-1
SYSTEM: ${{ matrix.os }}
run: |
version=$(echo ${{ github.ref }} | sed -r "s .*/.*/(.*) \1 g")
Expand All @@ -71,9 +59,10 @@ jobs:
- uses: actions/checkout@v1
- name: Code analyze
env:
ERL_OTP: erl22.1
run: docker run --rm -i --name alpine -v $(pwd):/repos emqx/build-env:$ERL_OTP-alpine3.10-amd64 sh -c "cd /repos && make xref && make dialyzer"
ERL_OTP: erl24.3.4.2-1
run: docker run --rm -i --name alpine -v $(pwd):/repos emqx/build-env:$ERL_OTP-alpine sh -c "cd /repos && make xref && make dialyzer"
- name: Run tests
env:
ERL_OTP: erl22.1
run: docker run --rm -i --name alpine -v $(pwd):/repos emqx/build-env:$ERL_OTP-alpine3.10-amd64 sh -c "cd /repos && make eunit && make ct"
ERL_OTP: erl24.3.4.2-1
run: docker run --rm -i --name alpine -v $(pwd):/repos emqx/build-env:$ERL_OTP-alpine sh -c "cd /repos && make eunit && make ct"

67 changes: 48 additions & 19 deletions src/client/grpc_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@
%%
%% Default equal to timeout option
, connect_timeout => non_neg_integer()
%% Whether to exit the client process
%% when the connection process exits
%% Default is true, as in gen.erl and gen_server.erl
, exit_on_connection_crash => boolean()
}.

-type def() ::
Expand Down Expand Up @@ -162,13 +166,12 @@ start_link(Pool, Id, Server, Opts) when is_map(Opts) ->
| {error, term()}.
%% @doc Unary function call
unary(Def, Req, Metadata, Options) ->
Timeout = maps:get(timeout, Options, infinity),
case open(Def, Metadata, Options) of
{ok, GStream} ->
_ = send(GStream, Req, fin),
case recv(GStream, Timeout) of
_ = send(GStream, Req, fin, Options),
case recv(GStream, Options) of
{ok, [Resp]} when is_map(Resp) ->
{ok, [{eos, Trailers}]} = recv(GStream, Timeout),
{ok, [{eos, Trailers}]} = recv(GStream, Options),
{ok, Resp, Trailers};
{ok, [{eos, Trailers}]} ->
%% Not data responed, only error trailers
Expand All @@ -189,7 +192,8 @@ open(Def, Metadata, Options) ->
maps:get(channel, Options, undefined),
maps:get(key_dispatch, Options, self())
),
case call(ClientPid, {open, Def, Metadata, Options}, connect_timeout(Options)) of
ConnectTimeout = connect_timeout(Options),
case call(ClientPid, {open, Def, Metadata, Options}, Options#{timeout => ConnectTimeout}) of
{ok, StreamRef} ->
{ok, #{stream_ref => StreamRef, client_pid => ClientPid, def => Def}};
{error, _} = Error -> Error
Expand All @@ -199,21 +203,32 @@ connect_timeout(Options) ->
maps:get(
connect_timeout,
Options,
maps:get(timeout, Options, infinity)
timeout(Options)
).

timeout(Options) ->
maps:get(timeout, Options, infinity).

exit_on_connection_crash(Options) ->
maps:get(exit_on_connection_crash, Options, true).

-spec send(grpcstream(), request()) -> ok.
send(GStream, Req) ->
send(GStream, Req, nofin).

-spec send(grpcstream(), request(), fin | nofin) -> ok | no_return().
send(GStream, Req, IsFin) ->
send(GStream, Req, IsFin, #{}).

-spec send(grpcstream(), request(), fin | nofin, options()) -> ok.
send(_GStream = #{
def := Def,
client_pid := ClientPid,
stream_ref := StreamRef
}, Req, IsFin) ->
}, Req, IsFin, Options) ->
#{marshal := Marshal} = Def,
Bytes = Marshal(Req),
case call(ClientPid, {send, StreamRef, Bytes, IsFin}, infinity) of
case call(ClientPid, {send, StreamRef, Bytes, IsFin}, Options) of
ok -> ok;
{error, R} -> error(R)
end.
Expand All @@ -222,20 +237,23 @@ send(_GStream = #{
-> {ok, [response() | eos_msg()]}
| {error, term()}.
recv(GStream) ->
recv(GStream, infinity).
recv(GStream, #{}).

-spec recv(grpcstream(), timeout())
-spec recv(grpcstream(), timeout() | options())
-> {ok, [response() | eos_msg()]}
| {error, term()}.
| {error, term()} | no_return().
recv(GStream, Timeout) when is_integer(Timeout) ->
recv(GStream, #{timeout => Timeout});
recv(#{def := Def,
client_pid := ClientPid,
stream_ref := StreamRef}, Timeout) ->
stream_ref := StreamRef}, Options) ->
Timeout = timeout(Options),
Unmarshal = maps:get(unmarshal, Def),
Endts = case Timeout of
infinity -> infinity;
_ -> erlang:system_time(millisecond) + Timeout
end,
case call(ClientPid, {read, StreamRef, Endts}, Timeout) of
case call(ClientPid, {read, StreamRef, Endts}, Options) of
{error, _} = E -> E;
{IsMore, Frames} ->
Msgs = lists:map(fun({eos, Trailers}) -> {eos, Trailers};
Expand Down Expand Up @@ -747,27 +765,38 @@ format_stream(#{st := St, recvbuff := Buff, mqueue := MQueue}) ->
[St, byte_size(Buff), MQueue]).

%% copied from gen.erl and gen_server.erl
call(Process, Request, Timeout) ->
Mref = erlang:monitor(process, Process),
call(Process, Request, Options) ->
Timeout = timeout(Options),
ExitOnCrash = exit_on_connection_crash(Options),
Mref = erlang:monitor(process, Process, [{alias, reply_demonitor}]),

%% OTP-21:
%% Auto-connect is asynchronous. But we still use 'noconnect' to make sure
%% we send on the monitored connection, and not trigger a new auto-connect.
%%
erlang:send(Process, {'$gen_call', {self(), Mref}, Request}, [noconnect]),
erlang:send(Process, {'$gen_call', {Mref, Mref}, Request}, [noconnect]),

receive
{Mref, Reply} ->
erlang:demonitor(Mref, [flush]),
Reply;
{'DOWN', Mref, _, _, Reason} ->
exit(Reason)
call_result(ExitOnCrash, Reason)
after Timeout ->
erlang:demonitor(Mref, [flush]),
{error, {grpc_utils:codename(?GRPC_STATUS_DEADLINE_EXCEEDED),
<<"Waiting for response timeout">>}}
receive
{Mref, Reply} -> Reply
after 0 ->
{error, {grpc_utils:codename(?GRPC_STATUS_DEADLINE_EXCEEDED),
<<"Waiting for response timeout">>}}
end
end.

call_result(_ExitOnCrash = true, Reason) ->
exit(Reason);
call_result(_ExitOnCrash = false, Reason) ->
error({exit, Reason}).

pick(ChannName, Key) ->
gproc_pool:pick_worker(ChannName, Key).

Expand Down
14 changes: 9 additions & 5 deletions src/grpc.appup.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
%% -*- mode: erlang -*-
{"0.6.12",
[{"0.6.11",
[{load_module, grpc_client_sup, brutal_purge, soft_purge, []},
{"0.6.13",
[{"0.6.12", [{load_module, grpc_client, brutal_purge, soft_purge, []}]},
{"0.6.11",
[{load_module, grpc_client, brutal_purge, soft_purge, []},
{load_module, grpc_client_sup, brutal_purge, soft_purge, []},
{load_module, grpc, brutal_purge, soft_purge, []}]},
{"0.6.10",
[{load_module, grpc_client, brutal_purge, soft_purge, []},
Expand Down Expand Up @@ -42,8 +44,10 @@
{load_module, grpc_client_sup, brutal_purge, soft_purge, []},
{load_module, grpc, brutal_purge, soft_purge, []}]},
{<<".*">>, []}],
[{"0.6.11",
[{load_module, grpc_client_sup, brutal_purge, soft_purge, []},
[{"0.6.12", [{load_module, grpc_client, brutal_purge, soft_purge, []}]},
{"0.6.11",
[{load_module, grpc_client, brutal_purge, soft_purge, []},
{load_module, grpc_client_sup, brutal_purge, soft_purge, []},
{load_module, grpc, brutal_purge, soft_purge, []}]},
{"0.6.10",
[{load_module, grpc_client, brutal_purge, soft_purge, []},
Expand Down

0 comments on commit e0e2093

Please sign in to comment.