Skip to content

Commit

Permalink
update pusher and dependencies (#6659)
Browse files Browse the repository at this point in the history
handle crashes in apns & update deps
* also adds keepalive
  • Loading branch information
lazedo authored and jamesaimonetti committed Nov 4, 2020
1 parent be141bb commit 7aec432
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 89 deletions.
134 changes: 70 additions & 64 deletions applications/crossbar/src/api_resource.erl
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,6 @@ options(Req0, Context) ->
{'ok', Req0, Context}
end.

-type content_type_callbacks() :: [{{kz_term:ne_binary(), kz_term:ne_binary(), kz_term:proplist()}, atom()} |
{kz_term:ne_binary(), atom()}
].
-spec content_types_provided(cowboy_req:req(), cb_context:context()) ->
{content_type_callbacks(), cowboy_req:req(), cb_context:context()}.
content_types_provided(Req, Context0) ->
Expand All @@ -520,26 +517,30 @@ content_types_provided(Req, Context0) ->

content_types_provided(Req, Context1, cb_context:content_types_provided(Context1)).

-spec content_types_provided(cowboy_req:req(), cb_context:context(), crossbar_content_handlers()) ->
{content_type_callbacks(), cowboy_req:req(), cb_context:context()}.
content_types_provided(Req, Context, []) ->
Def = ?CONTENT_PROVIDED,
content_types_provided(Req, cb_context:set_content_types_provided(Context, Def), Def);
content_types_provided(Req, Context, CTPs) ->
CTP =
lists:foldr(fun({Fun, L}, Acc) ->
lists:foldr(fun({Type, SubType}, Acc1) ->
[{{Type, SubType, []}, Fun} | Acc1];
({_,_,_}=EncType, Acc1) ->
[ {EncType, Fun} | Acc1 ];
(CT, Acc1) when is_binary(CT) ->
[{CT, Fun} | Acc1]
end, Acc, L)
end
,[]
,CTPs
),
content_types_provided(Req, Context, ContentHandlers) ->
CTP = lists:foldr(fun content_handlers_to_cowboy_providers/2, [], ContentHandlers),
lager:debug("ctp: ~p", [CTP]),
{CTP, Req, Context}.

-spec content_handlers_to_cowboy_providers(crossbar_content_handler(), content_type_callbacks()) -> content_type_callbacks().
content_handlers_to_cowboy_providers({ToFun, ContentTypes}, Acc) ->
%% ToFun converts the response body to the requested content type
lists:foldr(fun({Type, SubType}, Acc1) ->
[{{Type, SubType, []}, ToFun} | Acc1];
({_,_,_}=EncType, Acc1) ->
[{EncType, ToFun} | Acc1];
(CT, Acc1) when is_binary(CT) ->
[{CT, ToFun} | Acc1]
end
,Acc
,ContentTypes
).

-spec content_types_accepted(cowboy_req:req(), cb_context:context()) ->
{content_type_callbacks(), cowboy_req:req(), cb_context:context()}.
content_types_accepted(Req0, Context0) ->
Expand All @@ -550,32 +551,31 @@ content_types_accepted(Req0, Context0) ->
Payload = [Context0 | Params],
Context1 = crossbar_bindings:fold(Event, Payload),

case cowboy_req:parse_header(<<"content-type">>, Req0) of
case api_util:get_content_type(Req0) of
'undefined' ->
%% Cowboy no longer allows empty content-type headers and will auto-respond with
%% a 415 if we return a content type.
lager:debug("no content type on request, checking defaults"),
default_content_types_accepted(Req0, Context1);
CT ->
lager:debug("checking content type '~p' against accepted", [CT]),
content_types_accepted(CT, Req0, Context1)
ClientCT ->
lager:debug("checking client content type '~p' against accepted", [ClientCT]),
content_types_accepted(ClientCT, Req0, Context1)
end.

-spec default_content_types_accepted(cowboy_req:req(), cb_context:context()) ->
{content_types_funs(), cowboy_req:req(), cb_context:context()}.
{content_type_callbacks(), cowboy_req:req(), cb_context:context()}.
default_content_types_accepted(Req, Context) ->
CTA = [{?CROSSBAR_DEFAULT_CONTENT_TYPE, Fun}
|| {Fun, L} <- cb_context:content_types_accepted(Context),
'ok' =:= lager:debug("f: ~p l: ~p", [Fun, L])
andalso lists:any(fun({Type, SubType}) ->
api_util:content_type_matches(?CROSSBAR_DEFAULT_CONTENT_TYPE
,{Type, SubType, '*'}
);
({_,_,_}=ModCT) ->
api_util:content_type_matches(?CROSSBAR_DEFAULT_CONTENT_TYPE, ModCT)
end
,L % check each type against the default
)
|| {Fun, ContentTypes} <- cb_context:content_types_accepted(Context),
lists:any(fun({Type, SubType}) ->
api_util:content_type_matches(?CROSSBAR_DEFAULT_CONTENT_TYPE
,{Type, SubType, '*'}
);
({_,_,_}=ModCT) ->
api_util:content_type_matches(?CROSSBAR_DEFAULT_CONTENT_TYPE, ModCT)
end
,ContentTypes % check each type against the default
)
],
lager:debug("default cta: ~p", [CTA]),
case CTA of
Expand All @@ -593,49 +593,55 @@ set_content_type_header(#{headers := Headers}=Req, ?NE_BINARY=CT) ->
set_content_type_header(#{headers := Headers}=Req, {Type, SubType, _}) ->
Req#{headers => maps:put(<<"content-type">>, <<Type/binary, "/", SubType/binary>>, Headers)}.

-spec content_types_accepted(kz_term:ne_binary(), cowboy_req:req(), cb_context:context()) ->
{content_type_callbacks(), cowboy_req:req(), cb_context:context()}.
content_types_accepted(ClientCT, Req, Context) ->
content_types_accepted(ClientCT, Req, Context, cb_context:content_types_accepted(Context)).

-type content_type_fun() :: {content_type(), atom()}.
-type content_types_funs() :: [content_type_fun()].

-spec content_types_accepted(content_type(), cowboy_req:req(), cb_context:context()) ->
{content_types_funs(), cowboy_req:req(), cb_context:context()}.
content_types_accepted(CT, Req, Context) ->
content_types_accepted(CT, Req, Context, cb_context:content_types_accepted(Context)).

-spec content_types_accepted(content_type(), cowboy_req:req(), cb_context:context(), crossbar_content_handlers()) ->
{content_types_funs(), cowboy_req:req(), cb_context:context()}.
content_types_accepted(CT, Req, Context, []) ->
lager:debug("no content-types accepted, using defaults"),
content_types_accepted(CT, Req, cb_context:set_content_types_accepted(Context, ?CONTENT_ACCEPTED));
content_types_accepted(CT, Req, Context, Accepted) ->
CTA = lists:foldl(fun(I, Acc) ->
content_types_accepted_fold(I, Acc, CT)
-spec content_types_accepted(kz_term:ne_binary(), cowboy_req:req(), cb_context:context(), crossbar_content_handlers()) ->
{content_type_callbacks(), cowboy_req:req(), cb_context:context()}.
content_types_accepted(ClientCT, Req, Context, []) ->
lager:debug("endpoint(s) specify no accepted content-types, using defaults"),
content_types_accepted(ClientCT, Req, cb_context:set_content_types_accepted(Context, ?CONTENT_ACCEPTED));
content_types_accepted(ClientCT, Req, Context, Accepted) ->
CTA = lists:foldr(fun(I, Acc) ->
content_types_accepted_fold(I, Acc, ClientCT)
end
,[]
,Accepted
),
lager:debug("cta: ~p", [CTA]),
lager:debug("endpoint(s) accepted content-types: ~p", [CTA]),
{CTA, Req, Context}.

-spec content_types_accepted_fold(crossbar_content_handler(), content_types_funs(), content_type()) ->
content_types_funs().
content_types_accepted_fold({Fun, L}, Acc, CT) ->
lists:foldl(fun(CTA, Acc1) ->
content_type_accepted_fold(CTA, Acc1, Fun, CT)
-spec content_types_accepted_fold(crossbar_content_handler(), content_type_callbacks(), kz_term:ne_binary()) ->
content_type_callbacks().
content_types_accepted_fold({Fun, ContentTypes}, Acc, ClientCT) ->
lists:foldr(fun(ContentType, Acc1) ->
content_type_accepted_fold(ContentType, Acc1, Fun, ClientCT)
end
,Acc
,L
,ContentTypes
).

-spec content_type_accepted_fold(any(), content_type_fun(), atom(), content_type()) ->
content_type_fun().
content_type_accepted_fold({Type, SubType}, Acc, Fun, CT) ->
case api_util:content_type_matches(CT, {Type, SubType, []}) of
'true' -> [{CT, Fun} | Acc];
'false' -> Acc
-spec content_type_accepted_fold(content_type(), content_type_callbacks(), content_conversion_fun(), kz_term:ne_binary()) ->
content_type_callbacks().
content_type_accepted_fold(<<ContentType/binary>>, Acc, FromFun, ClientCT) ->
[Type, SubType | _] = binary:split(ContentType, <<"/">>, ['global']),

case api_util:content_type_matches(ClientCT, {Type, SubType, []}) of
'true' ->
lager:debug("added accepted content-type: ~p(~p)", [{Type, SubType}, FromFun]),
[{{Type, SubType, '*'}, FromFun} | Acc];
'false' ->
lager:debug("skipping content-type: ~p", [{Type, SubType}]),
Acc
end;
content_type_accepted_fold({_,_,_}=EncType, Acc, Fun, _CT) ->
[{EncType, Fun} | Acc].
content_type_accepted_fold({Type, SubType}=EncType, Acc, FromFun, _ClientCT) ->
lager:debug("adding accepted content-type: ~p(~p)", [EncType, FromFun]),
[{{Type, SubType, '*'}, FromFun} | Acc];
content_type_accepted_fold({_,_,_}=EncType, Acc, FromFun, _ClientCT) ->
lager:debug("adding accepted content-type: ~p(~p)", [EncType, FromFun]),
[{EncType, FromFun} | Acc].

-spec languages_provided(cowboy_req:req(), cb_context:context()) ->
{kz_term:ne_binaries(), cowboy_req:req(), cb_context:context()}.
Expand Down
1 change: 1 addition & 0 deletions applications/crossbar/src/api_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
,get_http_verb/2
,get_auth_token/2
,get_pretty_print/2
,get_content_type/1
,is_authentic/2, is_early_authentic/2
,is_permitted/2
,is_known_content_type/2
Expand Down
3 changes: 3 additions & 0 deletions applications/crossbar/src/crossbar_types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
-type media_value() :: {content_type(), non_neg_integer(), list()}.
-type media_values() :: [media_value()].

-type content_conversion_fun() :: atom().
-type content_type_callbacks() :: [{content_type(), content_conversion_fun()}].

-define(MEDIA_VALUE(Type, SubType, Weight, Options, Extensions)
,{{Type, SubType, Options}, Weight, Extensions}
).
Expand Down
74 changes: 65 additions & 9 deletions applications/pusher/src/modules/pm_apple.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,31 +50,69 @@ handle_cast({'push', JObj}, #state{tab=ETS}=State) ->
maybe_send_push_notification(get_apns(TokenApp, ETS), JObj),
{'noreply', State};
handle_cast('stop', State) ->
{'stop', 'normal', State}.
{'stop', 'normal', State};
handle_cast(_Msg, State) ->
lager:debug_unsafe("unhandled cast => ~p", [_Msg]),
{'ok', State}.

-spec handle_info(any(), state()) -> kz_types:handle_info_ret_state(state()).
handle_info(_Request, State) ->
handle_info({'DOWN', Ref, 'process', Pid, Reason}, #state{tab=ETS}=State) ->
_ = case ets:lookup(ETS, Ref) of
[{Ref, App}] ->
lager:warning("received down message for ~s / ~p / ~p => ~p", [App, Pid, Ref, Reason]),
ets:delete(ETS, Ref),
ets:delete(ETS, App),
erlang:send_after(?MILLISECONDS_IN_SECOND * 5, self(), {'reload', App});
_ ->
lager:critical("app not found")
end,
{'noreply', State};
handle_info({'reload', App}, #state{tab=ETS}=State) ->
_ = reload_apns(App, ETS),
{'noreply', State};
handle_info(_Msg, State) ->
lager:debug_unsafe("unhandled message => ~p", [_Msg]),
{'noreply', State}.

-spec terminate(any(), state()) -> 'ok'.
terminate(_Reason, #state{tab=ETS}) ->
apns:stop(),
ets:delete(ETS),
'ok'.

-spec code_change(any(), state(), any()) -> {'ok', state()}.
code_change(_OldVsn, State, _Extra) ->
{'ok', State}.

-spec maybe_send_push_notification(push_app(), kz_json:object()) -> any().
-spec join_headers(map()) -> binary().
join_headers(Headers) ->
kz_binary:join([list_to_binary([kz_term:to_binary(K), "=", kz_term:to_binary(V)])
|| {K, V} <- maps:to_list(Headers)
]
,<<",">>
).

-spec maybe_send_push_notification(push_app(), kz_json:object()) -> 'ok'.
maybe_send_push_notification('undefined', _) -> 'ok';
maybe_send_push_notification({Pid, ExtraHeaders}, JObj) ->
TokenID = kz_json:get_value(<<"Token-ID">>, JObj),
Topic = apns_topic(JObj),
Headers = kz_maps:merge(#{apns_topic => Topic}, ExtraHeaders),
Msg = build_payload(JObj),
lager:debug_unsafe("pushing topic ~s for token-id ~s : ~s", [Topic, TokenID, kz_json:encode(kz_json:from_map(Msg), ['pretty'])]),
{Result, _Props, _Ignore} = apns:push_notification(Pid, TokenID, Msg, Headers),
lager:debug("apns result for ~s : ~B", [Topic, Result]).
lager:debug_unsafe("pushing ~s for token-id ~s : ~s"
,[join_headers(Headers)
,TokenID
,kz_json:encode(kz_json:from_map(Msg), ['pretty'])
]
),
try
Result = apns:push_notification(Pid, TokenID, Msg, Headers),
lager:debug_unsafe("apns result for ~s : ~p", [Topic, Result])
catch
?CATCH(Ex, Msg, _ST) ->
lager:error_unsafe("PUBLISH ERROR => ~p / ~p", [Ex, Msg]),
?LOGSTACK(_ST)
end.

-spec build_payload(kz_json:object()) -> map().
build_payload(JObj) ->
Expand All @@ -88,6 +126,13 @@ map_key(K, V, JObj) ->
{_, K1} -> kz_json:set_value(K1, V, JObj)
end.

-spec reload_apns(kz_term:api_binary(), ets:tid()) -> 'ok' | reference().
reload_apns(App, ETS) ->
case get_apns(App, ETS) of
'undefined' -> erlang:send_after(?MILLISECONDS_IN_SECOND * 5, self(), {'reload', App});
_Push -> 'ok'
end.

-spec get_apns(kz_term:api_binary(), ets:tid()) -> push_app().
get_apns('undefined', _) -> 'undefined';
get_apns(App, ETS) ->
Expand Down Expand Up @@ -119,20 +164,31 @@ maybe_load_apns(App, ETS, CertBin, Host, Headers) ->
Connection = #{name => kz_term:to_atom(<<"apns_", App/binary>>, 'true')
,apple_host => kz_term:to_list(Host)
,apple_port => 443
,type => 'certdata'
,certdata => Cert
,keydata => Key
,timeout => 10000
,type => 'certdata'
,options => #{transport => 'tls'
,trace => 'false'
,http2_opts => #{keepalive => ?MILLISECONDS_IN_MINUTE * 15}
}
},
case apns:connect(Connection) of
try apns:connect(Connection) of
{'ok', Pid} ->
ets:insert(ETS, {App, {Pid, Headers}}),
Ref = erlang:monitor('process', Pid),
ets:insert(ETS, {Ref, App}),
{Pid, Headers};
{'error', {'already_started', Pid}} ->
apns:close_connection(Pid),
maybe_load_apns(App, ETS, CertBin, Host, Headers);
{'error', Reason} ->
lager:error("Error loading apns ~p", [Reason]),
lager:error("error loading apns ~p", [Reason]),
'undefined'
catch
?CATCH(_Er, _Ex,_ST) ->
lager:error("error loading apns ~p / ~p", [_Er, _Ex]),
?LOGSTACK(_ST),
'undefined'
end.

Expand Down
4 changes: 2 additions & 2 deletions applications/pusher/src/pusher.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{application,pusher,
[{applications,[apns,fcm,kazoo,kazoo_amqp,kazoo_apps,kazoo_data,
kazoo_sip,kazoo_stdlib,kernel,lager,public_key,
stdlib]},
kazoo_documents,kazoo_sip,kazoo_stdlib,kazoo_web,
kernel,lager,public_key,stdlib]},
{description,"pusher - wake the dead"},
{env,[{is_kazoo_app,true}]},
{mod,{pusher_app,[]}},
Expand Down
12 changes: 12 additions & 0 deletions applications/pusher/src/pusher.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,17 @@

-type push_app() :: {kz_term:api_pid(), map()} | 'undefined'.

-ifdef(OTP_RELEASE).
%% >= OTP 21
-define(CATCH(Type, Reason, Stacktrace), Type:Reason:Stacktrace).
-define(LOGSTACK(Stacktrace), kz_util:log_stacktrace(Stacktrace)).
-define(STACK(Stacktrace), Stacktrace).
-else.
%% =< OTP 20
-define(CATCH(Type, Reason, Stacktrace), Type:Reason).
-define(LOGSTACK(Stacktrace), kz_util:log_stacktrace()).
-define(STACK(Stacktrace), erlang:get_stacktrace()).
-endif.

-define(PUSHER_HRL, 'true').
-endif.
Loading

0 comments on commit 7aec432

Please sign in to comment.