diff --git a/.gitmodules b/.gitmodules index dfb26665d9..e031e682e1 100644 --- a/.gitmodules +++ b/.gitmodules @@ -43,3 +43,6 @@ [submodule "extern/netcdf-cxx4"] path = extern/netcdf-cxx4/netcdf-cxx4 url = https://github.com/Unidata/netcdf-cxx4/ +[submodule "extern/lstm/lstm"] + path = extern/lstm/lstm + url = https://github.com/CIROH-UA/lstm diff --git a/extern/lstm/README.md b/extern/lstm/README.md new file mode 100644 index 0000000000..7f66624af2 --- /dev/null +++ b/extern/lstm/README.md @@ -0,0 +1,27 @@ +# LSTM Submodule + +## About +This submodule is linked in from: https://github.com/CIROH-UA/lstm, which is a fork of https://github.com/NOAA-OWP/lstm. This directory follows the template for linking submodules from https://github.com/NOAA-OWP/ngen/edit/master/extern/cfe/. + +#### Extra Outer Directory + +Currently there are two directory layers beneath the top-level *extern/* directory. This was done so that certain things used by NGen (i.e., a *CMakeLists.txt* file for building shared library files) can be placed alongside, but not within, the submodule. + +## Working with the Submodule + +Some simple explanations of several command actions are included below. To better understand what these things are doing, consult the [Git Submodule documentation](https://git-scm.com/book/en/v2/Git-Tools-Submodules). + +### Getting the Latest Changes + +There are two steps to getting upstream submodule changes fully + 1. fetching and locally checking out the changes from the remote + 2. committing the new checkout revision for the submodule + +To fetch and check out the latest revision (for the [currently used branch](#viewing-the-current-branch)): + + git submodule update --init -- extern/lstm/lstm + +To commit the current submodule checkout revision to the CIROH UA NGen repo: + + git add extern/lstm/lstm + git commit diff --git a/extern/lstm/lstm b/extern/lstm/lstm new file mode 160000 index 0000000000..44a16c18c1 --- /dev/null +++ b/extern/lstm/lstm @@ -0,0 +1 @@ +Subproject commit 44a16c18c1b47bb6b5f35f6365878ae9a0a6cee5 diff --git a/extern/noah-owp-modular/noah-owp-modular b/extern/noah-owp-modular/noah-owp-modular index 30d0f53e8c..0abb891b48 160000 --- a/extern/noah-owp-modular/noah-owp-modular +++ b/extern/noah-owp-modular/noah-owp-modular @@ -1 +1 @@ -Subproject commit 30d0f53e8c14acc4ce74018e06ff7c9410ecc13c +Subproject commit 0abb891b48b043cc626c4e4bbd0efe54ad357fe1 diff --git a/include/forcing/AorcForcing.hpp b/include/forcing/AorcForcing.hpp index 58ea21753d..136ef10792 100644 --- a/include/forcing/AorcForcing.hpp +++ b/include/forcing/AorcForcing.hpp @@ -40,11 +40,12 @@ struct forcing_params std::string provider; time_t simulation_start_t; time_t simulation_end_t; + bool enable_cache = true; /* Constructor for forcing_params */ - forcing_params(std::string path, std::string provider, std::string start_time, std::string end_time): - path(path), provider(provider), start_time(start_time), end_time(end_time) + forcing_params(std::string path, std::string provider, std::string start_time, std::string end_time, bool enable_cache) : + path(path), provider(provider), start_time(start_time), end_time(end_time), enable_cache(enable_cache) { /// \todo converting to UTC can be tricky, especially if thread safety is a concern /* https://stackoverflow.com/questions/530519/stdmktime-and-timezone-info */ diff --git a/include/forcing/NetCDFPerFeatureDataProvider.hpp b/include/forcing/NetCDFPerFeatureDataProvider.hpp index 92ef771fff..85373ecb53 100644 --- a/include/forcing/NetCDFPerFeatureDataProvider.hpp +++ b/include/forcing/NetCDFPerFeatureDataProvider.hpp @@ -53,14 +53,14 @@ namespace data_access * @param log_s An output log stream for messages from the underlying library. If a provider object for * the given path already exists, this argument will be ignored. */ - static std::shared_ptr get_shared_provider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s); + static std::shared_ptr get_shared_provider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s, bool enable_cache); /** * @brief Cleanup the shared providers cache, ensuring that the files get closed. */ static void cleanup_shared_providers(); - NetCDFPerFeatureDataProvider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s); + NetCDFPerFeatureDataProvider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s, bool enable_cache); // Default implementation defined in the .cpp file so that // client code doesn't need to have the full definition of @@ -135,6 +135,7 @@ namespace data_access std::map ncvar_cache; std::map units_cache; boost::compute::detail::lru_cache>> value_cache; + bool enable_cache; size_t cache_slice_t_size = 1; size_t cache_slice_c_size = 1; diff --git a/include/realizations/catchment/Formulation_Constructors.hpp b/include/realizations/catchment/Formulation_Constructors.hpp index 8f1fa5aeea..24b0c2b38c 100644 --- a/include/realizations/catchment/Formulation_Constructors.hpp +++ b/include/realizations/catchment/Formulation_Constructors.hpp @@ -49,7 +49,7 @@ namespace realization { } #if NGEN_WITH_NETCDF else if (forcing_config.provider == "NetCDF"){ - fp = data_access::NetCDFPerFeatureDataProvider::get_shared_provider(forcing_config.path, forcing_config.simulation_start_t, forcing_config.simulation_end_t, output_stream); + fp = data_access::NetCDFPerFeatureDataProvider::get_shared_provider(forcing_config.path, forcing_config.simulation_start_t, forcing_config.simulation_end_t, output_stream, forcing_config.enable_cache); } #endif else if (forcing_config.provider == "NullForcingProvider"){ diff --git a/include/realizations/catchment/Formulation_Manager.hpp b/include/realizations/catchment/Formulation_Manager.hpp index 4140a97309..340e0df8bf 100644 --- a/include/realizations/catchment/Formulation_Manager.hpp +++ b/include/realizations/catchment/Formulation_Manager.hpp @@ -318,6 +318,59 @@ namespace realization { //for case where there is no output_root in the realization file return "./"; + + } + + /** + * @brief Check if the formulation has catchment output writing enabled + * + * @code{.cpp} + * // Example config: + * // ... + * // "write_catchment_output": false + * // ... + * const auto manager = Formulation_Manger(CONFIG); + * manager.get_output_root(); + * //> false + * @endcode + * + * @return bool + */ + bool write_catchment_output() const { + const auto write_catchment_output = this->tree.get_optional("write_catchment_output"); + if (write_catchment_output != boost::none && *write_catchment_output != "") { + // if any variation of "false" or "no" or 0 is found, return false + if (write_catchment_output->compare("false") == 0 || write_catchment_output->compare("no") == 0 || write_catchment_output->compare("0") == 0) { + return false; + } + } + return true; + } + + /** + * @brief Check if the formulation uses remote partitioning for mpi partitions + * + * @code{.cpp} + * // Example config: + * // ... + * // "remotes_enabled": false + * // ... + * const auto manager = Formulation_Manger(CONFIG); + * manager.get_output_root(); + * //> false + * @endcode + * + * @return bool + */ + bool remotes_enabled() const { + const auto remotes_enabled = this->tree.get_optional("remotes_enabled"); + if (remotes_enabled != boost::none && *remotes_enabled != "") { + // if any variation of "false" or "no" or 0 is found, return false + if (remotes_enabled->compare("false") == 0 || remotes_enabled->compare("no") == 0 || remotes_enabled->compare("0") == 0) { + return false; + } + } + return true; } /** @@ -395,9 +448,27 @@ namespace realization { } forcing_params get_forcing_params(const geojson::PropertyMap &forcing_prop_map, std::string identifier, simulation_time_params &simulation_time_config) { + int rank = 0; + bool enable_cache = true; + #if NGEN_WITH_MPI + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + #endif + + if (forcing_prop_map.count("enable_cache") != 0) { + enable_cache = forcing_prop_map.at("enable_cache").as_boolean(); + } + std::string path = ""; if(forcing_prop_map.count("path") != 0){ path = forcing_prop_map.at("path").as_string(); + int id_index = path.find("{{id}}"); + int partition_id_index = path.find("{{partition_id}}"); + if (id_index != std::string::npos) { + path = path.replace(id_index, sizeof("{{id}}") - 1, identifier); + } + if (partition_id_index != std::string::npos) { + path = path.replace(partition_id_index, sizeof("{{partition_id}}") - 1, std::to_string(rank)); + } } std::string provider; if(forcing_prop_map.count("provider") != 0){ @@ -408,7 +479,8 @@ namespace realization { path, provider, simulation_time_config.start_time, - simulation_time_config.end_time + simulation_time_config.end_time, + enable_cache ); } @@ -497,7 +569,8 @@ namespace realization { path + entry->d_name, provider, simulation_time_config.start_time, - simulation_time_config.end_time + simulation_time_config.end_time, + enable_cache ); } else if ( entry->d_type == DT_UNKNOWN ) @@ -516,7 +589,8 @@ namespace realization { path + entry->d_name, provider, simulation_time_config.start_time, - simulation_time_config.end_time + simulation_time_config.end_time, + enable_cache ); } throw std::runtime_error("Forcing data is path "+path+entry->d_name+" is not a file"); diff --git a/src/NGen.cpp b/src/NGen.cpp index 1c0cf2bf28..95bf9628aa 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -439,10 +439,16 @@ int main(int argc, char *argv[]) { for(const auto& id : features.nexuses()) { #if NGEN_WITH_MPI if (mpi_num_procs > 1) { + if (manager->remotes_enabled() == true) { if (!features.is_remote_sender_nexus(id)) { nexus_outfiles[id].open(manager->get_output_root() + id + "_output.csv", std::ios::trunc); } - } else { + } + else { + nexus_outfiles[id].open(manager->get_output_root() + id + "_rank_" + std::to_string(mpi_rank) + "_output.csv", std::ios::trunc); + } + } + else { nexus_outfiles[id].open(manager->get_output_root() + id + "_output.csv", std::ios::trunc); } #else @@ -558,57 +564,76 @@ int main(int argc, char *argv[]) { } //done time #if NGEN_WITH_MPI - MPI_Barrier(MPI_COMM_WORLD); -#endif - - if (mpi_rank == 0) - { - std::cout << "Finished " << manager->Simulation_Time_Object->get_total_output_times() << " timesteps." << std::endl; - } + MPI_Request barrier_request; + MPI_Ibarrier(MPI_COMM_WORLD, &barrier_request); - auto time_done_simulation = std::chrono::steady_clock::now(); - std::chrono::duration time_elapsed_simulation = time_done_simulation - time_done_init; + int flag = 0; + const int sleep_microseconds = 100000; // 100 millisecond sleep -#if NGEN_WITH_MPI - MPI_Barrier(MPI_COMM_WORLD); -#endif - -#if NGEN_WITH_ROUTING - if (mpi_rank == 0) - { // Run t-route from single process - if(manager->get_using_routing()) { - //Note: Currently, delta_time is set in the t-route yaml configuration file, and the - //number_of_timesteps is determined from the total number of nexus outputs in t-route. - //It is recommended to still pass these values to the routing_py_adapter object in - //case a future implmentation needs these two values from the ngen framework. - int number_of_timesteps = manager->Simulation_Time_Object->get_total_output_times(); - - int delta_time = manager->Simulation_Time_Object->get_output_interval_seconds(); - - router->route(number_of_timesteps, delta_time); + // Wait for all ranks to reach the barrier + while (!flag) { + MPI_Test(&barrier_request, &flag, MPI_STATUS_IGNORE); + if (!flag) { + usleep(sleep_microseconds); } } #endif + if (mpi_rank == 0) { + std::cout << "Finished " << manager->Simulation_Time_Object->get_total_output_times() << " timesteps." << std::endl; + + auto time_done_simulation = std::chrono::steady_clock::now(); + std::chrono::duration time_elapsed_simulation = time_done_simulation - time_done_init; + - auto time_done_routing = std::chrono::steady_clock::now(); - std::chrono::duration time_elapsed_routing = time_done_routing - time_done_simulation; - if (mpi_rank == 0) - { - std::cout << "NGen top-level timings:" - << "\n\tNGen::init: " << time_elapsed_init.count() - << "\n\tNGen::simulation: " << time_elapsed_simulation.count() #if NGEN_WITH_ROUTING - << "\n\tNGen::routing: " << time_elapsed_routing.count() + if(manager->get_using_routing()) { + //Note: Currently, delta_time is set in the t-route yaml configuration file, and the + //number_of_timesteps is determined from the total number of nexus outputs in t-route. + //It is recommended to still pass these values to the routing_py_adapter object in + //case a future implmentation needs these two values from the ngen framework. + int number_of_timesteps = manager->Simulation_Time_Object->get_total_output_times(); + + int delta_time = manager->Simulation_Time_Object->get_output_interval_seconds(); + + router->route(number_of_timesteps, delta_time); + } #endif - << std::endl; - } - manager->finalize(); - -#if NGEN_WITH_MPI + auto time_done_routing = std::chrono::steady_clock::now(); + std::chrono::duration time_elapsed_routing = time_done_routing - time_done_simulation; + + std::cout << "NGen top-level timings:" + << "\n\tNGen::init: " << time_elapsed_init.count() + << "\n\tNGen::simulation: " << time_elapsed_simulation.count() + #if NGEN_WITH_ROUTING + << "\n\tNGen::routing: " << time_elapsed_routing.count() + #endif + << std::endl; + #if NGEN_WITH_MPI + for (int i = 1; i < mpi_num_procs; ++i) { + MPI_Send(NULL, 0, MPI_INT, i, 0, MPI_COMM_WORLD); + } + } + else { + // Non-root processes + MPI_Request recv_request; + MPI_Irecv(NULL, 0, MPI_INT, 0, 0, MPI_COMM_WORLD, &recv_request); + + int recv_flag = 0; + while (!recv_flag) { + MPI_Test(&recv_request, &recv_flag, MPI_STATUS_IGNORE); + if (!recv_flag) { + usleep(sleep_microseconds); + } + } + #endif + } + + manager->finalize(); + #if NGEN_WITH_MPI MPI_Finalize(); -#endif + #endif return 0; } diff --git a/src/core/HY_Features.cpp b/src/core/HY_Features.cpp index 91e59cb487..8ef2fdb24c 100644 --- a/src/core/HY_Features.cpp +++ b/src/core/HY_Features.cpp @@ -32,7 +32,14 @@ HY_Features::HY_Features(network::Network network, std::shared_ptrget_formulation(feat_id); - formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv"); + if (formulations->write_catchment_output() == true) + { + formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv"); + } + else + { + formulation->set_output_stream("/dev/null"); + } // TODO: add command line or config option to have this be omitted //FIXME why isn't default param working here??? get_output_header_line() fails. formulation->write_output("Time Step,""Time,"+formulation->get_output_header_line(",")+"\n"); diff --git a/src/core/HY_Features_MPI.cpp b/src/core/HY_Features_MPI.cpp index 4eec4e39a3..60fcec075e 100644 --- a/src/core/HY_Features_MPI.cpp +++ b/src/core/HY_Features_MPI.cpp @@ -38,7 +38,14 @@ HY_Features_MPI::HY_Features_MPI( PartitionData partition_data, geojson::GeoJSON { //Find and prepare formulation auto formulation = formulations->get_formulation(feat_id); - formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv"); + if (formulations->write_catchment_output() == true) + { + formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv"); + } + else + { + formulation->set_output_stream("/dev/null"); + }; // TODO: add command line or config option to have this be omitted //FIXME why isn't default param working here??? get_output_header_line() fails. formulation->write_output("Time Step,""Time,"+formulation->get_output_header_line(",")+"\n"); diff --git a/src/forcing/NetCDFPerFeatureDataProvider.cpp b/src/forcing/NetCDFPerFeatureDataProvider.cpp index 2ddb9ab578..a6ba03b840 100644 --- a/src/forcing/NetCDFPerFeatureDataProvider.cpp +++ b/src/forcing/NetCDFPerFeatureDataProvider.cpp @@ -10,14 +10,14 @@ std::map namespace data_access { -std::shared_ptr NetCDFPerFeatureDataProvider::get_shared_provider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s) +std::shared_ptr NetCDFPerFeatureDataProvider::get_shared_provider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s, bool enable_cache) { const std::lock_guard lock(shared_providers_mutex); std::shared_ptr p; if(shared_providers.count(input_path) > 0){ p = shared_providers[input_path]; } else { - p = std::make_shared(input_path, sim_start, sim_end, log_s); + p = std::make_shared(input_path, sim_start, sim_end, log_s, enable_cache); shared_providers[input_path] = p; } return p; @@ -30,9 +30,10 @@ void NetCDFPerFeatureDataProvider::cleanup_shared_providers() shared_providers.clear(); } -NetCDFPerFeatureDataProvider::NetCDFPerFeatureDataProvider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s) : log_stream(log_s), value_cache(20), +NetCDFPerFeatureDataProvider::NetCDFPerFeatureDataProvider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s, bool enable_cache) : log_stream(log_s), value_cache(20), sim_start_date_time_epoch(sim_start), - sim_end_date_time_epoch(sim_end) + sim_end_date_time_epoch(sim_end), + enable_cache(enable_cache) { //size_t sizep = 1073741824, nelemsp = 202481; //float preemptionp = 0.75; @@ -322,105 +323,112 @@ size_t NetCDFPerFeatureDataProvider::get_ts_index_for_time(const time_t &epoch_t double NetCDFPerFeatureDataProvider::get_value(const CatchmentAggrDataSelector& selector, ReSampleMethod m) { auto init_time = selector.get_init_time(); - auto stop_time = init_time + selector.get_duration_secs(); // scope hiding! BAD JUJU! - size_t idx1 = get_ts_index_for_time(init_time); - size_t idx2; - try { - idx2 = get_ts_index_for_time(stop_time-1); // Don't include next timestep when duration % timestep = 0 - } - catch(const std::out_of_range &e){ - idx2 = get_ts_index_for_time(this->stop_time-1); //to the edge - } - - auto stride = idx2 - idx1; - - std::vector start, count; - auto cat_pos = id_pos[selector.get_id()]; - - - - double t1 = time_vals[idx1]; - double t2 = time_vals[idx2]; - + std::vector start, count; double rvalue = 0.0; - - auto ncvar = get_ncvar(selector.get_variable_name()); - std::string native_units = get_ncvar_units(selector.get_variable_name()); + auto ncvar = get_ncvar(selector.get_variable_name()); - auto read_len = idx2 - idx1 + 1; - - std::vector raw_values; - raw_values.resize(read_len); - - //TODO: Currently assuming a whole variable cache slice across all catchments for a single timestep...but some stuff here to support otherwise. - size_t cache_slices_t_n = read_len / cache_slice_t_size; // Integer division! - // For reference: https://stackoverflow.com/a/72030286 - for( size_t i = 0; i < cache_slices_t_n; i++ ) { - std::shared_ptr> cached; - int cache_t_idx = (idx1 - (idx1 % cache_slice_t_size) + i); - std::string key = ncvar.getName() + "|" + std::to_string(cache_t_idx); - if(value_cache.contains(key)){ - cached = value_cache.get(key).get(); - } else { - cached = std::make_shared>(cache_slice_c_size * cache_slice_t_size); - start.clear(); - start.push_back(0); // only always 0 when cache_slice_c_size = numids! - start.push_back(cache_t_idx * cache_slice_t_size); - count.clear(); - count.push_back(cache_slice_c_size); - count.push_back(cache_slice_t_size); // Must be 1 for now!...probably... - ncvar.getVar(start,count,&(*cached)[0]); - value_cache.insert(key, cached); + if (enable_cache == false) + { + // vector start is the catchment index and the time index + // vector count is how much to read in both directions + start.clear(); + start.push_back(cat_pos); + start.push_back(idx1); + count.clear(); + count.push_back(1); + count.push_back(1); + ncvar.getVar(start, count, &rvalue); + } + else + { + auto stop_time = init_time + selector.get_duration_secs(); // scope hiding! BAD JUJU! + + size_t idx2; + try { + idx2 = get_ts_index_for_time(stop_time-1); // Don't include next timestep when duration % timestep = 0 } - for( size_t j = 0; j < cache_slice_t_size; j++){ - raw_values[i+j] = cached->at((j*cache_slice_t_size) + cat_pos); + catch(const std::out_of_range &e){ + idx2 = get_ts_index_for_time(this->stop_time-1); //to the edge } - } - - rvalue = 0.0; - - double a , b = 0.0; - - a = 1.0 - ( (t1 - init_time) / time_stride ); - rvalue += (a * raw_values[0]); + auto stride = idx2 - idx1; + + double t1 = time_vals[idx1]; + double t2 = time_vals[idx2]; + + auto read_len = idx2 - idx1 + 1; + + std::vector raw_values; + raw_values.resize(read_len); + + + + //TODO: Currently assuming a whole variable cache slice across all catchments for a single timestep...but some stuff here to support otherwise. + size_t cache_slices_t_n = read_len / cache_slice_t_size; // Integer division! + // For reference: https://stackoverflow.com/a/72030286 + for( size_t i = 0; i < cache_slices_t_n; i++ ) { + std::shared_ptr> cached; + int cache_t_idx = (idx1 - (idx1 % cache_slice_t_size) + i); + std::string key = ncvar.getName() + "|" + std::to_string(cache_t_idx); + if(value_cache.contains(key)){ + cached = value_cache.get(key).get(); + } else { + cached = std::make_shared>(cache_slice_c_size * cache_slice_t_size); + start.clear(); + start.push_back(0); // only always 0 when cache_slice_c_size = numids! + start.push_back(cache_t_idx * cache_slice_t_size); + count.clear(); + count.push_back(cache_slice_c_size); + count.push_back(cache_slice_t_size); // Must be 1 for now!...probably... + ncvar.getVar(start,count,&(*cached)[0]); + value_cache.insert(key, cached); + } + for( size_t j = 0; j < cache_slice_t_size; j++){ + raw_values[i+j] = cached->at((j*cache_slice_t_size) + cat_pos); + } + } + rvalue = 0.0; - for( size_t i = 1; i < raw_values.size() -1; ++i ) - { - rvalue += raw_values[i]; - } + double a , b = 0.0; + + a = 1.0 - ( (t1 - init_time) / time_stride ); + rvalue += (a * raw_values[0]); - if ( raw_values.size() > 1) // likewise the last data value may not be fully in the window - { - b = (stop_time - t2) / time_stride; - rvalue += (b * raw_values.back() ); - } + for( size_t i = 1; i < raw_values.size() -1; ++i ) + { + rvalue += raw_values[i]; + } - // account for the resampling methods - switch(m) - { - case SUM: // we allready have the sum so do nothing - ; - break; - - case MEAN: - { - // This is getting a length weighted mean - // the data values where allready scaled for where there was only partial use of a data value - // so we just need to do a final scale to account for the differnce between time_stride and duration_s - - double scale_factor = (selector.get_duration_secs() > time_stride ) ? (time_stride / selector.get_duration_secs()) : (1.0 / (a + b)); - rvalue *= scale_factor; + if ( raw_values.size() > 1) // likewise the last data value may not be fully in the window + { + b = (stop_time - t2) / time_stride; + rvalue += (b * raw_values.back() ); } - break; + // account for the resampling methods + switch(m) + { + case SUM: // we allready have the sum so do nothing + ; + break; + + case MEAN: + { + // This is getting a length weighted mean + // the data values where allready scaled for where there was only partial use of a data value + // so we just need to do a final scale to account for the differnce between time_stride and duration_s + + double scale_factor = (selector.get_duration_secs() > time_stride ) ? (time_stride / selector.get_duration_secs()) : (1.0 / (a + b)); + rvalue *= scale_factor; + } + break; - default: - ; + default: + ; + } } - try { return UnitsHelper::get_converted_value(native_units, rvalue, selector.get_output_units()); @@ -433,7 +441,6 @@ double NetCDFPerFeatureDataProvider::get_value(const CatchmentAggrDataSelector& return rvalue; } - return rvalue; } std::vector NetCDFPerFeatureDataProvider::get_values(const CatchmentAggrDataSelector& selector, data_access::ReSampleMethod m) diff --git a/utilities/data_conversion/no_remote_merging.py b/utilities/data_conversion/no_remote_merging.py new file mode 100644 index 0000000000..4422ef9b90 --- /dev/null +++ b/utilities/data_conversion/no_remote_merging.py @@ -0,0 +1,65 @@ +from pathlib import Path +import argparse +import os +import polars as pl +from collections import defaultdict +from functools import partial +import multiprocessing as mp + +def sum_csv_files(file_pattern: str): + # Scan CSV files and assign column names + df = pl.scan_csv(file_pattern, has_header=False, new_columns=["index", "timestamp", "value"]) + # Group by timestamp and sum the values + df = df.group_by("index").agg( + pl.col("timestamp").first().str.strip_chars().alias("timestamp"), + pl.col("value").str.strip_chars().cast(pl.Float64).sum().alias("total_value"), + ) + + return df.sort("index") + +def process_pair(output_path:Path, tup): + nexus, count = tup + if count > 1: + df = sum_csv_files(output_path / f"{nexus}_rank_*.csv") + df.collect().write_csv(output_path / f"{nexus}_output.csv", include_header=False) + for file in output_path.glob(f"{nexus}_rank_*.csv"): + file.unlink() + # use sed to add the spaces in the csv files + output_file = output_path / f"{nexus}_output.csv" + os.system(f"sed -i 's/,/, /g' {output_file.absolute()}") + + +def merge_outputs(output_path: Path) -> None: + # get all the file names in the folder + output_files = list(output_path.glob("*_rank_*.csv")) + # print(output_files) + # parse out nexus id from the file names + total_files = len(output_files) + # sort the files + + nexus_counts = defaultdict(int) + + for file in output_files: + nexus_id = file.stem.split("_")[0] + nexus_counts[nexus_id] += 1 + + for file in output_files: + nexus_id = file.stem.split("_")[0] + if nexus_counts[nexus_id] == 1: + os.rename(file, output_path / f"{nexus_id}_output.csv") + + partial_process = partial(process_pair, output_path) + with mp.Pool() as p: + p.map(partial_process,nexus_counts.items()) + + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Merge the per rank outputs created by running ngen with round robin partitioning." + ) + parser.add_argument("output_path", type=Path, help="ngen output folder") + + args = parser.parse_args() + + merge_outputs(args.output_path) diff --git a/utilities/partitioning/round_robin_partioning.py b/utilities/partitioning/round_robin_partioning.py new file mode 100644 index 0000000000..a785b96847 --- /dev/null +++ b/utilities/partitioning/round_robin_partioning.py @@ -0,0 +1,62 @@ +import json +from pathlib import Path +import sqlite3 +import argparse +import multiprocessing + + +def get_cat_to_nex_flowpairs(hydrofabric: Path) -> list: + with sqlite3.connect(f"{hydrofabric}") as conn: + cursor = conn.cursor() + cursor.execute("SELECT divide_id, toid FROM divides") + cat_to_nex_pairs = cursor.fetchall() + return cat_to_nex_pairs + + +def create_partitions(hydrofabric: Path, output_path: Path, num_partitions: int = None) -> None: + if num_partitions is None: + num_partitions = multiprocessing.cpu_count() + + cat_to_nex_pairs = get_cat_to_nex_flowpairs(hydrofabric) + + # sort the cat nex tuples by the nex id + cat_to_nex_pairs = sorted(cat_to_nex_pairs, key=lambda x: x[1]) + + num_partitions = min(num_partitions, len(cat_to_nex_pairs)) + + cats = set([cat for cat, _ in cat_to_nex_pairs]) + nexs = set([nex for _, nex in cat_to_nex_pairs]) + print(f"Number of partitions: {num_partitions}") + print(f"Number of cats: {len(cats)}") + print(f"Number of nexus: {len(nexs)}") + + partitions = [] + for i in range(num_partitions): + part = {} + part["id"] = i + part["cat-ids"] = [] + part["nex-ids"] = [] + part["remote-connections"] = [] + partitions.append(part) + + for i, (cat_id, nex_id) in enumerate(cat_to_nex_pairs): + print(i) + part_id = i % num_partitions + partitions[part_id]["cat-ids"].append(cat_id) + partitions[part_id]["nex-ids"].append(nex_id) + + with open(output_path, "w") as f: + f.write(json.dumps({"partitions": partitions}, indent=4)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Create partitions from hydrofabric data.") + parser.add_argument("hydrofabric", type=Path, help="Path to the hydrofabric SQLite file.") + parser.add_argument("output_path", type=Path, help="Path to the output JSON file.") + parser.add_argument( + "-n", "--num_partitions", type=int, default=None, help="Number of partitions to create." + ) + + args = parser.parse_args() + + create_partitions(args.hydrofabric, args.output_path, args.num_partitions)