Skip to content

Commit

Permalink
Merge pull request #1 from geco-bern/update-for-map2tidy-v2
Browse files Browse the repository at this point in the history
Update map2tidy v2.0.0
  • Loading branch information
stineb authored Aug 22, 2024
2 parents b946934 + d5f26e9 commit 47fd69d
Show file tree
Hide file tree
Showing 12 changed files with 307 additions and 351 deletions.
16 changes: 0 additions & 16 deletions R/collect_cwd_annmax_byilon.R

This file was deleted.

73 changes: 36 additions & 37 deletions R/cwd_annmax_byilon.R
Original file line number Diff line number Diff line change
@@ -1,44 +1,43 @@
cwd_annmax_byilon <- function(
ilon,
indir,
cwd_annmax_byLON <- function(
filnam,
outdir,
fileprefix
overwrite
){

# function to get annual maximum
get_annmax <- function(df){
df |>
mutate(year = lubridate::year(time)) |>
group_by(year) |>
summarise(evspsbl_cum = max(evspsbl_cum))
}
# # write (complemented) data to file. Give it some meaningful name and the index counter
outpath <- file.path(outdir, basename(filnam) %>% gsub(".rds","_ANNMAX.rds", .))
if (file.exists(outpath) && !overwrite){

# read evapotranspiration file tidy
filnam <- list.files(
indir,
pattern = paste0(fileprefix, "_", ilon, ".rds"),
full.names = TRUE
)
df <- readr::read_rds(filnam)

# apply annual maximum function
out <- df |>
mutate(data = purrr::map(
data,
~get_annmax(.)
))

# write (complemented) data to file. Give it some meaningful name and the index counter
path <- paste0(outdir, "/", fileprefix, "_", ilon, "_ANNMAX.rds")
message(
paste0(
"Writing file ", path, " ..."
)
)
readr::write_rds(
out,
path
# don't do anything
return(paste0("File exists already: ", outpath))

} else {
# read from file that contains tidy data for a single longitudinal band

# read evapotranspiration file tidy
df_evap <- readr::read_rds(filnam)

# function to apply to get annual maximum:
get_annmax <- function(df_of_one_coordinate){
df_of_one_coordinate |>
mutate(year = lubridate::year(datetime)) |>
group_by(year) |>
summarise(evspsbl_cum = max(evspsbl_cum))
}

# apply annual maximum function
out <- df_evap |>
# apply the custom function on the time series data frame separately for each gridcell.
dplyr::mutate(data = purrr::map(data, ~get_annmax(.)))

message(
paste0(
"Writing file ", outpath, " ..."
)
)
readr::write_rds(out, outpath)

# don't return data - it's written to file
# don't return data - it's written to file
return(paste0("Written results to: ", outpath))
}
}
106 changes: 51 additions & 55 deletions R/cwd_byilon.R
Original file line number Diff line number Diff line change
@@ -1,61 +1,57 @@
cwd_byilon <- function(
ilon,
indir,
cwd_byLON <- function(
filnam,
outdir,
fileprefix
){

# load function that will be applied to time series
source(paste0(here::here(), "/R/my_cwd.R"))

# read from file that contains tidy data for a single longitudinal band

# read evapotranspiration file tidy
filnam <- list.files(
indir,
pattern = paste0("evspsbl_mon_CESM2_historical_r1i1p1f1_native_ilon_", ilon, ".rds"),
full.names = TRUE
)
df_evap <- readr::read_rds(filnam)

# read other required files (precipitation, temperature, ...

# # merge all such that monthly data is repeated for each day within month
# df <- df_prec |> # one of the daily data frames
# tidyr::unnest(data) |> # must unnest to join by date
# left_join(
# df_evap |> # one of the monthly data frames
# tidyr::unnest(data),
# by = join_by(year, month)
# )

# for demo only
df <- df_evap |>
tidyr::unnest(data)

out <- df |>

# Uncomment code below to nest data by gridcell, if not already nested.
# group data by gridcells and wrap time series for each gridcell into a new
# column, by default called 'data'.
dplyr::group_by(lon, lat) |>
tidyr::nest() |>

# apply the custom function on the time series data frame separately for
# each gridcell.
dplyr::mutate(data = purrr::map(data, ~my_cwd(.)))
overwrite
){

# prepare output
# write (complemented) data to file. Give it some meaningful name and the index counter
path <- paste0(outdir, "/", fileprefix, "_", ilon, ".rds")
message(
paste0(
"Writing file ", path, " ..."
)
)
readr::write_rds(
out,
path
fileprefix <- "CWD_result"
LON_string <- gsub("^.*?(LON_[0-9.+-]*).rds$", "\\1", basename(filnam))
outpath <- file.path(outdir, paste0(fileprefix, "_", LON_string, ".rds"))
if (file.exists(outpath) && !overwrite){

# don't do anything
return(paste0("File exists already: ", outpath))

} else {
# read from file that contains tidy data for a single longitudinal band

# read evapotranspiration file tidy
df_evap <- readr::read_rds(filnam)

# read other required files (precipitation, temperature, ...

# # merge all such that monthly data is repeated for each day within month
# df <- df_prec |> # one of the daily data frames
# tidyr::unnest(data) |> # must unnest to join by date
# left_join(
# df_evap |> # one of the monthly data frames
# tidyr::unnest(data),
# by = join_by(year, month)
# )

# function to apply:
## my_cwd: NOTE this is defined in outer scope and accessible by closure

out <- df_evap |>
# # Uncomment code below to nest data by gridcell, if not already nested.
# # group data by gridcells and wrap time series for each gridcell into a new
# # column, by default called 'data'.
# dplyr::group_by(lon, lat) |> tidyr::nest() |>

# apply the custom function on the time series data frame separately for each gridcell.
dplyr::mutate(data = purrr::map(data, ~my_cwd(.))) # NOTE: this uses the closure

# write (complemented) data to file.
message(
paste0(
"Writing file ", outpath, " ..."
)
)
readr::write_rds(out, outpath)

# don't return data - it's written to file
# don't return data - it's written to file
return(paste0("Written results to: ", outpath))
}
}
2 changes: 1 addition & 1 deletion R/my_cwd.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ my_cwd <- function(df){
mutate(evspsbl_cum = evspsbl) |>

# reduce size - important
select(time, evspsbl_cum)
select(datetime, evspsbl_cum)

# return a data frame
return(out)
Expand Down
20 changes: 9 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,25 @@
## Workflow

1. `analysis/make_tidy_cmip6.R`: Make original global files tidy. Save as nested data frames, where each row is one grid cell and time series is nested in the column `data`. Separate files are written for each longitudinal band. Writes files:

`~/data/cmip6-ng/tidy/evspsbl_mon_CESM2_historical_r1i1p1f1_native_ilon_<ilon>.rds`
`evspsbl/evspsbl_mon_CESM2_ssp585_r1i1p1f1_native_LON_[+-XXX.XXX].rds`
into folder defined as `outdir`.

2. `analysis/apply_cwd_global.R`: Script for parallel applying the CWD algorithm (or anything else that operates on full time series) separately for on gridcell. Distributes by longitudinal band. Reading files written in previous step and writing files (in the example):

`~/data/cmip6-ng/tidy/cwd/<fileprefix>_<ilon>.rds`
`cwd/CWD_result_LON_[+-XXX.XXX].rds`
into folder defined as `outdir`.

`src/apply_cwd_global.sh`: Bash script that calls `apply_cwd_global.R` , to be used as an alternative and containing submission statement for HPC.

Note: This step creates data at the original temporal resolution. Data is not collected at this stage to avoid memory limitation.

3. `analysis/get_cwd_annmax.R`: Script for parallel applying function for determining annual maximum. Reading files written in previous step and writing files (in the example):

`~/data/cmip6-ng/tidy/cwd/<fileprefix>_<ilon>_ANNMAX.rds`

4. `collect_cwd_annmax.R`: Script for collecting annual time series of each gridcell - is much smaller data and can now be handled by reading all into memory. Writes file containing global data with annual resolution:
`cwd_annmax/CWD_result_LON_[+-XXX.XXX]_ANNMAX.rds`

`~/data/cmip6-ng/tidy/cwd/<fileprefix>_cum_ANNMAX.rds`
4. `collect_cwd_annmax.R`: Script for collecting annual time series of each gridcell and writing the global annual data into a NetCDF file. Since annual maximum is much smaller data and can now be handled by reading all into memory. This uses the function `write_nc2()` from the package {rgeco}. Install it from [here](https://github.com/geco-bern/rgeco). Writes single file containing global data with annual resolution as NetCDF:

5. `analysis/create_nc_annmax.R`: Script for writing the global annual data into a NetCDF file. This uses the function `write_nc2()` from the package {rgeco}. Install it from [here](https://github.com/geco-bern/rgeco). Writes file containing global data with annual resolution as NetCDF:

`~/data/cmip6-ng/tidy/cwd/evspsbl_cum_ANNMAX.nc`
`/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd_annmax_global.nc`

Writes file containing global data with annual resolution:
**Note**: Adjust paths and file name prefixes for your own case in scripts (located in subdirectory `analysis/`)

2 changes: 1 addition & 1 deletion analysis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

1. `make_tidy_cmip6.R`: Make original global files tidy. Save as nested data frames, where each row is one grid cell and time series is nested in the column `data`. Separate files are written for each longitudinal band. Writes files:

`~/data/cmip6-ng/tidy/evspsbl_mon_CESM2_historical_r1i1p1f1_native_ilon_<ilon>.rds`
`~/data/cmip6-ng/tidy/evspsbl_mon_CESM2_historical_r1i1p1f1_native_LON_<+-XXX.XXX>.rds`

2. `apply_cwd_global.R`: Script for parallel applying the CWD algorithm (or anything else that operates on full time series) separately for on gridcell. Distributes by longitudinal band. Reading files written in previous step and writing files (in the example):

Expand Down
90 changes: 43 additions & 47 deletions analysis/apply_cwd_global.R
Original file line number Diff line number Diff line change
@@ -1,39 +1,42 @@
#!/usr/bin/env Rscript

# script is called with three arguments:
# 1. counter for chunks
# 2. total number of chunks
# 3. total number of longitude indices
# script is called without any arguments
# (By specifying overwrite you can choose within the script to avoid calculating
# or recalculate previous results)

# Example:
# >./apply_cwd_global.R 1 3 360

# to receive arguments to script from the shell
args = commandArgs(trailingOnly=TRUE)

# # When using this script directly from RStudio, not from the shell, specify
# nlon <- 288 # set this by hand. corresponds to length of the longitude dimension in original NetCDF files
# args <- c(1, 1, nlon)
# >./apply_cwd_global.R

library(dplyr)
library(map2tidy)
library(multidplyr)

source(paste0(here::here(), "/R/cwd_byilon.R"))
source(paste0(here::here(), "/R/my_cwd.R")) # load function that will be applied to time series

print("getting data for longitude indices:")
vec_index <- map2tidy::get_index_by_chunk(
as.integer(args[1]), # counter for chunks
as.integer(args[2]), # total number of chunks
as.integer(args[3]) # total number of longitude indices
)
indir <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/evspsbl/"
outdir <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd/"

dir.create(outdir, showWarnings = FALSE)

# 1) Define filenames of files to process: -------------------------------
filnams <- list.files(
indir,
pattern = "evspsbl_mon_CESM2_ssp585_r1i1p1f1_native_LON_[0-9.+-]*rds",
full.names = TRUE
)

# number of cores of parallel threads
ncores <- 2 # parallel::detectCores()
if (length(filnams) <= 1){
stop("Should find multiple files. Only found " ,length(filnams), ".")
}


# 2) Setup parallelization ------------------------------------------------
# parallelize job across cores on a single node
ncores <- 6 # parallel::detectCores() # number of cores of parallel threads

# parallelize job
# set up the cluster, sending required objects to each core
cl <- multidplyr::new_cluster(ncores) |>
# set up the cluster, sending required objects to each core
multidplyr::cluster_library(c("map2tidy",
"dplyr",
"purrr",
Expand All @@ -42,32 +45,25 @@ cl <- multidplyr::new_cluster(ncores) |>
"here",
"magrittr")) |>
multidplyr::cluster_assign(
cwd_byilon = cwd_byilon # make the function known for each core
)
my_cwd = my_cwd, # make the function known for each core
cwd_byLON = cwd_byLON, # make the function known for each core
outdir = outdir
)


# distribute computation across the cores, calculating for all longitudinal
# indices of this chunk
out <- tibble(ilon = vec_index) |>
multidplyr::partition(cl) |>
# 3) Process files --------------------------------------------------------
out <- tibble(in_fname = filnams) |>
multidplyr::partition(cl) |> # remove this line to deactivate parallelization
dplyr::mutate(out = purrr::map(
ilon,
~cwd_byilon(
.,
indir = "~/data/cmip6-ng/tidy/evspsbl/",
outdir = "~/data/cmip6-ng/tidy/cwd/",
fileprefix = "evspsbl_cum"
))
)
in_fname,
~cwd_byLON(
filnam = .,
outdir = outdir,
overwrite = FALSE
))
) |>
collect() # collect partitioned data.frame

out |> unnest(out)

# # un-parallel alternative
# out <- tibble(ilon = vec_index) |>
# dplyr::mutate(out = purrr::map(
# ilon,
# ~cwd_byilon(
# .,
# indir = "~/data/cmip6-ng/tidy/evspsbl/",
# outdir = "~/data/cmip6-ng/tidy/cwd/",
# fileprefix = "evspsbl_cum"
# ))
# )
# TO CHECK: readRDS("/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd//evspsbl_cum_LON_+0.000.rds") |> unnest(data)
Loading

0 comments on commit 47fd69d

Please sign in to comment.