Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/verify tool sample #722

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apps/arweave/include/ar_verify_chunks.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,13 @@
status = not_ready :: not_ready | running| done
}).

-record(sample_report, {
total = 0 :: non_neg_integer(),
success = 0 :: non_neg_integer(),
not_found = 0 :: non_neg_integer(),
failure = 0 :: non_neg_integer()
}).

-define(SAMPLE_CHUNK_COUNT, 1000).

-endif.
26 changes: 18 additions & 8 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,17 @@ get_chunk(Offset, #{ packing := Packing } = Options) ->
ModuleIDs = [ar_storage_module:id(Module) || Module <- Modules],
RootRecords = [ets:lookup(sync_records, {ar_data_sync, ID})
|| ID <- ModuleIDs],
log_chunk_error(RequestOrigin, chunk_record_not_found,
[{modules_covering_offset, ModuleIDs},
{root_sync_records, RootRecords},
{seek_offset, SeekOffset},
{reply, io_lib:format("~p", [Reply])},
{is_recorded_unpacked, io_lib:format("~p", [UnpackedReply])}]),
case RequestOrigin of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log_chunk_error should already skip logging for http and tx_data origins - is there an origin that I missed? I can add it to the matching clause

https://github.com/ArweaveTeam/arweave/pull/722/files#diff-2934ddad2ed46ac194a6be7abde4be6d440bc7565e1128bba234fad9fd2bd5d2L1489-L1494

miner ->
log_chunk_error(RequestOrigin, chunk_record_not_found,
[{modules_covering_offset, ModuleIDs},
{root_sync_records, RootRecords},
{seek_offset, SeekOffset},
{reply, io_lib:format("~p", [Reply])},
{is_recorded_unpacked, io_lib:format("~p", [UnpackedReply])}]);
_ ->
ok
end,
{error, chunk_not_found}
end.

Expand Down Expand Up @@ -1933,8 +1938,13 @@ validate_fetched_chunk(Args) ->
[{_, T}] = ets:lookup(ar_data_sync_state, disk_pool_threshold),
case Offset > T orelse not ar_node:is_joined() of
true ->
log_chunk_error(RequestOrigin, miner_requested_disk_pool_chunk,
[{disk_pool_threshold, T}, {end_offset, Offset}]),
case RequestOrigin of
miner ->
log_chunk_error(RequestOrigin, miner_requested_disk_pool_chunk,
[{disk_pool_threshold, T}, {end_offset, Offset}]);
_ ->
ok
end,
{true, none};
false ->
case ar_block_index:get_block_bounds(Offset - 1) of
Expand Down
144 changes: 137 additions & 7 deletions apps/arweave/src/ar_verify_chunks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
-export([start_link/2, name/1]).
-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_consensus.hrl").
-include_lib("arweave/include/ar_verify_chunks.hrl").
-export([generate_sample_offsets/3]).

-include("../include/ar.hrl").
-include("../include/ar_consensus.hrl").
-include("../include/ar_verify_chunks.hrl").

-include_lib("eunit/include/eunit.hrl").

-record(state, {
store_id :: binary(),
packing :: binary(),
store_id :: string(),
packing :: term(),
start_offset :: non_neg_integer(),
end_offset :: non_neg_integer(),
cursor :: non_neg_integer(),
Expand All @@ -39,7 +42,7 @@ name(StoreID) ->
init(StoreID) ->
?LOG_INFO([{event, verify_chunk_storage_started}, {store_id, StoreID}]),
{StartOffset, EndOffset} = ar_storage_module:get_range(StoreID),
gen_server:cast(self(), verify),
gen_server:cast(self(), sample),
{ok, #state{
store_id = StoreID,
packing = ar_storage_module:get_packing(StoreID),
Expand All @@ -52,6 +55,21 @@ init(StoreID) ->
}
}}.

handle_cast(sample, #state{ready = false, end_offset = EndOffset} = State) ->
ar_util:cast_after(1000, self(), sample),
{noreply, State#state{ready = is_ready(EndOffset)}};
handle_cast(sample,
#state{cursor = Cursor, end_offset = EndOffset} = State) when Cursor >= EndOffset ->
ar:console("Done!~n"),
{noreply, State};
handle_cast(sample, State) ->
%% Sample ?SAMPLE_CHUNK_COUNT random chunks, read them, unpack them and verify them.
%% Report the collected statistics and continue with the "verify" procedure.
sample_random_chunks(?SAMPLE_CHUNK_COUNT, State#state.packing,
State#state.start_offset, State#state.end_offset, State#state.store_id),
gen_server:cast(self(), verify),
{noreply, State};

handle_cast(verify, #state{ready = false, end_offset = EndOffset} = State) ->
ar_util:cast_after(1000, self(), verify),
{noreply, State#state{ready = is_ready(EndOffset)}};
Expand Down Expand Up @@ -152,7 +170,7 @@ verify_proof(Metadata, State) ->

verify_packing(Metadata, State) ->
#state{packing=Packing, store_id=StoreID} = State,
{AbsoluteOffset, ChunkDataKey, TXRoot, _DataRoot, TXPath,
{AbsoluteOffset, _ChunkDataKey, _TXRoot, _DataRoot, _TXPath,
_TXRelativeOffset, ChunkSize} = Metadata,
PaddedOffset = ar_block:get_chunk_padded_offset(AbsoluteOffset),
StoredPackingCheck = ar_sync_record:is_recorded(AbsoluteOffset, ar_data_sync, StoreID),
Expand Down Expand Up @@ -278,6 +296,10 @@ check_interval({End, Start}) when Start > End ->
check_interval(Interval) ->
Interval.

%% Report the sample results by sending them to ar_verify_chunks_reporter.
report_sample(StoreID, #sample_report{} = SampleReport) ->
ar_verify_chunks_reporter:sample_update(StoreID, SampleReport).

report_progress(State) ->
#state{
store_id = StoreID, verify_report = Report, cursor = Cursor,
Expand All @@ -299,6 +321,59 @@ report_progress(State) ->
ar_verify_chunks_reporter:update(StoreID, Report2),
State#state{ verify_report = Report2 }.

%% Generate a list of Count random offsets in the range (Start, End]
%% (i.e. offsets strictly greater than Start and less than or equal to End)
%% such that if an offset is sampled, no other offsets are selected from the
%% open interval (Offset - ?DATA_CHUNK_SIZE, Offset + ?DATA_CHUNK_SIZE).
generate_sample_offsets(Start, End, Count) when is_integer(Start), is_integer(End) ->
Candidates = lists:seq(Start + 1, End, ?DATA_CHUNK_SIZE),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a full 3.6TB partition I think Candidates will be a list of length 14.4 million entries or have I misread?

Could that become a memory issue (e.g. if running verify on multiple storage modules concurrently)?

Count2 = min(Count, length(Candidates)),
pick_offsets(Candidates, Count2).

pick_offsets(_Candidates, 0) ->
[];
pick_offsets([], _Count) ->
[];
pick_offsets(Candidates, Count) ->
Offsets = pick_offsets_batch(min(Count, length(Candidates)), Candidates),
UniqueOffsets = lists:usort(Offsets),
NewCandidates = Candidates -- UniqueOffsets,
UniqueOffsets ++ pick_offsets(NewCandidates, Count - length(UniqueOffsets)).

pick_offsets_batch(Len, Candidates) ->
pick_offsets_batch(Len, Candidates, min(Len, 20)).

pick_offsets_batch(_Len, _Candidates, 0) ->
[];
pick_offsets_batch(Len, Candidates, BatchSize) ->
N = rand:uniform(Len),
[lists:nth(N, Candidates) | pick_offsets_batch(Len, Candidates, BatchSize - 1)].

%% Use generate_sample_offsets/3 to obtain offsets (with exclusion)
%% and then queries ar_data_sync:get_chunk/2 with options to trigger unpacking.
sample_random_chunks(Count, Packing, Start, End, StoreID) ->
Offsets = generate_sample_offsets(Start, End, Count),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than precalculate the list of offsets, what about doing one offset at a time sampled randomly from the range, and if that chunk exists on disk just adding that offset to a set so we never try it again? Maybe this will avoid the 14.4M offset list?

lists:foldl(fun(Offset, Acc) ->
report_sample(StoreID, Acc),
case ar_data_sync:get_chunk(Offset, #{pack => true, packing => Packing}) of
{ok, _Proof} ->
Acc#sample_report{
total = Acc#sample_report.total + 1,
success = Acc#sample_report.success + 1
};
{error, chunk_not_found} ->
Acc#sample_report{
total = Acc#sample_report.total + 1,
not_found = Acc#sample_report.not_found + 1
};
{error, _Reason} ->
Acc#sample_report{
total = Acc#sample_report.total + 1,
failure = Acc#sample_report.failure + 1
}
end
end, #sample_report{}, Offsets).

%% ar_chunk_storage does not store small chunks before strict_split_data_threshold
%% (before 30607159107830 = partitions 0-7 and a half of 8
%%
Expand Down Expand Up @@ -614,3 +689,58 @@ test_verify_chunk() ->
{Interval, not_found},
#state{ cursor = 0, packing = unpacked })),
ok.

%% Verify that generate_sample_offsets/3 samples without replacement.
sample_offsets_without_replacement_test() ->
ChunkSize = ?DATA_CHUNK_SIZE,
Count = 5,
Offsets = generate_sample_offsets(ChunkSize * 10, ChunkSize * 1000, Count),
%% Check that exactly Count offsets are produced.
?assertEqual(Count, length(Offsets)),
%% For every pair, ensure the absolute difference is at least ?DATA_CHUNK_SIZE.
lists:foreach(fun(A) ->
lists:foreach(fun(B) ->
case {A == B, abs(A - B) < ?DATA_CHUNK_SIZE} of
{true, _} -> ok;
{false, true} -> ?assert(false);
_ -> ok
end
end, Offsets)
end, Offsets),
Offsets2 = generate_sample_offsets(0, ChunkSize, Count),
%% We cannot sample more than one offset without replacement.
?assertEqual(1, length(Offsets2)).

%% Verify sample_random_chunks/4 aggregates outcomes correctly.
%%
%% We mock ar_data_sync:get_chunk/2 such that:
%% - The first call returns {error, chunk_not_found},
%% - The second call returns {ok, <<"valid_proof">>},
%% - The third call returns {error, invalid_chunk}.
%% Note: Using atoms for partition borders triggers the fallback in generate_sample_offsets/3.
sample_random_chunks_test_() ->
[
ar_test_node:test_with_mocked_functions(
[{ar_data_sync, get_chunk, fun(_Offset, _Opts) ->
%% Use process dictionary to simulate sequential responses.
Counter = case erlang:get(sample_counter) of
undefined -> 0;
C -> C
end,
erlang:put(sample_counter, Counter + 1),
case Counter of
0 -> {error, chunk_not_found};
1 -> {ok, <<"valid_proof">>};
2 -> {error, invalid_chunk}
end
end}],
fun test_sample_random_chunks/0)
].

test_sample_random_chunks() ->
%% Initialize counter.
erlang:put(sample_counter, 0),
Packing = unpacked,
Report = sample_random_chunks(3, Packing, 0, 262144 * 3, "test"),
ExpectedReport = #sample_report{total = 3, success = 1, not_found = 1, failure = 1},
?assertEqual(ExpectedReport, Report).
37 changes: 31 additions & 6 deletions apps/arweave/src/ar_verify_chunks_reporter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@

-behaviour(gen_server).

-export([start_link/0, update/2]).
-export([start_link/0, update/2, sample_update/2]).
-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_verify_chunks.hrl").
-include("../include/ar.hrl").
-include("../include/ar_verify_chunks.hrl").
-include_lib("eunit/include/eunit.hrl").

-record(state, {
reports = #{} :: #{binary() => #verify_report{}}
reports = #{} :: #{string() => #verify_report{}},
sample_reports = #{} :: #{string() => #sample_report{}}
}).

-define(REPORT_PROGRESS_INTERVAL, 10000).
Expand All @@ -24,10 +25,14 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

-spec update(binary(), #verify_report{}) -> ok.
-spec update(string(), #verify_report{}) -> ok.
update(StoreID, Report) ->
gen_server:cast(?MODULE, {update, StoreID, Report}).

-spec sample_update(string(), #sample_report{}) -> ok.
sample_update(StoreID, SampleReport) ->
gen_server:cast(?MODULE, {sample_update, StoreID, SampleReport}).

%%%===================================================================
%%% Generic server callbacks.
%%%===================================================================
Expand All @@ -49,6 +54,12 @@ handle_cast(report_progress, State) ->
ar_util:cast_after(?REPORT_PROGRESS_INTERVAL, self(), report_progress),
{noreply, State};

handle_cast({sample_update, StoreID, SampleReport}, State) ->
NewSampleReports = maps:put(StoreID, SampleReport, State#state.sample_reports),
print_sampling_header(),
print_sample_report(StoreID, SampleReport),
{noreply, State#state{sample_reports = NewSampleReports}};

handle_cast(Cast, State) ->
?LOG_WARNING([{event, unhandled_cast}, {module, ?MODULE}, {cast, Cast}]),
{noreply, State}.
Expand Down Expand Up @@ -102,4 +113,18 @@ print_report(StoreID, Report) ->
TotalErrorBytes / 1000000000, Rate / 1000000,
Status
]
).
).

print_sample_report(StoreID, #sample_report{
total = Total,
success = Success,
not_found = NotFound,
failure = Failure
}) ->
ar:console("| ~-65s | ~7B | ~7B | ~7B | ~8B |~n",
[StoreID, Total, Success, NotFound, Failure]).

print_sampling_header() ->
ar:console("|-------------------------------------------------------------------+---------+---------+---------+----------|~n", []),
ar:console("| Storage Module | Total | Success | Missing | Failure |~n", []),
ar:console("|-------------------------------------------------------------------+---------+---------+---------+----------|~n", []).