Skip to content

Commit

Permalink
[4.3] HELP-19729: handle orphaned cf_exe processes (#6702)
Browse files Browse the repository at this point in the history
There is potential for a callflow executor leak when sending a default
error response.

In the field, we found inbound calls that were forwarded to a number
which canceled the request almost immediately. When this happens,
callflows gets a message back from stepswitch that the bridge was
unsuccessful, then looks up the children in the callflow to see if any
handle that specific hangup cause.

In this case (and most configurations) there are no special branches
so it performs a default action which is to send back a SIP error
message. However, while that process is happening, FreeSWITCH has
already forwarded the cancel from the B leg back to the A leg and out
to the carrier.

The particular carrier in this incident then immediately retries the
same call in another zone. When that call gets created in the other
zone, the ecallmgr call control usurps any other call control
processes for that call id, terminating the ecallmgr call control in
the first zone. It is only then that the first callflow sends the
command to publish the SIP error, but with no ecallmgr control process
in that zone anymore, nothing is there to process it. Callflows then
waits indefinitely to hear back if the SIP error message was
processed.

As a general avoidance of similar situations the all infinite wait
times in call command have also been reduced to one day by default,
however if the cluster is servicing things like turret phones they can
set it back to infinity.
  • Loading branch information
k-anderson authored and jamesaimonetti committed Feb 23, 2021
1 parent 7ad4b3d commit a1d4ef8
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 15 deletions.
2 changes: 1 addition & 1 deletion applications/callflow/src/cf_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ send_default_response(Cause, Call) ->
{'error', 'no_response'} ->
lager:debug("failed to send default response for ~s", [Cause]);
{'ok', NoopId} ->
_ = kapps_call_command:wait_for_noop(Call, NoopId),
_ = kapps_call_command:wait_for_noop(Call, NoopId, 2 * ?MILLISECONDS_IN_SECOND),
lager:debug("sent default response for ~s (~s)", [Cause, NoopId])
end
end.
Expand Down
5 changes: 5 additions & 0 deletions applications/crossbar/priv/api/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -29791,6 +29791,11 @@
"description": "call_command interdigit timeout",
"type": "integer"
},
"max_wait_ms": {
"default": 86400000,
"description": "call_command max_wait_ms",
"type": "integer"
},
"message_timeout": {
"default": 5000,
"description": "call_command message timeout",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
"description": "call_command interdigit timeout",
"type": "integer"
},
"max_wait_ms": {
"default": 86400000,
"description": "call_command max_wait_ms",
"type": "integer"
},
"message_timeout": {
"default": 5000,
"description": "call_command message timeout",
Expand Down
36 changes: 22 additions & 14 deletions core/kazoo_call/src/kapps_call_command.erl
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@
-export([wait_for_bridge/2, wait_for_bridge/3]).
-export([wait_for_channel_bridge/0, wait_for_channel_unbridge/0]).
-export([wait_for_dtmf/1]).
-export([wait_for_noop/2]).
-export([wait_for_noop/2
,wait_for_noop/3
]).
-export([wait_for_hangup/0, wait_for_hangup/1
,wait_for_unbridge/0, wait_for_unbridge/1
]).
Expand Down Expand Up @@ -237,6 +239,8 @@

-type store_fun() :: kz_term:ne_binary() | fun(() -> kz_term:ne_binary()).

-define(MAX_WAIT_MS, kapps_config:get_integer(?CONFIG_CAT, <<"max_wait_ms">>, ?MILLISECONDS_IN_DAY)).

-define(CONFIG_CAT, <<"call_command">>).

-define(DEFAULT_COLLECT_TIMEOUT, kapps_config:get_integer(?CONFIG_CAT, <<"collect_timeout">>, 5 * ?MILLISECONDS_IN_SECOND)).
Expand Down Expand Up @@ -1333,14 +1337,14 @@ hold_command(MOH, Call) ->

-spec b_hold(kapps_call:call()) ->
kapps_api_std_return().
b_hold(Call) -> b_hold('infinity', 'undefined', Call).
b_hold(Call) -> b_hold(?MAX_WAIT_MS, 'undefined', Call).

-spec b_hold(timeout() | kz_term:api_binary(), kapps_call:call()) ->
kapps_api_std_return().
b_hold(Timeout, Call) when is_integer(Timeout);
Timeout =:= 'infinity' ->
b_hold(Timeout, 'undefined', Call);
b_hold(MOH, Call) -> b_hold('infinity', MOH, Call).
b_hold(MOH, Call) -> b_hold(?MAX_WAIT_MS, MOH, Call).

-spec b_hold(timeout(), kz_term:api_binary(), kapps_call:call()) ->
kapps_api_std_return().
Expand Down Expand Up @@ -1692,7 +1696,7 @@ b_record(MediaName, Terminators, TimeLimit, SilenceThreshold, SilenceHits, Call)
,{<<"RECORD_START">>, <<"RECORD_STOP">>}
,<<"call_event">>
,fun(JObj) -> verify_media_name(JObj, MediaName) end
,'infinity'
,?MAX_WAIT_MS
).

-spec verify_media_name(kz_json:object(), kz_term:ne_binary()) -> boolean().
Expand Down Expand Up @@ -1755,13 +1759,13 @@ record_call(Media, Action, TimeLimit, Terminators, Call) ->
wait_for_headless_application_return().
b_record_call(MediaName, Call) ->
record_call(MediaName, Call),
wait_for_headless_application(<<"record">>, <<"RECORD_STOP">>, <<"call_event">>, 'infinity').
wait_for_headless_application(<<"record">>, <<"RECORD_STOP">>, <<"call_event">>, ?MAX_WAIT_MS).

