From 2ba87d0d86e655d34af2da91fb129fb0fc478b44 Mon Sep 17 00:00:00 2001 From: Fabian Bernhard <10245680+fabern@users.noreply.github.com> Date: Fri, 26 Jul 2024 09:01:56 +0200 Subject: [PATCH 1/9] Update make_tidy_cmip6 to map2tidy v2.0.0 --- analysis/README.md | 2 +- analysis/make_tidy_cmip6.R | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/analysis/README.md b/analysis/README.md index 2b1c9bb..5b4425b 100644 --- a/analysis/README.md +++ b/analysis/README.md @@ -6,7 +6,7 @@ 1. `make_tidy_cmip6.R`: Make original global files tidy. Save as nested data frames, where each row is one grid cell and time series is nested in the column `data`. Separate files are written for each longitudinal band. Writes files: - `~/data/cmip6-ng/tidy/evspsbl_mon_CESM2_historical_r1i1p1f1_native_ilon_.rds` + `~/data/cmip6-ng/tidy/evspsbl_mon_CESM2_historical_r1i1p1f1_native_LON_<+-XXX.XXX>.rds` 2. `apply_cwd_global.R`: Script for parallel applying the CWD algorithm (or anything else that operates on full time series) separately for on gridcell. Distributes by longitudinal band. Reading files written in previous step and writing files (in the example): diff --git a/analysis/make_tidy_cmip6.R b/analysis/make_tidy_cmip6.R index 4f0efe9..fe46d1e 100644 --- a/analysis/make_tidy_cmip6.R +++ b/analysis/make_tidy_cmip6.R @@ -19,20 +19,22 @@ if (length(filnam) != 1){ } # load and convert -df <- map2tidy( +res_evspsbl <- map2tidy( nclist = filnam, varnam = "evspsbl", lonnam = "lon", latnam = "lat", timenam = "time", - # timedimnam = "time", do_chunks = TRUE, outdir = "/data/scratch/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/", - fileprefix = str_remove(basename(filnam), ".nc") - # single_basedate = TRUE - # ncores = 2 # parallel::detectCores() + fileprefix = str_remove(basename(filnam), ".nc"), + ncores = 12 # parallel::detectCores() ) +# Check if any unsuccessful: +tidyr::unnest(res_evspsbl, data) |> + filter(!grepl("Written",data)) + ## Precipitation --------------- varnam <- "pr" res <- "day" @@ -47,17 +49,19 @@ if (length(filnam) != 1){ } # load and convert -df <- map2tidy( +res_pr <- map2tidy( nclist = filnam, varnam = "pr", lonnam = "lon", latnam = "lat", timenam = "time", - # timedimnam = "time", do_chunks = TRUE, outdir = "/data/scratch/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/", + #outdir = "/data_2/scratch/fbernhard/cmip6-ng/tidy/pr/", fileprefix = str_remove(basename(filnam), ".nc"), - # single_basedate = TRUE - ncores = 48 # parallel::detectCores() + ncores = 12 # parallel::detectCores() ) +# Check if any unsuccessful: +tidyr::unnest(res_pr, data) |> + filter(!grepl("Written",data)) From c35e44a91388e8bd73cba4ff08a3688764eebc0c Mon Sep 17 00:00:00 2001 From: Fabian Bernhard <10245680+fabern@users.noreply.github.com> Date: Fri, 26 Jul 2024 10:03:20 +0200 Subject: [PATCH 2/9] Update apply_cwd_global to map2tidy v2.0.0 --- R/cwd_byilon.R | 101 ++++++++++++++++++++---------------- R/my_cwd.R | 2 +- analysis/apply_cwd_global.R | 73 +++++++++++++------------- 3 files changed, 91 insertions(+), 85 deletions(-) diff --git a/R/cwd_byilon.R b/R/cwd_byilon.R index 28c1bd4..69fa20c 100644 --- a/R/cwd_byilon.R +++ b/R/cwd_byilon.R @@ -1,61 +1,70 @@ -cwd_byilon <- function( - ilon, +cwd_byLON <- function( + LON_string, indir, outdir, - fileprefix + fileprefix, + overwrite ){ - # load function that will be applied to time series - source(paste0(here::here(), "/R/my_cwd.R")) + # prepare output + # write (complemented) data to file. Give it some meaningful name and the index counter + outpath <- file.path(outdir, paste0(fileprefix, "_", LON_string, ".rds")) + if (file.exists(outpath) && !overwrite){ - # read from file that contains tidy data for a single longitudinal band + # don't do anything + return(paste0("File exists already: ", outpath)) - # read evapotranspiration file tidy - filnam <- list.files( - indir, - pattern = paste0("evspsbl_mon_CESM2_historical_r1i1p1f1_native_ilon_", ilon, ".rds"), - full.names = TRUE - ) - df_evap <- readr::read_rds(filnam) + } else { + # read from file that contains tidy data for a single longitudinal band - # read other required files (precipitation, temperature, ... + # read evapotranspiration file tidy + # filnam <- list.files( + # indir, + # pattern = paste0("evspsbl_mon_CESM2_ssp585_r1i1p1f1_native_", LON_string, ".rds"), # NOTE REGEX not working since LON_string can contain a + + # full.names = TRUE, + # ) + filnam <- file.path(indir, paste0("evspsbl_mon_CESM2_ssp585_r1i1p1f1_native_", LON_string, ".rds")) + df_evap <- readr::read_rds(filnam) - # # merge all such that monthly data is repeated for each day within month - # df <- df_prec |> # one of the daily data frames - # tidyr::unnest(data) |> # must unnest to join by date - # left_join( - # df_evap |> # one of the monthly data frames - # tidyr::unnest(data), - # by = join_by(year, month) - # ) + # read other required files (precipitation, temperature, ... - # for demo only - df <- df_evap |> - tidyr::unnest(data) + # # merge all such that monthly data is repeated for each day within month + # df <- df_prec |> # one of the daily data frames + # tidyr::unnest(data) |> # must unnest to join by date + # left_join( + # df_evap |> # one of the monthly data frames + # tidyr::unnest(data), + # by = join_by(year, month) + # ) - out <- df |> + # for demo only + # df_evap <- df_evap |> + # tidyr::unnest(data) - # Uncomment code below to nest data by gridcell, if not already nested. - # group data by gridcells and wrap time series for each gridcell into a new - # column, by default called 'data'. - dplyr::group_by(lon, lat) |> - tidyr::nest() |> + out <- df_evap |> + # # Uncomment code below to nest data by gridcell, if not already nested. + # # group data by gridcells and wrap time series for each gridcell into a new + # # column, by default called 'data'. + # dplyr::group_by(lon, lat) |> + # tidyr::nest() |> - # apply the custom function on the time series data frame separately for - # each gridcell. - dplyr::mutate(data = purrr::map(data, ~my_cwd(.))) + # slice(1:2) |> # for demo only - # write (complemented) data to file. Give it some meaningful name and the index counter - path <- paste0(outdir, "/", fileprefix, "_", ilon, ".rds") - message( - paste0( - "Writing file ", path, " ..." - ) - ) - readr::write_rds( - out, - path - ) + # apply the custom function on the time series data frame separately for + # each gridcell. + dplyr::mutate(data = purrr::map(data, ~my_cwd(.))) - # don't return data - it's written to file + # write (complemented) data to file. + message( + paste0( + "Writing file ", outpath, " ..." + ) + ) + readr::write_rds( + out, + outpath + ) + # don't return data - it's written to file + return(paste0("Written results to: ", outpath)) + } } diff --git a/R/my_cwd.R b/R/my_cwd.R index 23331ee..9c8e8ca 100644 --- a/R/my_cwd.R +++ b/R/my_cwd.R @@ -8,7 +8,7 @@ my_cwd <- function(df){ mutate(evspsbl_cum = evspsbl) |> # reduce size - important - select(time, evspsbl_cum) + select(datetime, evspsbl_cum) # return a data frame return(out) diff --git a/analysis/apply_cwd_global.R b/analysis/apply_cwd_global.R index 134fcb3..bdb1402 100644 --- a/analysis/apply_cwd_global.R +++ b/analysis/apply_cwd_global.R @@ -1,37 +1,37 @@ #!/usr/bin/env Rscript -# script is called with three arguments: -# 1. counter for chunks -# 2. total number of chunks -# 3. total number of longitude indices +# script is called without any arguments +# (By specifying overwrite you can choose within the script to avoid calculating +# or recalculate previous results) # Example: -# >./apply_cwd_global.R 1 3 360 - -# to receive arguments to script from the shell -args = commandArgs(trailingOnly=TRUE) - -# # When using this script directly from RStudio, not from the shell, specify -# nlon <- 288 # set this by hand. corresponds to length of the longitude dimension in original NetCDF files -# args <- c(1, 1, nlon) +# >./apply_cwd_global.R library(dplyr) library(map2tidy) library(multidplyr) source(paste0(here::here(), "/R/cwd_byilon.R")) +indir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/evspsbl/" +outdir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd/" +dir.create(outdir, showWarnings = FALSE) -print("getting data for longitude indices:") -vec_index <- map2tidy::get_index_by_chunk( - as.integer(args[1]), # counter for chunks - as.integer(args[2]), # total number of chunks - as.integer(args[3]) # total number of longitude indices - ) +filnams <- list.files( + indir, + pattern = "evspsbl_mon_CESM2_ssp585_r1i1p1f1_native_.*rds", + full.names = TRUE +) +list_LON <- gsub(".*(LON_[//-//.//+0-9]*).rds", "\\1", filnams) +# as.numeric(gsub("LON_","",list_LON)) +print("getting data for longitude indices:") # number of cores of parallel threads -ncores <- 2 # parallel::detectCores() +ncores <- 6 # parallel::detectCores() # parallelize job +# load function that will be applied to time series +source(paste0(here::here(), "/R/my_cwd.R")) + # set up the cluster, sending required objects to each core cl <- multidplyr::new_cluster(ncores) |> multidplyr::cluster_library(c("map2tidy", @@ -42,32 +42,29 @@ cl <- multidplyr::new_cluster(ncores) |> "here", "magrittr")) |> multidplyr::cluster_assign( - cwd_byilon = cwd_byilon # make the function known for each core + my_cwd = my_cwd, # make the function known for each core + cwd_byLON = cwd_byLON, # make the function known for each core + indir = indir, + outdir = outdir ) # distribute computation across the cores, calculating for all longitudinal # indices of this chunk -out <- tibble(ilon = vec_index) |> +out <- tibble(LON_string = list_LON) |> multidplyr::partition(cl) |> dplyr::mutate(out = purrr::map( - ilon, - ~cwd_byilon( + LON_string, + ~cwd_byLON( ., - indir = "~/data/cmip6-ng/tidy/evspsbl/", - outdir = "~/data/cmip6-ng/tidy/cwd/", - fileprefix = "evspsbl_cum" + indir = indir, + outdir = outdir, + fileprefix = "evspsbl_cum", + overwrite = FALSE )) - ) + ) |> + collect() # collect partitioned data.frame +out |> unnest(out) +# out |> unnest(out) |> unnest(data) -# # un-parallel alternative -# out <- tibble(ilon = vec_index) |> -# dplyr::mutate(out = purrr::map( -# ilon, -# ~cwd_byilon( -# ., -# indir = "~/data/cmip6-ng/tidy/evspsbl/", -# outdir = "~/data/cmip6-ng/tidy/cwd/", -# fileprefix = "evspsbl_cum" -# )) -# ) +# TO CHECK: readRDS("/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd//evspsbl_cum_LON_+0.000.rds") |> unnest(data) From 971e2c58ac51ff70c6032e96c899c489038cf6a8 Mon Sep 17 00:00:00 2001 From: Fabian Bernhard <10245680+fabern@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:40:07 +0200 Subject: [PATCH 3/9] Update get_cwd_annmax to map2tidy v2.0.0 --- R/cwd_annmax_byilon.R | 77 ++++++++++++++------------- analysis/get_cwd_annmax.R | 107 ++++++++++++++++++++++---------------- 2 files changed, 101 insertions(+), 83 deletions(-) diff --git a/R/cwd_annmax_byilon.R b/R/cwd_annmax_byilon.R index 23a430b..9f95b81 100644 --- a/R/cwd_annmax_byilon.R +++ b/R/cwd_annmax_byilon.R @@ -1,44 +1,47 @@ -cwd_annmax_byilon <- function( - ilon, - indir, +cwd_annmax_byLON <- function( + filnam, outdir, - fileprefix + overwrite ){ - # function to get annual maximum - get_annmax <- function(df){ - df |> - mutate(year = lubridate::year(time)) |> - group_by(year) |> - summarise(evspsbl_cum = max(evspsbl_cum)) - } + # # write (complemented) data to file. Give it some meaningful name and the index counter + outpath <- file.path(outdir, basename(filnam) %>% gsub(".rds","_ANNMAX.rds", .)) + if (file.exists(outpath) && !overwrite){ - # read evapotranspiration file tidy - filnam <- list.files( - indir, - pattern = paste0(fileprefix, "_", ilon, ".rds"), - full.names = TRUE - ) - df <- readr::read_rds(filnam) - - # apply annual maximum function - out <- df |> - mutate(data = purrr::map( - data, - ~get_annmax(.) - )) - - # write (complemented) data to file. Give it some meaningful name and the index counter - path <- paste0(outdir, "/", fileprefix, "_", ilon, "_ANNMAX.rds") - message( - paste0( - "Writing file ", path, " ..." - ) - ) - readr::write_rds( - out, - path + # don't do anything + return(paste0("File exists already: ", outpath)) + + } else { + + # function to get annual maximum + get_annmax <- function(df_of_one_coordinate){ + df_of_one_coordinate |> + mutate(year = lubridate::year(datetime)) |> + group_by(year) |> + summarise(evspsbl_cum = max(evspsbl_cum)) + } + + # read evapotranspiration file tidy + df <- readr::read_rds(filnam) + + # apply annual maximum function + out <- df |> + mutate(data = purrr::map( + data, + ~get_annmax(.) + )) + + message( + paste0( + "Writing file ", outpath, " ..." + ) ) + readr::write_rds( + out, + outpath + ) - # don't return data - it's written to file + # don't return data - it's written to file + return(paste0("Written results to: ", outpath)) + } } diff --git a/analysis/get_cwd_annmax.R b/analysis/get_cwd_annmax.R index b5e8be4..8b92c5e 100644 --- a/analysis/get_cwd_annmax.R +++ b/analysis/get_cwd_annmax.R @@ -1,19 +1,26 @@ #!/usr/bin/env Rscript -# script is called with three arguments: -# 1. counter for chunks -# 2. total number of chunks -# 3. total number of longitude indices +# script is called with three arguments for parallelization: +# 1. counter for chunks (e.g. # of each compute node) +# 2. total number of chunks (e.g. number of total compute nodes) -# Example: -# >./get_cwd_annmax.R 1 3 360 +# Note that these arguments can be used to distribute over multiple nodes. +# Distribution over CPU cores of a single node is handled by multidplyr +# and argument ncores in the script. -# to receive arguments to script from the shell -args = commandArgs(trailingOnly=TRUE) +# Example for 4 CPU-nodes: +# >./get_cwd_annmax.R 1 4 +# >./get_cwd_annmax.R 2 4 +# >./get_cwd_annmax.R 3 4 +# >./get_cwd_annmax.R 4 4 +# Example for 1 CPU-nodes: +# >./get_cwd_annmax.R 1 1 # # When using this script directly from RStudio, not from the shell, specify -# nlon <- 288 # set this by hand. corresponds to length of the longitude dimension in original NetCDF files -# args <- c(1, 1, nlon) +# args <- c(1, 1) + +# to receive arguments to script from the shell +args = commandArgs(trailingOnly=TRUE) library(dplyr) library(map2tidy) @@ -21,19 +28,33 @@ library(multidplyr) source(paste0(here::here(), "/R/cwd_annmax_byilon.R")) -print("getting data for longitude indices:") +indir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd/" +outdir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd/" + + +# 1) Define filenames of files to process: ------------------------------- +filnams <- list.files( + indir, + pattern = "evspsbl_cum_LON_[0-9.+-]*.rds", # make sure not to include _ANNMAX.rds + full.names = TRUE +) +# filnams = filnams[1:5] # for development + + +# 2) Setup parallelization ------------------------------------------------ +# 2a) Split job onto multiple nodes +# i.e. only consider a subset of the files (others might be treated by another compute node) vec_index <- map2tidy::get_index_by_chunk( - as.integer(args[1]), # counter for chunks - as.integer(args[2]), # total number of chunks - as.integer(args[3]) # total number of longitude indices - ) + as.integer(args[1]), # counter for compute node + as.integer(args[2]), # total number of compute node + length(filnams) # total number of longitude indices +) -# number of cores of parallel threads -ncores <- 2 # parallel::detectCores() +# 2b) Parallelize job across cores on a single node +ncores <- 2 # parallel::detectCores() # number of cores of parallel threads -# parallelize job -# set up the cluster, sending required objects to each core cl <- multidplyr::new_cluster(ncores) |> + # set up the cluster by sending required objects to each core multidplyr::cluster_library(c("map2tidy", "dplyr", "purrr", @@ -42,32 +63,26 @@ cl <- multidplyr::new_cluster(ncores) |> "here", "magrittr")) |> multidplyr::cluster_assign( - cwd_annmax_byilon = cwd_annmax_byilon # make the function known for each core - ) + cwd_annmax_byLON = cwd_annmax_byLON, # make the function known for each core + outdir = outdir + ) -# distribute computation across the cores, calculating for all longitudinal -# indices of this chunk -out <- tibble(ilon = vec_index) |> - multidplyr::partition(cl) |> + +# 3) Process files -------------------------------------------------------- +out <- tibble(in_fname = filnams[vec_index]) |> # TODO: in_fname is a much cleaner solution (also adapt in apply_cwd_global.R and cwd_byilon.R) + multidplyr::partition(cl) |> # remove this line to deactivate parallelization dplyr::mutate(out = purrr::map( - ilon, - ~cwd_annmax_byilon( - ., - indir = "~/data/cmip6-ng/tidy/cwd/", - outdir = "~/data/cmip6-ng/tidy/cwd/", - fileprefix = "evspsbl_cum" - )) - ) - - -# # un-parallel alternative -# out <- tibble(ilon = vec_index) |> -# dplyr::mutate(out = purrr::map( -# ilon, -# ~cwd_annmax_byilon( -# ., -# indir = "~/data/cmip6-ng/tidy/cwd/", -# outdir = "~/data/cmip6-ng/tidy/cwd/", -# fileprefix = "evspsbl_cum" -# )) -# ) + in_fname, + ~cwd_annmax_byLON( + filnam = ., + outdir = outdir, + overwrite = FALSE + )) + ) |> + collect() # collect partitioned data.frame + +out |> unnest(out) + + + + From 0be44513e491a717d0f61fdae059811a39786953 Mon Sep 17 00:00:00 2001 From: Fabian Bernhard <10245680+fabern@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:07:13 +0200 Subject: [PATCH 4/9] Simplify cwd_byLON() to use filenames --- R/cwd_annmax_byilon.R | 22 +++++++---------- R/cwd_byilon.R | 33 +++++++------------------ analysis/apply_cwd_global.R | 48 ++++++++++++++++--------------------- analysis/get_cwd_annmax.R | 4 +--- analysis/make_tidy_cmip6.R | 6 +++-- 5 files changed, 44 insertions(+), 69 deletions(-) diff --git a/R/cwd_annmax_byilon.R b/R/cwd_annmax_byilon.R index 9f95b81..4d6b013 100644 --- a/R/cwd_annmax_byilon.R +++ b/R/cwd_annmax_byilon.R @@ -12,8 +12,12 @@ cwd_annmax_byLON <- function( return(paste0("File exists already: ", outpath)) } else { + # read from file that contains tidy data for a single longitudinal band - # function to get annual maximum + # read evapotranspiration file tidy + df_evap <- readr::read_rds(filnam) + + # function to apply to get annual maximum: get_annmax <- function(df_of_one_coordinate){ df_of_one_coordinate |> mutate(year = lubridate::year(datetime)) |> @@ -21,25 +25,17 @@ cwd_annmax_byLON <- function( summarise(evspsbl_cum = max(evspsbl_cum)) } - # read evapotranspiration file tidy - df <- readr::read_rds(filnam) - # apply annual maximum function - out <- df |> - mutate(data = purrr::map( - data, - ~get_annmax(.) - )) + out <- df_evap |> + # apply the custom function on the time series data frame separately for each gridcell. + dplyr::mutate(data = purrr::map(data, ~get_annmax(.))) message( paste0( "Writing file ", outpath, " ..." ) ) - readr::write_rds( - out, - outpath - ) + readr::write_rds(out, outpath) # don't return data - it's written to file return(paste0("Written results to: ", outpath)) diff --git a/R/cwd_byilon.R b/R/cwd_byilon.R index 69fa20c..1583072 100644 --- a/R/cwd_byilon.R +++ b/R/cwd_byilon.R @@ -1,10 +1,8 @@ cwd_byLON <- function( - LON_string, - indir, + filnam, outdir, - fileprefix, overwrite - ){ +){ # prepare output # write (complemented) data to file. Give it some meaningful name and the index counter @@ -18,12 +16,6 @@ cwd_byLON <- function( # read from file that contains tidy data for a single longitudinal band # read evapotranspiration file tidy - # filnam <- list.files( - # indir, - # pattern = paste0("evspsbl_mon_CESM2_ssp585_r1i1p1f1_native_", LON_string, ".rds"), # NOTE REGEX not working since LON_string can contain a + - # full.names = TRUE, - # ) - filnam <- file.path(indir, paste0("evspsbl_mon_CESM2_ssp585_r1i1p1f1_native_", LON_string, ".rds")) df_evap <- readr::read_rds(filnam) # read other required files (precipitation, temperature, ... @@ -37,22 +29,17 @@ cwd_byLON <- function( # by = join_by(year, month) # ) - # for demo only - # df_evap <- df_evap |> - # tidyr::unnest(data) + # function to apply: + ## my_cwd: NOTE this is defined in outer scope and accessible by closure out <- df_evap |> # # Uncomment code below to nest data by gridcell, if not already nested. # # group data by gridcells and wrap time series for each gridcell into a new # # column, by default called 'data'. - # dplyr::group_by(lon, lat) |> - # tidyr::nest() |> - - # slice(1:2) |> # for demo only + # dplyr::group_by(lon, lat) |> tidyr::nest() |> - # apply the custom function on the time series data frame separately for - # each gridcell. - dplyr::mutate(data = purrr::map(data, ~my_cwd(.))) + # apply the custom function on the time series data frame separately for each gridcell. + dplyr::mutate(data = purrr::map(data, ~my_cwd(.))) # NOTE: this uses the closure # write (complemented) data to file. message( @@ -60,10 +47,8 @@ cwd_byLON <- function( "Writing file ", outpath, " ..." ) ) - readr::write_rds( - out, - outpath - ) + readr::write_rds(out, outpath) + # don't return data - it's written to file return(paste0("Written results to: ", outpath)) } diff --git a/analysis/apply_cwd_global.R b/analysis/apply_cwd_global.R index bdb1402..4e743e7 100644 --- a/analysis/apply_cwd_global.R +++ b/analysis/apply_cwd_global.R @@ -12,28 +12,26 @@ library(map2tidy) library(multidplyr) source(paste0(here::here(), "/R/cwd_byilon.R")) +source(paste0(here::here(), "/R/my_cwd.R")) # load function that will be applied to time series + indir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/evspsbl/" outdir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd/" + dir.create(outdir, showWarnings = FALSE) +# 1) Define filenames of files to process: ------------------------------- filnams <- list.files( indir, - pattern = "evspsbl_mon_CESM2_ssp585_r1i1p1f1_native_.*rds", + pattern = "evspsbl_mon_CESM2_ssp585_r1i1p1f1_native_LON_[0-9.+-]*rds", full.names = TRUE ) -list_LON <- gsub(".*(LON_[//-//.//+0-9]*).rds", "\\1", filnams) -# as.numeric(gsub("LON_","",list_LON)) -print("getting data for longitude indices:") -# number of cores of parallel threads -ncores <- 6 # parallel::detectCores() +# 2) Setup parallelization ------------------------------------------------ +# parallelize job across cores on a single node +ncores <- 6 # parallel::detectCores() # number of cores of parallel threads -# parallelize job -# load function that will be applied to time series -source(paste0(here::here(), "/R/my_cwd.R")) - -# set up the cluster, sending required objects to each core cl <- multidplyr::new_cluster(ncores) |> + # set up the cluster, sending required objects to each core multidplyr::cluster_library(c("map2tidy", "dplyr", "purrr", @@ -42,29 +40,25 @@ cl <- multidplyr::new_cluster(ncores) |> "here", "magrittr")) |> multidplyr::cluster_assign( - my_cwd = my_cwd, # make the function known for each core - cwd_byLON = cwd_byLON, # make the function known for each core - indir = indir, - outdir = outdir - ) + my_cwd = my_cwd, # make the function known for each core + cwd_byLON = cwd_byLON, # make the function known for each core + outdir = outdir + ) + -# distribute computation across the cores, calculating for all longitudinal -# indices of this chunk -out <- tibble(LON_string = list_LON) |> - multidplyr::partition(cl) |> +# 3) Process files -------------------------------------------------------- +out <- tibble(in_fname = filnams[vec_index]) |> + # multidplyr::partition(cl) |> # remove this line to deactivate parallelization dplyr::mutate(out = purrr::map( - LON_string, + in_fname, ~cwd_byLON( - ., - indir = indir, + filnam = ., outdir = outdir, - fileprefix = "evspsbl_cum", overwrite = FALSE - )) - ) |> + )) + ) |> collect() # collect partitioned data.frame out |> unnest(out) -# out |> unnest(out) |> unnest(data) # TO CHECK: readRDS("/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd//evspsbl_cum_LON_+0.000.rds") |> unnest(data) diff --git a/analysis/get_cwd_annmax.R b/analysis/get_cwd_annmax.R index 8b92c5e..c317fcb 100644 --- a/analysis/get_cwd_annmax.R +++ b/analysis/get_cwd_annmax.R @@ -38,8 +38,6 @@ filnams <- list.files( pattern = "evspsbl_cum_LON_[0-9.+-]*.rds", # make sure not to include _ANNMAX.rds full.names = TRUE ) -# filnams = filnams[1:5] # for development - # 2) Setup parallelization ------------------------------------------------ # 2a) Split job onto multiple nodes @@ -69,7 +67,7 @@ cl <- multidplyr::new_cluster(ncores) |> # 3) Process files -------------------------------------------------------- -out <- tibble(in_fname = filnams[vec_index]) |> # TODO: in_fname is a much cleaner solution (also adapt in apply_cwd_global.R and cwd_byilon.R) +out <- tibble(in_fname = filnams[vec_index]) |> multidplyr::partition(cl) |> # remove this line to deactivate parallelization dplyr::mutate(out = purrr::map( in_fname, diff --git a/analysis/make_tidy_cmip6.R b/analysis/make_tidy_cmip6.R index fe46d1e..9db9b69 100644 --- a/analysis/make_tidy_cmip6.R +++ b/analysis/make_tidy_cmip6.R @@ -28,7 +28,8 @@ res_evspsbl <- map2tidy( do_chunks = TRUE, outdir = "/data/scratch/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/", fileprefix = str_remove(basename(filnam), ".nc"), - ncores = 12 # parallel::detectCores() + ncores = 12, # parallel::detectCores() + overwrite = FALSE ) # Check if any unsuccessful: @@ -59,7 +60,8 @@ res_pr <- map2tidy( outdir = "/data/scratch/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/", #outdir = "/data_2/scratch/fbernhard/cmip6-ng/tidy/pr/", fileprefix = str_remove(basename(filnam), ".nc"), - ncores = 12 # parallel::detectCores() + ncores = 12, # parallel::detectCores() + overwrite = FALSE ) # Check if any unsuccessful: tidyr::unnest(res_pr, data) |> From 45c147949b1ff6074e2c3c4c4644a18f4bdf4d9b Mon Sep 17 00:00:00 2001 From: Fabian Bernhard <10245680+fabern@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:43:48 +0200 Subject: [PATCH 5/9] Update collect_cwd_annmax --- R/collect_cwd_annmax_byilon.R | 16 ----- analysis/collect_cwd_annmax.R | 114 +++++++++++++--------------------- 2 files changed, 44 insertions(+), 86 deletions(-) delete mode 100644 R/collect_cwd_annmax_byilon.R diff --git a/R/collect_cwd_annmax_byilon.R b/R/collect_cwd_annmax_byilon.R deleted file mode 100644 index 7c055ff..0000000 --- a/R/collect_cwd_annmax_byilon.R +++ /dev/null @@ -1,16 +0,0 @@ -collect_cwd_annmax_byilon <- function( - ilon, - indir, - fileprefix -){ - - # read annual time series file - filnam <- list.files( - indir, - pattern = paste0(fileprefix, "_", ilon, "_ANNMAX.rds"), - full.names = TRUE - ) - df <- readr::read_rds(filnam) |> - unnest(data) - return(df) -} diff --git a/analysis/collect_cwd_annmax.R b/analysis/collect_cwd_annmax.R index effded4..28996a8 100644 --- a/analysis/collect_cwd_annmax.R +++ b/analysis/collect_cwd_annmax.R @@ -1,85 +1,59 @@ #!/usr/bin/env Rscript -# script is called with three arguments: -# 1. counter for chunks -# 2. total number of chunks -# 3. total number of longitude indices +# script is called without any arguments # Example: -# >./get_cwd_annmax.R 1 3 360 - -# to receive arguments to script from the shell -args = commandArgs(trailingOnly=TRUE) - -# # When using this script directly from RStudio, not from the shell, specify -# nlon <- 288 # set this by hand. corresponds to length of the longitude dimension in original NetCDF files -# args <- c(1, 1, nlon) +# >./collect_cwd_annmax.R library(dplyr) library(map2tidy) library(multidplyr) -source(paste0(here::here(), "/R/collect_cwd_annmax_byilon.R")) - -print("getting data for longitude indices:") -vec_index <- map2tidy::get_index_by_chunk( - as.integer(args[1]), # counter for chunks - as.integer(args[2]), # total number of chunks - as.integer(args[3]) # total number of longitude indices - ) +indir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd/" -# number of cores of parallel threads -ncores <- 2 # parallel::detectCores() +# 1) Define filenames of files to process: ------------------------------- +filnams <- list.files( + indir, + pattern = "evspsbl_cum_LON_[0-9.+-]*_ANNMAX.rds", # make sure to include only _ANNMAX.rds + full.names = TRUE +) -# parallelize job -# set up the cluster, sending required objects to each core -cl <- multidplyr::new_cluster(ncores) |> - multidplyr::cluster_library(c("map2tidy", - "dplyr", - "purrr", - "tidyr", - "readr", - "here", - "magrittr")) |> - multidplyr::cluster_assign( - collect_cwd_annmax_byilon = collect_cwd_annmax_byilon # make the function known for each core - ) + # NOTE: this script has to be run non-parallel, since it collects results + # from previous scripts that were run in parallel. + # # 2) Setup parallelization ------------------------------------------------ + # # 2a) Split job onto multiple nodes + # # i.e. only consider a subset of the files (others might be treated by another compute node) + # vec_index <- map2tidy::get_index_by_chunk( + # as.integer(args[1]), # counter for compute node + # as.integer(args[2]), # total number of compute node + # length(filnams) # total number of longitude indices + # ) + # + # # 2b) Parallelize job across cores on a single node + # ncores <- 2 # parallel::detectCores() # number of cores of parallel threads + # + # cl <- multidplyr::new_cluster(ncores) |> + # # set up the cluster by sending required objects to each core + # multidplyr::cluster_library(c("map2tidy", + # "dplyr", + # "purrr", + # "tidyr", + # "readr", + # "here", + # "magrittr")) |> + # multidplyr::cluster_assign( + # collect_cwd_annmax_byilon = collect_cwd_annmax_byilon # make the function known for each core + # ) + + +# 3) Process files -------------------------------------------------------- +global_df <- lapply(filnams, + function(filnam) {readr::read_rds(filnam) |> tidyr::unnest(data)}) |> + bind_rows() + +readr::write_rds(global_df, + file.path(indir,paste0(fileprefix, "_ANNMAX.rds")) -# distribute computation across the cores, calculating for all longitudinal -# indices of this chunk -df <- tibble(ilon = vec_index) |> - multidplyr::partition(cl) |> - dplyr::mutate(out = purrr::map( - ilon, - ~collect_cwd_annmax_byilon( - ., - indir = "~/data/cmip6-ng/tidy/cwd/", - fileprefix = "evspsbl_cum" - )) - ) |> - collect() |> - tidyr::unnest(out) -readr::write_rds( - df, - paste0( - indir, - fileprefix, - "_ANNMAX.rds" - ) -) -# # un-parallel alternative -# df <- tibble(ilon = seq(60)) |> # vec_index -# dplyr::mutate(out = purrr::map( -# ilon, -# ~collect_cwd_annmax_byilon( -# ., -# indir = "~/data/cmip6-ng/tidy/cwd/", -# fileprefix = "evspsbl_cum" -# )) -# ) |> -# tidyr::unnest(out) -# -# format(object.size(df), units = "MB") From 071d228b66e7f67c19cbff0e1e338e72b5b18d34 Mon Sep 17 00:00:00 2001 From: Fabian Bernhard <10245680+fabern@users.noreply.github.com> Date: Tue, 20 Aug 2024 14:50:29 +0200 Subject: [PATCH 6/9] Combine collect and NetCDF generation in single R-script --- analysis/collect_cwd_annmax.R | 50 +++++++++++++++++++++++++++++++--- analysis/create_nc_annmax.R | 51 ----------------------------------- analysis/get_cwd_annmax.R | 2 +- 3 files changed, 48 insertions(+), 55 deletions(-) delete mode 100644 analysis/create_nc_annmax.R diff --git a/analysis/collect_cwd_annmax.R b/analysis/collect_cwd_annmax.R index 28996a8..d6c3dc0 100644 --- a/analysis/collect_cwd_annmax.R +++ b/analysis/collect_cwd_annmax.R @@ -9,7 +9,8 @@ library(dplyr) library(map2tidy) library(multidplyr) -indir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd/" +indir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd/" +outfile <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/evspsbl_cum_global.nc" # 1) Define filenames of files to process: ------------------------------- filnams <- list.files( @@ -51,9 +52,52 @@ global_df <- lapply(filnams, function(filnam) {readr::read_rds(filnam) |> tidyr::unnest(data)}) |> bind_rows() -readr::write_rds(global_df, - file.path(indir,paste0(fileprefix, "_ANNMAX.rds")) +# readr::write_rds(global_df, +# file.path(indir,paste0(fileprefix, "_ANNMAX.rds")) +# Instead of writing rds file, directly save as NetCDF: +# 4) Output to single, global NetCDF file --------------------------------- +library(rgeco) # get it from https://github.com/geco-bern/rgeco + +# create object that can be used with write_nc2() +global_df <- global_df |> + select(lon, lat, year, evspsbl_cum) |> + arrange(year, lat, lon) + +arr <- array( + unlist(global_df$evspsbl_cum), + dim = c( + length(unique(global_df$lon)), + length(unique(global_df$lat)), + length(unique(global_df$year)) + ) +) + +# image(arr[,,1]) + +# create object for use in rgeco::write_nc2() +obj <- list( + lon = sort(unique(global_df$lon)), + lat = sort(unique(global_df$lat)), + time = lubridate::ymd( + paste0( + sort(unique(global_df$year)), + "-01-01" # taking first of January as a mid-point for each year + ) + ), + vars = list(evspsbl_cum = arr) +) + +rgeco::write_nc2( + obj, + varnams = "evspsbl_cum", + make_tdim = TRUE, + path = outfile, + units_time = "days since 2001-01-01", + att_title = "Global Cumulative Water Deficit", + att_history = sprintf("Created on: %s, with R scripts from https://github.com/geco-bern/cwd_global processing data from: %s", Sys.Date(), indir) +) + diff --git a/analysis/create_nc_annmax.R b/analysis/create_nc_annmax.R deleted file mode 100644 index 995ff0a..0000000 --- a/analysis/create_nc_annmax.R +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env Rscript - -library(rgeco) # get it from https://github.com/geco-bern/rgeco - -indir <- "~/data/cmip6-ng/tidy/cwd/" -fileprefix <- "evspsbl_cum" - -df <- readr::read_rds( - paste0( - indir, - fileprefix, - "_ANNMAX.rds" - ) -) - -# create object that can be used with write_nc2() -df <- df |> - select(lon, lat, year, evspsbl_cum) |> - arrange(year, lat, lon) - -arr <- array( - unlist(df$evspsbl_cum), - dim = c( - length(unique(df$lon)), - length(unique(df$lat)), - length(unique(df$year)) - ) -) - -# image(arr[,,1]) - -# create object for use in rgeco::write_nc2() -obj <- list( - lon = sort(unique(df$lon)), - lat = sort(unique(df$lat)), - time = lubridate::ymd( - paste0( - sort(unique(df$year)), - "-01-01" # taking first of January as a mid-point for each year - ) - ), - vars = list(evspsbl_cum = arr) -) - -rgeco::write_nc2( - obj, - varnams = "evspsbl_cum", - make_tdim = TRUE, - path = "~/data/cmip6-ng/tidy/cwd/evspsbl_cum_ANNMAX.nc", - units_time = "days since 2001-01-01" -) diff --git a/analysis/get_cwd_annmax.R b/analysis/get_cwd_annmax.R index c317fcb..a28bb50 100644 --- a/analysis/get_cwd_annmax.R +++ b/analysis/get_cwd_annmax.R @@ -1,6 +1,6 @@ #!/usr/bin/env Rscript -# script is called with three arguments for parallelization: +# script is called with two arguments for parallelization: # 1. counter for chunks (e.g. # of each compute node) # 2. total number of chunks (e.g. number of total compute nodes) From 964a61d1cae93d44568fd0d037cd7d23bc37cde2 Mon Sep 17 00:00:00 2001 From: Fabian Bernhard <10245680+fabern@users.noreply.github.com> Date: Tue, 20 Aug 2024 15:12:55 +0200 Subject: [PATCH 7/9] Add meta info to NetCDF --- analysis/collect_cwd_annmax.R | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/analysis/collect_cwd_annmax.R b/analysis/collect_cwd_annmax.R index d6c3dc0..3e93de2 100644 --- a/analysis/collect_cwd_annmax.R +++ b/analysis/collect_cwd_annmax.R @@ -89,6 +89,19 @@ obj <- list( vars = list(evspsbl_cum = arr) ) + +# Get meta information on code executed: +# gitrepo_hash = system("git rev-parse HEAD", intern=TRUE) +gitrepo_hash = system("git rev-parse --short HEAD", intern=TRUE) +gitrepo_status <- + ifelse(system("git status --porcelain | wc -l", intern = TRUE) == "0", + "", #-clean-repository + "-dirty-repository") +gitrepo_id <- paste0( + "https://github.com/geco-bern/cwd_global@", + gitrepo_hash, gitrepo_status) + +# Write NetCDF file: rgeco::write_nc2( obj, varnams = "evspsbl_cum", @@ -96,8 +109,11 @@ rgeco::write_nc2( path = outfile, units_time = "days since 2001-01-01", att_title = "Global Cumulative Water Deficit", - att_history = sprintf("Created on: %s, with R scripts from https://github.com/geco-bern/cwd_global processing data from: %s", Sys.Date(), indir) + att_history = sprintf( + "Created on: %s, with R scripts from (%s) processing input data from: %s", + Sys.Date(), gitrepo_id, indir) ) + From b6d888f1b696880eeac6c89a05064b02442a08c1 Mon Sep 17 00:00:00 2001 From: Fabian Bernhard <10245680+fabern@users.noreply.github.com> Date: Tue, 20 Aug 2024 15:46:36 +0200 Subject: [PATCH 8/9] Fix out filename in apply_cwd_global.R --- R/cwd_byilon.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/R/cwd_byilon.R b/R/cwd_byilon.R index 1583072..39294d9 100644 --- a/R/cwd_byilon.R +++ b/R/cwd_byilon.R @@ -6,6 +6,8 @@ cwd_byLON <- function( # prepare output # write (complemented) data to file. Give it some meaningful name and the index counter + fileprefix <- "CWD_result" + LON_string <- gsub("^.*?(LON_[0-9.+-]*).rds$", "\\1", basename(filnam)) outpath <- file.path(outdir, paste0(fileprefix, "_", LON_string, ".rds")) if (file.exists(outpath) && !overwrite){ From d5f26e91532269c06d56008de744b7edac5e1be5 Mon Sep 17 00:00:00 2001 From: Fabian Bernhard <10245680+fabern@users.noreply.github.com> Date: Tue, 20 Aug 2024 16:08:12 +0200 Subject: [PATCH 9/9] Update paths and README.md --- README.md | 20 +++++++++---------- analysis/apply_cwd_global.R | 15 ++++++++++----- analysis/collect_cwd_annmax.R | 36 ++++++----------------------------- analysis/get_cwd_annmax.R | 15 +++++++++------ analysis/make_tidy_cmip6.R | 12 +++++++----- src/apply_cwd_global.sh | 10 +++++----- 6 files changed, 46 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index 74c3161..03e9fc6 100644 --- a/README.md +++ b/README.md @@ -5,12 +5,12 @@ ## Workflow 1. `analysis/make_tidy_cmip6.R`: Make original global files tidy. Save as nested data frames, where each row is one grid cell and time series is nested in the column `data`. Separate files are written for each longitudinal band. Writes files: - - `~/data/cmip6-ng/tidy/evspsbl_mon_CESM2_historical_r1i1p1f1_native_ilon_.rds` + `evspsbl/evspsbl_mon_CESM2_ssp585_r1i1p1f1_native_LON_[+-XXX.XXX].rds` + into folder defined as `outdir`. 2. `analysis/apply_cwd_global.R`: Script for parallel applying the CWD algorithm (or anything else that operates on full time series) separately for on gridcell. Distributes by longitudinal band. Reading files written in previous step and writing files (in the example): - - `~/data/cmip6-ng/tidy/cwd/_.rds` + `cwd/CWD_result_LON_[+-XXX.XXX].rds` + into folder defined as `outdir`. `src/apply_cwd_global.sh`: Bash script that calls `apply_cwd_global.R` , to be used as an alternative and containing submission statement for HPC. @@ -18,14 +18,12 @@ 3. `analysis/get_cwd_annmax.R`: Script for parallel applying function for determining annual maximum. Reading files written in previous step and writing files (in the example): - `~/data/cmip6-ng/tidy/cwd/__ANNMAX.rds` - -4. `collect_cwd_annmax.R`: Script for collecting annual time series of each gridcell - is much smaller data and can now be handled by reading all into memory. Writes file containing global data with annual resolution: + `cwd_annmax/CWD_result_LON_[+-XXX.XXX]_ANNMAX.rds` - `~/data/cmip6-ng/tidy/cwd/_cum_ANNMAX.rds` +4. `collect_cwd_annmax.R`: Script for collecting annual time series of each gridcell and writing the global annual data into a NetCDF file. Since annual maximum is much smaller data and can now be handled by reading all into memory. This uses the function `write_nc2()` from the package {rgeco}. Install it from [here](https://github.com/geco-bern/rgeco). Writes single file containing global data with annual resolution as NetCDF: -5. `analysis/create_nc_annmax.R`: Script for writing the global annual data into a NetCDF file. This uses the function `write_nc2()` from the package {rgeco}. Install it from [here](https://github.com/geco-bern/rgeco). Writes file containing global data with annual resolution as NetCDF: - - `~/data/cmip6-ng/tidy/cwd/evspsbl_cum_ANNMAX.nc` + `/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd_annmax_global.nc` +Writes file containing global data with annual resolution: **Note**: Adjust paths and file name prefixes for your own case in scripts (located in subdirectory `analysis/`) + diff --git a/analysis/apply_cwd_global.R b/analysis/apply_cwd_global.R index 4e743e7..046b425 100644 --- a/analysis/apply_cwd_global.R +++ b/analysis/apply_cwd_global.R @@ -14,8 +14,8 @@ library(multidplyr) source(paste0(here::here(), "/R/cwd_byilon.R")) source(paste0(here::here(), "/R/my_cwd.R")) # load function that will be applied to time series -indir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/evspsbl/" -outdir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd/" +indir <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/evspsbl/" +outdir <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd/" dir.create(outdir, showWarnings = FALSE) @@ -26,6 +26,11 @@ filnams <- list.files( full.names = TRUE ) +if (length(filnams) <= 1){ + stop("Should find multiple files. Only found " ,length(filnams), ".") +} + + # 2) Setup parallelization ------------------------------------------------ # parallelize job across cores on a single node ncores <- 6 # parallel::detectCores() # number of cores of parallel threads @@ -47,8 +52,8 @@ cl <- multidplyr::new_cluster(ncores) |> # 3) Process files -------------------------------------------------------- -out <- tibble(in_fname = filnams[vec_index]) |> - # multidplyr::partition(cl) |> # remove this line to deactivate parallelization +out <- tibble(in_fname = filnams) |> + multidplyr::partition(cl) |> # remove this line to deactivate parallelization dplyr::mutate(out = purrr::map( in_fname, ~cwd_byLON( @@ -61,4 +66,4 @@ out <- tibble(in_fname = filnams[vec_index]) |> out |> unnest(out) -# TO CHECK: readRDS("/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd//evspsbl_cum_LON_+0.000.rds") |> unnest(data) +# TO CHECK: readRDS("/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd//evspsbl_cum_LON_+0.000.rds") |> unnest(data) diff --git a/analysis/collect_cwd_annmax.R b/analysis/collect_cwd_annmax.R index 3e93de2..28551de 100644 --- a/analysis/collect_cwd_annmax.R +++ b/analysis/collect_cwd_annmax.R @@ -9,43 +9,19 @@ library(dplyr) library(map2tidy) library(multidplyr) -indir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd/" -outfile <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/evspsbl_cum_global.nc" +indir <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd_annmax" +outfile <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd_annmax_global.nc" # 1) Define filenames of files to process: ------------------------------- filnams <- list.files( indir, - pattern = "evspsbl_cum_LON_[0-9.+-]*_ANNMAX.rds", # make sure to include only _ANNMAX.rds + pattern = "CWD_result_LON_[0-9.+-]*_ANNMAX.rds", # make sure to include only _ANNMAX.rds full.names = TRUE ) - # NOTE: this script has to be run non-parallel, since it collects results - # from previous scripts that were run in parallel. - # # 2) Setup parallelization ------------------------------------------------ - # # 2a) Split job onto multiple nodes - # # i.e. only consider a subset of the files (others might be treated by another compute node) - # vec_index <- map2tidy::get_index_by_chunk( - # as.integer(args[1]), # counter for compute node - # as.integer(args[2]), # total number of compute node - # length(filnams) # total number of longitude indices - # ) - # - # # 2b) Parallelize job across cores on a single node - # ncores <- 2 # parallel::detectCores() # number of cores of parallel threads - # - # cl <- multidplyr::new_cluster(ncores) |> - # # set up the cluster by sending required objects to each core - # multidplyr::cluster_library(c("map2tidy", - # "dplyr", - # "purrr", - # "tidyr", - # "readr", - # "here", - # "magrittr")) |> - # multidplyr::cluster_assign( - # collect_cwd_annmax_byilon = collect_cwd_annmax_byilon # make the function known for each core - # ) - +if (length(filnams) <= 1){ + stop("Should find multiple files. Only found " ,length(filnams), ".") +} # 3) Process files -------------------------------------------------------- global_df <- lapply(filnams, diff --git a/analysis/get_cwd_annmax.R b/analysis/get_cwd_annmax.R index a28bb50..41babed 100644 --- a/analysis/get_cwd_annmax.R +++ b/analysis/get_cwd_annmax.R @@ -17,10 +17,10 @@ # Example for 1 CPU-nodes: # >./get_cwd_annmax.R 1 1 # # When using this script directly from RStudio, not from the shell, specify -# args <- c(1, 1) +args <- c(1, 1) # to receive arguments to script from the shell -args = commandArgs(trailingOnly=TRUE) +# args = commandArgs(trailingOnly=TRUE) library(dplyr) library(map2tidy) @@ -28,16 +28,19 @@ library(multidplyr) source(paste0(here::here(), "/R/cwd_annmax_byilon.R")) -indir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd/" -outdir <- "/data_2/scratch/fbernhard/cmip6-ng/tidy/cwd/" - +indir <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd" +outdir <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd_annmax" +dir.create(outdir, showWarnings = FALSE) # 1) Define filenames of files to process: ------------------------------- filnams <- list.files( indir, - pattern = "evspsbl_cum_LON_[0-9.+-]*.rds", # make sure not to include _ANNMAX.rds + pattern = "CWD_result_LON_[0-9.+-]*.rds", # make sure not to include _ANNMAX.rds full.names = TRUE ) +if (length(filnams) <= 1){ + stop("Should find multiple files. Only found " ,length(filnams), ".") +} # 2) Setup parallelization ------------------------------------------------ # 2a) Split job onto multiple nodes diff --git a/analysis/make_tidy_cmip6.R b/analysis/make_tidy_cmip6.R index 9db9b69..20cb816 100644 --- a/analysis/make_tidy_cmip6.R +++ b/analysis/make_tidy_cmip6.R @@ -3,7 +3,10 @@ library(dplyr) library(stringr) # list demo file path -path_cmip6 <- "/data/scratch/CMIP6ng_CESM2_ssp585/cmip6-ng/" +path_cmip6 <- "/data/scratch/CMIP6ng_CESM2_ssp585/cmip6-ng/" + +outdir_evspsbl <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/evspsbl/" +outdir_pr <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/pr/" ## Evapotranspiration ----------------- varnam <- "evspsbl" @@ -15,7 +18,7 @@ filnam <- list.files( ) if (length(filnam) != 1){ - stop("Should find only a single file.") + stop("Should find exactly one single file.") } # load and convert @@ -26,7 +29,7 @@ res_evspsbl <- map2tidy( latnam = "lat", timenam = "time", do_chunks = TRUE, - outdir = "/data/scratch/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/", + outdir = outdir_evspsbl, fileprefix = str_remove(basename(filnam), ".nc"), ncores = 12, # parallel::detectCores() overwrite = FALSE @@ -57,8 +60,7 @@ res_pr <- map2tidy( latnam = "lat", timenam = "time", do_chunks = TRUE, - outdir = "/data/scratch/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/", - #outdir = "/data_2/scratch/fbernhard/cmip6-ng/tidy/pr/", + outdir = outdir_pr, fileprefix = str_remove(basename(filnam), ".nc"), ncores = 12, # parallel::detectCores() overwrite = FALSE diff --git a/src/apply_cwd_global.sh b/src/apply_cwd_global.sh index 81a950e..2ee6a2d 100755 --- a/src/apply_cwd_global.sh +++ b/src/apply_cwd_global.sh @@ -1,12 +1,12 @@ #!/bin/bash -nlon=288 # set this by hand: is the number of longitude indices in your files. All files must have the same spatial grid. -njobs=4 # number of top-level jobs, sent to separate nodes. On each node, number fo cores is automatically determined and used. - # this is for sumitting the job on a single machine (node) with multiple cores -Rscript analysis/apply_cwd_global.R 1 1 $nlon +Rscript analysis/apply_cwd_global.R 1 1 # # this is for submitting jobs on HPC with a queueing system +# TODO: Note by Fabian: only get_cwd_annmax.R is setup for multi-node parallelization but not apply_cwd_global.R +# TODO: Note by Fabian: only Furthermore, no HPC available with bsub. +# njobs=4 # number of top-level jobs, sent to separate nodes. On each node, number fo cores is automatically determined and used. # for ((n=1;n<=${njobs};n++)); do # echo "Submitting chunk number $n ..." -# bsub -W 72:00 -u bestocke -J "job_name $n" -R "rusage[mem=10000]" "Rscript vanilla analysis/apply_cwd_global.R $n $njobs $nlon" +# bsub -W 72:00 -u bestocke -J "job_name $n" -R "rusage[mem=10000]" "Rscript vanilla analysis/apply_cwd_global.R $n $njobs" # done