Skip to content

Commit

Permalink
Switch from cwd_byLON() and cwd_annmax_byLON() to generic fct().
Browse files Browse the repository at this point in the history
  • Loading branch information
fabern committed Aug 26, 2024
1 parent 47fd69d commit 6bfa25b
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 126 deletions.
48 changes: 48 additions & 0 deletions R/apply_fct_to_each_file.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
apply_fct_to_each_file <- function(
fct_to_apply_per_location,
filnam,
outdir,
overwrite,
outfilename_template #"resultfile_[LONSTRING].rds"
){

# prepare output
stopifnot(grepl(pattern = "\\[LONSTRING\\]", outfilename_template))

curr_LON_string <- gsub("^.*?(LON_[0-9.+-]*).rds$", "\\1", basename(filnam))
outpath <- file.path(outdir, gsub("\\[LONSTRING\\]", curr_LON_string, outfilename_template))

if (file.exists(outpath) && !overwrite){

return(paste0("File exists already: ", outpath)) # don't do anything

} else {
# read from file that contains tidy data for a single longitudinal band

# read evapotranspiration file tidy
df_evap <- readr::read_rds(filnam)

# apply the custom function on the time series data frame separately for each gridcell.
out <- df_evap |>
dplyr::mutate(data = purrr::map(data, ~fct_to_apply_per_location(.)))

# write result to file
message(paste0("Writing file ", outpath, " ..."))
readr::write_rds(out, outpath)

# don't return data - it's written to file
return(paste0("Written results to: ", outpath))


# UNUSED: stems originally from the function cwd_byLON()
# read other required files (precipitation, temperature, ...
# # merge all such that monthly data is repeated for each day within month
# df <- df_prec |> # one of the daily data frames
# tidyr::unnest(data) |> # must unnest to join by date
# left_join(
# df_evap |> # one of the monthly data frames
# tidyr::unnest(data),
# by = join_by(year, month)
# )
}
}
43 changes: 0 additions & 43 deletions R/cwd_annmax_byilon.R

This file was deleted.

57 changes: 0 additions & 57 deletions R/cwd_byilon.R

This file was deleted.

37 changes: 23 additions & 14 deletions analysis/apply_cwd_global.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,28 @@ library(dplyr)
library(map2tidy)
library(multidplyr)

source(paste0(here::here(), "/R/cwd_byilon.R"))
source(paste0(here::here(), "/R/my_cwd.R")) # load function that will be applied to time series
source(paste0(here::here(), "/R/apply_fct_to_each_file.R"))

indir <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/evspsbl/"
outdir <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd/"

dir.create(outdir, showWarnings = FALSE)

# 1) Define filenames of files to process: -------------------------------
filnams <- list.files(
indir,
pattern = "evspsbl_mon_CESM2_ssp585_r1i1p1f1_native_LON_[0-9.+-]*rds",
full.names = TRUE
)
infile_pattern <- "evspsbl_mon_CESM2_ssp585_r1i1p1f1_native_LON_[0-9.+-]*rds"
outfile_pattern <- "CWD_result_[LONSTRING].rds" # must contain [LONSTRING]

filnams <- list.files(indir, pattern = infile_pattern, full.names = TRUE)
if (length(filnams) <= 1){
stop("Should find multiple files. Only found " ,length(filnams), ".")
}

# 1b) Define function to apply to each location: -------------------------------
# function to apply to each file:
source(paste0(here::here(), "/R/my_cwd.R")) # load function that will be applied to time series
# test and debug:
# df_of_one_coordinate <- read_rds(filnams[1])$data[[1]]
# my_cwd(df_of_one_coordinate)


# 2) Setup parallelization ------------------------------------------------
# parallelize job across cores on a single node
Expand All @@ -45,9 +48,10 @@ cl <- multidplyr::new_cluster(ncores) |>
"here",
"magrittr")) |>
multidplyr::cluster_assign(
my_cwd = my_cwd, # make the function known for each core
cwd_byLON = cwd_byLON, # make the function known for each core
outdir = outdir
apply_fct_to_each_file = apply_fct_to_each_file, # make the function known for each core
my_cwd = my_cwd, # make the function known for each core
outdir = outdir,
outfile_pattern = outfile_pattern
)


