Skip to content

Commit

Permalink
Telemetry code refactor. (#3948)
Browse files Browse the repository at this point in the history
* code refactor.

Signed-off-by: sreepuramsudheer <[email protected]>
  • Loading branch information
sreepuramsudheer authored Nov 26, 2024
1 parent d3241c8 commit 37a069c
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 48 deletions.
116 changes: 77 additions & 39 deletions src/oc_erchef/apps/chef_telemetry/src/chef_telemetry_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
]).

-record(current_scan, {
total_nodes = 0,
active_nodes = 0,
company_name = <<"">>,
fqdns = [],
license_id,
scan_start_time,
scan_end_time,
total_nodes,
active_nodes,
company_name
scan_end_time
}).

-record(state, {
Expand Down Expand Up @@ -91,6 +93,10 @@ handle_cast(send_data, State) ->
try send_data(State) of
State1 -> State1
catch
_:_ when State#state.ctl_command /= "Hab infra server" andalso State#state.ctl_command /= "chef-server-ctl" ->
timer:apply_after(60 * 1000, gen_server, cast, [self(), send_data]),
sqerl:execute(<<"delete from telemetry where property = 'last_send'">>),
State;
_:_ ->
State
end,
Expand Down Expand Up @@ -145,16 +151,17 @@ send_data(State) ->
case chef_telemetry:is_enabled() of
true ->
State1 = init_req(State),
Hostname = get_fqdn(),
case check_send(Hostname) of
NodeName = to_binary("NODE:" ++ binary:bin_to_list(envy:get(oc_chef_wm, actions_fqdn, <<"">>, binary))),
case check_send(NodeName) of
true ->
[{_Server, ServerVersion, _, _}] = release_handler:which_releases(permanent),
State2 = get_nodes(State1),
State3 = get_company_name(State2),
State4 = get_api_fqdn(State3),
Req = generate_request(ServerVersion, State4),
send_req(Req, State3),
State3;
Funs = [fun get_total_nodes/1, fun get_active_nodes/1, fun get_company_name/1, fun get_api_fqdn/1, fun determine_license_id/1],
Pid = self(),
Res = [ erlang:spawn_monitor(runner(Pid, State1, Fun)) || Fun <- Funs ],
Current_scan = gather_res(Res, State1#state.current_scan, length(Funs)),
Req = generate_request(ServerVersion, State1#state{current_scan = Current_scan}),
send_req(Req, State1),
State1;
_ ->
State1
end;
Expand All @@ -163,15 +170,15 @@ send_data(State) ->
end,
State6.

get_api_fqdn(State) ->
sqerl:execute(<<"delete from telemetry where property like 'FQDN:%' and event_timestamp < (current_timestamp - interval '86700')">>),
case sqerl:execute(<<"select trim(property) as property from telemetry where property like 'FQDN:%'">>) of
get_api_fqdn(_State) ->
sqerl:execute(<<"delete from telemetry where property like 'NODE:%' and event_timestamp < (current_timestamp - interval '86700')">>),
case sqerl:execute(<<"select trim(property) as property from telemetry where property like 'NODE:%'">>) of
{ok, Rows} when is_list(Rows) ->
FQDNs = [binary:part(FQDN, 5, size(FQDN) -5) || [{<<"property">>, FQDN}] <- Rows],
FQDNs1 = mask(FQDNs),
State#state{fqdns = FQDNs1};
FQDNs1;
_ ->
State
[]
end.

get_org_nodes(OrgName, Query1, ReqId, DbContext) ->
Expand Down Expand Up @@ -208,7 +215,7 @@ get_license_company_name()->
{_Lic, _Type, _GracePeriod, _ExpDate, _Msg, CN,_LID} = chef_license:get_license(),
CN.

determine_license_id()->
determine_license_id(_State)->
{_Lic, _Type, _GracePeriod, _ExpDate, _Msg, _CN, LicenseID} = chef_license:get_license(),
case LicenseID of
undefined ->
Expand All @@ -221,8 +228,7 @@ determine_license_id()->
LicenseID
end.


get_company_name(State) ->
get_company_name(_State) ->
CompanyName =
case get_license_company_name() of
CN when CN =:= undefined; CN=:= <<"">>; CN =:= "" ->
Expand Down Expand Up @@ -250,9 +256,7 @@ get_company_name(State) ->
end;
CN -> CN
end,
CurrentScan = State#state.current_scan,
State#state{
current_scan = CurrentScan#current_scan{company_name = CompanyName}}.
CompanyName.

get_most_occuring(List) ->
FirstElement = lists:nth(1, List),
Expand All @@ -274,17 +278,20 @@ get_most_occuring(List) ->
Res1 = maps:fold(Fun1, {FirstElement, 0}, Map1),
element(1, Res1).

get_nodes(#state{req_id = ReqId, db_context = DbContext} = State) ->
get_total_nodes(State) ->
Count =
case chef_db:count_nodes(State#state.db_context) of
Count1 when is_integer(Count1) -> Count1;
Error -> throw({db_error, Error})
end,
Count.

get_active_nodes(#state{req_id = ReqId, db_context = DbContext} = State) ->
CurrentScan = State#state.current_scan,
ScanStartTime = CurrentScan#current_scan.scan_start_time,
ScanEndTime = CurrentScan#current_scan.scan_end_time,
QueryString = lists:flatten(io_lib:format("ohai_time:{~p TO ~p}", [ScanStartTime, ScanEndTime])),
Query1 = chef_index:query_from_params("node", QueryString, "0", "10000"),
Count =
case chef_db:count_nodes(DbContext) of
Count1 when is_integer(Count1) -> Count1;
Error -> throw({db_error, Error})
end,
Orgs =
case chef_db:list(#oc_chef_organization{}, DbContext) of
Orgs1 when is_list(Orgs1) -> Orgs1;
Expand All @@ -295,15 +302,12 @@ get_nodes(#state{req_id = ReqId, db_context = DbContext} = State) ->
Sum + Nodes
end,
ActiveNodes = lists:foldl(Fun, 0, Stats),
State#state{
current_scan = CurrentScan#current_scan{
total_nodes = Count,
active_nodes = ActiveNodes}}.
ActiveNodes.

generate_request(ServerVersion, State) ->
CurrentScan = State#state.current_scan,
Res = jiffy:encode({[
{<<"licenseId">>, determine_license_id()},
{<<"licenseId">>, CurrentScan#current_scan.license_id},
{<<"customerName">>, State#state.current_scan#current_scan.company_name},
{<<"periods">>, [
{[
Expand Down Expand Up @@ -337,7 +341,7 @@ generate_request(ServerVersion, State) ->
{<<"Infra Server">>, {[
{<<"deploymentType">>, <<"">>},
{<<"instanceId">>, <<"">>},
{<<"fqdn">>, State#state.fqdns},
{<<"fqdn">>, CurrentScan#current_scan.fqdns},
{<<"config_location">>, to_binary(State#state.running_file)},
{<<"binary_location">>, to_binary(State#state.ctl_command)}
]}}
Expand Down Expand Up @@ -373,10 +377,6 @@ check_send(Hostname) ->
Error
end.

get_fqdn() ->
HostName = binary:bin_to_list(envy:get(oc_chef_wm, actions_fqdn, <<"">>, binary)),
to_binary("FQDN:" ++ HostName).

mask(FQDNs) ->
Join = fun(Elements, Separator) ->
[H | T] = Elements,
Expand Down Expand Up @@ -439,3 +439,41 @@ mask(FQDNs) ->
end
end,
lists:map(Fun, FQDNs).

runner(Parent, State, Fun) ->
fun() ->
Res = Fun(State),
Parent ! {result, self(), Res}
end.

gather_res(_Ids, Res, Count) when Count =< 0->
Res;

gather_res(Ids, Res, Count) ->
Fun = fun(Id) ->
fun({Id1,_}) ->
Id =/= Id1
end
end,
receive
{result, Id, Ret} ->
case lists:search(Fun(Id), Ids) of
{value, _} ->
Res1 = erlang:setelement(length(lists:takewhile(Fun(Id), Ids)) + 2, Res, Ret),
gather_res(Ids, Res1, Count - 1);
_ ->
gather_res(Ids, Res, Count)
end;
{'DOWN', _Ref, process, _Id, normal} ->
gather_res(Ids, Res, Count);
{'DOWN', _Ref, process, Id, _Reason} ->
case lists:search(Fun(Id), Ids) of
{value, _} ->
gather_res(Ids, Res, Count - 1);
_ ->
gather_res(Ids, Res, Count)
end
after
60000 ->
Res
end.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ enable_flag_test() ->
execute(State, Expected, Env) ->
set_env([{chef_telemetry, reporting_url, "http://127.0.0.1:9001/esi/payload:io"}] ++ Env),
application:start(ibrowse),
put(state, State),
ets:new(telemetry_worker_test, [set, public, named_table]),
ets:insert(telemetry_worker_test, {state, State}),
setup(),
chef_telemetry_test_utils:start_server([]),
register(telemetry_mock_consumer, self()),
Expand All @@ -91,28 +92,28 @@ setup() ->
meck:expect(release_handler, which_releases, fun(_) -> [{"chef_server", "15.9.38", [], []}] end),
meck:expect(stats_hero, ctime, fun(_, _, Fun) -> Fun() end).

get_execute(<<"select trim(property) as property from telemetry where property like 'FQDN:%'">>) ->
State = get(state),
get_execute(<<"select trim(property) as property from telemetry where property like 'NODE:%'">>) ->
[{state, State}] = ets:lookup(telemetry_worker_test, state),
State#state.fqdn_select;

get_execute(<<"select telemetry_check_send('", _/binary>>) ->
State = get(state),
[{state, State}] = ets:lookup(telemetry_worker_test, state),
State#state.should_send;

get_execute(_) ->
ok.

adhoc_select([<<"email">>], <<"users">>, all) ->
State = get(state),
[{state, State}] = ets:lookup(telemetry_worker_test, state),
{ok, State#state.user_emails}.

count_nodes(_Context) ->
State = get(state),
[{state, State}] = ets:lookup(telemetry_worker_test, state),
State#state.nodes_count.

chef_db_list(Record, _context) ->
RecordName = element(1, Record),
State = get(state),
[{state, State}] = ets:lookup(telemetry_worker_test, state),
case RecordName of
oc_chef_organization -> State#state.organizations;
_ -> []
Expand All @@ -123,10 +124,10 @@ org_metadata(_context, OrgName) ->
{OrgName1, OrgName1}.

index_search(_) ->
State = get(state),
[{state, State}] = ets:lookup(telemetry_worker_test, state),
[Nodes | Rest] = State#state.index_search,
State1 = State#state{index_search = Rest},
put(state, State1),
ets:insert(telemetry_worker_test, {state, State1}),
{ok, 0, length(Nodes), Nodes}.

trigger_send_data() ->
Expand Down

0 comments on commit 37a069c

Please sign in to comment.