From e0e209338374f0f7e7c907abc130ed7df11b05f7 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Mon, 13 Nov 2023 22:43:44 +0300 Subject: [PATCH] feat: add option for more graceful request failures --- .github/workflows/run_test_case.yaml | 43 +++++++----------- src/client/grpc_client.erl | 67 ++++++++++++++++++++-------- src/grpc.appup.src | 14 +++--- 3 files changed, 73 insertions(+), 51 deletions(-) diff --git a/.github/workflows/run_test_case.yaml b/.github/workflows/run_test_case.yaml index 971fdde..532943c 100644 --- a/.github/workflows/run_test_case.yaml +++ b/.github/workflows/run_test_case.yaml @@ -9,22 +9,14 @@ jobs: steps: - uses: actions/checkout@v1 - - uses: ilammy/msvc-dev-cmd@v1 - - uses: gleam-lang/setup-erlang@v1.1.0 - id: install_erlang + - uses: erlef/setup-beam@v1 with: - otp-version: 22.3 - - name: Run tests - run: | - $env:PATH = "${{ steps.install_erlang.outputs.erlpath }}\bin;$env:PATH" - - $rebar3 = $env:USERPROFILE + "\rebar3" - (New-Object System.Net.WebClient).DownloadFile('https://github.com/emqx/rebar3/releases/download/3.14.3-emqx-4/rebar3', $rebar3) - - escript $rebar3 xref - escript $rebar3 dialyzer - escript $rebar3 eunit - escript $rebar3 ct + otp-version: '24' + rebar3-version: '3.16.1' + - run: rebar3 xref + - run: rebar3 dialyzer + - run: rebar3 eunit + - run: rebar3 ct linux: runs-on: ubuntu-latest @@ -32,24 +24,20 @@ jobs: strategy: matrix: os: + - ubuntu20.04 - ubuntu18.04 - ubuntu16.04 - - ubuntu14.04 - debian10 - debian9 - - debian8 - - opensuse - - centos7 + - centos8 - centos6 - raspbian10 - - raspbian9 - - raspbian8 steps: - uses: actions/checkout@v1 - name: Code analyze env: - ERL_OTP: erl22.1 + ERL_OTP: erl24.3.4.2-1 SYSTEM: ${{ matrix.os }} run: | version=$(echo ${{ github.ref }} | sed -r "s .*/.*/(.*) \1 g") @@ -57,7 +45,7 @@ jobs: sudo docker run --rm -i --name $SYSTEM -v $(pwd):/repos emqx/build-env:$ERL_OTP-$SYSTEM sh -c "cd /repos && make xref && make dialyzer" - name: Run tests env: - ERL_OTP: erl22.1 + ERL_OTP: erl24.3.4.2-1 SYSTEM: ${{ matrix.os }} run: | version=$(echo ${{ github.ref }} | sed -r "s .*/.*/(.*) \1 g") @@ -71,9 +59,10 @@ jobs: - uses: actions/checkout@v1 - name: Code analyze env: - ERL_OTP: erl22.1 - run: docker run --rm -i --name alpine -v $(pwd):/repos emqx/build-env:$ERL_OTP-alpine3.10-amd64 sh -c "cd /repos && make xref && make dialyzer" + ERL_OTP: erl24.3.4.2-1 + run: docker run --rm -i --name alpine -v $(pwd):/repos emqx/build-env:$ERL_OTP-alpine sh -c "cd /repos && make xref && make dialyzer" - name: Run tests env: - ERL_OTP: erl22.1 - run: docker run --rm -i --name alpine -v $(pwd):/repos emqx/build-env:$ERL_OTP-alpine3.10-amd64 sh -c "cd /repos && make eunit && make ct" + ERL_OTP: erl24.3.4.2-1 + run: docker run --rm -i --name alpine -v $(pwd):/repos emqx/build-env:$ERL_OTP-alpine sh -c "cd /repos && make eunit && make ct" + diff --git a/src/client/grpc_client.erl b/src/client/grpc_client.erl index b08f555..7d44881 100644 --- a/src/client/grpc_client.erl +++ b/src/client/grpc_client.erl @@ -91,6 +91,10 @@ %% %% Default equal to timeout option , connect_timeout => non_neg_integer() + %% Whether to exit the client process + %% when the connection process exits + %% Default is true, as in gen.erl and gen_server.erl + , exit_on_connection_crash => boolean() }. -type def() :: @@ -162,13 +166,12 @@ start_link(Pool, Id, Server, Opts) when is_map(Opts) -> | {error, term()}. %% @doc Unary function call unary(Def, Req, Metadata, Options) -> - Timeout = maps:get(timeout, Options, infinity), case open(Def, Metadata, Options) of {ok, GStream} -> - _ = send(GStream, Req, fin), - case recv(GStream, Timeout) of + _ = send(GStream, Req, fin, Options), + case recv(GStream, Options) of {ok, [Resp]} when is_map(Resp) -> - {ok, [{eos, Trailers}]} = recv(GStream, Timeout), + {ok, [{eos, Trailers}]} = recv(GStream, Options), {ok, Resp, Trailers}; {ok, [{eos, Trailers}]} -> %% Not data responed, only error trailers @@ -189,7 +192,8 @@ open(Def, Metadata, Options) -> maps:get(channel, Options, undefined), maps:get(key_dispatch, Options, self()) ), - case call(ClientPid, {open, Def, Metadata, Options}, connect_timeout(Options)) of + ConnectTimeout = connect_timeout(Options), + case call(ClientPid, {open, Def, Metadata, Options}, Options#{timeout => ConnectTimeout}) of {ok, StreamRef} -> {ok, #{stream_ref => StreamRef, client_pid => ClientPid, def => Def}}; {error, _} = Error -> Error @@ -199,21 +203,32 @@ connect_timeout(Options) -> maps:get( connect_timeout, Options, - maps:get(timeout, Options, infinity) + timeout(Options) ). +timeout(Options) -> + maps:get(timeout, Options, infinity). + +exit_on_connection_crash(Options) -> + maps:get(exit_on_connection_crash, Options, true). + -spec send(grpcstream(), request()) -> ok. send(GStream, Req) -> send(GStream, Req, nofin). +-spec send(grpcstream(), request(), fin | nofin) -> ok | no_return(). +send(GStream, Req, IsFin) -> + send(GStream, Req, IsFin, #{}). + +-spec send(grpcstream(), request(), fin | nofin, options()) -> ok. send(_GStream = #{ def := Def, client_pid := ClientPid, stream_ref := StreamRef - }, Req, IsFin) -> + }, Req, IsFin, Options) -> #{marshal := Marshal} = Def, Bytes = Marshal(Req), - case call(ClientPid, {send, StreamRef, Bytes, IsFin}, infinity) of + case call(ClientPid, {send, StreamRef, Bytes, IsFin}, Options) of ok -> ok; {error, R} -> error(R) end. @@ -222,20 +237,23 @@ send(_GStream = #{ -> {ok, [response() | eos_msg()]} | {error, term()}. recv(GStream) -> - recv(GStream, infinity). + recv(GStream, #{}). --spec recv(grpcstream(), timeout()) +-spec recv(grpcstream(), timeout() | options()) -> {ok, [response() | eos_msg()]} - | {error, term()}. + | {error, term()} | no_return(). +recv(GStream, Timeout) when is_integer(Timeout) -> + recv(GStream, #{timeout => Timeout}); recv(#{def := Def, client_pid := ClientPid, - stream_ref := StreamRef}, Timeout) -> + stream_ref := StreamRef}, Options) -> + Timeout = timeout(Options), Unmarshal = maps:get(unmarshal, Def), Endts = case Timeout of infinity -> infinity; _ -> erlang:system_time(millisecond) + Timeout end, - case call(ClientPid, {read, StreamRef, Endts}, Timeout) of + case call(ClientPid, {read, StreamRef, Endts}, Options) of {error, _} = E -> E; {IsMore, Frames} -> Msgs = lists:map(fun({eos, Trailers}) -> {eos, Trailers}; @@ -747,27 +765,38 @@ format_stream(#{st := St, recvbuff := Buff, mqueue := MQueue}) -> [St, byte_size(Buff), MQueue]). %% copied from gen.erl and gen_server.erl -call(Process, Request, Timeout) -> - Mref = erlang:monitor(process, Process), +call(Process, Request, Options) -> + Timeout = timeout(Options), + ExitOnCrash = exit_on_connection_crash(Options), + Mref = erlang:monitor(process, Process, [{alias, reply_demonitor}]), %% OTP-21: %% Auto-connect is asynchronous. But we still use 'noconnect' to make sure %% we send on the monitored connection, and not trigger a new auto-connect. %% - erlang:send(Process, {'$gen_call', {self(), Mref}, Request}, [noconnect]), + erlang:send(Process, {'$gen_call', {Mref, Mref}, Request}, [noconnect]), receive {Mref, Reply} -> erlang:demonitor(Mref, [flush]), Reply; {'DOWN', Mref, _, _, Reason} -> - exit(Reason) + call_result(ExitOnCrash, Reason) after Timeout -> erlang:demonitor(Mref, [flush]), - {error, {grpc_utils:codename(?GRPC_STATUS_DEADLINE_EXCEEDED), - <<"Waiting for response timeout">>}} + receive + {Mref, Reply} -> Reply + after 0 -> + {error, {grpc_utils:codename(?GRPC_STATUS_DEADLINE_EXCEEDED), + <<"Waiting for response timeout">>}} + end end. +call_result(_ExitOnCrash = true, Reason) -> + exit(Reason); +call_result(_ExitOnCrash = false, Reason) -> + error({exit, Reason}). + pick(ChannName, Key) -> gproc_pool:pick_worker(ChannName, Key). diff --git a/src/grpc.appup.src b/src/grpc.appup.src index db76852..ae1947d 100644 --- a/src/grpc.appup.src +++ b/src/grpc.appup.src @@ -1,7 +1,9 @@ %% -*- mode: erlang -*- -{"0.6.12", - [{"0.6.11", - [{load_module, grpc_client_sup, brutal_purge, soft_purge, []}, +{"0.6.13", + [{"0.6.12", [{load_module, grpc_client, brutal_purge, soft_purge, []}]}, + {"0.6.11", + [{load_module, grpc_client, brutal_purge, soft_purge, []}, + {load_module, grpc_client_sup, brutal_purge, soft_purge, []}, {load_module, grpc, brutal_purge, soft_purge, []}]}, {"0.6.10", [{load_module, grpc_client, brutal_purge, soft_purge, []}, @@ -42,8 +44,10 @@ {load_module, grpc_client_sup, brutal_purge, soft_purge, []}, {load_module, grpc, brutal_purge, soft_purge, []}]}, {<<".*">>, []}], - [{"0.6.11", - [{load_module, grpc_client_sup, brutal_purge, soft_purge, []}, + [{"0.6.12", [{load_module, grpc_client, brutal_purge, soft_purge, []}]}, + {"0.6.11", + [{load_module, grpc_client, brutal_purge, soft_purge, []}, + {load_module, grpc_client_sup, brutal_purge, soft_purge, []}, {load_module, grpc, brutal_purge, soft_purge, []}]}, {"0.6.10", [{load_module, grpc_client, brutal_purge, soft_purge, []},