Skip to content

Commit

Permalink
Extra formatting of logstash reports as json data. (#13)
Browse files Browse the repository at this point in the history
* Extra formatting of logstash reports as json data.

Fallback to 'text' field in report if no 'msg' field available.

* Fix types, add newer OTP releases

* Update test

* Send newline after data
  • Loading branch information
mworrell authored Jul 15, 2024
1 parent e1813a2 commit 18c753c
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
otp: [22.3, 23.3, 24.2]
otp: [24.3, 25, 26]
rebar: [3.18.0]
steps:
- uses: actions/checkout@v2
Expand Down
27 changes: 0 additions & 27 deletions .github/workflows/hex.yaml

This file was deleted.

31 changes: 19 additions & 12 deletions src/logstasher.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,33 @@ start_link() ->
%% Supervisor callbacks
%%==============================================================================

-spec init(term()) -> {ok, maps:map()} | {stop, maps:map()}.
-type state() :: #{
transport := module(),
host := string() | binary() | tuple(),
port := pos_integer(),
socket => gen_udp:socket() | gen_tcp:socket() | undefined
}.

-spec init(_) -> {ok, state()}.
init(_) ->
Transport = application:get_env(?MODULE, transport, ?LOGSTASH_TRANSPORT),
Host = application:get_env(?MODULE, host, ?LOGSTASH_HOST),
Port = application:get_env(?MODULE, port, ?LOGSTASH_PORT),
Opts = #{transport => Transport, host => Host, port => Port},
State = Opts#{socket => connect(Opts)},
Opts = #{ transport => Transport, host => Host, port => Port },
State = Opts#{ socket => connect(Opts) },
{ok, State}.

-spec handle_call({send, binary()}, any(), maps:map()) ->
{reply, ok | {error, atom() | {timeout, binary()}}, maps:map()}.
-spec handle_call({send, binary()}, _, state()) ->
{reply, ok | {error, atom() | {timeout, binary()}}, state()}.
handle_call({send, Data}, _, State) ->
Result = maybe_send(Data, State),
{reply, Result, State}.

-spec handle_cast(term(), maps:map()) -> {noreply, maps:map()}.
-spec handle_cast(_, state()) -> {noreply, state()}.
handle_cast(_, State) ->
{noreply, State}.

