From 0b2ee434f78c9569f8e55e6c2c3ffc3b5f20fd4e Mon Sep 17 00:00:00 2001 From: Brujo Benavides Date: Wed, 22 Apr 2020 15:20:33 +0200 Subject: [PATCH] [RTI-8010] Update dependencies and ensure the project works with OTP22 (#22) * [RTI-8010] Update dependencies and ensure the project works with OTP22 * [RTI-8010] Apply formatting and linting --- .travis.yml | 3 +- elvis.config | 34 ++ include/erlmld.hrl | 53 +-- rebar.config | 14 +- rebar.lock | 8 +- src/erlmld_app.erl | 17 +- src/erlmld_batch_processor.erl | 367 ++++++++-------- src/erlmld_flusher.erl | 27 +- src/erlmld_noisy_wrk.erl | 36 +- src/erlmld_runner.erl | 136 +++--- src/erlmld_sup.erl | 53 +-- src/erlmld_tcp_acceptor.erl | 37 +- src/erlmld_worker.erl | 38 +- src/erlmld_wrk_statem.erl | 737 +++++++++++++++------------------ src/erlmld_wrk_sup.erl | 17 +- src/kpl_agg.erl | 331 +++++++-------- 16 files changed, 905 insertions(+), 1003 deletions(-) create mode 100644 elvis.config diff --git a/.travis.yml b/.travis.yml index 8cfeddc..9aed5ab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,8 +4,8 @@ os: - linux otp_release: - - 20.3 - 21.3 + - 22.3 notifications: email: rtb-team+travis@adroll.com @@ -15,4 +15,5 @@ script: - export PATH=/home/travis/.cache/rebar3/bin:$PATH - rebar3 --version - erl -version + - rebar3 format --verify - rebar3 test diff --git a/elvis.config b/elvis.config new file mode 100644 index 0000000..e06379c --- /dev/null +++ b/elvis.config @@ -0,0 +1,34 @@ +[ + { + elvis, + [ + {config, + [#{dirs => ["src"], + filter => "*.erl", + ruleset => erl_files, + ignore => ["src/kpl_agg_pb.erl"], + rules => [ + {elvis_style, line_length, #{limit => 120, skip_comments => whole_line}}, + {elvis_style, nesting_level, #{level => 4}}, + {elvis_style, dont_repeat_yourself, #{min_complexity => 20}}, + {elvis_style, invalid_dynamic_call, disable}, + {elvis_style, no_debug_call, #{ignore => [erlmld_noisy_wrk, erlmld_runner]}} + ] + }, + #{dirs => ["."], + filter => "*rebar.config", + ruleset => rebar_config, + rules => [ + %% Elixir deps use git@... + {elvis_project, protocol_for_deps_rebar, disable} + ] + }, + #{dirs => ["."], + filter => "elvis.config", + ruleset => elvis_config + } + ] + } + ] + } +]. diff --git a/include/erlmld.hrl b/include/erlmld.hrl index 993b3d9..106579e 100644 --- a/include/erlmld.hrl +++ b/include/erlmld.hrl @@ -1,35 +1,28 @@ -ifndef(ERLMLD_HRL). --define(ERLMLD_HRL, true). - --record(sequence_number, { - %% overall record sequence number: - base :: undefined | non_neg_integer() | atom(), - - %% record sub-sequence number for records using KPL aggregation: - sub :: undefined | non_neg_integer(), - - %% record sub-sequence number for records NOT using KPL aggregation. - %% erlmld supports its own KPL-like aggregation and will fill this - %% one when needed. For other use cases, your code is expected to - %% fake these when needed. - user_sub :: undefined | non_neg_integer(), - %% total number of records in aggregated KPL record: - %% (user_sub will range from 0 to user_total-1) - user_total :: undefined | non_neg_integer() - }). - --record(checkpoint, { - sequence_number :: undefined | sequence_number() - }). +-define(ERLMLD_HRL, true). --record(stream_record, { - partition_key :: binary(), - timestamp :: undefined | non_neg_integer(), % approximate arrival time (ms) - delay :: non_neg_integer(), % approximate delay between this record and tip of stream (ms) - sequence_number :: sequence_number(), - data :: term() - }). +-record(sequence_number, + {%% overall record sequence number: + base :: undefined | non_neg_integer() | atom(), + %% record sub-sequence number for records using KPL aggregation: + sub :: undefined | non_neg_integer(), + %% record sub-sequence number for records NOT using KPL aggregation. + %% erlmld supports its own KPL-like aggregation and will fill this + %% one when needed. For other use cases, your code is expected to + %% fake these when needed. + user_sub :: undefined | non_neg_integer(), + %% total number of records in aggregated KPL record: + %% (user_sub will range from 0 to user_total-1) + user_total :: undefined | non_neg_integer()}). +-record(checkpoint, {sequence_number :: undefined | sequence_number()}). +-record(stream_record, + {partition_key :: binary(), + timestamp :: undefined | non_neg_integer(), % approximate arrival time (ms) + delay :: + non_neg_integer(), % approximate delay between this record and tip of stream (ms) + sequence_number :: sequence_number(), + data :: term()}). -type worker_state() :: term(). -type shard_id() :: binary(). @@ -37,7 +30,6 @@ -type stream_record() :: #stream_record{}. -type shutdown_reason() :: terminate | zombie. -type checkpoint() :: #checkpoint{}. - %% Types used by the flusher behavior (see erlmld_flusher.erl). -type flusher_state() :: term(). -type flusher_token() :: term(). @@ -50,7 +42,6 @@ %% with version 1.1.1 of the dynamo streams adapter doesn't properly deaggregate (doesn't %% include subsequence numbers in the records we see). -define(KPL_AGG_MAGIC, <<16#00, 16#89, 16#9A, 16#C2>>). - %% magic number identifying deflate-compressed KPL record, compressed using %% zlib:compress/1. the KPL checksum trailer is included in the deflated data. -define(KPL_AGG_MAGIC_DEFLATED, <<16#01, 16#89, 16#9A, 16#C2>>). diff --git a/rebar.config b/rebar.config index db1d5b0..f8e54d6 100644 --- a/rebar.config +++ b/rebar.config @@ -1,14 +1,14 @@ % -*- mode: erlang -*- {erl_opts, [debug_info]}. -{deps, [ - {jiffy, "1.0.1"}, - {erlexec, "1.10.0"} - ]}. +{deps, [{jiffy, "1.0.4"}, {erlexec, "1.10.9"}]}. {plugins, [ - { rebar3_gpb_plugin, "2.8.2" } + {rebar3_gpb_plugin, "2.13.1"}, + rebar3_lint, + rebar3_format, + rebar3_hex ]}. {erl_opts, [{i, "./_build/default/plugins/gpb/include"}]}. @@ -50,4 +50,6 @@ {cover_enabled, true}. -{alias, [{test, [xref, dialyzer, eunit, cover]}]}. \ No newline at end of file +{alias, [{test, [format, lint, xref, dialyzer, eunit, cover]}]}. + +{format, [{files, ["src/*.erl", "include/*.hrl"]}]}. diff --git a/rebar.lock b/rebar.lock index 3117f12..6170ce8 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,8 +1,8 @@ {"1.1.0", -[{<<"erlexec">>,{pkg,<<"erlexec">>,<<"1.10.0">>},0}, - {<<"jiffy">>,{pkg,<<"jiffy">>,<<"1.0.1">>},0}]}. +[{<<"erlexec">>,{pkg,<<"erlexec">>,<<"1.10.9">>},0}, + {<<"jiffy">>,{pkg,<<"jiffy">>,<<"1.0.4">>},0}]}. [ {pkg_hash,[ - {<<"erlexec">>, <<"CBA7924CF526097D2082CEB0EC34E7DB6BCA2624B8F3867FB3FA89C4CF25D227">>}, - {<<"jiffy">>, <<"4F25639772CA41202F41BA9C8F6CA0933554283DD4742C90651E03471C55E341">>}]} + {<<"erlexec">>, <<"3CBB3476F942BFB8B68B85721C21C1835061CF6DD35F5285C2362E85B100DDC7">>}, + {<<"jiffy">>, <<"72ADEFF75C52A2FF07DE738F0813768ABE7CE158026CC1115A170340259C0CAA">>}]} ]. diff --git a/src/erlmld_app.erl b/src/erlmld_app.erl index a4aaf90..d97a4f2 100644 --- a/src/erlmld_app.erl +++ b/src/erlmld_app.erl @@ -2,9 +2,7 @@ -behaviour(application). --export([start/2, stop/1, - ensure_all_started/0]). - +-export([start/2, stop/1, ensure_all_started/0]). %% application callback. load default configuration from application environment, and %% start erlmd_sup with that configuration. @@ -12,15 +10,14 @@ start(_StartType, []) -> Opts = maps:from_list(application:get_all_env(erlmld)), erlmld_sup:start_link(Opts). - stop(_State) -> ok. - ensure_all_started() -> {ok, Deps} = application:get_key(erlmld, applications), - {ok, lists:append([begin - {ok, Started} = application:ensure_all_started(Dep), - Started - end - || Dep <- Deps])}. + {ok, + lists:append([begin + {ok, Started} = application:ensure_all_started(Dep), + Started + end + || Dep <- Deps])}. diff --git a/src/erlmld_batch_processor.erl b/src/erlmld_batch_processor.erl index bfa5a09..6bf475f 100644 --- a/src/erlmld_batch_processor.erl +++ b/src/erlmld_batch_processor.erl @@ -30,52 +30,43 @@ -behavior(erlmld_worker). --export([initialize/3, - ready/1, - process_record/2, - checkpointed/3, - shutdown/2]). +-export([initialize/3, ready/1, process_record/2, checkpointed/3, shutdown/2]). -include("erlmld.hrl"). --record(state, { - %% Handler module implementing the flusher behavior. - flusher_mod :: atom(), - - %% Flusher state (opaque to this module) passed to flusher_mod callbacks. - flusher_state :: flusher_state(), - - %% Optional callback to call each time process_record returns a checkpoint. - on_checkpoint :: fun((term(), shard_id()) -> term()), - - %% Optional, false by default. Tells whether to log or not every successful - %% checkpoint from the KCL worker. - log_checkpoints :: boolean(), - - description :: term(), - shard_id :: shard_id(), - count = 0, % non-ignored records seen - checkpoints = 0, % checkpoints requested - next_counter_checkpoint = 0, % counter value at which we can next checkpoint - last_flush_time = os:timestamp(), - last_checkpoint_time = os:timestamp(), - checkpointable = gb_trees:empty(), % {Count, SequenceNo} - - flush_interval_ms, - checkpoint_interval_ms, - watchdog_timeout_ms, - watchdog, - - enable_subsequence_checkpoints = false :: boolean() -}). - +-record(state, + {%% Handler module implementing the flusher behavior. + flusher_mod :: atom(), + %% Flusher state (opaque to this module) passed to flusher_mod callbacks. + flusher_state :: flusher_state(), + %% Optional callback to call each time process_record returns a checkpoint. + on_checkpoint :: fun((term(), shard_id()) -> term()), + %% Optional, false by default. Tells whether to log or not every successful + %% checkpoint from the KCL worker. + log_checkpoints :: boolean(), + description :: term(), + shard_id :: shard_id(), + count = 0, % non-ignored records seen + checkpoints = 0, % checkpoints requested + next_counter_checkpoint = 0, % counter value at which we can next checkpoint + last_flush_time = os:timestamp(), + last_checkpoint_time = os:timestamp(), + checkpointable = gb_trees:empty(), % {Count, SequenceNo} + flush_interval_ms, + checkpoint_interval_ms, + watchdog_timeout_ms, + watchdog, + enable_subsequence_checkpoints = false :: boolean()}). %%%=================================================================== %%% API %%%=================================================================== initialize(Opts, ShardId, ISN) -> - Defaults = #{on_checkpoint => fun(_, _) -> ok end, + Defaults = #{on_checkpoint => + fun (_, _) -> + ok + end, log_checkpoints => false, description => undefined, enable_subsequence_checkpoints => false}, @@ -87,99 +78,93 @@ initialize(Opts, ShardId, ISN) -> flush_interval_ms := FlushIntervalMs, checkpoint_interval_ms := CheckpointIntervalMs, watchdog_timeout_ms := WatchdogTimeoutMs, - enable_subsequence_checkpoints := EnableSubCP} = maps:merge(Defaults, Opts), - State = #state{ - flusher_mod = FlusherMod, - flusher_state = FlusherMod:init(ShardId, FlusherModData), - on_checkpoint = OnCheckpoint, - log_checkpoints = LogCheckpoints, - description = Description, - shard_id = ShardId, - flush_interval_ms = FlushIntervalMs, - checkpoint_interval_ms = CheckpointIntervalMs, - watchdog_timeout_ms = WatchdogTimeoutMs, - enable_subsequence_checkpoints = EnableSubCP - }, + enable_subsequence_checkpoints := EnableSubCP} = + maps:merge(Defaults, Opts), + State = #state{flusher_mod = FlusherMod, + flusher_state = FlusherMod:init(ShardId, FlusherModData), + on_checkpoint = OnCheckpoint, + log_checkpoints = LogCheckpoints, + description = Description, + shard_id = ShardId, + flush_interval_ms = FlushIntervalMs, + checkpoint_interval_ms = CheckpointIntervalMs, + watchdog_timeout_ms = WatchdogTimeoutMs, + enable_subsequence_checkpoints = EnableSubCP}, error_logger:info_msg("~p initialized for shard {~p,~p} at ~p~n", [self(), Description, ShardId, ISN]), {ok, update_watchdog(State)}. - ready(#state{flusher_mod = FMod, flusher_state = FState} = State) -> {ok, NFState, Tokens} = FMod:heartbeat(FState), NState = flusher_state(State, NFState), - NNState = - case Tokens of - [] -> - NState; - _ -> - note_success(note_flush(NState), Tokens) - end, + NNState = case Tokens of + [] -> + NState; + _ -> + note_success(note_flush(NState), Tokens) + end, maybe_checkpoint(update_watchdog(NNState)). - -process_record(#state{last_flush_time = LastFlush, - flush_interval_ms = FlushInterval} = State, +process_record(#state{last_flush_time = LastFlush, flush_interval_ms = FlushInterval} = + State, Record) -> {ok, NState} = case {add_record(State, Record), elapsed_ms(LastFlush)} of - {{ok, State1}, E} when E >= FlushInterval -> - {ok, flush(State1)}; - {{ok, State1}, _} -> - {ok, State1}; - {{error, full}, _} -> - add_record(flush(State), Record) - end, + {{ok, State1}, E} when E >= FlushInterval -> + {ok, flush(State1)}; + {{ok, State1}, _} -> + {ok, State1}; + {{error, full}, _} -> + add_record(flush(State), Record) + end, maybe_checkpoint(update_watchdog(NState)). checkpointed(#state{log_checkpoints = LogCheckpoints} = State, - SequenceNumber, - Checkpoint) -> + SequenceNumber, + Checkpoint) -> case LogCheckpoints of - true -> error_logger:info_msg("~p checkpointed at ~p (~p)~n", [State, Checkpoint, SequenceNumber]); - false -> ok + true -> + error_logger:info_msg("~p checkpointed at ~p (~p)~n", + [State, Checkpoint, SequenceNumber]); + false -> + ok end, {ok, State}. -shutdown(#state{description = Description, - shard_id = ShardId, - count = Count} = State, Reason) -> +shutdown(#state{description = Description, shard_id = ShardId, count = Count} = State, + Reason) -> error_logger:info_msg("{~p,~p} (~p) shutting down, reason: ~p~n", [Description, ShardId, Count, Reason]), case Reason of - zombie -> - %% we lost our lease, nothing else to do now. - ok; - - terminate -> - %% the shard is closing (e.g., as a result of a merge or split). we should - %% flush all outstanding data and checkpoint. - NState = flush_full(State), - case next_checkpoint(NState) of - - %% we flushed all items and are able to checkpoint. there shouldn't be - %% any more items left to checkpoint: - {CPState, CPSN} -> - error_logger:info_msg("{~p,~p} will checkpoint at ~p~n", - [Description, ShardId, CPSN]), - Remaining = checkpointable(CPState), - Remaining = gb_trees:empty(), - {ok, #checkpoint{sequence_number = CPSN}}; - - %% nothing was checkpointable. there shouldn't be any items left to - %% checkpoint: - undefined -> - error_logger:info_msg("{~p,~p} has nothing to checkpoint, will use latest~n", - [Description, ShardId]), - Remaining = checkpointable(NState), - Remaining = gb_trees:empty(), - %% we checkpoint with an empty sequence number. this will cause the - %% KCL to checkpoint at the most recent record it has seen on the - %% shard: - {ok, #checkpoint{}} - end + zombie -> + %% we lost our lease, nothing else to do now. + ok; + terminate -> + %% the shard is closing (e.g., as a result of a merge or split). we should + %% flush all outstanding data and checkpoint. + NState = flush_full(State), + case next_checkpoint(NState) of + %% we flushed all items and are able to checkpoint. there shouldn't be + %% any more items left to checkpoint: + {CPState, CPSN} -> + error_logger:info_msg("{~p,~p} will checkpoint at ~p~n", + [Description, ShardId, CPSN]), + Remaining = checkpointable(CPState), + Remaining = gb_trees:empty(), + {ok, #checkpoint{sequence_number = CPSN}}; + %% nothing was checkpointable. there shouldn't be any items left to + %% checkpoint: + undefined -> + error_logger:info_msg("{~p,~p} has nothing to checkpoint, will use latest~n", + [Description, ShardId]), + Remaining = checkpointable(NState), + Remaining = gb_trees:empty(), + %% we checkpoint with an empty sequence number. this will cause the + %% KCL to checkpoint at the most recent record it has seen on the + %% shard: + {ok, #checkpoint{}} + end end. - %%%=================================================================== %%% Internal functions %%%=================================================================== @@ -187,37 +172,32 @@ shutdown(#state{description = Description, incr_count(#state{count = Count} = State) -> State#state{count = Count + 1}. - incr_checkpoints(#state{checkpoints = Count} = State) -> State#state{checkpoints = Count + 1}. - checkpointable(#state{checkpointable = CPT}) -> CPT. checkpointable(State, CPT) -> State#state{checkpointable = CPT}. - next_counter_checkpoint(State, N) -> State#state{next_counter_checkpoint = N}. - flusher_state(State, FState) -> State#state{flusher_state = FState}. - -add_record(#state{count = Count, - flusher_mod = FMod, - flusher_state = FState} = State, +add_record(#state{count = Count, flusher_mod = FMod, flusher_state = FState} = State, #stream_record{sequence_number = SN} = Record) -> case FMod:add_record(FState, Record, {Count, SN}) of - {ok, NFState} -> {ok, incr_count(flusher_state(State, NFState))}; - {ignored, NFState} -> {ok, flusher_state(State, NFState)}; - {error, full} -> {error, full} + {ok, NFState} -> + {ok, incr_count(flusher_state(State, NFState))}; + {ignored, NFState} -> + {ok, flusher_state(State, NFState)}; + {error, full} -> + {error, full} end. - flush(State) -> flush(State, partial). @@ -229,7 +209,6 @@ flush(#state{flusher_mod = FMod, flusher_state = FState} = State, Kind) -> NState = flusher_state(State, NFState), note_success(note_flush(NState), Tokens). - note_success(#state{checkpointable = CPT} = State, Tokens) -> %% The items with the given Tokens have been successfully processed. %% Update the 'checkpointable' gb_tree, which we are using as a priority @@ -242,28 +221,27 @@ note_success(#state{checkpointable = CPT} = State, Tokens) -> %% fixme; when adding a new element, also pop all smaller elements which have %% contiguous keys with the new element (only need to track gaps). NCPT = lists:foldl(fun ({Counter, SeqNum}, Acc) -> - gb_trees:insert(Counter, SeqNum, Acc) - end, CPT, Tokens), + gb_trees:insert(Counter, SeqNum, Acc) + end, + CPT, + Tokens), checkpointable(State, NCPT). - maybe_checkpoint(#state{last_checkpoint_time = LastCheckpoint, - checkpoint_interval_ms = CheckpointInterval} = State) -> + checkpoint_interval_ms = CheckpointInterval} = + State) -> case elapsed_ms(LastCheckpoint) of - N when N >= CheckpointInterval -> - case next_checkpoint(State) of - {NState, SequenceNumber} -> - {ok, - note_checkpoint(NState), - #checkpoint{sequence_number = SequenceNumber}}; - undefined -> - {ok, State} - end; - _ -> - {ok, State} + N when N >= CheckpointInterval -> + case next_checkpoint(State) of + {NState, SequenceNumber} -> + {ok, note_checkpoint(NState), #checkpoint{sequence_number = SequenceNumber}}; + undefined -> + {ok, State} + end; + _ -> + {ok, State} end. - %% given a state, return undefined if nothing is ready to be checkpointed. otherwise, %% return {NState, SeqNo}, where SeqNo is a #sequence_number{} at which we can checkpoint %% (i.e., all items below that SN have been successfully processed), and NState represents @@ -271,10 +249,10 @@ maybe_checkpoint(#state{last_checkpoint_time = LastCheckpoint, next_checkpoint(State) -> CPT = checkpointable(State), case gb_trees:is_empty(CPT) of - true -> - undefined; - false -> - next_checkpoint(State, gb_trees:take_smallest(CPT), undefined) + true -> + undefined; + false -> + next_checkpoint(State, gb_trees:take_smallest(CPT), undefined) end. next_checkpoint(State, {SmallestCount, _, _} = SmallestItem, LatestFinished) -> @@ -284,70 +262,66 @@ next_checkpoint(State, {SmallestCount, _, _} = SmallestItem, LatestFinished) -> %% to the `next_counter_checkpoint` field; otherwise, we can't checkpoint yet (we have an %% initial gap). next_checkpoint(#state{next_counter_checkpoint = N}, FirstSmallestCount, _, _) - when FirstSmallestCount > N -> + when FirstSmallestCount > N -> undefined; %% if subsequence checkpointing is disabled, we can only checkpoint at a record which has %% been fully processed. while advancing through contiguous items, we keep track of the %% highest-numbered item corresponding to a completely-processed record (which may be %% undefined). next_checkpoint(#state{enable_subsequence_checkpoints = EnableSubCP} = State, - FirstSmallestCount, {SmallestCount, SmallestSN, CPT}, LatestFinished) -> + FirstSmallestCount, + {SmallestCount, SmallestSN, CPT}, + LatestFinished) -> %% can checkpoint at SmallestSN if subsequence checkpointing is enabled, or if it %% corresponds to the last subrecord in an aggregate record, or if it corresponds to a %% non-aggregate record: - CanCheckpoint = EnableSubCP - orelse (not is_sub_record(SmallestSN)), + CanCheckpoint = EnableSubCP orelse not is_sub_record(SmallestSN), %% if we can checkpoint at the current SN, use it as the latest 'finished' value: NLatestFinished = case CanCheckpoint of - true -> - {next_counter_checkpoint(checkpointable(State, CPT), - SmallestCount + 1), - SmallestSN}; - false -> - LatestFinished + true -> + {next_counter_checkpoint(checkpointable(State, CPT), SmallestCount + 1), + SmallestSN}; + false -> + LatestFinished end, case gb_trees:is_empty(CPT) of - true -> - NLatestFinished; - - false -> - NextContiguous = SmallestCount + 1, - case gb_trees:take_smallest(CPT) of - {NextContiguous, _, _} = Next -> - next_checkpoint(State, FirstSmallestCount, Next, NLatestFinished); - _ -> - NLatestFinished - end + true -> + NLatestFinished; + false -> + NextContiguous = SmallestCount + 1, + case gb_trees:take_smallest(CPT) of + {NextContiguous, _, _} = Next -> + next_checkpoint(State, FirstSmallestCount, Next, NLatestFinished); + _ -> + NLatestFinished + end end. - note_checkpoint(#state{description = Description, shard_id = ShardId, - on_checkpoint = OnCheckpoint} = State) -> + on_checkpoint = OnCheckpoint} = + State) -> OnCheckpoint(Description, ShardId), incr_checkpoints(State#state{last_checkpoint_time = os:timestamp()}). - note_flush(State) -> State#state{last_flush_time = os:timestamp()}. - %% given an erlang timestamp, return the elapsed duration in milliseconds. elapsed_ms(When) -> - trunc(timer:now_diff(os:timestamp(), When)/1.0e3). - + trunc(timer:now_diff(os:timestamp(), When) / 1.0e3). %% start a watchdog timer, cancelling any which is outstanding. if the timer fires, it %% will result in the current process exiting with a reason of 'watchdog_timeout' update_watchdog(#state{watchdog_timeout_ms = undefined} = State) -> State; -update_watchdog(#state{watchdog_timeout_ms = WatchdogTimeout, - watchdog = Watchdog} = State) -> +update_watchdog(#state{watchdog_timeout_ms = WatchdogTimeout, watchdog = Watchdog} = + State) -> case Watchdog of - undefined -> - ok; - _ -> - timer:cancel(Watchdog) + undefined -> + ok; + _ -> + timer:cancel(Watchdog) end, {ok, Ref} = timer:exit_after(WatchdogTimeout, watchdog_timeout), State#state{watchdog = Ref}. @@ -358,32 +332,31 @@ update_watchdog(#state{watchdog_timeout_ms = WatchdogTimeout, %% will be the original KCL sent (0, possible) so it can't be really used to %% know if this is a subrecord or not. is_sub_record(#sequence_number{user_sub = Sub, user_total = Total}) - when is_integer(Sub) andalso is_integer(Total) - andalso Sub < Total - 1 -> + when is_integer(Sub) andalso is_integer(Total) andalso Sub < Total - 1 -> true; is_sub_record(_) -> false. - %%%=================================================================== %%% TESTS %%%=================================================================== -ifdef(TEST). + -include_lib("eunit/include/eunit.hrl"). equal_cpt(A, B) -> - maps:from_list(gb_trees:to_list(checkpointable(A))) - == maps:from_list(gb_trees:to_list(checkpointable(B))). + maps:from_list(gb_trees:to_list(checkpointable(A))) == + maps:from_list(gb_trees:to_list(checkpointable(B))). checkpointing_test() -> State = #state{}, ?assertEqual(undefined, next_checkpoint(State)), %% items 0, 1, and 3 completed and checkpointable: - CPT0 = gb_trees:insert(0, 0, - gb_trees:insert(1, 1, - gb_trees:insert(3, 3, gb_trees:empty()))), + CPT0 = gb_trees:insert(0, + 0, + gb_trees:insert(1, 1, gb_trees:insert(3, 3, gb_trees:empty()))), Expected1 = checkpointable(State, CPT0), State1 = note_success(State, [{0, 0}, {1, 1}, {3, 3}]), ?assert(equal_cpt(Expected1, State1)), @@ -400,7 +373,9 @@ checkpointing_test() -> Expected2 = checkpointable(AfterCheckpoint1, CPT2), State2 = note_success(AfterCheckpoint1, [{2, 2}]), ?assert(equal_cpt(Expected2, State2)), - AfterCheckpoint2 = next_counter_checkpoint(checkpointable(AfterCheckpoint1, gb_trees:empty()), 4), + AfterCheckpoint2 = next_counter_checkpoint(checkpointable(AfterCheckpoint1, + gb_trees:empty()), + 4), ?assertEqual({AfterCheckpoint2, 3}, next_checkpoint(State2)). checkpointing_subrecord_test() -> @@ -438,25 +413,23 @@ watchdog_test() -> process_flag(trap_exit, true), State = update_watchdog(#state{watchdog_timeout_ms = 200}), receive - {'EXIT', _, watchdog_timeout} -> - error("unexpected watchdog trigger") - after 100 -> - ok + {'EXIT', _, watchdog_timeout} -> + error("unexpected watchdog trigger") + after 100 -> + ok end, update_watchdog(State), receive - {'EXIT', _, watchdog_timeout} -> - ok - after 400 -> - error("watchdog failed to trigger") + {'EXIT', _, watchdog_timeout} -> + ok + after 400 -> + error("watchdog failed to trigger") end. - is_sub_record_test() -> - ?assertEqual(true, is_sub_record(#sequence_number{user_sub = 0, - user_total = 2})), - ?assertEqual(false, is_sub_record(#sequence_number{user_sub = 1, - user_total = 2})), - ?assertEqual(false, is_sub_record(#sequence_number{user_sub = undefined, - user_total = undefined})). + ?assertEqual(true, is_sub_record(#sequence_number{user_sub = 0, user_total = 2})), + ?assertEqual(false, is_sub_record(#sequence_number{user_sub = 1, user_total = 2})), + ?assertEqual(false, + is_sub_record(#sequence_number{user_sub = undefined, user_total = undefined})). + -endif. diff --git a/src/erlmld_flusher.erl b/src/erlmld_flusher.erl index aa6fc1c..94cf68b 100644 --- a/src/erlmld_flusher.erl +++ b/src/erlmld_flusher.erl @@ -48,18 +48,15 @@ -include("erlmld.hrl"). --callback init(shard_id(), term()) -> - flusher_state(). - --callback add_record(flusher_state(), stream_record(), flusher_token()) -> - {ok, flusher_state()} - | {ignored, flusher_state()} - | {error, full | term()}. - --callback flush(flusher_state(), partial | full) -> - {ok, flusher_state(), list(flusher_token())} - | {error, term()}. - --callback heartbeat(flusher_state()) -> - {ok, flusher_state(), list(flusher_token())} - | {error, term()}. +-callback init(shard_id(), term()) -> flusher_state(). +-callback add_record(flusher_state(), stream_record(), flusher_token()) -> {ok, + flusher_state()} | + {ignored, + flusher_state()} | + {error, full | term()}. +-callback flush(flusher_state(), partial | full) -> {ok, + flusher_state(), + [flusher_token()]} | + {error, term()}. +-callback heartbeat(flusher_state()) -> {ok, flusher_state(), [flusher_token()]} | + {error, term()}. diff --git a/src/erlmld_noisy_wrk.erl b/src/erlmld_noisy_wrk.erl index b148d90..fa0719e 100644 --- a/src/erlmld_noisy_wrk.erl +++ b/src/erlmld_noisy_wrk.erl @@ -11,50 +11,42 @@ -behavior(erlmld_worker). --export([initialize/3, - ready/1, - process_record/2, - checkpointed/3, - shutdown/2]). +-export([initialize/3, ready/1, process_record/2, checkpointed/3, shutdown/2]). -include("erlmld.hrl"). --record(state, {shard_id, - count = 0}). - +-record(state, {shard_id, count = 0}). initialize(_Opaque, ShardId, ISN) -> State = #state{shard_id = ShardId}, io:format("~p initialized for shard ~p at ~p~n", [self(), ShardId, ISN]), {ok, State}. - ready(State) -> {ok, State}. - process_record(#state{shard_id = ShardId, count = Count} = State, #stream_record{sequence_number = SN} = Record) -> io:format("~p (~p) got record ~p~n", [ShardId, Count, Record]), case Count >= 10 of - true -> - {ok, State#state{count = 0}, #checkpoint{sequence_number = SN}}; - false -> - {ok, State#state{count = Count + 1}} + true -> + {ok, State#state{count = 0}, #checkpoint{sequence_number = SN}}; + false -> + {ok, State#state{count = Count + 1}} end. checkpointed(#state{shard_id = ShardId, count = Count} = State, - SequenceNumber, - Checkpoint -) -> - io:format("~p (~p) checkpointed at ~p (~p)~n", [ShardId, Count, Checkpoint, SequenceNumber]), + SequenceNumber, + Checkpoint) -> + io:format("~p (~p) checkpointed at ~p (~p)~n", + [ShardId, Count, Checkpoint, SequenceNumber]), {ok, State}. shutdown(#state{shard_id = ShardId, count = Count}, Reason) -> io:format("~p (~p) shutting down, reason: ~p~n", [ShardId, Count, Reason]), case Reason of - terminate -> - {ok, #checkpoint{}}; - _ -> - ok + terminate -> + {ok, #checkpoint{}}; + _ -> + ok end. diff --git a/src/erlmld_runner.erl b/src/erlmld_runner.erl index 2bdddeb..12e636d 100644 --- a/src/erlmld_runner.erl +++ b/src/erlmld_runner.erl @@ -12,86 +12,74 @@ -module(erlmld_runner). --export([start_link/3, - run/3, - build_properties/1]). - +-export([start_link/3, run/3, build_properties/1]). start_link(Regname, Pathname, StreamType) -> {ok, spawn_link(?MODULE, run, [Regname, Pathname, StreamType])}. - run(Regname, Pathname, StreamType) -> register(Regname, self()), {Exe, CWD} = runner_params(StreamType), process_flag(trap_exit, true), {ok, ErlPid, OsPid} = exec:run_link([Exe, Pathname], - [{cd, CWD}, - {group, 0}, - kill_group, - stdout, stderr]), + [{cd, CWD}, {group, 0}, kill_group, stdout, stderr]), error_logger:info_msg("~p launched ~p (pid ~p)~n", [Regname, Exe, OsPid]), {ok, SpamMP} = spam_mp(), loop(Regname, ErlPid, OsPid, SpamMP). - loop(Regname, ErlPid, OsPid, SpamMP) -> receive - {stdout, OsPid, _Data} -> - ok; - - {stderr, OsPid, Data} -> - case application:get_env(erlmld, log_kcl_spam, undefined) of - true -> - io:format("~p: ~s", [Regname, Data]); - false -> - ok; - undefined -> - ok; - {LagerMod, LagerSink} -> - [LagerMod:log(LagerSink, debug, LagerMod:md(), "~p: ~s", [Regname, Line]) - || Line <- binary:split(Data, <<"\n">>, [global]), - not is_spam(SpamMP, Line)], - ok - end; - - {'EXIT', ErlPid, Reason} -> - exit({child_exited, OsPid, Reason}) + {stdout, OsPid, _Data} -> + ok; + {stderr, OsPid, Data} -> + case application:get_env(erlmld, log_kcl_spam, undefined) of + true -> + io:format("~p: ~s", [Regname, Data]); + false -> + ok; + undefined -> + ok; + {LagerMod, LagerSink} -> + [LagerMod:log(LagerSink, debug, LagerMod:md(), "~p: ~s", [Regname, Line]) + || Line <- binary:split(Data, <<"\n">>, [global]), not is_spam(SpamMP, Line)], + ok + end; + {'EXIT', ErlPid, Reason} -> + exit({child_exited, OsPid, Reason}) end, loop(Regname, ErlPid, OsPid, SpamMP). - runner_params(StreamType) -> Runner = io_lib:format("run_~p.sh", [StreamType]), Pathname = priv_path(Runner), {Pathname, filename:dirname(Pathname)}. - priv_path(Filename) -> Priv = code:priv_dir(erlmld), lists:flatten(filename:join(Priv, Filename)). - tempdir_path(Filename) -> filename:join(os:getenv("TMPDIR", "/tmp"), [erlmld, $/, Filename]). - %% given a map of option values, populate the MLD properties template, creating a file %% like "$TMPDIR/erlmld/erlmld.X.properties", where X is either "default" or the %% app_suffix value, and return that populated pathname. build_properties(#{app_suffix := AppSuffix} = Opts) -> Input = priv_path("mld.properties.in"), - Output = tempdir_path("erlmld." ++ atom_to_list(case AppSuffix of - undefined -> default; - _ -> AppSuffix - end) ++ ".properties"), + Output = tempdir_path("erlmld." ++ + atom_to_list(case AppSuffix of + undefined -> + default; + _ -> + AppSuffix + end) + ++ ".properties"), {ok, Template} = file:read_file(Input), {ok, Result} = apply_substitutions(Template, Opts), ok = filelib:ensure_dir(Output), ok = file:write_file(Output, Result), {ok, Output}. - %% given a binary template and a map of options, use the map to apply substitutions into %% the template. the map of options is used to populate shell-like variable references in %% the template file as in the following example: @@ -114,53 +102,55 @@ build_properties(#{app_suffix := AppSuffix} = Opts) -> %% Integers, atoms, and iodata are converted to binaries before being substituted. %% Other types are unsupported and ignored. apply_substitutions(Template, Opts) -> - Data = maps:fold( - fun (_, V, Acc) when is_tuple(V); - is_map(V) -> - Acc; - (K, V, Acc) -> - Var = iolist_to_binary("${" ++ string:to_upper(atom_to_list(K)) ++ "}"), - Val = case V of - undefined -> - <<>>; - V when is_integer(V) -> - integer_to_binary(V); - V when is_atom(V) -> - atom_to_binary(V, utf8); - V when is_list(V) -> - iolist_to_binary(V); - V when is_binary(V) -> - V; - _ -> - <<>> - end, - binary:replace(Acc, Var, Val, [global]) - end, Template, Opts), + Data = maps:fold(fun (_, V, Acc) when is_tuple(V); is_map(V) -> + Acc; + (K, V, Acc) -> + Var = iolist_to_binary("${" ++ + string:to_upper(atom_to_list(K)) ++ "}"), + Val = case V of + undefined -> + <<>>; + V when is_integer(V) -> + integer_to_binary(V); + V when is_atom(V) -> + atom_to_binary(V, utf8); + V when is_list(V) -> + iolist_to_binary(V); + V when is_binary(V) -> + V; + _ -> + <<>> + end, + binary:replace(Acc, Var, Val, [global]) + end, + Template, + Opts), case re:run(Data, <<"\\${([^}]+)}">>, [global]) of - {match, Groups} -> - {error, {unknown_variables, - sets:to_list( - lists:foldl(fun ([_, {Start, Size}], Acc) -> - Name = binary:part(Data, {Start, Size}), - sets:add_element(Name, Acc) - end, sets:new(), Groups))}}; - nomatch -> - {ok, Data} + {match, Groups} -> + {error, + {unknown_variables, + sets:to_list(lists:foldl(fun ([_, {Start, Size}], Acc) -> + Name = binary:part(Data, {Start, Size}), + sets:add_element(Name, Acc) + end, + sets:new(), + Groups))}}; + nomatch -> + {ok, Data} end. - spam_mp() -> Spammy = [<<"^INFO: Received response ">>, <<"^INFO: Starting: Reading next message from STDIN ">>, <<"^INFO: Writing ProcessRecordsMessage to child process ">>, <<"^INFO: Message size == (40|63) bytes for shard ">>, - <<"com.amazonaws.services.kinesis.multilang.MultiLangProtocol validateStatusMessage$">>, + <<"com.amazonaws.services.kinesis.multilang.MultiLangProtocol " + "validateStatusMessage$">>, <<"com.amazonaws.services.kinesis.multilang.MessageWriter writeMessage$">>, <<"com.amazonaws.services.kinesis.multilang.MessageWriter call$">>, <<"com.amazonaws.services.kinesis.multilang.LineReaderTask call$">>], re:compile(lists:join(<<"|">>, Spammy)). - is_spam(_, <<>>) -> true; is_spam(MP, Bin) -> diff --git a/src/erlmld_sup.erl b/src/erlmld_sup.erl index c998c41..16de24f 100644 --- a/src/erlmld_sup.erl +++ b/src/erlmld_sup.erl @@ -4,14 +4,9 @@ %% API -export([start_link/1]). - %% Supervisor callbacks -export([init/1]). --define(WORKER_SUP, erlmld_wrk_sup). --define(ACCEPTOR, erlmld_tcp_acceptor). --define(RUNNER, erlmld_runner). - %%==================================================================== %% API functions %%==================================================================== @@ -24,43 +19,52 @@ start_link(Opts) -> %% Supervisor callbacks %%==================================================================== -init([Regname, #{record_processor := RecordProcessor, - record_processor_data := RecordProcessorData, - listen_ip := ListenIP, - listen_port := ListenPort, - stream_type := StreamType} = Opts]) -> - WorkerSupName = regname(?WORKER_SUP, Opts), - AcceptorName = regname(?ACCEPTOR, Opts), - RunnerName = regname(?RUNNER, Opts), - - {ok, ListenSocket, ActualPort} = ?ACCEPTOR:listen(ListenIP, ListenPort), +init([Regname, + #{record_processor := RecordProcessor, + record_processor_data := RecordProcessorData, + listen_ip := ListenIP, + listen_port := ListenPort, + stream_type := StreamType} = + Opts]) -> + WorkerSupName = regname(erlmld_wrk_sup, Opts), + AcceptorName = regname(erlmld_tcp_acceptor, Opts), + RunnerName = regname(erlmld_runner, Opts), + + {ok, ListenSocket, ActualPort} = erlmld_tcp_acceptor:listen(ListenIP, ListenPort), error_logger:info_msg("~p listening on ~p~n", [Regname, ActualPort]), %% prepare MLD .properties file: - {ok, PropertiesPathname} = ?RUNNER:build_properties(maps:put(port, ActualPort, Opts)), + {ok, PropertiesPathname} = erlmld_runner:build_properties(maps:put(port, + ActualPort, + Opts)), - SupFlags = #{strategy => rest_for_one, - intensity => 10, - period => 10}, + SupFlags = #{strategy => rest_for_one, intensity => 10, period => 10}, StartWorker = fun (AcceptedSocket) -> - ?WORKER_SUP:start_worker(WorkerSupName, AcceptedSocket) + erlmld_wrk_sup:start_worker(WorkerSupName, AcceptedSocket) end, WorkerSup = #{id => wrk_sup, type => supervisor, shutdown => infinity, - start => {?WORKER_SUP, start_link, [WorkerSupName, RecordProcessor, RecordProcessorData]}}, + start => + {erlmld_wrk_sup, + start_link, + [WorkerSupName, RecordProcessor, RecordProcessorData]}}, TcpAcceptor = #{id => tcp_acceptor, type => worker, shutdown => brutal_kill, - start => {?ACCEPTOR, start_link, [AcceptorName, ListenSocket, StartWorker]}}, + start => + {erlmld_tcp_acceptor, + start_link, + [AcceptorName, ListenSocket, StartWorker]}}, MLDRunner = #{id => mld_runner, type => worker, shutdown => brutal_kill, - start => {?RUNNER, start_link, [RunnerName, PropertiesPathname, StreamType]}}, + start => + {erlmld_runner, start_link, [RunnerName, PropertiesPathname, StreamType]}}, ChildSpecs = [WorkerSup, TcpAcceptor, MLDRunner], @@ -73,6 +77,7 @@ init([Regname, #{record_processor := RecordProcessor, regname(Prefix, #{app_suffix := Suffix}) when Suffix /= undefined -> binary_to_atom(<<(atom_to_binary(Prefix, utf8))/binary, "_", - (atom_to_binary(Suffix, utf8))/binary>>, utf8); + (atom_to_binary(Suffix, utf8))/binary>>, + utf8); regname(Prefix, _) -> Prefix. diff --git a/src/erlmld_tcp_acceptor.erl b/src/erlmld_tcp_acceptor.erl index 1c2a57a..779fdb1 100644 --- a/src/erlmld_tcp_acceptor.erl +++ b/src/erlmld_tcp_acceptor.erl @@ -12,10 +12,7 @@ %%%------------------------------------------------------------------- -module(erlmld_tcp_acceptor). --export([start_link/3, - run/3, - listen/2]). - +-export([start_link/3, run/3, listen/2]). start_link(Regname, Socket, AcceptCallback) -> {ok, spawn_link(?MODULE, run, [Regname, Socket, AcceptCallback])}. @@ -26,27 +23,27 @@ run(Regname, ListenSocket, AcceptCallback) -> loop(ListenSocket, AcceptCallback) -> case gen_tcp:accept(ListenSocket) of - {ok, AcceptedSocket} -> - ok = inet:setopts(AcceptedSocket, [{delay_send, true}, - {send_timeout, 10000}, - {send_timeout_close, true}]), - AcceptCallback(AcceptedSocket), - loop(ListenSocket, AcceptCallback); - - {error, Error} -> - exit({accept_failed, Error}) + {ok, AcceptedSocket} -> + ok = inet:setopts(AcceptedSocket, + [{delay_send, true}, + {send_timeout, 10000}, + {send_timeout_close, true}]), + AcceptCallback(AcceptedSocket), + loop(ListenSocket, AcceptCallback); + {error, Error} -> + exit({accept_failed, Error}) end. - %% given an IP specification and listen port, listen on that IP and port, returning the %% socket and actual port we're listening on. ip may be 'loopback' (listen on loopback %% address) and port may be 0 (listen on a random port). listen(ListenIP, ListenPort) -> - {ok, Socket} = gen_tcp:listen(ListenPort, [binary, - {ip, ListenIP}, - {packet, raw}, - {reuseaddr, true}, - {active, false}, - {backlog, 128}]), + {ok, Socket} = gen_tcp:listen(ListenPort, + [binary, + {ip, ListenIP}, + {packet, raw}, + {reuseaddr, true}, + {active, false}, + {backlog, 128}]), {ok, ActualPort} = inet:port(Socket), {ok, Socket, ActualPort}. diff --git a/src/erlmld_worker.erl b/src/erlmld_worker.erl index 6c84068..3a5d4f3 100644 --- a/src/erlmld_worker.erl +++ b/src/erlmld_worker.erl @@ -51,26 +51,18 @@ -include("erlmld.hrl"). - --callback initialize(term(), shard_id(), sequence_number() | undefined) -> - {ok, worker_state()} - | {error, term()}. - --callback ready(worker_state()) -> - {ok, worker_state()} - | {ok, worker_state(), checkpoint()} - | {error, term()}. - --callback process_record(worker_state(), stream_record()) -> - {ok, worker_state()} - | {ok, worker_state(), checkpoint()} - | {error, term()}. - --callback checkpointed(worker_state(), sequence_number(), checkpoint()) -> - {ok, worker_state()} - | {error, term()}. - --callback shutdown(worker_state(), shutdown_reason()) -> - ok - | {ok, checkpoint()} - | {error, term()}. +-callback initialize(term(), shard_id(), sequence_number() | undefined) -> {ok, + worker_state()} | + {error, term()}. +-callback ready(worker_state()) -> {ok, worker_state()} | + {ok, worker_state(), checkpoint()} | + {error, term()}. +-callback process_record(worker_state(), stream_record()) -> {ok, worker_state()} | + {ok, worker_state(), checkpoint()} | + {error, term()}. +-callback checkpointed(worker_state(), sequence_number(), checkpoint()) -> {ok, + worker_state()} | + {error, term()}. +-callback shutdown(worker_state(), shutdown_reason()) -> ok | + {ok, checkpoint()} | + {error, term()}. diff --git a/src/erlmld_wrk_statem.erl b/src/erlmld_wrk_statem.erl index 7a250cd..66e0c98 100644 --- a/src/erlmld_wrk_statem.erl +++ b/src/erlmld_wrk_statem.erl @@ -107,39 +107,31 @@ %% API -export([start_link/2, accept/2]). - %% gen_statem callbacks -export([init/1, callback_mode/0, terminate/3, code_change/4, handle_event/4]). --record(data, { - %% name of handler module implementing erlmld_worker behavior: - handler_module :: atom(), - - %% opaque term used as first argument to handler_module's initialize/3: - handler_data :: term(), - - %% connected socket owned by this process: - socket :: undefined | gen_tcp:socket(), - - %% input buffer; responses are small and we need no output buffer: - buf = [] :: [binary()], - - %% worker state returned from handler module init: - worker_state :: undefined | term(), - - %% if true, the MLD made a processRecords call with the V2 format (supplied - %% millisBehindLatest), so we will checkpoint using the V2 checkpoint format: - is_v2 = false :: boolean(), - - %% most recent action name from the peer: - last_request :: undefined | binary(), - - %% last attempted checkpoint: - last_checkpoint :: undefined | checkpoint() - }). +-record(data, + {%% name of handler module implementing erlmld_worker behavior: + handler_module :: atom(), + %% opaque term used as first argument to handler_module's initialize/3: + handler_data :: term(), + %% connected socket owned by this process: + socket :: undefined | gen_tcp:socket(), + %% input buffer; responses are small and we need no output buffer: + buf = [] :: [binary()], + %% worker state returned from handler module init: + worker_state :: undefined | term(), + %% if true, the MLD made a processRecords call with the V2 format (supplied + %% millisBehindLatest), so we will checkpoint using the V2 checkpoint format: + is_v2 = false :: boolean(), + %% most recent action name from the peer: + last_request :: undefined | binary(), + %% last attempted checkpoint: + last_checkpoint :: undefined | checkpoint()}). + +-type data() :: #data{}. -define(INTERNAL, internal). - %% state atoms. some states are 2-tuples of these atoms. -define(INIT, init). -define(SHUTDOWN, shutdown). @@ -151,7 +143,6 @@ -define(PROCESS_RECORDS, process_records). -define(PEER_WRITE, peer_write). -define(ABORT, abort). - -define(RETRY_SLEEP, application:get_env(erlmld, checkpoint_retry_sleep, 10000)). -include("erlmld.hrl"). @@ -176,8 +167,7 @@ callback_mode() -> handle_event_function. init([HandlerMod, HandlerData]) -> - {ok, ?INIT, #data{handler_module = HandlerMod, - handler_data = HandlerData}}. + {ok, ?INIT, #data{handler_module = HandlerMod, handler_data = HandlerData}}. terminate(_Reason, _State, #data{socket = Socket}) -> gen_tcp:close(Socket), @@ -186,173 +176,157 @@ terminate(_Reason, _State, #data{socket = Socket}) -> code_change(_OldVsn, State, Data, _Extra) -> {ok, State, Data}. - %% a connection has been accepted. start reading requests from it. handle_event({call, From}, {accepted, Socket}, ?INIT, Data) -> - {next_state, {?PEER_READ, ?REQUEST}, activate(Data#data{socket = Socket}), + {next_state, + {?PEER_READ, ?REQUEST}, + activate(Data#data{socket = Socket}), [{reply, From, ok}]}; - - %% connection was closed, and we expected it to be closed. this will occur when the MLD %% has instructed us to shut down, and we've returned a "success" response for the %% shutdown action (MLD will read our response, then close its stream to us, then await %% our exit). handle_event(info, {tcp_closed, _}, {?PEER_READ, ?SHUTDOWN}, _) -> stop; - %% connection was closed, but we didn't expect it. handle_event(info, {tcp_closed, _}, _State, _) -> {stop, {error, unexpected_close}}; - - %% we are trying to read some data, and some data has been received. if we've received a %% complete line (action), we dispatch it to be handled according to its kind (i.e., a new %% request or a response to a checkpoint attempt). handle_event(info, {tcp, Socket, Bin}, {?PEER_READ, NextReadKind}, Data) -> case next_action(Bin, Data) of - {undefined, NData, <<>>} -> - {keep_state, activate(NData)}; - - {{error, _} = Error, _, _} -> - {stop, Error}; - - {#{<<"action">> := Action} = ActionData, NData, Rest} -> - {next_state, {?DISPATCH, NextReadKind}, - case NextReadKind of - ?REQUEST -> NData#data{last_request = Action}; - _ -> NData - end, - [{next_event, ?INTERNAL, ActionData} - | case Rest of - <<>> -> []; - _ -> [{next_event, info, {tcp, Socket, Rest}}] - end]} + {undefined, NData, <<>>} -> + {keep_state, activate(NData)}; + {{error, _} = Error, _, _} -> + {stop, Error}; + {#{<<"action">> := Action} = ActionData, NData, Rest} -> + {next_state, + {?DISPATCH, NextReadKind}, + case NextReadKind of + ?REQUEST -> + NData#data{last_request = Action}; + _ -> + NData + end, + [{next_event, ?INTERNAL, ActionData} | case Rest of + <<>> -> + []; + _ -> + [{next_event, info, {tcp, Socket, Rest}}] + end]} end; - - %% MLD is initializing us, and we haven't been initialized yet. -handle_event(?INTERNAL, #{<<"action">> := <<"initialize">>, - <<"shardId">> := ShardId} = R, +handle_event(?INTERNAL, + #{<<"action">> := <<"initialize">>, <<"shardId">> := ShardId} = R, {?DISPATCH, ?REQUEST}, #data{handler_module = HandlerMod, handler_data = HandlerData, - worker_state = undefined} = Data) -> + worker_state = undefined} = + Data) -> ISN = sequence_number(R), case HandlerMod:initialize(HandlerData, ShardId, ISN) of - {ok, WorkerState} -> - success(R, worker_state(Data, WorkerState), ?REQUEST); - - {error, _} = Error -> - {stop, Error} + {ok, WorkerState} -> + success(R, worker_state(Data, WorkerState), ?REQUEST); + {error, _} = Error -> + {stop, Error} end; - - %% MLD is instructing us to shut down after we've been initialized. if the reason is %% TERMINATE, we should finish processing and checkpoint (shard is being closed). if the %% reason is not TERMINATE (i.e., ZOMBIE), we should abort any outstanding work and stop %% without checkpointing. in both cases, we should return a success response and stop %% (resulting in connection closure). -handle_event(?INTERNAL, #{<<"action">> := <<"shutdown">>, - <<"reason">> := ReasonBin} = R, +handle_event(?INTERNAL, + #{<<"action">> := <<"shutdown">>, <<"reason">> := ReasonBin} = R, {?DISPATCH, State}, - #data{handler_module = Mod, - worker_state = {ok, WorkerState}} = Data) when State == ?REQUEST; - State == ?SHUTDOWN -> + #data{handler_module = Mod, worker_state = {ok, WorkerState}} = Data) + when State == ?REQUEST; State == ?SHUTDOWN -> Reason = case ReasonBin of - <<"TERMINATE">> -> terminate; - <<"ZOMBIE">> -> zombie; - _ -> unknown + <<"TERMINATE">> -> + terminate; + <<"ZOMBIE">> -> + zombie; + _ -> + unknown end, case {Reason, Mod:shutdown(WorkerState, Reason)} of - {terminate, {ok, Checkpoint}} -> - %% shard terminating. worker supplied a checkpoint. checkpoint and then - %% return a success response for the shutdown if the checkpoint was - %% successful. - shutdown_checkpoint(Data, Checkpoint); - - {terminate, ok} -> - %% worker should have checkpointed for this TERMINATE shutdown. - error_logger:error_msg("~p did not checkpoint after TERMINATE~n", [WorkerState]), - {stop, {error, expected_checkpoint}}; - - {_, ok} -> - %% non-terminate shutdown, worker did not checkpoint; normal exit. - success(R, Data, ?SHUTDOWN); - - {_, {ok, _Checkpoint}} -> - %% worker should only checkpoint during shutdown for a TERMINATE shutdown. - error_logger:error_msg("~p attempted to checkpoint during ~p shutdown~n", - [WorkerState, ReasonBin]), - {stop, {error, unexpected_checkpoint}}; - - {_, {error, _} = Error} -> - {stop, Error} + {terminate, {ok, Checkpoint}} -> + %% shard terminating. worker supplied a checkpoint. checkpoint and then + %% return a success response for the shutdown if the checkpoint was + %% successful. + shutdown_checkpoint(Data, Checkpoint); + {terminate, ok} -> + %% worker should have checkpointed for this TERMINATE shutdown. + error_logger:error_msg("~p did not checkpoint after TERMINATE~n", [WorkerState]), + {stop, {error, expected_checkpoint}}; + {_, ok} -> + %% non-terminate shutdown, worker did not checkpoint; normal exit. + success(R, Data, ?SHUTDOWN); + {_, {ok, _Checkpoint}} -> + %% worker should only checkpoint during shutdown for a TERMINATE shutdown. + error_logger:error_msg("~p attempted to checkpoint during ~p shutdown~n", + [WorkerState, ReasonBin]), + {stop, {error, unexpected_checkpoint}}; + {_, {error, _} = Error} -> + {stop, Error} end; - - %% MLD is gracefully stopping all workers, which can elect to checkpoint or not. right %% now we treat this the same way as a "zombie" shutdown (lost lease). need to add a %% non-zombie, non-terminate reason to worker behavior so we can support checkpointing %% here. -handle_event(?INTERNAL, #{<<"action">> := <<"shutdownRequested">>} = R, +handle_event(?INTERNAL, + #{<<"action">> := <<"shutdownRequested">>} = R, {?DISPATCH, ?REQUEST}, - #data{handler_module = Mod, - worker_state = WS} = Data) -> + #data{handler_module = Mod, worker_state = WS} = Data) -> case WS of - {ok, WorkerState} -> - case Mod:shutdown(WorkerState, zombie) of - ok -> - %% non-terminate shutdown, worker did not checkpoint; normal exit. - success(R, Data, ?SHUTDOWN); - - {ok, _Checkpoint} -> - %% worker should only checkpoint during shutdown for a TERMINATE shutdown; - %% this isn't supported yet. - error_logger:error_msg("~p attempted to checkpoint during shutdownRequest shutdown~n", - [WorkerState]), - {stop, {error, unexpected_checkpoint}}; - - {_, {error, _} = Error} -> - {stop, Error} - end; - _ -> - %% not initialized yet. - success(R, Data, ?SHUTDOWN) + {ok, WorkerState} -> + case Mod:shutdown(WorkerState, zombie) of + ok -> + %% non-terminate shutdown, worker did not checkpoint; normal exit. + success(R, Data, ?SHUTDOWN); + {ok, _Checkpoint} -> + %% worker should only checkpoint during shutdown for a TERMINATE shutdown; + %% this isn't supported yet. + error_logger:error_msg("~p attempted to checkpoint during shutdownRequest shutdown~n", + [WorkerState]), + {stop, {error, unexpected_checkpoint}}; + {_, {error, _} = Error} -> + {stop, Error} + end; + _ -> + %% not initialized yet. + success(R, Data, ?SHUTDOWN) end; - - %% MLD is providing some records to be processed. We will call the worker's ready/1 %% callback, then deaggregate them all at once if they are in KPL format, and then provide %% each in turn to the handler module, which will have the opportunity to checkpoint after %% each record. the MLD should wait for our "status" response before sending any %% additional records or other requests. if the worker returned a checkpoint response, %% checkpoint before processing the records. -handle_event(?INTERNAL, #{<<"action">> := <<"processRecords">>, - <<"records">> := Records} = R, +handle_event(?INTERNAL, + #{<<"action">> := <<"processRecords">>, <<"records">> := Records} = R, {?DISPATCH, ?REQUEST}, - #data{handler_module = Mod, - worker_state = {ok, WorkerState}} = Data) -> + #data{handler_module = Mod, worker_state = {ok, WorkerState}} = Data) -> case Mod:ready(WorkerState) of - {error, _} = Error -> - {stop, Error}; - - Ready -> - {NWorkerState, Checkpoint} = case Ready of - {ok, S} -> {S, undefined}; - {ok, S, C} -> {S, C} - end, - NData = worker_state(Data#data{is_v2 = maps:is_key(<<"millisBehindLatest">>, R)}, - NWorkerState), - DeaggregatedRecords = deaggregate_kpl_records(R, Records), - case Checkpoint of - undefined -> - process_records(R, NData, DeaggregatedRecords); - _ -> - checkpoint(R, NData, Checkpoint, DeaggregatedRecords) - end + {error, _} = Error -> + {stop, Error}; + Ready -> + {NWorkerState, Checkpoint} = case Ready of + {ok, S} -> + {S, undefined}; + {ok, S, C} -> + {S, C} + end, + NData = worker_state(Data#data{is_v2 = maps:is_key(<<"millisBehindLatest">>, R)}, + NWorkerState), + DeaggregatedRecords = deaggregate_kpl_records(R, Records), + case Checkpoint of + undefined -> + process_records(R, NData, DeaggregatedRecords); + _ -> + checkpoint(R, NData, Checkpoint, DeaggregatedRecords) + end end; - - %% MLD is returning a checkpoint response. if a fatal error occurred, we shouldn't %% process any more data and should exit. with the current version of the MLD, this will %% cause all other workers managed by the MLD to be killed (because the MLD will exit). @@ -373,136 +347,122 @@ handle_event(?INTERNAL, #{<<"action">> := <<"processRecords">>, %% MLD was throttled when performing a checkpoint. we retry forever after a random %% sleep. %% -handle_event(?INTERNAL, #{<<"action">> := <<"checkpoint">>, - <<"error">> := <<"ShutdownException">>}, +handle_event(?INTERNAL, + #{<<"action">> := <<"checkpoint">>, <<"error">> := <<"ShutdownException">>}, {?DISPATCH, ?CHECKPOINT}, #data{worker_state = {ok, WorkerState}} = Data) -> %% we were processing records and we attempted to checkpoint, but another worker stole %% our lease. abort processing. we should receive a ZOMBIE shutdown reason and exit %% normally. - error_logger:warning_msg("~p received shutdown exception during checkpoint; aborting processing~n", + error_logger:warning_msg("~p received shutdown exception during checkpoint; aborting " + "processing~n", [WorkerState]), {next_state, ?ABORT, Data}; - -handle_event(?INTERNAL, #{<<"action">> := <<"checkpoint">>, - <<"error">> := <<"ShutdownException">>}, +handle_event(?INTERNAL, + #{<<"action">> := <<"checkpoint">>, <<"error">> := <<"ShutdownException">>}, {?DISPATCH, ?SHUTDOWN_CHECKPOINT}, - #data{worker_state = {ok, WorkerState}, - last_request = LastAction} = Data) -> + #data{worker_state = {ok, WorkerState}, last_request = LastAction} = Data) -> %% we were shutting down and attempted to checkpoint, but another worker stole our %% lease during the shutdown. return a 'success' response to the shutdown command. %% we should exit normally. error_logger:warning_msg("~p received shutdown exception during shutdown checkpoint~n", [WorkerState]), success(LastAction, Data, ?SHUTDOWN); - -handle_event(?INTERNAL, #{<<"action">> := <<"checkpoint">>, - <<"error">> := <<"ThrottlingException">>}, +handle_event(?INTERNAL, + #{<<"action">> := <<"checkpoint">>, <<"error">> := <<"ThrottlingException">>}, {?DISPATCH, CheckpointState}, - #data{worker_state = {ok, WorkerState}, - last_checkpoint = Checkpoint} = Data) - when CheckpointState == ?CHECKPOINT; - CheckpointState == ?SHUTDOWN_CHECKPOINT -> + #data{worker_state = {ok, WorkerState}, last_checkpoint = Checkpoint} = Data) + when CheckpointState == ?CHECKPOINT; CheckpointState == ?SHUTDOWN_CHECKPOINT -> %% we attempted to checkpoint, but were throttled. this may mean the dynamodb state %% table used by the KCL is underprovisioned, or workers are checkpointing in %% lockstep. retry forever after a random sleep of up to ?RETRY_SLEEP ms. - error_logger:warning_msg("~p throttled during ~p; consider increasing state table write capacity~n", + error_logger:warning_msg("~p throttled during ~p; consider increasing state table write " + "capacity~n", [WorkerState, CheckpointState]), timer:sleep(rand:uniform(?RETRY_SLEEP)), do_checkpoint(Data, Checkpoint, CheckpointState, []); - -handle_event(?INTERNAL, #{<<"action">> := <<"checkpoint">>, - <<"error">> := CheckpointError}, +handle_event(?INTERNAL, + #{<<"action">> := <<"checkpoint">>, <<"error">> := CheckpointError}, {?DISPATCH, CheckpointState}, - _Data) when (CheckpointState == ?CHECKPOINT orelse - CheckpointState == ?SHUTDOWN_CHECKPOINT), - CheckpointError /= undefined -> + _Data) + when CheckpointState == ?CHECKPOINT orelse CheckpointState == ?SHUTDOWN_CHECKPOINT, + CheckpointError /= undefined -> {stop, {error, {checkpoint_failure, CheckpointError}}}; - -handle_event(?INTERNAL, #{<<"action">> := <<"checkpoint">>} = R, +handle_event(?INTERNAL, + #{<<"action">> := <<"checkpoint">>} = R, {?DISPATCH, CheckpointState}, #data{handler_module = Mod, worker_state = {ok, WorkerState}, last_checkpoint = Checkpoint, - last_request = LastAction} = Data) - when CheckpointState == ?CHECKPOINT; - CheckpointState == ?SHUTDOWN_CHECKPOINT -> + last_request = LastAction} = + Data) + when CheckpointState == ?CHECKPOINT; CheckpointState == ?SHUTDOWN_CHECKPOINT -> SN = sequence_number(R), case Mod:checkpointed(WorkerState, SN, Checkpoint) of - {ok, NWorkerState} -> - NData = worker_state(Data, NWorkerState), - case CheckpointState of - ?CHECKPOINT -> - {next_state, ?PROCESS_RECORDS, NData}; - ?SHUTDOWN_CHECKPOINT -> - success(LastAction, NData, ?SHUTDOWN) + {ok, NWorkerState} -> + NData = worker_state(Data, NWorkerState), + case CheckpointState of + ?CHECKPOINT -> + {next_state, ?PROCESS_RECORDS, NData}; + ?SHUTDOWN_CHECKPOINT -> + success(LastAction, NData, ?SHUTDOWN) end; - {error, _} = Error -> - {stop, Error} + {error, _} = Error -> + {stop, Error} end; - %% we were processing records and we attempted to checkpoint, but failed because another %% worker stole our lease. abort record processing, return a 'success' response for %% the current command, and read the next request, which should be a shutdown command. handle_event(?INTERNAL, {process, R, _}, ?ABORT, Data) -> success(R, Data, ?REQUEST); - %% we were processing records, and have now processed them all. return a 'success' %% response for the current command, and read the next command. handle_event(?INTERNAL, {process, R, []}, ?PROCESS_RECORDS, Data) -> success(R, Data, ?REQUEST); - %% we're processing records. process the next record, update the worker state according %% to the return value, and possibly perform a checkpoint if desired before processing any %% subsequent records. -handle_event(?INTERNAL, {process, R, [Record | Records]}, +handle_event(?INTERNAL, + {process, R, [Record | Records]}, ?PROCESS_RECORDS, - #data{handler_module = Mod, - worker_state = {ok, WorkerState}} = Data) -> + #data{handler_module = Mod, worker_state = {ok, WorkerState}} = Data) -> case Mod:process_record(WorkerState, Record) of - {ok, NWorkerState} -> - process_records(R, worker_state(Data, NWorkerState), Records); - - {ok, NWorkerState, Checkpoint} -> - checkpoint(R, worker_state(Data, NWorkerState), Checkpoint, Records); - - {error, _} = Error -> - {stop, Error} + {ok, NWorkerState} -> + process_records(R, worker_state(Data, NWorkerState), Records); + {ok, NWorkerState, Checkpoint} -> + checkpoint(R, worker_state(Data, NWorkerState), Checkpoint, Records); + {error, _} = Error -> + {stop, Error} end; - %% we suspend record processing while awaiting a checkpoint response. this event will be %% seen again after changing states. handle_event(?INTERNAL, {process, _, _}, {?PEER_READ, ?CHECKPOINT}, _) -> {keep_state_and_data, postpone}; - - %% we've been given something (json data iolist) to write; we write it and then read & %% handle the next action according to the specified read kind. we expect data we write %% to be small (i.e., action responses and checkpoint requests). while waiting for the %% next action of kind NextReadKind, we do nothing. -handle_event(?INTERNAL, {write, IoData}, {?PEER_WRITE, NextReadKind}, #data{socket = Socket, - buf = Buf} = Data) -> +handle_event(?INTERNAL, + {write, IoData}, + {?PEER_WRITE, NextReadKind}, + #data{socket = Socket, buf = Buf} = Data) -> %% there should be no unprocessed/buffered request data at this point: case Buf of - [] -> ok; - _ -> - throw({stop, {error, {unprocessed_request_data, Buf}}}) + [] -> + ok; + _ -> + throw({stop, {error, {unprocessed_request_data, Buf}}}) end, ok = gen_tcp:send(Socket, [IoData, "\n"]), {next_state, {?PEER_READ, NextReadKind}, activate(Data)}; - - %% some tcp data was received, but couldn't be handled in the current state. this event %% will be seen again after changing states. handle_event(info, {tcp, _Socket, _Bin}, _State, _Data) -> {keep_state_and_data, postpone}; - - handle_event(info, Message, _State, #data{worker_state = WorkerState}) -> error_logger:error_msg("~p ignoring unexpected message ~p~n", [WorkerState, Message]), keep_state_and_data. - %%%=================================================================== %%% Internal functions %%%=================================================================== @@ -512,136 +472,128 @@ activate(#data{socket = Socket} = Data) -> inet:setopts(Socket, [{active, once}]), Data. - %% given an input (request) action (a map) or an action name, cause a "success" response %% for that action to be written, changing state according to NextState after writing. success(#{<<"action">> := Action}, Data, NextState) -> success(Action, Data, NextState); success(Action, Data, NextState) when is_binary(Action) -> - IoData = jiffy:encode(#{<<"action">> => <<"status">>, - <<"responseFor">> => Action}), - {next_state, {?PEER_WRITE, NextState}, Data, - [{next_event, ?INTERNAL, {write, IoData}}]}. - + IoData = jiffy:encode(#{<<"action">> => <<"status">>, <<"responseFor">> => Action}), + {next_state, {?PEER_WRITE, NextState}, Data, [{next_event, ?INTERNAL, {write, IoData}}]}. %% given a list of records, change state if needed, and send a single {process, ...} event %% with those records. process_records(R, Data, Records) -> - {next_state, ?PROCESS_RECORDS, Data, - [{next_event, ?INTERNAL, {process, R, Records}}]}. - + {next_state, ?PROCESS_RECORDS, Data, [{next_event, ?INTERNAL, {process, R, Records}}]}. %% checkpoint, then continue processing the remaining records after receiving the %% checkpoint response. checkpoint(R, Data, Checkpoint, Records) -> - do_checkpoint(Data, Checkpoint, ?CHECKPOINT, + do_checkpoint(Data, + Checkpoint, + ?CHECKPOINT, [{next_event, ?INTERNAL, {process, R, Records}}]). %% checkpoint, then shutdown after receiving the checkpoint response. shutdown_checkpoint(Data, Checkpoint) -> do_checkpoint(Data, Checkpoint, ?SHUTDOWN_CHECKPOINT, []). - %% cause an attempt to checkpoint at the given sequence number, or the latest sequence %% number seen by the MLD on the stream if undefined. then continue processing according %% to the supplied next state, also appending the given events. -do_checkpoint(Data, #checkpoint{sequence_number = SN} = Checkpoint, NextState, NextEvents) -> +do_checkpoint(Data, + #checkpoint{sequence_number = SN} = Checkpoint, + NextState, + NextEvents) -> Enc = jiffy:encode(maps:put(<<"action">>, <<"checkpoint">>, checkpoint_spec(Data, SN))), - {next_state, {?PEER_WRITE, NextState}, Data#data{last_checkpoint = Checkpoint}, + {next_state, + {?PEER_WRITE, NextState}, + Data#data{last_checkpoint = Checkpoint}, [{next_event, ?INTERNAL, {write, Enc}} | NextEvents]}. - checkpoint_spec(#data{is_v2 = true}, #sequence_number{base = Base, sub = Sub}) -> #{<<"sequenceNumber">> => encode_seqno_base(Base), - <<"subSequenceNumber">> => case Sub of - undefined -> null; - _ -> Sub - end}; + <<"subSequenceNumber">> => + case Sub of + undefined -> + null; + _ -> + Sub + end}; checkpoint_spec(#data{is_v2 = true}, undefined) -> - #{<<"sequenceNumber">> => null, - <<"subSequenceNumber">> => null}; - + #{<<"sequenceNumber">> => null, <<"subSequenceNumber">> => null}; checkpoint_spec(#data{is_v2 = false}, #sequence_number{base = Base}) -> #{<<"checkpoint">> => encode_seqno_base(Base)}; checkpoint_spec(#data{is_v2 = false}, undefined) -> #{<<"checkpoint">> => null}. - %% given a binary, strip the last byte if it is a carriage return. -spec strip_cr(binary()) -> binary(). strip_cr(<<>>) -> <<>>; strip_cr(Bin) -> case [binary:at(Bin, size(Bin) - 1)] of - "\r" -> - binary:part(Bin, {0, size(Bin) - 1}); - _ -> - Bin + "\r" -> + binary:part(Bin, {0, size(Bin) - 1}); + _ -> + Bin end. - -%% given a binary corresponding to some input data and a #data{}, return a 3-tuple of the +%% given a binary corresponding to some input data and a data(), return a 3-tuple of the %% next line (if any), next state, and remaining data. if the input binary contains a %% newline, return as the first element a binary consisting of the already-buffered data %% followed by that binary. otherwise, add it to the buffer. the buffer never contains %% newlines. --spec next_line(binary(), #data{}) -> {binary() | undefined, #data{}, binary()}. +-spec next_line(binary(), data()) -> {binary() | undefined, data(), binary()}. next_line(Bin, #data{buf = Buf} = Data) -> case binary:split(Bin, <<"\n">>) of - [_] -> - {undefined, Data#data{buf = [Bin | Buf]}, <<>>}; - - [EOL, Rest] -> - Line = iolist_to_binary(lists:reverse([strip_cr(EOL) | Buf])), - {Line, Data#data{buf = []}, Rest} + [_] -> + {undefined, Data#data{buf = [Bin | Buf]}, <<>>}; + [EOL, Rest] -> + Line = iolist_to_binary(lists:reverse([strip_cr(EOL) | Buf])), + {Line, Data#data{buf = []}, Rest} end. - -%% given a binary and #data{}, return a 3-tuple of the next action (if any), next state, +%% given a binary and data(), return a 3-tuple of the next action (if any), next state, %% and remaining data. an "action" is a line which should have been a json-encoded map %% containing an "action" key. if decoding fails with a thrown error, that error is %% returned as the decoded value. --spec next_action(binary(), #data{}) -> {map() | undefined | {error, term()}, #data{}, binary()}. +-spec next_action(binary(), data()) -> {map() | undefined | {error, term()}, + data(), + binary()}. next_action(Bin, Data) -> case next_line(Bin, Data) of - {undefined, NData, Rest} -> - {undefined, NData, Rest}; - - {<<>>, NData, Rest} -> - {undefined, NData, Rest}; - - {Line, NData, Rest} -> - Dec = try - jiffy:decode(Line, [return_maps, - {null_term, undefined}]) - catch - throw:{error, Error} -> - {error, Error} - end, - {Dec, NData, Rest} + {undefined, NData, Rest} -> + {undefined, NData, Rest}; + {<<>>, NData, Rest} -> + {undefined, NData, Rest}; + {Line, NData, Rest} -> + Dec = try + jiffy:decode(Line, [return_maps, {null_term, undefined}]) + catch + {error, Error} -> + {error, Error} + end, + {Dec, NData, Rest} end. - %% given a value which is possibly a map containing a `sequenceNumber` key with a binary %% string value possibly denoting an integer, and possibly a `subSequenceNumber` key with %% an integer value, return a sequence_number() corresponding to those values, otherwise %% undefined. sequence_number(#{<<"sequenceNumber">> := SN} = M) when is_binary(SN) -> SubSeq = maps:get(<<"subSequenceNumber">>, M, undefined), - #sequence_number{base = decode_seqno_base(SN), - sub = SubSeq, - user_sub = SubSeq}; - + #sequence_number{base = decode_seqno_base(SN), sub = SubSeq, user_sub = SubSeq}; sequence_number(#{<<"sequenceNumber">> := undefined}) -> #sequence_number{}; - sequence_number(_) -> undefined. % A version that takes the sequence numbers as args directly. sequence_number(SN, OriSSN, SSN, Total) when is_binary(SN) -> - #sequence_number{base = decode_seqno_base(SN), sub = OriSSN, user_sub = SSN, user_total = Total}; - + #sequence_number{base = decode_seqno_base(SN), + sub = OriSSN, + user_sub = SSN, + user_total = Total}; sequence_number(undefined, _, _, _) -> undefined. @@ -652,43 +604,37 @@ non_kpl_sequence_number(#{<<"sequenceNumber">> := SN} = M) when is_binary(SN) -> partition_key(#{<<"partitionKey">> := PK} = _Record) -> PK. - worker_state(Data, WorkerState) -> Data#data{worker_state = {ok, WorkerState}}. - stream_record(Record, PartitionKey, Data, SequenceNumber) -> {Timestamp, Delay} = case Record of - #{<<"action">> := <<"record">>} -> - {maps:get(<<"approximateArrivalTimestamp">>, Record, undefined), - maps:get(<<"millisBehindLatest">>, Record, undefined)}; - _ -> - {undefined, undefined} + #{<<"action">> := <<"record">>} -> + {maps:get(<<"approximateArrivalTimestamp">>, Record, undefined), + maps:get(<<"millisBehindLatest">>, Record, undefined)}; + _ -> + {undefined, undefined} end, - #stream_record{ - partition_key = PartitionKey, - data = Data, - sequence_number = SequenceNumber, - timestamp = Timestamp, - delay = Delay - }. - + #stream_record{partition_key = PartitionKey, + data = Data, + sequence_number = SequenceNumber, + timestamp = Timestamp, + delay = Delay}. deaggregate_kpl_records(R, Records) -> Base64Decoder = application:get_env(erlmld, base64_decoder, {base64, decode}), lists:flatmap(fun (#{<<"data">> := RecordData} = Record) -> deaggregate_kpl_record(R, Record, b64decode(Base64Decoder, RecordData)) - end, Records). - + end, + Records). deaggregate_kpl_record(_R, Record, <> = _DecodedData) - when Magic == ?KPL_AGG_MAGIC; - Magic == ?KPL_AGG_MAGIC_DEFLATED -> + when Magic == ?KPL_AGG_MAGIC; Magic == ?KPL_AGG_MAGIC_DEFLATED -> %% This is an aggregated/deflated V1 record, or a deflated aggregated V2 record. %% See https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md %% fixme; move this to kpl_agg. also, make user-supplied decode functions configurable AggData = case Magic of - ?KPL_AGG_MAGIC_DEFLATED -> + ?KPL_AGG_MAGIC_DEFLATED -> zlib:uncompress(Data); _ -> Data @@ -698,38 +644,38 @@ deaggregate_kpl_record(_R, Record, <> = _DecodedDat Checksum = binary:part(AggData, L, -16), check_md5(ProtoMsg, Checksum), decode_kpl_protobuf_message(Record, ProtoMsg); - deaggregate_kpl_record(R, Record, DecodedData) -> %% This is a non-aggregated record. [stream_record(R, partition_key(Record), DecodedData, non_kpl_sequence_number(Record))]. - check_md5(Data, Checksum) -> case application:get_env(erlmld, check_md5_of_agg_data, true) of - true -> - case crypto:hash(md5, Data) of - Checksum -> ok; - _ -> error("md5 checksum failed for aggregated record") - end; - _ -> ok + true -> + case crypto:hash(md5, Data) of + Checksum -> + ok; + _ -> + error("md5 checksum failed for aggregated record") + end; + _ -> + ok end. - decode_kpl_protobuf_message(#{<<"sequenceNumber">> := SN} = AggRecord, ProtoMsg) -> - #'AggregatedRecord'{partition_key_table = PKsList, - records = Records} = kpl_agg_pb:decode_msg(ProtoMsg, 'AggregatedRecord'), + #'AggregatedRecord'{partition_key_table = PKsList, records = Records} = + kpl_agg_pb:decode_msg(ProtoMsg, 'AggregatedRecord'), PKs = list_to_tuple(PKsList), SubSeq = maps:get(<<"subSequenceNumber">>, AggRecord, undefined), - [stream_record(AggRecord, element(PKIndex + 1, PKs), Data, sequence_number(SN, SubSeq, SSN, length(Records))) - || {#'Record'{partition_key_index = PKIndex, data = Data} = _Record, SSN} + [stream_record(AggRecord, + element(PKIndex + 1, PKs), + Data, + sequence_number(SN, SubSeq, SSN, length(Records))) + || {#'Record'{partition_key_index = PKIndex, data = Data} = _Record, SSN} <- lists:zip(Records, lists:seq(0, length(Records) - 1))]. - - %% decode a sequence number string to an integer if possible, or an atom. this is done to %% save space. -decode_seqno_base(<> = Bin) - when C >= $0, C =< $9 -> +decode_seqno_base(<> = Bin) when C >= $0, C =< $9 -> binary_to_integer(Bin); decode_seqno_base(Bin) -> %% note: the set of values which can be taken is small. @@ -740,11 +686,9 @@ encode_seqno_base(X) when is_integer(X) -> encode_seqno_base(X) when is_atom(X), X /= undefined -> atom_to_binary(X, utf8). - b64decode({M, F}, Data) -> M:F(Data). - %%%=================================================================== %%% TESTS %%%=================================================================== @@ -753,7 +697,6 @@ b64decode({M, F}, Data) -> -include_lib("eunit/include/eunit.hrl"). - deaggregate_kpl_record_v1_noagg_test() -> %% Test a non-aggregated record. R = #{}, @@ -762,144 +705,152 @@ deaggregate_kpl_record_v1_noagg_test() -> [Result] = deaggregate_kpl_record(R, NonAggRecord, NonAggData), ?assertEqual(<<"PK">>, Result#stream_record.partition_key), ?assertEqual(<<"whatever">>, Result#stream_record.data), - ?assertEqual(666, Result#stream_record.sequence_number#sequence_number.base), - ?assertEqual(undefined, Result#stream_record.sequence_number#sequence_number.sub), - ?assertEqual(undefined, Result#stream_record.sequence_number#sequence_number.user_sub), - ?assertEqual(undefined, Result#stream_record.sequence_number#sequence_number.user_total), + ?assertEqual(666, (Result#stream_record.sequence_number)#sequence_number.base), + ?assertEqual(undefined, (Result#stream_record.sequence_number)#sequence_number.sub), + ?assertEqual(undefined, (Result#stream_record.sequence_number)#sequence_number.user_sub), + ?assertEqual(undefined, + (Result#stream_record.sequence_number)#sequence_number.user_total), ok. - deaggregate_kpl_record_v1_agg_test() -> %% Test an aggregated record. %% The data is the same as in kpl_agg:simple_aggregation_test. R = #{}, - AggRecord = #{<<"partitionKey">> => <<"ignored">>, <<"sequenceNumber">> => <<"666">>, <<"subSequenceNumber">> => 0}, - AggData = <<(?KPL_AGG_MAGIC)/binary,10,3,112,107,49,10,3,112,107,50,18,4,101,104, - 107,49,18,4,101,104,107,50,26,11,8,0,16,0,26,5,100,97,116,97, - 49,26,11,8,1,16,1,26,5,100,97,116,97,50,244,41,93,155,173,190, - 58,30,240,223,216,8,26,205,86,4>>, + AggRecord = #{<<"partitionKey">> => <<"ignored">>, + <<"sequenceNumber">> => <<"666">>, + <<"subSequenceNumber">> => 0}, + AggData = <>, [Result1, Result2] = deaggregate_kpl_record(R, AggRecord, AggData), ?assertEqual(<<"pk1">>, Result1#stream_record.partition_key), ?assertEqual(<<"data1">>, Result1#stream_record.data), - ?assertEqual(666, Result1#stream_record.sequence_number#sequence_number.base), - ?assertEqual(0, Result1#stream_record.sequence_number#sequence_number.user_sub), - ?assertEqual(0, Result1#stream_record.sequence_number#sequence_number.sub), - ?assertEqual(2, Result1#stream_record.sequence_number#sequence_number.user_total), + ?assertEqual(666, (Result1#stream_record.sequence_number)#sequence_number.base), + ?assertEqual(0, (Result1#stream_record.sequence_number)#sequence_number.user_sub), + ?assertEqual(0, (Result1#stream_record.sequence_number)#sequence_number.sub), + ?assertEqual(2, (Result1#stream_record.sequence_number)#sequence_number.user_total), ?assertEqual(<<"pk2">>, Result2#stream_record.partition_key), ?assertEqual(<<"data2">>, Result2#stream_record.data), - ?assertEqual(666, Result2#stream_record.sequence_number#sequence_number.base), - ?assertEqual(1, Result2#stream_record.sequence_number#sequence_number.user_sub), - ?assertEqual(0, Result2#stream_record.sequence_number#sequence_number.sub), - ?assertEqual(2, Result2#stream_record.sequence_number#sequence_number.user_total), + ?assertEqual(666, (Result2#stream_record.sequence_number)#sequence_number.base), + ?assertEqual(1, (Result2#stream_record.sequence_number)#sequence_number.user_sub), + ?assertEqual(0, (Result2#stream_record.sequence_number)#sequence_number.sub), + ?assertEqual(2, (Result2#stream_record.sequence_number)#sequence_number.user_total), ok. - deaggregate_kpl_records_v1_noagg_test() -> R = #{}, - Records = [ - #{<<"partitionKey">> => <<"pk1">>, <<"sequenceNumber">> => <<"666">>, <<"data">> => base64:encode(<<"data1">>)}, - #{<<"partitionKey">> => <<"pk2">>, <<"sequenceNumber">> => <<"667">>, <<"data">> => base64:encode(<<"data2">>)} - ], + Records = [#{<<"partitionKey">> => <<"pk1">>, + <<"sequenceNumber">> => <<"666">>, + <<"data">> => base64:encode(<<"data1">>)}, + #{<<"partitionKey">> => <<"pk2">>, + <<"sequenceNumber">> => <<"667">>, + <<"data">> => base64:encode(<<"data2">>)}], [Result1, Result2] = deaggregate_kpl_records(R, Records), ?assertEqual(<<"pk1">>, Result1#stream_record.partition_key), ?assertEqual(<<"data1">>, Result1#stream_record.data), - ?assertEqual(666, Result1#stream_record.sequence_number#sequence_number.base), - ?assertEqual(undefined, Result1#stream_record.sequence_number#sequence_number.sub), - ?assertEqual(undefined, Result1#stream_record.sequence_number#sequence_number.user_sub), - ?assertEqual(undefined, Result1#stream_record.sequence_number#sequence_number.user_total), + ?assertEqual(666, (Result1#stream_record.sequence_number)#sequence_number.base), + ?assertEqual(undefined, (Result1#stream_record.sequence_number)#sequence_number.sub), + ?assertEqual(undefined, (Result1#stream_record.sequence_number)#sequence_number.user_sub), + ?assertEqual(undefined, + (Result1#stream_record.sequence_number)#sequence_number.user_total), ?assertEqual(<<"pk2">>, Result2#stream_record.partition_key), ?assertEqual(<<"data2">>, Result2#stream_record.data), - ?assertEqual(667, Result2#stream_record.sequence_number#sequence_number.base), - ?assertEqual(undefined, Result2#stream_record.sequence_number#sequence_number.sub), - ?assertEqual(undefined, Result1#stream_record.sequence_number#sequence_number.user_sub), - ?assertEqual(undefined, Result2#stream_record.sequence_number#sequence_number.user_total), + ?assertEqual(667, (Result2#stream_record.sequence_number)#sequence_number.base), + ?assertEqual(undefined, (Result2#stream_record.sequence_number)#sequence_number.sub), + ?assertEqual(undefined, (Result1#stream_record.sequence_number)#sequence_number.user_sub), + ?assertEqual(undefined, + (Result2#stream_record.sequence_number)#sequence_number.user_total), ok. - deaggregate_kpl_records_v1_agg_test() -> R = #{}, - Records = [ - #{<<"partitionKey">> => <<"pk1">>, <<"sequenceNumber">> => <<"666">>, <<"subSequenceNumber">> => 0, <<"data">> => base64:encode( - <<(?KPL_AGG_MAGIC)/binary,10,3,112,107,49,10,3,112,107,50,18,4,101,104, - 107,49,18,4,101,104,107,50,26,11,8,0,16,0,26,5,100,97,116,97, - 49,26,11,8,1,16,1,26,5,100,97,116,97,50,244,41,93,155,173,190, - 58,30,240,223,216,8,26,205,86,4>>)}, - #{<<"partitionKey">> => <<"pk2">>, <<"sequenceNumber">> => <<"667">>, <<"subSequenceNumber">> => 0, <<"data">> => base64:encode( - <<(?KPL_AGG_MAGIC)/binary,10,3,112,107,51,10,3,112,107,52,18,4,101,104, - 107,51,18,4,101,104,107,52,26,11,8,0,16,0,26,5,100,97,116,97, - 51,26,11,8,1,16,1,26,5,100,97,116,97,52,96,124,151,102,57,163, - 206,141,67,25,76,61,196,252,78,12>>)} - ], + Records = [#{<<"partitionKey">> => <<"pk1">>, + <<"sequenceNumber">> => <<"666">>, + <<"subSequenceNumber">> => 0, + <<"data">> => + base64:encode(<>)}, + #{<<"partitionKey">> => <<"pk2">>, + <<"sequenceNumber">> => <<"667">>, + <<"subSequenceNumber">> => 0, + <<"data">> => + base64:encode(<>)}], [Result1, Result2, Result3, Result4] = deaggregate_kpl_records(R, Records), ?assertEqual(<<"pk1">>, Result1#stream_record.partition_key), ?assertEqual(<<"data1">>, Result1#stream_record.data), - ?assertEqual(666, Result1#stream_record.sequence_number#sequence_number.base), - ?assertEqual(0, Result1#stream_record.sequence_number#sequence_number.user_sub), - ?assertEqual(0, Result1#stream_record.sequence_number#sequence_number.sub), - ?assertEqual(2, Result1#stream_record.sequence_number#sequence_number.user_total), + ?assertEqual(666, (Result1#stream_record.sequence_number)#sequence_number.base), + ?assertEqual(0, (Result1#stream_record.sequence_number)#sequence_number.user_sub), + ?assertEqual(0, (Result1#stream_record.sequence_number)#sequence_number.sub), + ?assertEqual(2, (Result1#stream_record.sequence_number)#sequence_number.user_total), ?assertEqual(<<"pk2">>, Result2#stream_record.partition_key), ?assertEqual(<<"data2">>, Result2#stream_record.data), - ?assertEqual(666, Result2#stream_record.sequence_number#sequence_number.base), - ?assertEqual(1, Result2#stream_record.sequence_number#sequence_number.user_sub), - ?assertEqual(0, Result2#stream_record.sequence_number#sequence_number.sub), - ?assertEqual(2, Result2#stream_record.sequence_number#sequence_number.user_total), + ?assertEqual(666, (Result2#stream_record.sequence_number)#sequence_number.base), + ?assertEqual(1, (Result2#stream_record.sequence_number)#sequence_number.user_sub), + ?assertEqual(0, (Result2#stream_record.sequence_number)#sequence_number.sub), + ?assertEqual(2, (Result2#stream_record.sequence_number)#sequence_number.user_total), ?assertEqual(<<"pk3">>, Result3#stream_record.partition_key), ?assertEqual(<<"data3">>, Result3#stream_record.data), - ?assertEqual(667, Result3#stream_record.sequence_number#sequence_number.base), - ?assertEqual(0, Result3#stream_record.sequence_number#sequence_number.user_sub), - ?assertEqual(0, Result2#stream_record.sequence_number#sequence_number.sub), - ?assertEqual(2, Result3#stream_record.sequence_number#sequence_number.user_total), + ?assertEqual(667, (Result3#stream_record.sequence_number)#sequence_number.base), + ?assertEqual(0, (Result3#stream_record.sequence_number)#sequence_number.user_sub), + ?assertEqual(0, (Result2#stream_record.sequence_number)#sequence_number.sub), + ?assertEqual(2, (Result3#stream_record.sequence_number)#sequence_number.user_total), ?assertEqual(<<"pk4">>, Result4#stream_record.partition_key), ?assertEqual(<<"data4">>, Result4#stream_record.data), - ?assertEqual(667, Result4#stream_record.sequence_number#sequence_number.base), - ?assertEqual(1, Result4#stream_record.sequence_number#sequence_number.user_sub), - ?assertEqual(0, Result2#stream_record.sequence_number#sequence_number.sub), - ?assertEqual(2, Result4#stream_record.sequence_number#sequence_number.user_total), + ?assertEqual(667, (Result4#stream_record.sequence_number)#sequence_number.base), + ?assertEqual(1, (Result4#stream_record.sequence_number)#sequence_number.user_sub), + ?assertEqual(0, (Result2#stream_record.sequence_number)#sequence_number.sub), + ?assertEqual(2, (Result4#stream_record.sequence_number)#sequence_number.user_total), ok. - deaggregate_kpl_records_v2_noagg_test() -> R = #{}, - Records = [ - #{<<"action">> => <<"record">>, - <<"partitionKey">> => <<"pk1">>, - <<"sequenceNumber">> => <<"666">>, - <<"subSequenceNumber">> => 123, - <<"data">> => base64:encode(<<"data1">>)}, - #{<<"action">> => <<"record">>, - <<"partitionKey">> => <<"pk2">>, - <<"sequenceNumber">> => <<"666">>, - <<"subSequenceNumber">> => 124, - <<"data">> => base64:encode(<<"data2">>)} - ], + Records = [#{<<"action">> => <<"record">>, + <<"partitionKey">> => <<"pk1">>, + <<"sequenceNumber">> => <<"666">>, + <<"subSequenceNumber">> => 123, + <<"data">> => base64:encode(<<"data1">>)}, + #{<<"action">> => <<"record">>, + <<"partitionKey">> => <<"pk2">>, + <<"sequenceNumber">> => <<"666">>, + <<"subSequenceNumber">> => 124, + <<"data">> => base64:encode(<<"data2">>)}], [Result1, Result2] = deaggregate_kpl_records(R, Records), ?assertEqual(<<"pk1">>, Result1#stream_record.partition_key), ?assertEqual(<<"data1">>, Result1#stream_record.data), - ?assertEqual(666, Result1#stream_record.sequence_number#sequence_number.base), - ?assertEqual(123, Result1#stream_record.sequence_number#sequence_number.sub), - ?assertEqual(undefined, Result2#stream_record.sequence_number#sequence_number.user_sub), - ?assertEqual(undefined, Result1#stream_record.sequence_number#sequence_number.user_total), + ?assertEqual(666, (Result1#stream_record.sequence_number)#sequence_number.base), + ?assertEqual(123, (Result1#stream_record.sequence_number)#sequence_number.sub), + ?assertEqual(undefined, (Result2#stream_record.sequence_number)#sequence_number.user_sub), + ?assertEqual(undefined, + (Result1#stream_record.sequence_number)#sequence_number.user_total), ?assertEqual(<<"pk2">>, Result2#stream_record.partition_key), ?assertEqual(<<"data2">>, Result2#stream_record.data), - ?assertEqual(666, Result2#stream_record.sequence_number#sequence_number.base), - ?assertEqual(124, Result2#stream_record.sequence_number#sequence_number.sub), - ?assertEqual(undefined, Result2#stream_record.sequence_number#sequence_number.user_sub), - ?assertEqual(undefined, Result2#stream_record.sequence_number#sequence_number.user_total), + ?assertEqual(666, (Result2#stream_record.sequence_number)#sequence_number.base), + ?assertEqual(124, (Result2#stream_record.sequence_number)#sequence_number.sub), + ?assertEqual(undefined, (Result2#stream_record.sequence_number)#sequence_number.user_sub), + ?assertEqual(undefined, + (Result2#stream_record.sequence_number)#sequence_number.user_total), ok. - -endif. diff --git a/src/erlmld_wrk_sup.erl b/src/erlmld_wrk_sup.erl index 10d7686..933ec9f 100644 --- a/src/erlmld_wrk_sup.erl +++ b/src/erlmld_wrk_sup.erl @@ -14,14 +14,10 @@ -behaviour(supervisor). %% API --export([start_link/3, - start_worker/2]). - +-export([start_link/3, start_worker/2]). %% Supervisor callbacks -export([init/1]). --define(WORKER, erlmld_wrk_statem). - %%%=================================================================== %%% API functions %%%=================================================================== @@ -31,7 +27,7 @@ start_link(Regname, RecordProcessor, RecordProcessorData) -> start_worker(SupRef, AcceptedSocket) -> {ok, Pid} = Result = supervisor:start_child(SupRef, []), - ok = ?WORKER:accept(Pid, AcceptedSocket), + ok = erlmld_wrk_statem:accept(Pid, AcceptedSocket), Result. %%%=================================================================== @@ -39,13 +35,10 @@ start_worker(SupRef, AcceptedSocket) -> %%%=================================================================== init([RecordProcessor, RecordProcessorData]) -> + SupFlags = #{strategy => simple_one_for_one, intensity => 0, period => 1}, - SupFlags = #{strategy => simple_one_for_one, - intensity => 0, - period => 1}, - - Worker = #{id => ?WORKER, - start => {?WORKER, start_link, [RecordProcessor, RecordProcessorData]}, + Worker = #{id => erlmld_wrk_statem, + start => {erlmld_wrk_statem, start_link, [RecordProcessor, RecordProcessorData]}, restart => temporary, shutdown => brutal_kill}, diff --git a/src/kpl_agg.erl b/src/kpl_agg.erl index 098fbc3..73a348d 100644 --- a/src/kpl_agg.erl +++ b/src/kpl_agg.erl @@ -55,33 +55,25 @@ -include("kpl_agg_pb.hrl"). %% A set of keys, mapping each key to a unique index. --record(keyset, { - rev_keys = [] :: list(binary()), %% list of known keys in reverse order - rev_keys_length = 0 :: non_neg_integer(), %% length of the rev_keys list - key_to_index = maps:new() :: map() %% maps each known key to a 0-based index -}). - +-record(keyset, + {rev_keys = [] :: [binary()], %% list of known keys in reverse order + rev_keys_length = 0 :: non_neg_integer(), %% length of the rev_keys list + key_to_index = maps:new() :: map()}). %% maps each known key to a 0-based index %% Internal state of a record aggregator. It stores an aggregated record that %% is "in progress", i.e. it is possible to add more user records to it. --record(state, { - num_user_records = 0 :: non_neg_integer(), - agg_size_bytes = 0 :: non_neg_integer(), - - %% The aggregated record's PartitionKey and ExplicitHashKey are the - %% PartitionKey and ExplicitHashKey of the first user record added. - agg_partition_key = undefined :: undefined | binary(), - agg_explicit_hash_key = undefined :: undefined | binary(), - - %% Keys seen in the user records added so far. - partition_keyset = #keyset{} :: #keyset{}, - explicit_hash_keyset = #keyset{} :: #keyset{}, - - %% List if user records added so far, in reverse order. - rev_records = [] :: [#'Record'{}], - - should_deflate = false -}). - +-record(state, + {num_user_records = 0 :: non_neg_integer(), + agg_size_bytes = 0 :: non_neg_integer(), + %% The aggregated record's PartitionKey and ExplicitHashKey are the + %% PartitionKey and ExplicitHashKey of the first user record added. + agg_partition_key = undefined :: undefined | binary(), + agg_explicit_hash_key = undefined :: undefined | binary(), + %% Keys seen in the user records added so far. + partition_keyset = #keyset{} :: #keyset{}, + explicit_hash_keyset = #keyset{} :: #keyset{}, + %% List if user records added so far, in reverse order. + rev_records = [] :: [#'Record'{}], + should_deflate = false}). %%%=================================================================== %%% API @@ -89,68 +81,69 @@ new() -> new(false). + new(ShouldDeflate) -> #state{should_deflate = ShouldDeflate}. count(#state{num_user_records = Num} = _State) -> Num. -size_bytes(#state{agg_size_bytes = Size, - agg_partition_key = PK} = _State) -> - byte_size(?KPL_AGG_MAGIC) - + Size - + ?MD5_DIGEST_BYTES - + (case PK of - undefined -> 0; - _ -> byte_size(PK) - end) - + byte_size(kpl_agg_pb:encode_msg(#'AggregatedRecord'{})). +size_bytes(#state{agg_size_bytes = Size, agg_partition_key = PK} = _State) -> + byte_size(?KPL_AGG_MAGIC) + Size + ?MD5_DIGEST_BYTES + + case PK of + undefined -> + 0; + _ -> + byte_size(PK) + end + + byte_size(kpl_agg_pb:encode_msg(#'AggregatedRecord'{})). finish(#state{num_user_records = 0} = State) -> {undefined, State}; - -finish(#state{agg_partition_key = AggPK, agg_explicit_hash_key = AggEHK, should_deflate = ShouldDeflate} = State) -> +finish(#state{agg_partition_key = AggPK, + agg_explicit_hash_key = AggEHK, + should_deflate = ShouldDeflate} = + State) -> AggRecord = {AggPK, serialize_data(State, ShouldDeflate), AggEHK}, {AggRecord, new(ShouldDeflate)}. - add(State, {PartitionKey, Data} = _Record) -> add(State, {PartitionKey, Data, create_explicit_hash_key(PartitionKey)}); - add(State, {PartitionKey, Data, ExplicitHashKey} = _Record) -> case {calc_record_size(State, PartitionKey, Data, ExplicitHashKey), size_bytes(State)} of - {RecSize, _} when RecSize > ?KINESIS_MAX_BYTES_PER_RECORD -> - error("input record too large to fit in a single Kinesis record"); - {RecSize, CurSize} when RecSize + CurSize > ?KINESIS_MAX_BYTES_PER_RECORD -> - {FullRecord, State1} = finish(State), - State2 = add_record(State1, PartitionKey, Data, ExplicitHashKey, RecSize), - {FullRecord, State2}; - {RecSize, _} -> - State1 = add_record(State, PartitionKey, Data, ExplicitHashKey, RecSize), - %% fixme; make size calculations more accurate - case size_bytes(State1) > ?KINESIS_MAX_BYTES_PER_RECORD - 64 of - true -> - %% size estimate is almost the limit, finish & retry: - {FullRecord, State2} = finish(State), - State3 = add_record(State2, PartitionKey, Data, ExplicitHashKey, RecSize), - {FullRecord, State3}; - false -> - {undefined, State1} - end + {RecSize, _} when RecSize > ?KINESIS_MAX_BYTES_PER_RECORD -> + error("input record too large to fit in a single Kinesis record"); + {RecSize, CurSize} when RecSize + CurSize > ?KINESIS_MAX_BYTES_PER_RECORD -> + {FullRecord, State1} = finish(State), + State2 = add_record(State1, PartitionKey, Data, ExplicitHashKey, RecSize), + {FullRecord, State2}; + {RecSize, _} -> + State1 = add_record(State, PartitionKey, Data, ExplicitHashKey, RecSize), + %% fixme; make size calculations more accurate + case size_bytes(State1) > ?KINESIS_MAX_BYTES_PER_RECORD - 64 of + true -> + %% size estimate is almost the limit, finish & retry: + {FullRecord, State2} = finish(State), + State3 = add_record(State2, PartitionKey, Data, ExplicitHashKey, RecSize), + {FullRecord, State3}; + false -> + {undefined, State1} + end end. - add_all(State, Records) -> - {RevAggRecords, NState} = lists:foldl( - fun(Record, {RevAggRecords, Agg}) -> - case add(Agg, Record) of - {undefined, NewAgg} -> {RevAggRecords, NewAgg}; - {AggRecord, NewAgg} -> {[AggRecord | RevAggRecords], NewAgg} - end - end, {[], State}, Records), + {RevAggRecords, NState} = lists:foldl(fun (Record, {RevAggRecords, Agg}) -> + case add(Agg, Record) of + {undefined, NewAgg} -> + {RevAggRecords, NewAgg}; + {AggRecord, NewAgg} -> + {[AggRecord | RevAggRecords], NewAgg} + end + end, + {[], State}, + Records), {lists:reverse(RevAggRecords), NState}. - %%%=================================================================== %%% Internal functions %%%=================================================================== @@ -162,41 +155,48 @@ create_explicit_hash_key(_PartitionKey) -> %% implementation [2]. But we don't care about EHKs anyway. %% [1] https://github.com/awslabs/kinesis-aggregation/blob/db92620e435ad9924356cda7d096e3c888f0f72f/python/aws_kinesis_agg/aggregator.py#L447-L458 %% [2] https://github.com/awslabs/amazon-kinesis-producer/blob/ea1e49218e1a11f1b462662a1db4cc06ddad39bb/aws/kinesis/core/user_record.cc#L36-L45 - %% FIXME: Implement the actual algorithm from KPL. undefined. - %% Calculate how many extra bytes the given user record would take, when added %% to the current aggregated record. This calculation has to know about KPL and %% Protobuf internals. calc_record_size(#state{partition_keyset = PartitionKeySet, - explicit_hash_keyset = ExplicitHashKeySet} = _State, - PartitionKey, Data, ExplicitHashKey) -> - + explicit_hash_keyset = ExplicitHashKeySet} = + _State, + PartitionKey, + Data, + ExplicitHashKey) -> %% How much space we need for the PK: PKLength = byte_size(PartitionKey), PKSize = case is_key(PartitionKey, PartitionKeySet) of - true -> 0; - false -> 1 + varint_size(PKLength) + PKLength - end, + true -> + 0; + false -> + 1 + varint_size(PKLength) + PKLength + end, %% How much space we need for the EHK: EHKSize = case ExplicitHashKey of - undefined -> 0; - _ -> - EHKLength = byte_size(ExplicitHashKey), - case is_key(ExplicitHashKey, ExplicitHashKeySet) of - true -> 0; - false -> 1 + varint_size(EHKLength) + EHKLength - end + undefined -> + 0; + _ -> + EHKLength = byte_size(ExplicitHashKey), + case is_key(ExplicitHashKey, ExplicitHashKeySet) of + true -> + 0; + false -> + 1 + varint_size(EHKLength) + EHKLength + end end, %% How much space we need for the inner record: PKIndexSize = 1 + varint_size(potential_index(PartitionKey, PartitionKeySet)), EHKIndexSize = case ExplicitHashKey of - undefined -> 0; - _ -> 1 + varint_size(potential_index(ExplicitHashKey, ExplicitHashKeySet)) + undefined -> + 0; + _ -> + 1 + varint_size(potential_index(ExplicitHashKey, ExplicitHashKeySet)) end, DataLength = byte_size(Data), DataSize = 1 + varint_size(DataLength) + DataLength, @@ -205,7 +205,6 @@ calc_record_size(#state{partition_keyset = PartitionKeySet, %% How much space we need for the entire record: PKSize + EHKSize + 1 + varint_size(InnerSize) + InnerSize. - %% Calculate how many bytes are needed to represent the given integer in a %% Protobuf message. varint_size(Integer) when Integer >= 0 -> @@ -213,11 +212,11 @@ varint_size(Integer) when Integer >= 0 -> (NumBits + 6) div 7. %% Recursively compute the number of bits needed to represent an integer. -num_bits(0, Acc) -> Acc; +num_bits(0, Acc) -> + Acc; num_bits(Integer, Acc) when Integer >= 0 -> num_bits(Integer bsr 1, Acc + 1). - %% Helper for add; do not use directly. add_record(#state{partition_keyset = PKSet, explicit_hash_keyset = EHKSet, @@ -225,50 +224,48 @@ add_record(#state{partition_keyset = PKSet, num_user_records = NumUserRecords, agg_size_bytes = AggSize, agg_partition_key = AggPK, - agg_explicit_hash_key = AggEHK} = State, - PartitionKey, Data, ExplicitHashKey, NewRecordSize) -> + agg_explicit_hash_key = AggEHK} = + State, + PartitionKey, + Data, + ExplicitHashKey, + NewRecordSize) -> {PKIndex, NewPKSet} = get_or_add_key(PartitionKey, PKSet), {EHKIndex, NewEHKSet} = get_or_add_key(ExplicitHashKey, EHKSet), - NewRecord = #'Record'{ - partition_key_index = PKIndex, - explicit_hash_key_index = EHKIndex, - data = Data - }, - State#state{ - partition_keyset = NewPKSet, - explicit_hash_keyset = NewEHKSet, - rev_records = [NewRecord | RevRecords], - num_user_records = 1 + NumUserRecords, - agg_size_bytes = NewRecordSize + AggSize, - agg_partition_key = first_defined(AggPK, PartitionKey), - agg_explicit_hash_key = first_defined(AggEHK, ExplicitHashKey) - }. - - -first_defined(undefined, Second) -> Second; -first_defined(First, _) -> First. - + NewRecord = #'Record'{partition_key_index = PKIndex, + explicit_hash_key_index = EHKIndex, + data = Data}, + State#state{partition_keyset = NewPKSet, + explicit_hash_keyset = NewEHKSet, + rev_records = [NewRecord | RevRecords], + num_user_records = 1 + NumUserRecords, + agg_size_bytes = NewRecordSize + AggSize, + agg_partition_key = first_defined(AggPK, PartitionKey), + agg_explicit_hash_key = first_defined(AggEHK, ExplicitHashKey)}. + +first_defined(undefined, Second) -> + Second; +first_defined(First, _) -> + First. serialize_data(#state{partition_keyset = PKSet, explicit_hash_keyset = EHKSet, - rev_records = RevRecords} = _State, ShouldDeflate) -> - ProtobufMessage = #'AggregatedRecord'{ - partition_key_table = key_list(PKSet), - explicit_hash_key_table = key_list(EHKSet), - records = lists:reverse(RevRecords) - }, + rev_records = RevRecords} = + _State, + ShouldDeflate) -> + ProtobufMessage = #'AggregatedRecord'{partition_key_table = key_list(PKSet), + explicit_hash_key_table = key_list(EHKSet), + records = lists:reverse(RevRecords)}, SerializedData = kpl_agg_pb:encode_msg(ProtobufMessage), Checksum = crypto:hash(md5, SerializedData), case ShouldDeflate of - true -> - <>))/binary>>; - - false -> - <> + true -> + <>))/binary>>; + false -> + <> end. - %%%=================================================================== %%% Internal functions for keysets %%%=================================================================== @@ -276,34 +273,33 @@ serialize_data(#state{partition_keyset = PKSet, is_key(Key, #keyset{key_to_index = KeyToIndex} = _KeySet) -> maps:is_key(Key, KeyToIndex). - get_or_add_key(undefined, KeySet) -> {undefined, KeySet}; -get_or_add_key(Key, #keyset{rev_keys = RevKeys, rev_keys_length = Length, key_to_index = KeyToIndex} = KeySet) -> +get_or_add_key(Key, + #keyset{rev_keys = RevKeys, rev_keys_length = Length, key_to_index = KeyToIndex} = + KeySet) -> case maps:get(Key, KeyToIndex, not_found) of - not_found -> - NewKeySet = KeySet#keyset{ - rev_keys = [Key | RevKeys], - rev_keys_length = Length + 1, - key_to_index = maps:put(Key, Length, KeyToIndex) - }, - {Length, NewKeySet}; - Index -> - {Index, KeySet} + not_found -> + NewKeySet = KeySet#keyset{rev_keys = [Key | RevKeys], + rev_keys_length = Length + 1, + key_to_index = maps:put(Key, Length, KeyToIndex)}, + {Length, NewKeySet}; + Index -> + {Index, KeySet} end. - -potential_index(Key, #keyset{rev_keys_length = Length, key_to_index = KeyToIndex} = _KeySet) -> +potential_index(Key, + #keyset{rev_keys_length = Length, key_to_index = KeyToIndex} = _KeySet) -> case maps:get(Key, KeyToIndex, not_found) of - not_found -> Length; - Index -> Index + not_found -> + Length; + Index -> + Index end. - key_list(#keyset{rev_keys = RevKeys} = _KeySet) -> lists:reverse(RevKeys). - %%%=================================================================== %%% TESTS %%%=================================================================== @@ -323,7 +319,6 @@ varint_size_test() -> ?assertEqual(6, varint_size(999999999999)), ok. - keyset_test() -> KeySet0 = #keyset{}, ?assertEqual([], key_list(KeySet0)), @@ -346,7 +341,6 @@ keyset_test() -> ok. - empty_aggregator_test() -> Agg = new(), ?assertEqual(0, count(Agg)), @@ -354,7 +348,6 @@ empty_aggregator_test() -> {undefined, Agg} = finish(Agg), ok. - simple_aggregation_test() -> Agg0 = new(), {undefined, Agg1} = add(Agg0, {<<"pk1">>, <<"data1">>, <<"ehk1">>}), @@ -364,44 +357,41 @@ simple_aggregation_test() -> %% Reference values obtained using priv/kpl_agg_tests_helper.py. RefPK = <<"pk1">>, RefEHK = <<"ehk1">>, - RefData = <<(?KPL_AGG_MAGIC)/binary,10,3,112,107,49,10,3,112,107,50,18,4,101,104, - 107,49,18,4,101,104,107,50,26,11,8,0,16,0,26,5,100,97,116,97, - 49,26,11,8,1,16,1,26,5,100,97,116,97,50,244,41,93,155,173,190, - 58,30,240,223,216,8,26,205,86,4>>, + RefData = <>, ?assertEqual({RefPK, RefData, RefEHK}, AggRecord), ok. - aggregate_many(Records) -> {AggRecords, Agg} = add_all(new(), Records), case finish(Agg) of - {undefined, _} -> AggRecords; - {LastAggRecord, _} -> AggRecords ++ [LastAggRecord] + {undefined, _} -> + AggRecords; + {LastAggRecord, _} -> + AggRecords ++ [LastAggRecord] end. - shared_keys_test() -> - [AggRecord] = aggregate_many([ - {<<"alpha">>, <<"data1">>, <<"zulu">>}, - {<<"beta">>, <<"data2">>, <<"yankee">>}, - {<<"alpha">>, <<"data3">>, <<"xray">>}, - {<<"charlie">>, <<"data4">>, <<"yankee">>}, - {<<"beta">>, <<"data5">>, <<"zulu">>} - ]), + [AggRecord] = aggregate_many([{<<"alpha">>, <<"data1">>, <<"zulu">>}, + {<<"beta">>, <<"data2">>, <<"yankee">>}, + {<<"alpha">>, <<"data3">>, <<"xray">>}, + {<<"charlie">>, <<"data4">>, <<"yankee">>}, + {<<"beta">>, <<"data5">>, <<"zulu">>}]), %% Reference values obtained using priv/kpl_agg_tests_helper.py. RefPK = <<"alpha">>, RefEHK = <<"zulu">>, - RefData = <<(?KPL_AGG_MAGIC)/binary,10,5,97,108,112,104,97,10,4,98,101,116,97,10, - 7,99,104,97,114,108,105,101,18,4,122,117,108,117,18,6,121,97, - 110,107,101,101,18,4,120,114,97,121,26,11,8,0,16,0,26,5,100, - 97,116,97,49,26,11,8,1,16,1,26,5,100,97,116,97,50,26,11,8,0, - 16,2,26,5,100,97,116,97,51,26,11,8,2,16,1,26,5,100,97,116,97, - 52,26,11,8,1,16,0,26,5,100,97,116,97,53,78,67,160,206,22,1, - 33,154,3,6,110,235,9,229,53,100>>, + RefData = <>, ?assertEqual({RefPK, RefData, RefEHK}, AggRecord), ok. - record_fullness_test() -> Data1 = list_to_binary(["X" || _ <- lists:seq(1, 500000)]), Data2 = list_to_binary(["Y" || _ <- lists:seq(1, 600000)]), @@ -425,17 +415,16 @@ record_fullness_test() -> %?assertEqual(RefChecksum2, crypto:hash(md5, AggData2)), ok. - full_record_test() -> - Fill = fun F (Acc) -> + Fill = fun F(Acc) -> PK = integer_to_binary(rand:uniform(1000)), Data = << <<(integer_to_binary(rand:uniform(128)))/binary>> - || _ <- lists:seq(1, 1 + rand:uniform(1000)) >>, + || _ <- lists:seq(1, 1 + rand:uniform(1000)) >>, case add(Acc, {PK, Data}) of - {undefined, NAcc} -> - F(NAcc); - {Full, _} -> - Full + {undefined, NAcc} -> + F(NAcc); + {Full, _} -> + Full end end, {PK, Data, _} = Fill(new()), @@ -443,7 +432,6 @@ full_record_test() -> ?assert(Total =< ?KINESIS_MAX_BYTES_PER_RECORD), ?assert(Total >= ?KINESIS_MAX_BYTES_PER_RECORD - 2048). - deflate_test() -> Agg0 = new(true), {undefined, Agg1} = add(Agg0, {<<"pk1">>, <<"data1">>, <<"ehk1">>}), @@ -458,5 +446,4 @@ deflate_test() -> #'Record'{data = RecordData} = R1, ?assertEqual(<<"data1">>, RecordData). - -endif.