-spec b_record_call(kz_term:proplist(), kz_term:ne_binary(), kapps_call:call()) ->
wait_for_headless_application_return().
b_record_call(MediaName, Action, Call) ->
record_call(MediaName, Action, Call),
wait_for_headless_application(<<"record">>, <<"RECORD_STOP">>, <<"call_event">>, 'infinity').
wait_for_headless_application(<<"record">>, <<"RECORD_STOP">>, <<"call_event">>, ?MAX_WAIT_MS).

-spec b_record_call(kz_term:proplist(), kz_term:ne_binary(), kz_term:api_binary() | pos_integer(), kapps_call:call()) ->
wait_for_headless_application_return().
Expand All @@ -1772,7 +1776,7 @@ b_record_call(MediaName, Action, TimeLimit, Call) ->
wait_for_headless_application_return().
b_record_call(MediaName, Action, TimeLimit, Terminators, Call) ->
record_call(MediaName, Action, TimeLimit, Terminators, Call),
wait_for_headless_application(<<"record">>, <<"RECORD_STOP">>, <<"call_event">>, 'infinity').
wait_for_headless_application(<<"record">>, <<"RECORD_STOP">>, <<"call_event">>, ?MAX_WAIT_MS).

%%------------------------------------------------------------------------------
%% @doc Produces the low level AMQP request to store the file.
Expand Down Expand Up @@ -2227,7 +2231,7 @@ b_say(Say, Type, Method, Language, Gender, Call) ->

-spec wait_for_say(kapps_call:call()) -> kapps_api_std_return().
wait_for_say(Call) ->
wait_for_message(Call, <<"say">>, <<"CHANNEL_EXECUTE_COMPLETE">>, <<"call_event">>, 'infinity').
wait_for_message(Call, <<"say">>, <<"CHANNEL_EXECUTE_COMPLETE">>, <<"call_event">>, ?MAX_WAIT_MS).

%%------------------------------------------------------------------------------
%% @doc Produces the low level AMQP request to bridge a caller
Expand Down Expand Up @@ -2803,7 +2807,7 @@ wait_for_bridge(Timeout, Fun, Call, Start, {'ok', JObj}) ->
'false' -> 'ok';
'true' -> Fun(JObj)
end,
wait_for_bridge('infinity', Fun, Call);
wait_for_bridge(?MAX_WAIT_MS, Fun, Call);
{<<"call_event">>, <<"CHANNEL_DESTROY">>, _} ->
%% TODO: reduce log level if no issue is found with
%% basing the Result on Disposition
Expand All @@ -2826,7 +2830,11 @@ wait_for_bridge(Timeout, Fun, Call, Start, {'ok', JObj}) ->
%%------------------------------------------------------------------------------
-spec wait_for_noop(kapps_call:call(), kz_term:api_binary()) -> kapps_api_std_return().
wait_for_noop(Call, NoopId) ->
case wait_for_message(Call, <<"noop">>, <<"CHANNEL_EXECUTE_COMPLETE">>, <<"call_event">>, 'infinity') of
wait_for_noop(Call, NoopId, ?MAX_WAIT_MS).

-spec wait_for_noop(kapps_call:call(), kz_term:api_binary(), timeout()) -> kapps_api_std_return().
wait_for_noop(Call, NoopId, Timeout) ->
case wait_for_message(Call, <<"noop">>, <<"CHANNEL_EXECUTE_COMPLETE">>, <<"call_event">>, Timeout) of
{'ok', JObj}=OK ->
case kz_json:get_value(<<"Application-Response">>, JObj) of
NoopId when is_binary(NoopId), NoopId =/= <<>> -> OK;
Expand Down Expand Up @@ -2858,7 +2866,7 @@ wait_for_channel_unbridge() ->
%%------------------------------------------------------------------------------
-spec wait_for_channel_bridge() -> {'ok', kz_json:object()}.
wait_for_channel_bridge() ->
case receive_event('infinity') of
case receive_event(?MAX_WAIT_MS) of
{'ok', JObj}=Ok ->
case kz_util:get_event_type(JObj) of
{<<"call_event">>, <<"CHANNEL_BRIDGE">>} -> Ok;
Expand All @@ -2876,7 +2884,7 @@ wait_for_channel_bridge() ->
-spec wait_for_hangup() -> {'ok', 'channel_hungup'} |
{'error', 'timeout'}.
wait_for_hangup() ->
wait_for_hangup('infinity').
wait_for_hangup(?MAX_WAIT_MS).

-spec wait_for_hangup(timeout()) ->
{'ok', 'channel_hungup'} |
Expand Down Expand Up @@ -2906,7 +2914,7 @@ wait_for_hangup(Timeout) ->
-spec wait_for_unbridge() -> {'ok', 'leg_hungup'} |
{'error', 'timeout'}.
wait_for_unbridge() ->
wait_for_unbridge('infinity').
wait_for_unbridge(?MAX_WAIT_MS).

-spec wait_for_unbridge(timeout()) ->
{'ok', 'leg_hungup'} |
Expand Down Expand Up @@ -2967,7 +2975,7 @@ wait_for_fax(Timeout) ->
lager:debug("channel execution error while waiting for fax: ~s", [kz_json:encode(JObj)]),
{'error', JObj};
{<<"call_event">>, <<"CHANNEL_EXECUTE">>, <<"receive_fax">>} ->
wait_for_fax('infinity');
wait_for_fax(?MAX_WAIT_MS);
{<<"call_event">>, <<"CHANNEL_EXECUTE_COMPLETE">>, <<"receive_fax">>} ->
{'ok', kz_json:set_value(<<"Fax-Success">>, 'true', JObj)};
{<<"call_event">>, <<"CHANNEL_DESTROY">>, _} ->
Expand Down

0 comments on commit a1d4ef8

Please sign in to comment.