-spec terminate(term(), maps:map()) -> ok.
-spec terminate(_, state()) -> ok.
terminate(_, #{transport := tcp, socket := Socket}) ->
gen_tcp:close(Socket);
terminate(_, #{transport := udp, socket := Socket}) ->
Expand All @@ -90,7 +97,7 @@ terminate(_, #{transport := console}) ->
%% Internal functions
%%==============================================================================

-spec connect(maps:map()) -> gen_udp:socket() | gen_tcp:socket() | undefined.
-spec connect(state()) -> gen_udp:socket() | gen_tcp:socket() | undefined.
connect(#{transport := tcp, host := Host, port := Port}) ->
Opts = [binary, {active, false}, {keepalive, true}],
case gen_tcp:connect(Host, Port, Opts, ?TCP_CONNECT_TIMEOUT) of
Expand All @@ -112,7 +119,7 @@ connect(#{transport := udp}) ->
connect(#{transport := console}) ->
undefined.

-spec maybe_send(binary(), maps:map()) -> ok | {error, atom()}.
-spec maybe_send(binary(), map()) -> ok | {error, atom()}.
maybe_send(Data, #{transport := console} = State) ->
send(Data, State);
maybe_send(Data, #{socket := undefined} = State) ->
Expand All @@ -124,12 +131,12 @@ maybe_send(Data, State) ->
{error, _} = Error -> Error
end.

-spec send(binary(), maps:map()) -> ok | {error, atom()}.
-spec send(binary(), map()) -> ok | {error, atom()}.
send(Data, #{transport := console}) ->
io:put_chars([ Data, "\n"]);
send(_Data, #{socket := undefined}) ->
{error, closed};
send(Data, #{transport := tcp, socket := Socket}) ->
gen_tcp:send(Socket, Data);
gen_tcp:send(Socket, [ Data, "\n" ]);
send(Data, #{transport := udp, socket := Socket, host := Host, port := Port}) ->
gen_udp:send(Socket, Host, Port, Data).
gen_udp:send(Socket, Host, Port, [ Data, "\n" ]).
186 changes: 156 additions & 30 deletions src/logstasher_h.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@
%% logger callbacks
-export([log/2]).

%% Testing
-export([log_data/1]).

%% Xref ignores
-ignore_xref([log/2]).
-ignore_xref([log/2, log_data/1]).

%% Truncate binary values beyond this size.
-define(LOG_BINARY_SIZE, 2000).

%%==============================================================================
%% API
Expand All @@ -14,61 +20,181 @@
log(#{level := info, meta := #{error_logger := #{type := progress}}}, _Config) ->
% Ignore supervisor progress reports
ok;
log(#{level := Level, msg := EventData, meta := Meta}, _Config) ->
{Msg, MsgFields} = format_msg(EventData),
Fields = [{severity, Level}] ++ safe_meta(Meta) ++ MsgFields,
Data = #{fields => Fields, '@timestamp' => format_timestamp(Meta), message => Msg},
_ = logstasher:send(jsx:encode(Data)),
ok.
log(LogEvent, _Config) ->
try
Data = log_data(LogEvent),
_ = logstasher:send(jsx:encode(Data)),
ok
catch
_:_ ->
% Ignore crashes on unexpected data, as that would remove the log
% handler from the logger and stop logging.
ok
end.

%%==============================================================================
%% Internal functions
%%==============================================================================

-spec format_msg(Data) -> {binary(), [{binary() | atom(), jsx:json_term()}]} when
Data :: {io:format(), [term()]}
| {report, logger:report()}
| {string, unicode:chardata()}.
log_data(#{level := Level, msg := EventData, meta := Meta}) ->
{Msg, MsgFields} = format_msg(EventData),
{Msg1, MsgFields1} = maybe_extract_message(Msg, MsgFields),
Fields = maps:merge(safe_meta(Meta), MsgFields1),
Fields1 = Fields#{ severity => Level },
#{
fields => Fields1,
'@timestamp' => format_timestamp(Meta),
message => Msg1
}.

%% @doc If there is no message, try to extract the 'text' fields from the message fields
%% and use that as the message.
maybe_extract_message(null, #{ text := Text } = MsgFields) when is_binary(Text) ->
{Text, maps:remove(text, MsgFields)};
maybe_extract_message(Msg, MsgFields) ->
{Msg, MsgFields}.


-spec format_msg(Data) -> {Message, #{ Key => Value } } when
Data :: {io:format(), [ term() ]}
| {report, logger:report()}
| {string, unicode:chardata()},
Message :: binary() | null,
Key :: binary() | atom(),
Value :: jsx:json_term().
format_msg({string, Message}) ->
{unicode:characters_to_binary(Message), []};
{unicode:characters_to_binary(Message), #{}};
format_msg({report, Report}) when is_map(Report) ->
format_msg({report, maps:to_list(Report)});
{maps:get(msg, Report, null), safe_fields(Report)};
format_msg({report, Report}) when is_list(Report) ->
{proplists:get_value(msg, Report, null), safe_fields(Report)};
format_msg({Format, Params}) ->
{unicode:characters_to_binary(io_lib:format(Format, Params)), []}.
format_msg({report, maps:from_list(Report)});
format_msg({"Error in process ~p on node ~p with exit value:~n~p~n", [_, _, {undef, Undef}]}) ->
format_undef(Undef);
format_msg({Format, Params}) when is_list(Format), is_list(Params) ->
{unicode:characters_to_binary(io_lib:format(Format, Params)), #{}};
format_msg(Other) ->
{unicode:characters_to_binary(io_lib:format("~p", [ Other ])), #{}}.

format_undef([ {Module, Function, Args, _} | _ ] = Stack) when is_list(Args) ->
Arity = length(Args),
Message = io_lib:format("Undefined function ~p:~p/~p", [Module, Function, Arity]),
Report = #{
result => error,
reason => undef,
module => Module,
function => Function,
args => Args,
stack => Stack
},
{unicode:characters_to_binary(Message), safe_fields(Report)}.

-spec format_timestamp(logger:metadata()) -> binary().
format_timestamp(#{time := Ts}) ->
list_to_binary(calendar:system_time_to_rfc3339(Ts, [{unit, microsecond}, {offset, "Z"}])).

-spec safe_meta(logger:metadata()) -> [{binary() | atom(), jsx:json_term()}].
-spec safe_meta(logger:metadata()) -> #{ Key => Term } when
Key :: binary() | atom(),
Term :: jsx:json_term().
safe_meta(Meta) ->
safe_fields(maps:to_list(Meta)).
safe_fields(Meta).

-spec safe_fields([{term(), term()}]) -> [{binary() | atom(), jsx:json_term()}].
-spec safe_fields(map()) -> map().
safe_fields(Terms) ->
lists:map(fun safe_field/1, Terms).
maps:fold(
fun(K, V, Acc) ->
{K1, V1} = safe_field(K, V),
Acc#{ K1 => V1 }
end,
#{},
Terms).

-spec safe_field({atom() | binary() | atom(), term()}) -> {atom() | binary(), jsx:json_term()}.
safe_field({Key, Value}) when is_atom(Key); is_binary(Key) ->
-spec safe_field(atom() | binary() | string(), term()) -> {atom() | binary(), jsx:json_term()}.
safe_field(stack, Stack) when is_list(Stack) ->
{stack, safe_stack(Stack)};
safe_field(file, Filename) when is_list(Filename) ->
{file, unicode:characters_to_binary(Filename)};
safe_field(Key, Value) when is_atom(Key); is_binary(Key) ->
{Key, safe_value(Value)};
safe_field({Key, Value}) when is_list(Key) ->
safe_field({list_to_binary(Key), Value}).
safe_field(Key, Value) when is_list(Key) ->
safe_field(unicode:characters_to_binary(Key), Value).

safe_stack(Stack) ->
lists:map(fun safe_stack_entry/1, Stack).

safe_stack_entry({Mod, Fun, Args, _}) when is_atom(Mod), is_atom(Fun), is_list(Args) ->
Arity = length(Args),
Function = io_lib:format("~p:~p/~p", [Mod, Fun, Arity]),
#{
function => unicode:characters_to_binary(Function)
};
safe_stack_entry({Mod, Fun, Arity, Loc}) when is_atom(Mod), is_atom(Fun), is_integer(Arity) ->
Function = io_lib:format("~p:~p/~p", [ Mod, Fun, Arity ]),
#{
function => unicode:characters_to_binary(Function),
at => unicode:characters_to_binary([stack_file(Loc), $:, integer_to_binary(stack_line(Loc))])
};
safe_stack_entry(Entry) ->
safe_value(Entry).

stack_file(Loc) when is_list(Loc) -> proplists:get_value(file, Loc, "");
stack_file({File, _}) -> File;
stack_file({File, _, _}) -> File;
stack_file(_) -> "".

stack_line([ {_, _} | _ ] = Loc) -> proplists:get_value(line, Loc, "");
stack_line({_, Line}) -> Line;
stack_line({_, Line, _}) -> Line;
stack_line(_) -> 0.

-spec safe_value(term()) -> jsx:json_term().
safe_value(Pid) when is_pid(Pid) ->
list_to_binary(pid_to_list(Pid));
safe_value([]) ->
[];
safe_value(List) when is_list(List) ->
case io_lib:char_list(List) of
true ->
list_to_binary(List);
case is_proplist(List) of
true -> safe_value(map_from_proplist(List));
false ->
lists:map(fun safe_value/1, List)
case is_ascii_list(List) of
true -> unicode:characters_to_binary(List);
false -> lists:map(fun safe_value/1, List)
end
end;
safe_value(Map) when is_map(Map) ->
safe_fields(Map);
safe_value(undefined) ->
null;
safe_value(Val) when is_binary(Val); is_atom(Val); is_integer(Val) ->
safe_value(Val) when is_atom(Val); is_number(Val) ->
Val;
safe_value(Val) when is_binary(Val) ->
maybe_truncate(Val);
safe_value(Val) ->
unicode:characters_to_binary(io_lib:format("~p", [Val])).
maybe_truncate(unicode:characters_to_binary(io_lib:format("~p", [Val]))).

% Map a proplists to a map
map_from_proplist(L) ->
lists:foldl(
fun
({K,V}, Acc) -> Acc#{ K => V };
(K, Acc) -> Acc#{ K => true }
end,
#{},
L).

% If something is a proplist, then we will display it as a map.
is_proplist([]) -> true;
is_proplist([ {K, _} | T ]) when is_atom(K); is_binary(K) -> is_proplist(T);
is_proplist([ K | T ]) when is_atom(K) -> is_proplist(T);
is_proplist(_) -> false.

% Simple ASCII character string, typically SQL statements, filenames or literal texts.
is_ascii_list([]) -> true;
is_ascii_list([ C | T ]) when C >= 32, C =< 127 -> is_ascii_list(T);
is_ascii_list([ C | T ]) when C =:= $\n; C =:= $\t -> is_ascii_list(T);
is_ascii_list(_) -> false.

maybe_truncate(Bin) when size(Bin) >= ?LOG_BINARY_SIZE ->
<<Truncated:?LOG_BINARY_SIZE/binary, _/binary>> = Bin,
<<Truncated/binary, "...">>;
maybe_truncate(Bin) ->
Bin.
33 changes: 29 additions & 4 deletions test/logstasher_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,25 @@

-export([all/0, groups/0, init_per_testcase/2, end_per_testcase/2]).

-export([logstasher_udp/1, logstasher_tcp/1]).
-export([logstasher_udp/1, logstasher_tcp/1, logstasher_message/1]).

-spec all() -> [ct_suite:ct_test_def(), ...].
all() ->
[{group, logstasher}].

-spec groups() -> [ct_suite:ct_group_def(), ...].
groups() ->
[{logstasher, [sequence], [logstasher_udp, logstasher_tcp]}].
[{logstasher, [sequence], [
logstasher_udp,
logstasher_tcp,
logstasher_message
]}].

-spec init_per_testcase(ct_suite:ct_testname(), ct_suite:ct_config()) ->
ct_suite:ct_config() | {fail, term()} | {skip, term()}.
init_per_testcase(_Name, Config) ->
ok = logger:add_handler(logstash, logstasher_h, #{level => info}),
ok = logger:update_primary_config(#{level => all}),
Config;
init_per_testcase(_Name, Config) ->
Config.

-spec end_per_testcase(ct_suite:ct_testname(), ct_suite:ct_config()) ->
Expand Down Expand Up @@ -105,3 +107,26 @@ logstasher_tcp(_Config) ->
end,

?assertEqual(list_to_binary(ErrorMsg), Msg).


-spec logstasher_message(ct_suite:ct_config()) -> ok | no_return().
logstasher_message(_Config) ->
{ok, _Started} = application:ensure_all_started(logstasher),
#{
message := <<"Hello">>,
fields := #{ msg := <<"Hello">>, severity := info }
} = logstasher_h:log_data(#{
level => info,
msg => {report, #{ msg => <<"Hello">> }},
meta => #{ time => 0 }
}),
#{
message := <<"Hello">>,
fields := Fields2
} = logstasher_h:log_data(#{
level => info,
msg => {report, #{ text => <<"Hello">> }},
meta => #{ time => 0 }
}),
false = maps:is_key(text, Fields2),
ok.

0 comments on commit 18c753c

Please sign in to comment.