diff --git a/.github/workflows/parallel_test.yml b/.github/workflows/parallel_test.yml index 88fd99e63..4118bc28f 100644 --- a/.github/workflows/parallel_test.yml +++ b/.github/workflows/parallel_test.yml @@ -54,9 +54,9 @@ jobs: version: '1.10' - uses: julia-actions/cache@v1 - run: | - MPILIBPATH=$(find /opt/homebrew/Cellar/open-mpi/ -name libmpi.dylib) + export MPILIBPATH=$(find /opt/homebrew/Cellar/open-mpi/ -name libmpi.dylib) touch Project.toml - julia --project -O3 --check-bounds=no -e 'import Pkg; Pkg.add(["MPI", "MPIPreferences"]); using MPIPreferences; MPIPreferences.use_system_binary(library_names="/opt/homebrew/Cellar/open-mpi/5.0.3/lib/libmpi.dylib")' + julia --project -O3 --check-bounds=no -e "import Pkg; Pkg.add([\"MPI\", \"MPIPreferences\"]); using MPIPreferences; MPIPreferences.use_system_binary(library_names=\"$MPILIBPATH\")" julia --project -O3 --check-bounds=no -e 'import Pkg; Pkg.add(["NCDatasets", "Random", "SpecialFunctions", "Test"]); Pkg.develop(path="moment_kinetics/")' julia --project -O3 --check-bounds=no -e 'import Pkg; Pkg.precompile()' # Need to use openmpi so that the following arguments work: diff --git a/moment_kinetics/src/array_allocation.jl b/moment_kinetics/src/array_allocation.jl index 7dc3bc5d3..b265b6879 100644 --- a/moment_kinetics/src/array_allocation.jl +++ b/moment_kinetics/src/array_allocation.jl @@ -9,6 +9,8 @@ using ..communication using ..debugging @debug_initialize_NaN using ..communication: block_rank, _block_synchronize +using MPI + """ allocate array with dimensions given by dims and entries of type Bool """ @@ -55,14 +57,27 @@ function allocate_shared_float(dims...; comm=nothing) array = allocate_shared(mk_float, dims; comm=comm) @debug_initialize_NaN begin # Initialize as NaN to try and catch use of uninitialized values - if block_rank[] == 0 + if comm === nothing + comm_rank = block_rank[] + this_comm = comm_block[] + elseif comm == MPI.COMM_NULL + comm_rank = -1 + this_comm = nothing + else + # Get MPI.Comm_rank when comm is not nothing + comm_rank = MPI.Comm_rank(comm) + this_comm = comm + end + if comm_rank == 0 array .= NaN @debug_track_initialized begin # Track initialization as if the array was not initialized to NaN array.is_initialized .= false end end - _block_synchronize() + if this_comm !== nothing + MPI.Barrier(this_comm) + end end return array end @@ -85,14 +100,27 @@ function allocate_shared_complex(dims...; comm=nothing) array = allocate_shared(Complex{mk_float}, dims; comm=comm) @debug_initialize_NaN begin # Initialize as NaN to try and catch use of uninitialized values - if block_rank[] == 0 + if comm === nothing + comm_rank = block_rank[] + this_comm = comm_block[] + elseif comm == MPI.COMM_NULL + comm_rank = -1 + this_comm = nothing + else + # Get MPI.Comm_rank when comm is not nothing + comm_rank = MPI.Comm_rank(comm) + this_comm = comm + end + if comm_rank == 0 array .= NaN @debug_track_initialized begin # Track initialization as if the array was not initialized to NaN array.is_initialized .= false end end - _block_synchronize() + if this_comm !== nothing + MPI.Barrier(this_comm) + end end return array end diff --git a/moment_kinetics/src/communication.jl b/moment_kinetics/src/communication.jl index ee1b84f3a..2f88c8cd9 100644 --- a/moment_kinetics/src/communication.jl +++ b/moment_kinetics/src/communication.jl @@ -100,7 +100,7 @@ const anyv_subblock_size = Ref{mk_int}() """ """ -const anyv_isubblock_index = Ref{mk_int}() +const anyv_isubblock_index = Ref{Union{mk_int,Nothing}}() """ """ @@ -531,6 +531,11 @@ end # instances, so their is_read and is_written members can be checked and # reset by _block_synchronize() const global_debugmpisharedarray_store = Vector{DebugMPISharedArray}(undef, 0) + # 'anyv' regions require a separate array store, because within an anyv region, + # processes in the same shared memory block may still not be synchronized if they are + # in different anyv sub-blocks, so debug checks within an anyv region should only + # consider the anyv-specific arrays. + const global_anyv_debugmpisharedarray_store = Vector{DebugMPISharedArray}(undef, 0) end """ @@ -567,6 +572,19 @@ Array{mk_float} function allocate_shared(T, dims; comm=nothing, maybe_debug=true) if comm === nothing comm = comm_block[] + elseif comm == MPI.COMM_NULL + # If `comm` is a null communicator (on this process), then this array is just a + # dummy that will not be used. + array = Array{T}(undef, (0 for _ ∈ dims)...) + + @debug_shared_array begin + # If @debug_shared_array is active, create DebugMPISharedArray instead of Array + if maybe_debug + array = DebugMPISharedArray(array, comm) + end + end + + return array end br = MPI.Comm_rank(comm) bs = MPI.Comm_size(comm) @@ -633,7 +651,11 @@ function allocate_shared(T, dims; comm=nothing, maybe_debug=true) # If @debug_shared_array is active, create DebugMPISharedArray instead of Array if maybe_debug debug_array = DebugMPISharedArray(array, comm) - push!(global_debugmpisharedarray_store, debug_array) + if comm == comm_anyv_subblock[] + push!(global_anyv_debugmpisharedarray_store, debug_array) + else + push!(global_debugmpisharedarray_store, debug_array) + end return debug_array end end @@ -762,11 +784,17 @@ end Raises an error if any array has been accessed incorrectly since the previous call to _block_synchronize() - Can be added when debugging to help in down where an error occurs. + Can be added when debugging to help pin down where an error occurs. """ - function debug_check_shared_memory(; kwargs...) - for (arraynum, array) ∈ enumerate(global_debugmpisharedarray_store) - debug_check_shared_array(array; kwargs...) + function debug_check_shared_memory(; comm=comm_block[], kwargs...) + if comm == comm_anyv_subblock[] + for (arraynum, array) ∈ enumerate(global_anyv_debugmpisharedarray_store) + debug_check_shared_array(array; comm=comm, kwargs...) + end + else + for (arraynum, array) ∈ enumerate(global_debugmpisharedarray_store) + debug_check_shared_array(array; comm=comm, kwargs...) + end end return nothing end @@ -909,6 +937,10 @@ sub-blocks, depending on how the species and spatial dimensions are split up. `_anyv_subblock_synchronize()`. """ function _anyv_subblock_synchronize() + if comm_anyv_subblock[] == MPI.COMM_NULL + # No synchronization to do for a null communicator + return nothing + end MPI.Barrier(comm_anyv_subblock[]) @@ -943,7 +975,7 @@ function _anyv_subblock_synchronize() # * If an element is written to, only the rank that writes to it should read it. # @debug_detect_redundant_block_synchronize previous_was_unnecessary = true - for (arraynum, array) ∈ enumerate(global_debugmpisharedarray_store) + for (arraynum, array) ∈ enumerate(global_anyv_debugmpisharedarray_store) debug_check_shared_array(array; comm=comm_anyv_subblock[]) @@ -1057,6 +1089,7 @@ end """ function free_shared_arrays() @debug_shared_array resize!(global_debugmpisharedarray_store, 0) + @debug_shared_array resize!(global_anyv_debugmpisharedarray_store, 0) for w ∈ global_Win_store MPI.free(w) diff --git a/moment_kinetics/src/fokker_planck.jl b/moment_kinetics/src/fokker_planck.jl index 0352f9a1f..1f520e9f6 100644 --- a/moment_kinetics/src/fokker_planck.jl +++ b/moment_kinetics/src/fokker_planck.jl @@ -664,7 +664,9 @@ function conserving_corrections!(CC,pdf_in,vpa,vperp,dummy_vpavperp) # Broadcast x0, x1, x2 to all processes in the 'anyv' subblock param_vec = [x0, x1, x2, upar] - MPI.Bcast!(param_vec, 0, comm_anyv_subblock[]) + if comm_anyv_subblock[] != MPI.COMM_NULL + MPI.Bcast!(param_vec, 0, comm_anyv_subblock[]) + end (x0, x1, x2, upar) = param_vec # correct CC @@ -695,7 +697,9 @@ function density_conserving_correction!(CC,pdf_in,vpa,vperp,dummy_vpavperp) # Broadcast x0 to all processes in the 'anyv' subblock param_vec = [x0] - MPI.Bcast!(param_vec, 0, comm_anyv_subblock[]) + if comm_anyv_subblock[] != MPI.COMM_NULL + MPI.Bcast!(param_vec, 0, comm_anyv_subblock[]) + end x0 = param_vec[1] # correct CC diff --git a/moment_kinetics/src/looping.jl b/moment_kinetics/src/looping.jl index 8dd4b4c7a..a6c1ca880 100644 --- a/moment_kinetics/src/looping.jl +++ b/moment_kinetics/src/looping.jl @@ -471,14 +471,19 @@ eval(quote anyv_subblock_size[] = anyv_split[end] number_of_anyv_blocks = prod(anyv_split[1:end-1]) anyv_subblock_index = block_rank[] ÷ anyv_subblock_size[] - anyv_rank_within_subblock = block_rank[] % anyv_subblock_size[] + if anyv_subblock_index ≥ number_of_anyv_blocks + anyv_subblock_index = nothing + anyv_rank_within_subblock = -1 + else + anyv_rank_within_subblock = block_rank[] % anyv_subblock_size[] + end # Create communicator for the anyv subblock. OK to do this here as # communication.setup_distributed_memory_MPI() must have already been called # to set block_size[] and block_rank[] comm_anyv_subblock[] = MPI.Comm_split(comm_block[], anyv_subblock_index, anyv_rank_within_subblock) - anyv_subblock_rank[] = MPI.Comm_rank(comm_anyv_subblock[]) + anyv_subblock_rank[] = anyv_rank_within_subblock anyv_isubblock_index[] = anyv_subblock_index anyv_nsubblocks_per_block[] = number_of_anyv_blocks anyv_rank0 = (anyv_subblock_rank[] == 0) diff --git a/moment_kinetics/test/fokker_planck_tests.jl b/moment_kinetics/test/fokker_planck_tests.jl index 6911f951b..c5b192bac 100644 --- a/moment_kinetics/test/fokker_planck_tests.jl +++ b/moment_kinetics/test/fokker_planck_tests.jl @@ -53,14 +53,12 @@ function create_grids(ngrid,nelement_vpa,nelement_vperp; #println("made inputs") #println("vpa: ngrid: ",ngrid," nelement: ",nelement_local_vpa, " Lvpa: ",Lvpa) #println("vperp: ngrid: ",ngrid," nelement: ",nelement_local_vperp, " Lvperp: ",Lvperp) - #vpa, vpa_spectral = define_coordinate(vpa_input,ignore_MPI=true) - #vperp, vperp_spectral = define_coordinate(vperp_input,ignore_MPI=true) # Set up MPI initialize_comms!() setup_distributed_memory_MPI(1,1,1,1) - vpa, vpa_spectral = define_coordinate(vpa_input,ignore_MPI=false) - vperp, vperp_spectral = define_coordinate(vperp_input,ignore_MPI=false) + vpa, vpa_spectral = define_coordinate(vpa_input) + vperp, vperp_spectral = define_coordinate(vperp_input) looping.setup_loop_ranges!(block_rank[], block_size[]; s=1, sn=1, r=1, z=1, vperp=vperp.n, vpa=vpa.n, @@ -281,7 +279,8 @@ function runtests() calculate_GG=true, calculate_dGdvperp=true) # extract C[Fs,Fs'] result # and Rosenbluth potentials for testing - begin_vperp_vpa_region() + begin_s_r_z_anyv_region() + begin_anyv_vperp_vpa_region() @loop_vperp_vpa ivperp ivpa begin G_M_num[ivpa,ivperp] = fkpl_arrays.GG[ivpa,ivperp] H_M_num[ivpa,ivperp] = fkpl_arrays.HH[ivpa,ivperp] @@ -421,7 +420,8 @@ function runtests() conserving_corrections!(fkpl_arrays.CC,Fs_M,vpa,vperp,dummy_array) end # extract C[Fs,Fs'] result - begin_vperp_vpa_region() + begin_s_r_z_anyv_region() + begin_anyv_vperp_vpa_region() @loop_vperp_vpa ivperp ivpa begin C_M_num[ivpa,ivperp] = fkpl_arrays.CC[ivpa,ivperp] end @@ -581,7 +581,8 @@ function runtests() density_conserving_correction!(fkpl_arrays.CC,Fs_M,vpa,vperp,dummy_array) end # extract C[Fs,Fs'] result - begin_vperp_vpa_region() + begin_s_r_z_anyv_region() + begin_anyv_vperp_vpa_region() @loop_vperp_vpa ivperp ivpa begin C_M_num[ivpa,ivperp] = fkpl_arrays.CC[ivpa,ivperp] end diff --git a/test_scripts/2D_FEM_assembly_test.jl b/test_scripts/2D_FEM_assembly_test.jl index d4a3b8a14..6dcfec726 100644 --- a/test_scripts/2D_FEM_assembly_test.jl +++ b/test_scripts/2D_FEM_assembly_test.jl @@ -113,16 +113,14 @@ end println("made inputs") println("vpa: ngrid: ",ngrid," nelement: ",nelement_local_vpa, " Lvpa: ",Lvpa) println("vperp: ngrid: ",ngrid," nelement: ",nelement_local_vperp, " Lvperp: ",Lvperp) - #vpa, vpa_spectral = define_coordinate(vpa_input,ignore_MPI=false) - #vperp, vperp_spectral = define_coordinate(vperp_input,ignore_MPI=false) # Set up MPI if standalone initialize_comms!() end setup_distributed_memory_MPI(1,1,1,1) - vpa, vpa_spectral = define_coordinate(vpa_input,ignore_MPI=false) - vperp, vperp_spectral = define_coordinate(vperp_input,ignore_MPI=false) + vpa, vpa_spectral = define_coordinate(vpa_input) + vperp, vperp_spectral = define_coordinate(vperp_input) looping.setup_loop_ranges!(block_rank[], block_size[]; s=1, sn=1, r=1, z=1, vperp=vperp.n, vpa=vpa.n, @@ -231,7 +229,6 @@ end ms = 1.0 msp = 1.0 nussp = 1.0 - begin_serial_region() for ivperp in 1:vperp.n for ivpa in 1:vpa.n Fs_M[ivpa,ivperp] = F_Maxwellian(denss,upars,vths,vpa,vperp,ivpa,ivperp) @@ -283,7 +280,8 @@ end algebraic_solve_for_d2Gdvperp2=false,calculate_GG=true,calculate_dGdvperp=true) # extract C[Fs,Fs'] result # and Rosenbluth potentials for testing - begin_vperp_vpa_region() + begin_s_r_z_anyv_region() + begin_anyv_vperp_vpa_region() @loop_vperp_vpa ivperp ivpa begin C_M_num[ivpa,ivperp] = fkpl_arrays.CC[ivpa,ivperp] G_M_num[ivpa,ivperp] = fkpl_arrays.GG[ivpa,ivperp]