Expand All @@ -56,14 +60,19 @@ out <- tibble(in_fname = filnams) |>
multidplyr::partition(cl) |> # remove this line to deactivate parallelization
dplyr::mutate(out = purrr::map(
in_fname,
~cwd_byLON(
~apply_fct_to_each_file(
fct_to_apply_per_location = my_cwd,
filnam = .,
outdir = outdir,
overwrite = FALSE
overwrite = FALSE,
outfilename_template = outfile_pattern # must contain [LONSTRING]
))
) |>
collect() # collect partitioned data.frame

out |> unnest(out)
out$out[1]




# TO CHECK: readRDS("/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd//evspsbl_cum_LON_+0.000.rds") |> unnest(data)
39 changes: 28 additions & 11 deletions analysis/get_cwd_annmax.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,34 @@ library(dplyr)
library(map2tidy)
library(multidplyr)

source(paste0(here::here(), "/R/cwd_annmax_byilon.R"))
source(paste0(here::here(), "/R/apply_fct_to_each_file.R"))

indir <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd"
outdir <- "/data_2/scratch/fbernhard/CMIP6ng_CESM2_ssp585/cmip6-ng/tidy/cwd_annmax"
dir.create(outdir, showWarnings = FALSE)

# 1) Define filenames of files to process: -------------------------------
filnams <- list.files(
indir,
pattern = "CWD_result_LON_[0-9.+-]*.rds", # make sure not to include _ANNMAX.rds
full.names = TRUE
)
# 1a) Define filenames of files to process: -------------------------------
infile_pattern <- "CWD_result_LON_[0-9.+-]*.rds"
outfile_pattern <- "CWD_result_[LONSTRING]_ANNMAX.rds" # must contain [LONSTRING]

filnams <- list.files(indir, pattern = infile_pattern, full.names = TRUE)
if (length(filnams) <= 1){
stop("Should find multiple files. Only found " ,length(filnams), ".")
}

# 1b) Define function to apply to each location: -------------------------------
# function to apply to get annual maximum:
get_annmax <- function(df_of_one_coordinate){
df_of_one_coordinate |>
mutate(year = lubridate::year(datetime)) |>
group_by(year) |>
summarise(evspsbl_cum = max(evspsbl_cum))
}
# test and debug:
# df_of_one_coordinate <- read_rds(filnams[1])$data[[1]]
# df_of_one_coordinate


# 2) Setup parallelization ------------------------------------------------
# 2a) Split job onto multiple nodes
# i.e. only consider a subset of the files (others might be treated by another compute node)
Expand All @@ -64,8 +76,10 @@ cl <- multidplyr::new_cluster(ncores) |>
"here",
"magrittr")) |>
multidplyr::cluster_assign(
cwd_annmax_byLON = cwd_annmax_byLON, # make the function known for each core
outdir = outdir
apply_fct_to_each_file = apply_fct_to_each_file, # make the function known for each core
get_annmax = get_annmax, # make the function known for each core
outdir = outdir,
outfile_pattern = outfile_pattern
)


Expand All @@ -74,15 +88,18 @@ out <- tibble(in_fname = filnams[vec_index]) |>
multidplyr::partition(cl) |> # remove this line to deactivate parallelization
dplyr::mutate(out = purrr::map(
in_fname,
~cwd_annmax_byLON(
~apply_fct_to_each_file(
fct_to_apply_per_location = get_annmax,
filnam = .,
outdir = outdir,
overwrite = FALSE
overwrite = FALSE,
outfilename_template = outfile_pattern # must contain [LONSTRING]
))
) |>
collect() # collect partitioned data.frame

out |> unnest(out)
out$out[1]



Expand Down
2 changes: 1 addition & 1 deletion analysis/test_cwd_global_cmip6.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Testing whether CMIP6 data is correctly read from files and handled.
# For Patricia Gribi

library(FluxDataKit)
library(FluxDataKit) # remotes::install_github("geco-bern/[email protected]")
library(tidyverse)
library(terra)
library(lubridate)
Expand Down

0 comments on commit 6bfa25b

Please sign in to comment.