Skip to content

Commit

Permalink
feature: expose the packing_worker and replica_2_9_workers flags
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesPiechota committed Jan 19, 2025
1 parent 64415ac commit 43982fe
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 22 deletions.
5 changes: 4 additions & 1 deletion apps/arweave/include/ar_config.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@
-define(DEFAULT_REPLICA_2_9_WORKERS, 8).
-endif.

%% The number of packing workers.
-define(DEFAULT_PACKING_WORKERS, erlang:system_info(dirty_cpu_schedulers_online)).

%% @doc Startup options with default values.
-record(config, {
init = false,
Expand Down Expand Up @@ -195,7 +198,6 @@
get_tx => ?MAX_PARALLEL_GET_TX_REQUESTS
},
disk_cache_size = ?DISK_CACHE_SIZE,
packing_rate,
max_nonce_limiter_validation_thread_count
= ?DEFAULT_MAX_NONCE_LIMITER_VALIDATION_THREAD_COUNT,
max_nonce_limiter_last_step_validation_thread_count
Expand All @@ -222,6 +224,7 @@
pool_server_address = not_set,
pool_api_key = not_set,
pool_worker_name = not_set,
packing_workers = ?DEFAULT_PACKING_WORKERS,
replica_2_9_workers = ?DEFAULT_REPLICA_2_9_WORKERS,
%% Undocumented/unsupported options
chunk_storage_file_size = ?CHUNK_GROUP_SIZE,
Expand Down
19 changes: 14 additions & 5 deletions apps/arweave/src/ar.erl
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,16 @@ show_help() ->
)
)},
{"packing_rate",
"The maximum number of chunks per second to pack or unpack. "
"The default value is determined based on the number of CPU cores."},
"DEPRECATED. Does not affect anything. Use packing_workers instead."},
{"packing_workers (num)",
"The number of packing workers to spawn. The default is the number of "
"logical CPU cores."},
{"replica_2_9_workers (num)", io_lib:format(
"The number of replica 2.9 workers to spawn. Replica 2.9 workers are used "
"to generate entropy the replica.2.9 format. At most one worker will be "
"active per physical disk at a time. Default: ~B",
[?DEFAULT_REPLICA_2_9_WORKERS]
)},
{"max_vdf_validation_thread_count", io_lib:format("\tThe maximum number "
"of threads used for VDF validation. Default: ~B",
[?DEFAULT_MAX_NONCE_LIMITER_VALIDATION_THREAD_COUNT])},
Expand Down Expand Up @@ -565,8 +573,10 @@ parse_cli_args(["max_disk_pool_data_root_buffer_mb", Num | Rest], C) ->
parse_cli_args(Rest, C#config{ max_disk_pool_data_root_buffer_mb = list_to_integer(Num) });
parse_cli_args(["disk_cache_size_mb", Num | Rest], C) ->
parse_cli_args(Rest, C#config{ disk_cache_size = list_to_integer(Num) });
parse_cli_args(["packing_rate", Num | Rest], C) ->
parse_cli_args(Rest, C#config{ packing_rate = list_to_integer(Num) });
parse_cli_args(["packing_rate", _Num | Rest], C) ->
?LOG_WARNING("Deprecated option found 'packing_rate': "
" this option has been removed and is now a no-op.", []),
parse_cli_args(Rest, C#config{ });
parse_cli_args(["max_vdf_validation_thread_count", Num | Rest], C) ->
parse_cli_args(Rest,
C#config{ max_nonce_limiter_validation_thread_count = list_to_integer(Num) });
Expand Down Expand Up @@ -931,7 +941,6 @@ start_for_tests(TestType, Config) ->
data_dir = ".tmp/data_" ++ atom_to_list(TestType) ++ "_main_" ++ UniqueName,
port = ar_test_node:get_unused_port(),
disable = [randomx_jit],
packing_rate = 20,
auto_join = false
},
start(TestConfig).
Expand Down
4 changes: 3 additions & 1 deletion apps/arweave/src/ar_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,9 @@ parse_options([{<<"disk_cache_size_mb">>, D} | Rest], Config) when is_integer(D)
parse_options(Rest, Config#config{ disk_cache_size = D });

parse_options([{<<"packing_rate">>, D} | Rest], Config) when is_integer(D) ->
parse_options(Rest, Config#config{ packing_rate = D });
?LOG_WARNING("Deprecated option found 'packing_rate': "
" this option has been removed and is a no-op.", []),
parse_options(Rest, Config);

parse_options([{<<"max_nonce_limiter_validation_thread_count">>, D} | Rest], Config)
when is_integer(D) ->
Expand Down
10 changes: 7 additions & 3 deletions apps/arweave/src/ar_device_lock.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
-record(state, {
store_id_to_device = #{},
device_locks = #{},
initialized = false
initialized = false,
num_replica_2_9_workers = 0
}).

-type device_mode() :: prepare | sync | repack.
Expand Down Expand Up @@ -83,7 +84,10 @@ start_link() ->

init([]) ->
gen_server:cast(self(), initialize_state),
{ok, #state{}}.
{ok, Config} = application:get_env(arweave, config),
?LOG_INFO([{event, starting_device_lock_server},
{num_replica_2_9_workers, Config#config.replica_2_9_workers}]),
{ok, #state{num_replica_2_9_workers = Config#config.replica_2_9_workers}}.

handle_call(get_state, _From, State) ->
{reply, State, State};
Expand Down Expand Up @@ -170,10 +174,10 @@ get_system_device(StorageModule) ->
end.

do_acquire_lock(Mode, StoreID, State) ->
MaxPrepareLocks = State#state.num_replica_2_9_workers,
Device = maps:get(StoreID, State#state.store_id_to_device),
DeviceLock = maps:get(Device, State#state.device_locks, sync),
PrepareLocks = count_prepare_locks(State),
MaxPrepareLocks = 8,
{Acquired, NewDeviceLock} = case Mode of
sync ->
%% Can only aquire a sync lock if the device is in sync mode
Expand Down
12 changes: 5 additions & 7 deletions apps/arweave/src/ar_packing_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,12 @@ init([]) ->
H1String = io_lib:format("~.3f", [H1 / 1000]),
ar:console("Hashing benchmark~nH0: ~s ms~nH1/H2: ~s ms~n", [H0String, H1String]),
?LOG_INFO([{event, hash_benchmark}, {h0_ms, H0String}, {h1_ms, H1String}]),
Schedulers = erlang:system_info(dirty_cpu_schedulers_online),
SpawnSchedulers = Schedulers,
ar:console("~nStarting ~B packing threads.~n", [SpawnSchedulers]),
?LOG_INFO([{event, starting_packing_threads}, {num_threads, SpawnSchedulers}]),
NumWorkers = Config#config.packing_workers,
ar:console("~nStarting ~B packing threads.~n", [NumWorkers]),
?LOG_INFO([{event, starting_packing_threads}, {num_threads, NumWorkers}]),
Workers = queue:from_list(
[spawn_link(fun() -> worker(PackingState) end) || _ <- lists:seq(1, SpawnSchedulers)]),
[spawn_link(fun() -> worker(PackingState) end) || _ <- lists:seq(1, NumWorkers)]),
ets:insert(?MODULE, {buffer_size, 0}),
{ok, Config} = application:get_env(arweave, config),
MaxSize =
case Config#config.packing_cache_size_limit of
undefined ->
Expand All @@ -296,7 +294,7 @@ init([]) ->
ets:insert(?MODULE, {buffer_size_limit, MaxSize}),
timer:apply_interval(200, ?MODULE, record_buffer_size_metric, []),
{ok, #state{
workers = Workers, num_workers = SpawnSchedulers }}.
workers = Workers, num_workers = NumWorkers }}.

handle_call(Request, _From, State) ->
?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {request, Request}]),
Expand Down
3 changes: 2 additions & 1 deletion apps/arweave/test/ar_config_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ test_parse_config() ->
tx_validators = 3,
post_tx_timeout = 50,
max_emitters = 4,
replica_2_9_workers = 16,
packing_workers = 25,
tx_propagation_parallelization = undefined,
sync_jobs = 10,
header_sync_jobs = 1,
Expand Down Expand Up @@ -117,7 +119,6 @@ test_parse_config() ->
gateway_arql := 3,
get_sync_record := 10
},
packing_rate = 20,
max_nonce_limiter_validation_thread_count = 2,
max_nonce_limiter_last_step_validation_thread_count = 3,
nonce_limiter_server_trusted_peers = ["127.0.0.1", "2.3.4.5", "6.7.8.9:1982"],
Expand Down
2 changes: 2 additions & 0 deletions apps/arweave/test/ar_config_tests_config_fixture.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
"gateway_arql": 3
},
"packing_rate": 20,
"replica_2_9_workers": 16,
"packing_workers": 25,
"max_nonce_limiter_validation_thread_count": 2,
"max_nonce_limiter_last_step_validation_thread_count": 3,
"vdf_server_trusted_peer": "127.0.0.1",
Expand Down
5 changes: 1 addition & 4 deletions apps/arweave/test/ar_test_node.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ try_boot_peer(TestType, Node, Retries) ->
Cmd = io_lib:format(
"erl +S ~B:~B -pa ~s -config config/sys.config -noshell " ++
"-name ~s -setcookie ~s -run ar main debug port ~p " ++
"data_dir .tmp/data_~s_~s no_auto_join packing_rate 20 " ++
"data_dir .tmp/data_~s_~s no_auto_join " ++
"> ~s-~s.out 2>&1 &",
[Schedulers, Schedulers, string:join(Paths, " "), NodeName, Cookie, Port,
atom_to_list(TestType), NodeName, Node, get_node_namespace()]),
Expand Down Expand Up @@ -226,7 +226,6 @@ update_config(Config) ->
auto_join = Config#config.auto_join,
mining_addr = Config#config.mining_addr,
sync_jobs = Config#config.sync_jobs,
packing_rate = Config#config.packing_rate,
disk_pool_jobs = Config#config.disk_pool_jobs,
header_sync_jobs = Config#config.header_sync_jobs,
enable = Config#config.enable ++ BaseConfig#config.enable,
Expand Down Expand Up @@ -322,7 +321,6 @@ base_cm_config(Peers) ->
auto_join = true,
mining_addr = RewardAddr,
sync_jobs = 2,
packing_rate = 20,
disk_pool_jobs = 2,
header_sync_jobs = 2,
enable = [search_in_rocksdb_when_mining, serve_tx_data_without_limits,
Expand Down Expand Up @@ -598,7 +596,6 @@ start(B0, RewardAddr, Config, StorageModules) ->
storage_modules = StorageModules,
disk_space_check_frequency = 1000,
sync_jobs = 2,
packing_rate = 20,
disk_pool_jobs = 2,
header_sync_jobs = 2,
enable = [search_in_rocksdb_when_mining, serve_tx_data_without_limits,
Expand Down

0 comments on commit 43982fe

Please sign in to comment.