From 43982fe166ceed60d4f9d3d570efce5f7b076a45 Mon Sep 17 00:00:00 2001 From: James Piechota Date: Sun, 19 Jan 2025 22:29:24 +0000 Subject: [PATCH] feature: expose the packing_worker and replica_2_9_workers flags --- apps/arweave/include/ar_config.hrl | 5 ++++- apps/arweave/src/ar.erl | 19 ++++++++++++++----- apps/arweave/src/ar_config.erl | 4 +++- apps/arweave/src/ar_device_lock.erl | 10 +++++++--- apps/arweave/src/ar_packing_server.erl | 12 +++++------- apps/arweave/test/ar_config_tests.erl | 3 ++- .../test/ar_config_tests_config_fixture.json | 2 ++ apps/arweave/test/ar_test_node.erl | 5 +---- 8 files changed, 38 insertions(+), 22 deletions(-) diff --git a/apps/arweave/include/ar_config.hrl b/apps/arweave/include/ar_config.hrl index 4bfad48c3..e4a2dc128 100644 --- a/apps/arweave/include/ar_config.hrl +++ b/apps/arweave/include/ar_config.hrl @@ -126,6 +126,9 @@ -define(DEFAULT_REPLICA_2_9_WORKERS, 8). -endif. +%% The number of packing workers. +-define(DEFAULT_PACKING_WORKERS, erlang:system_info(dirty_cpu_schedulers_online)). + %% @doc Startup options with default values. -record(config, { init = false, @@ -195,7 +198,6 @@ get_tx => ?MAX_PARALLEL_GET_TX_REQUESTS }, disk_cache_size = ?DISK_CACHE_SIZE, - packing_rate, max_nonce_limiter_validation_thread_count = ?DEFAULT_MAX_NONCE_LIMITER_VALIDATION_THREAD_COUNT, max_nonce_limiter_last_step_validation_thread_count @@ -222,6 +224,7 @@ pool_server_address = not_set, pool_api_key = not_set, pool_worker_name = not_set, + packing_workers = ?DEFAULT_PACKING_WORKERS, replica_2_9_workers = ?DEFAULT_REPLICA_2_9_WORKERS, %% Undocumented/unsupported options chunk_storage_file_size = ?CHUNK_GROUP_SIZE, diff --git a/apps/arweave/src/ar.erl b/apps/arweave/src/ar.erl index 1173d68b0..1c13b32c2 100644 --- a/apps/arweave/src/ar.erl +++ b/apps/arweave/src/ar.erl @@ -266,8 +266,16 @@ show_help() -> ) )}, {"packing_rate", - "The maximum number of chunks per second to pack or unpack. " - "The default value is determined based on the number of CPU cores."}, + "DEPRECATED. Does not affect anything. Use packing_workers instead."}, + {"packing_workers (num)", + "The number of packing workers to spawn. The default is the number of " + "logical CPU cores."}, + {"replica_2_9_workers (num)", io_lib:format( + "The number of replica 2.9 workers to spawn. Replica 2.9 workers are used " + "to generate entropy the replica.2.9 format. At most one worker will be " + "active per physical disk at a time. Default: ~B", + [?DEFAULT_REPLICA_2_9_WORKERS] + )}, {"max_vdf_validation_thread_count", io_lib:format("\tThe maximum number " "of threads used for VDF validation. Default: ~B", [?DEFAULT_MAX_NONCE_LIMITER_VALIDATION_THREAD_COUNT])}, @@ -565,8 +573,10 @@ parse_cli_args(["max_disk_pool_data_root_buffer_mb", Num | Rest], C) -> parse_cli_args(Rest, C#config{ max_disk_pool_data_root_buffer_mb = list_to_integer(Num) }); parse_cli_args(["disk_cache_size_mb", Num | Rest], C) -> parse_cli_args(Rest, C#config{ disk_cache_size = list_to_integer(Num) }); -parse_cli_args(["packing_rate", Num | Rest], C) -> - parse_cli_args(Rest, C#config{ packing_rate = list_to_integer(Num) }); +parse_cli_args(["packing_rate", _Num | Rest], C) -> + ?LOG_WARNING("Deprecated option found 'packing_rate': " + " this option has been removed and is now a no-op.", []), + parse_cli_args(Rest, C#config{ }); parse_cli_args(["max_vdf_validation_thread_count", Num | Rest], C) -> parse_cli_args(Rest, C#config{ max_nonce_limiter_validation_thread_count = list_to_integer(Num) }); @@ -931,7 +941,6 @@ start_for_tests(TestType, Config) -> data_dir = ".tmp/data_" ++ atom_to_list(TestType) ++ "_main_" ++ UniqueName, port = ar_test_node:get_unused_port(), disable = [randomx_jit], - packing_rate = 20, auto_join = false }, start(TestConfig). diff --git a/apps/arweave/src/ar_config.erl b/apps/arweave/src/ar_config.erl index 0f2357a9c..afe1ee3b7 100644 --- a/apps/arweave/src/ar_config.erl +++ b/apps/arweave/src/ar_config.erl @@ -518,7 +518,9 @@ parse_options([{<<"disk_cache_size_mb">>, D} | Rest], Config) when is_integer(D) parse_options(Rest, Config#config{ disk_cache_size = D }); parse_options([{<<"packing_rate">>, D} | Rest], Config) when is_integer(D) -> - parse_options(Rest, Config#config{ packing_rate = D }); + ?LOG_WARNING("Deprecated option found 'packing_rate': " + " this option has been removed and is a no-op.", []), + parse_options(Rest, Config); parse_options([{<<"max_nonce_limiter_validation_thread_count">>, D} | Rest], Config) when is_integer(D) -> diff --git a/apps/arweave/src/ar_device_lock.erl b/apps/arweave/src/ar_device_lock.erl index 22c605099..9aa0da202 100644 --- a/apps/arweave/src/ar_device_lock.erl +++ b/apps/arweave/src/ar_device_lock.erl @@ -14,7 +14,8 @@ -record(state, { store_id_to_device = #{}, device_locks = #{}, - initialized = false + initialized = false, + num_replica_2_9_workers = 0 }). -type device_mode() :: prepare | sync | repack. @@ -83,7 +84,10 @@ start_link() -> init([]) -> gen_server:cast(self(), initialize_state), - {ok, #state{}}. + {ok, Config} = application:get_env(arweave, config), + ?LOG_INFO([{event, starting_device_lock_server}, + {num_replica_2_9_workers, Config#config.replica_2_9_workers}]), + {ok, #state{num_replica_2_9_workers = Config#config.replica_2_9_workers}}. handle_call(get_state, _From, State) -> {reply, State, State}; @@ -170,10 +174,10 @@ get_system_device(StorageModule) -> end. do_acquire_lock(Mode, StoreID, State) -> + MaxPrepareLocks = State#state.num_replica_2_9_workers, Device = maps:get(StoreID, State#state.store_id_to_device), DeviceLock = maps:get(Device, State#state.device_locks, sync), PrepareLocks = count_prepare_locks(State), - MaxPrepareLocks = 8, {Acquired, NewDeviceLock} = case Mode of sync -> %% Can only aquire a sync lock if the device is in sync mode diff --git a/apps/arweave/src/ar_packing_server.erl b/apps/arweave/src/ar_packing_server.erl index 329bab4da..69f103576 100644 --- a/apps/arweave/src/ar_packing_server.erl +++ b/apps/arweave/src/ar_packing_server.erl @@ -273,14 +273,12 @@ init([]) -> H1String = io_lib:format("~.3f", [H1 / 1000]), ar:console("Hashing benchmark~nH0: ~s ms~nH1/H2: ~s ms~n", [H0String, H1String]), ?LOG_INFO([{event, hash_benchmark}, {h0_ms, H0String}, {h1_ms, H1String}]), - Schedulers = erlang:system_info(dirty_cpu_schedulers_online), - SpawnSchedulers = Schedulers, - ar:console("~nStarting ~B packing threads.~n", [SpawnSchedulers]), - ?LOG_INFO([{event, starting_packing_threads}, {num_threads, SpawnSchedulers}]), + NumWorkers = Config#config.packing_workers, + ar:console("~nStarting ~B packing threads.~n", [NumWorkers]), + ?LOG_INFO([{event, starting_packing_threads}, {num_threads, NumWorkers}]), Workers = queue:from_list( - [spawn_link(fun() -> worker(PackingState) end) || _ <- lists:seq(1, SpawnSchedulers)]), + [spawn_link(fun() -> worker(PackingState) end) || _ <- lists:seq(1, NumWorkers)]), ets:insert(?MODULE, {buffer_size, 0}), - {ok, Config} = application:get_env(arweave, config), MaxSize = case Config#config.packing_cache_size_limit of undefined -> @@ -296,7 +294,7 @@ init([]) -> ets:insert(?MODULE, {buffer_size_limit, MaxSize}), timer:apply_interval(200, ?MODULE, record_buffer_size_metric, []), {ok, #state{ - workers = Workers, num_workers = SpawnSchedulers }}. + workers = Workers, num_workers = NumWorkers }}. handle_call(Request, _From, State) -> ?LOG_WARNING([{event, unhandled_call}, {module, ?MODULE}, {request, Request}]), diff --git a/apps/arweave/test/ar_config_tests.erl b/apps/arweave/test/ar_config_tests.erl index a745b7c1e..f5956c6c0 100644 --- a/apps/arweave/test/ar_config_tests.erl +++ b/apps/arweave/test/ar_config_tests.erl @@ -69,6 +69,8 @@ test_parse_config() -> tx_validators = 3, post_tx_timeout = 50, max_emitters = 4, + replica_2_9_workers = 16, + packing_workers = 25, tx_propagation_parallelization = undefined, sync_jobs = 10, header_sync_jobs = 1, @@ -117,7 +119,6 @@ test_parse_config() -> gateway_arql := 3, get_sync_record := 10 }, - packing_rate = 20, max_nonce_limiter_validation_thread_count = 2, max_nonce_limiter_last_step_validation_thread_count = 3, nonce_limiter_server_trusted_peers = ["127.0.0.1", "2.3.4.5", "6.7.8.9:1982"], diff --git a/apps/arweave/test/ar_config_tests_config_fixture.json b/apps/arweave/test/ar_config_tests_config_fixture.json index b97f9680e..7ad38aa08 100644 --- a/apps/arweave/test/ar_config_tests_config_fixture.json +++ b/apps/arweave/test/ar_config_tests_config_fixture.json @@ -100,6 +100,8 @@ "gateway_arql": 3 }, "packing_rate": 20, + "replica_2_9_workers": 16, + "packing_workers": 25, "max_nonce_limiter_validation_thread_count": 2, "max_nonce_limiter_last_step_validation_thread_count": 3, "vdf_server_trusted_peer": "127.0.0.1", diff --git a/apps/arweave/test/ar_test_node.erl b/apps/arweave/test/ar_test_node.erl index 53b0f8ab9..982b90a57 100644 --- a/apps/arweave/test/ar_test_node.erl +++ b/apps/arweave/test/ar_test_node.erl @@ -139,7 +139,7 @@ try_boot_peer(TestType, Node, Retries) -> Cmd = io_lib:format( "erl +S ~B:~B -pa ~s -config config/sys.config -noshell " ++ "-name ~s -setcookie ~s -run ar main debug port ~p " ++ - "data_dir .tmp/data_~s_~s no_auto_join packing_rate 20 " ++ + "data_dir .tmp/data_~s_~s no_auto_join " ++ "> ~s-~s.out 2>&1 &", [Schedulers, Schedulers, string:join(Paths, " "), NodeName, Cookie, Port, atom_to_list(TestType), NodeName, Node, get_node_namespace()]), @@ -226,7 +226,6 @@ update_config(Config) -> auto_join = Config#config.auto_join, mining_addr = Config#config.mining_addr, sync_jobs = Config#config.sync_jobs, - packing_rate = Config#config.packing_rate, disk_pool_jobs = Config#config.disk_pool_jobs, header_sync_jobs = Config#config.header_sync_jobs, enable = Config#config.enable ++ BaseConfig#config.enable, @@ -322,7 +321,6 @@ base_cm_config(Peers) -> auto_join = true, mining_addr = RewardAddr, sync_jobs = 2, - packing_rate = 20, disk_pool_jobs = 2, header_sync_jobs = 2, enable = [search_in_rocksdb_when_mining, serve_tx_data_without_limits, @@ -598,7 +596,6 @@ start(B0, RewardAddr, Config, StorageModules) -> storage_modules = StorageModules, disk_space_check_frequency = 1000, sync_jobs = 2, - packing_rate = 20, disk_pool_jobs = 2, header_sync_jobs = 2, enable = [search_in_rocksdb_when_mining, serve_tx_data_without_limits,