From e79ef488887d6d77e49408c62093e6c71e152c16 Mon Sep 17 00:00:00 2001 From: Adrien Le Guillou Date: Wed, 1 Nov 2023 02:48:59 -0700 Subject: [PATCH] Refactor (#3) * refactor in progress * vignette start + pkgdown --- .Rbuildignore | 8 + .github/.gitignore | 1 + .github/workflows/pkgdown.yaml | 48 ++++ .gitignore | 12 + DESCRIPTION | 7 + NAMESPACE | 6 + R/assessment_plots.R | 89 ++++++ R/assessment_rmd.R | 73 +++++ R/assessments.R | 85 ++++++ R/calib_object.R | 52 ++++ R/calibration_steps.R | 26 +- R/determ_end.R | 86 ++++++ R/ending.R | 30 ++ R/job.R | 23 ++ R/proposal.R | 131 +++++++++ R/proposers.R | 73 +++++ R/result.R | 122 ++++++++ R/sideload.R | 36 +++ R/state.R | 79 +++++ R/utils.R | 488 ------------------------------- R/wave.R | 45 +++ R/zzz.R | 2 +- README.Rmd | 76 ++++- README.md | 85 +++++- _pkgdown.yml | 4 + inst/rmd/assessment.Rmd | 19 ++ man/calibration_step2.Rd | 20 ++ man/calibration_step3.Rd | 15 + man/load_sideload.Rd | 2 +- man/make_shrink_proposer.Rd | 11 + man/render_assessment.Rd | 25 ++ man/save_sideload.Rd | 2 +- vignettes/.gitignore | 2 + vignettes/swfcalib.Rmd | 512 +++++++++++++++++++++++++++++++++ 34 files changed, 1794 insertions(+), 501 deletions(-) create mode 100644 .Rbuildignore create mode 100644 .github/.gitignore create mode 100644 .github/workflows/pkgdown.yaml create mode 100644 .gitignore create mode 100644 R/assessment_plots.R create mode 100644 R/assessment_rmd.R create mode 100644 R/assessments.R create mode 100644 R/calib_object.R create mode 100644 R/determ_end.R create mode 100644 R/ending.R create mode 100644 R/job.R create mode 100644 R/proposal.R create mode 100644 R/proposers.R create mode 100644 R/result.R create mode 100644 R/sideload.R create mode 100644 R/state.R create mode 100644 R/wave.R create mode 100644 _pkgdown.yml create mode 100644 inst/rmd/assessment.Rmd create mode 100644 man/calibration_step2.Rd create mode 100644 man/calibration_step3.Rd create mode 100644 man/make_shrink_proposer.Rd create mode 100644 man/render_assessment.Rd create mode 100644 vignettes/.gitignore create mode 100644 vignettes/swfcalib.Rmd diff --git a/.Rbuildignore b/.Rbuildignore new file mode 100644 index 0000000..38c0a28 --- /dev/null +++ b/.Rbuildignore @@ -0,0 +1,8 @@ +^renv$ +^renv\.lock$ +^doc$ +^Meta$ +^_pkgdown\.yml$ +^docs$ +^pkgdown$ +^\.github$ diff --git a/.github/.gitignore b/.github/.gitignore new file mode 100644 index 0000000..2d19fc7 --- /dev/null +++ b/.github/.gitignore @@ -0,0 +1 @@ +*.html diff --git a/.github/workflows/pkgdown.yaml b/.github/workflows/pkgdown.yaml new file mode 100644 index 0000000..ed7650c --- /dev/null +++ b/.github/workflows/pkgdown.yaml @@ -0,0 +1,48 @@ +# Workflow derived from https://github.com/r-lib/actions/tree/v2/examples +# Need help debugging build failures? Start at https://github.com/r-lib/actions#where-to-find-help +on: + push: + branches: [main, master] + pull_request: + branches: [main, master] + release: + types: [published] + workflow_dispatch: + +name: pkgdown + +jobs: + pkgdown: + runs-on: ubuntu-latest + # Only restrict concurrency for non-PR jobs + concurrency: + group: pkgdown-${{ github.event_name != 'pull_request' || github.run_id }} + env: + GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }} + permissions: + contents: write + steps: + - uses: actions/checkout@v3 + + - uses: r-lib/actions/setup-pandoc@v2 + + - uses: r-lib/actions/setup-r@v2 + with: + use-public-rspm: true + + - uses: r-lib/actions/setup-r-dependencies@v2 + with: + extra-packages: any::pkgdown, local::. + needs: website + + - name: Build site + run: pkgdown::build_site_github_pages(new_process = FALSE, install = FALSE) + shell: Rscript {0} + + - name: Deploy to GitHub pages 🚀 + if: github.event_name != 'pull_request' + uses: JamesIves/github-pages-deploy-action@v4.4.1 + with: + clean: false + branch: gh-pages + folder: docs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..35855c3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +inst/doc +/doc/ +/Meta/ +/renv/ +.Rproj.user +.Rhistory +.Rprofile +.RData +.Ruserdata +.DS_Store +renv/ +docs diff --git a/DESCRIPTION b/DESCRIPTION index bc9fefe..e22126f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -17,9 +17,16 @@ Imports: future.apply, slurmworkflow, fs, + lhs, dplyr Remotes: github::EpiModel/slurmworkflow Encoding: UTF-8 Roxygen: list(markdown = TRUE) RoxygenNote: 7.2.3 +Suggests: + ggplot2, + knitr, + rmarkdown +VignetteBuilder: knitr +URL: https://epimodel.github.io/swfcalib/ diff --git a/NAMESPACE b/NAMESPACE index ef78f5e..3ae68e8 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -3,5 +3,11 @@ export(calibration_step1) export(calibration_step2) export(calibration_step3) +export(determ_end_thresh) +export(determ_poly_end) export(load_sideload) +export(make_proposer_se_range) +export(make_shrink_proposer) +export(render_assessment) export(save_sideload) +importFrom(dplyr,.data) diff --git a/R/assessment_plots.R b/R/assessment_plots.R new file mode 100644 index 0000000..bf2f3d9 --- /dev/null +++ b/R/assessment_plots.R @@ -0,0 +1,89 @@ +make_rmse_plot <- function(job_assess) { + d <- job_assess$measures + ggplot2::ggplot(d, ggplot2::aes(x = .data$iteration, y = .data$rmse_mean)) + + ggplot2::geom_line() + + ggplot2::geom_ribbon( + ggplot2::aes(ymin = rmse_mean - rmse_sd, ymax = rmse_mean + rmse_sd), + alpha = 0.3 + ) + + ggplot2::scale_y_log10() + + ggplot2::theme_light() + + ggplot2::labs( + title = paste0("RMSE Evolution for: ", job_assess$infos$job_id), + x = "Iteration", + y = "RMSE \n(log10 scale)" + ) +} + +make_param_volume_plot <- function(job_assess) { + d <- job_assess$measures + ggplot2::ggplot(d, ggplot2::aes(x = .data$iteration, + y = .data$param_volume)) + + ggplot2::geom_line() + + ggplot2::scale_y_log10() + + ggplot2::theme_light() + + ggplot2::labs( + title = + paste0("Parameter Space Volume Evolution for: ", + job_assess$infos$job_id), + x = "Iteration", + y = "Parameter Space Volume \n(log10 scale)" + ) +} + +make_param_spread_plot <- function(job_assess, param) { + d <- job_assess$measures + d[["y"]] <- d[[paste0("spread__", param)]] + ggplot2::ggplot(d, ggplot2::aes(x = .data$iteration, y = .data$y)) + + ggplot2::geom_line() + + ggplot2::scale_y_log10() + + ggplot2::theme_light() + + ggplot2::labs( + title = paste0("Spread of Parameter: ", param), + x = "Iteration", + y = "Spread \n(log10 scale)" + ) +} + +make_target_err_plot <- function(job_assess, target) { + d <- job_assess$measures + d[["y"]] <- d[[paste0("mean_err__", target)]] + d[["ys"]] <- d[[paste0("sd_err__", target)]] + ggplot2::ggplot(d, ggplot2::aes(x = .data$iteration, y = .data$y)) + + ggplot2::geom_line() + + ggplot2::geom_ribbon( + ggplot2::aes(ymin = .data$y - .data$ys, ymax = .data$y + .data$ys), + alpha = 0.3 + ) + + ggplot2::geom_hline(yintercept = 0, linetype = "dashed") + + ggplot2::theme_light() + + ggplot2::labs( + title = paste0("Mean Error on: ", target), + x = "Iteration", + y = "Error \n(mean + sd)" + ) +} + +make_job_plots <- function(job_assess) { + out <- list() + infos <- job_assess$infos + out$rmse <- make_rmse_plot(job_assess) + out$volume <- make_param_volume_plot(job_assess) + out$params <- lapply(infos$params, make_param_spread_plot, job = job_assess) + names(out$params) <- infos$params + out$targets <- lapply(infos$targets, make_target_err_plot, job = job_assess) + names(out$targets) <- infos$targets + out +} + +make_wave_plots <- function(wave_assess) { + out <- lapply(wave_assess, make_job_plots) + names(out) <- vapply(wave_assess, function(x) x$infos$job_id, character(1)) + out +} + +make_assessments_plots <- function(assessments) { + out <- lapply(assessments, make_wave_plots) + names(out) <- names(assessments) + out +} diff --git a/R/assessment_rmd.R b/R/assessment_rmd.R new file mode 100644 index 0000000..d808966 --- /dev/null +++ b/R/assessment_rmd.R @@ -0,0 +1,73 @@ +make_wave_rmd <- function(assessments, wave_num) { + wave <- assessments[[paste0("wave", wave_num)]] + cat("# Wave", wave_num, "\n\n") + for (i in seq_along(wave)) { + make_job_rmd(wave[[i]]) + } +} + +make_job_rmd <- function(job_assess) { + cat("##", job_assess$infos$job_id, "\n\n") + + cat("### Targets and Parameters", "\n\n") + dplyr::tibble( + target_name = job_assess$infos$targets, + target_value = job_assess$infos$targets_val + ) |> knitr::kable(align = "ll") |> print() + + dplyr::tibble( + parameter = job_assess$infos$params, + initial_range = vapply( + job_assess$infos$params_ranges, + \(x) paste0(x[1], " - ", x[2]), + "" + ) + ) |> knitr::kable(align = "ll") |> print() + + cat("\n\n") + + cat("### Parameter Space and RMSE Evolution", "\n\n") + + make_param_volume_plot(job_assess) |> print() + make_rmse_plot(job_assess) |> print() + cat("\n\n") + + cat("### Parameter Spreads", "\n\n") + for (p in job_assess$infos$params) { + make_param_spread_plot(job_assess, p) |> print() + cat("\n\n") + } + + cat("### Target Errors", "\n\n") + for (t in job_assess$infos$targets) { + make_target_err_plot(job_assess, t) |> print() + cat("\n\n") + } + cat("\n\n") + +} + +#' Generate an html report of the auto-calibration +#' +#' The report contains descriptions of the parameters spaces and residual errors +#' over the duration of the calibration. +#' +#' @param path_to_assessments Path to an `assessments.rds` file generated by an +#' `swfcalib` process. +#' @param output_filename Name of the html report (default = "assessment.html") +#' @param output_dir Directory where to store the report (default = current +#' working directory) +#' +#' @export +render_assessment <- function(path_to_assessments, + output_filename = "assessment.html", + output_dir = NULL) { + if (is.null(output_dir)) output_dir <- getwd() + rmarkdown::render( + system.file("rmd/assessment.Rmd", package = "swfcalib"), + output_file = output_filename, + output_dir = output_dir, + knit_root_dir = getwd(), + params = list(path_to_assessments = path_to_assessments) + ) +} diff --git a/R/assessments.R b/R/assessments.R new file mode 100644 index 0000000..edb0abb --- /dev/null +++ b/R/assessments.R @@ -0,0 +1,85 @@ +update_assessments <- function(calib_object, results) { + out <- load_assessments(calib_object) + if (nrow(results) == 0) { + save_assessments(calib_object, out) + return(invisible(calib_object)) + } + + cur_wave <- paste0("wave", get_current_wave(calib_object)) + + assessments <- future.apply::future_lapply( + get_current_jobs(calib_object), + make_job_assessment, + calib_object = calib_object, + results = results + ) + + out[[cur_wave]] <- merge_wave_assements(assessments, out[[cur_wave]]) + save_assessments(calib_object, out) + invisible(calib_object) +} + +merge_wave_assements <- function(new, old) { + old <- if (is.null(old)) list() else old + for (nme in names(new)) { + new[[nme]] <- merge_job_assessment(new[[nme]], old[[nme]]) + } + new +} + +merge_job_assessment <- function(new, old) { + list( + infos = new$infos, + measures = dplyr::bind_rows(old$measures, new$measures) + ) +} + +save_assessments <- function(calib_object, assessments) { + saveRDS(assessments, get_assessments_path(calib_object)) +} + +get_assessments_path <- function(calib_object) { + fs::path(get_root_dir(calib_object), "assessments.rds") +} + +load_assessments <- function(calib_object) { + f_path <- get_assessments_path(calib_object) + if (fs::file_exists(f_path)) readRDS(f_path) else list() +} + +make_job_assessment <- function(calib_object, job, results) { + out <- list() + + out$infos <- job[c("targets", "targets_val", "params")] + out$infos$job_id <- get_job_id(job) + out$infos$params_ranges <- lapply(job$initial_proposals, range) + + current_iteration <- get_current_iteration(calib_object) + current_wave <- get_current_wave(calib_object) + d <- dplyr::filter( + results, + .data$.iteration == current_iteration, + .data$.wave == current_wave + ) + + make_rmse <- function(x, target) sqrt(mean((target - x)^2)) + iter_rmse <- apply(d[job$targets], 1, make_rmse, target = job$targets_val) + + get_spread <- function(x) diff(range(x)) + spreads <- vapply(d[job$params], get_spread, numeric(1)) + + out$measures <- dplyr::tibble( + iteration = current_iteration, + rmse_mean = mean(iter_rmse), + rmse_sd = sd(iter_rmse), + param_volume = prod(spreads) + ) + + errors <- Map(function(x, target) target - x, d[job$targets], job$targets_val) + + out$measures[paste0("spread__", names(spreads))] <- as.list(spreads) + out$measures[paste0("mean_err__", job$targets)] <- lapply(errors, mean) + out$measures[paste0("sd_err__", job$targets)] <- lapply(errors, sd) + + out +} diff --git a/R/calib_object.R b/R/calib_object.R new file mode 100644 index 0000000..162866f --- /dev/null +++ b/R/calib_object.R @@ -0,0 +1,52 @@ +get_root_dir <- function(calib_object) { + calib_object$config$root_directory +} + +make_directories <- function(calib_object) { + dirs <- c( + get_sim_result_save_dir(calib_object), + get_sideload_dir(calib_object) + ) + for (d in dirs) { + if (!fs::dir_exists(d)) fs::dir_create(d) + } + invisible(TRUE) +} + +# calib_save_path must be a variable in all SWF steps +load_calib_object <- function(calib_object) { + save_path <- get_calib_object_path(calib_object) + if (fs::file_exists(save_path)) { + calib_object <- readRDS(save_path) + } else { + message("No calib_object file found. initializing the state and returning") + calib_object <- initialize_state(calib_object) + make_directories(calib_object) + } + calib_object +} + + +get_calib_object_path <- function(calib_object) { + root_directory <- get_root_dir(calib_object) + fs::path(root_directory, "calib_object.rds") +} + +save_calib_object <- function(calib_object) { + saveRDS(calib_object, get_calib_object_path(calib_object)) +} + +initialize_state <- function(calib_object) { + calib_object$state <- list( + done = FALSE, + wave = 1, + iteration = 0, + default_proposal = calib_object$config$default_proposal + ) + calib_object <- initialize_job_ids(calib_object) + calib_object <- mutate_jobs_done_status( + calib_object, + rep(FALSE, length(get_current_jobs(calib_object))) + ) + calib_object +} diff --git a/R/calibration_steps.R b/R/calibration_steps.R index 506083b..5b636ba 100644 --- a/R/calibration_steps.R +++ b/R/calibration_steps.R @@ -12,10 +12,15 @@ calibration_step1 <- function(calib_object, n_cores) { calib_object <- process_sim_results(calib_object) results <- load_results(calib_object) + update_assessments(calib_object, results) + calib_object <- update_calibration_state(calib_object, results) if (is_calibration_complete(calib_object)) { - wrap_up_calibration(calib_object) + # When the calibration is done, skip the next step + next_step <- slurmworkflow::get_current_workflow_step() + 2 + slurmworkflow::change_next_workflow_step(next_step) + message("Calibration complete") } else { proposals <- make_proposals(calib_object, results) save_proposals(calib_object, proposals) @@ -25,6 +30,13 @@ calibration_step1 <- function(calib_object, n_cores) { print_log(calib_object) } +#' Second calibration step: run the model for each proposal +#' +#' @param batch_num the batch number for the current proposal +#' @param n_batches the total number of batches for this step +#' +#' @inheritParams calibration_step1 +#' #' @export calibration_step2 <- function(calib_object, n_cores, batch_num, n_batches) { oplan <- future::plan("multicore", workers = n_cores) @@ -54,10 +66,14 @@ calibration_step2 <- function(calib_object, n_cores, batch_num, n_batches) { } } +#' Third calibration step: Wrap up the calibration system and store the results +#' +#' Having this as a separate step allows a mail to be send at the end of the +#' calibration +#' +#' @inheritParams calibration_step1 +#' #' @export calibration_step3 <- function(calib_object) { - calib_object <- load_calib_object(calib_object) - full_results <- load_full_results(calib_object) - save_full_results(calib_object, full_results) - print_log(calib_object) + wrap_up_calibration(load_calib_object(calib_object)) } diff --git a/R/determ_end.R b/R/determ_end.R new file mode 100644 index 0000000..cd3f09a --- /dev/null +++ b/R/determ_end.R @@ -0,0 +1,86 @@ +#' @export +determ_end_thresh <- function(thresholds, n_enough) { + force(thresholds) + force(n_enough) + function(calib_object, job, results) { + # Enough close enough estimates? + p_ok <- results[, c(job$params, job$targets)] + for (j in seq_along(job$targets)) { + values <- p_ok[[ job$targets[j] ]] + target <- job$targets_val[j] + thresh <- thresholds[j] + p_ok <- p_ok[abs(values - target) < thresh, ] + } + + if (nrow(p_ok) > n_enough) { + res <- p_ok[, job$params] + # get the n_tuple where all values are the closest to the median + best <- dplyr::summarise( + res, + dplyr::across(dplyr::everything(), ~ abs(.x - median(.x))) + ) + best <- which.min(rowSums(best)) + return(res[best, ]) + } else { + return(NULL) + } + } +} + +# can be used for 1 or many params I think +# save centers in sideload using `list(centers = newp)` +#' @export +determ_poly_end <- function(threshold, poly_n = 3) { + force(threshold) + force(poly_n) + function(calib_object, job, results) { + mscale <- function(x, val) (x - mean(val)) / sd(val) + munscale <- function(x, val) x * sd(val) + mean(val) + + values <- c() + params <- c() + targets <- job$targets_val + + for (i in seq_along(job$targets)) { + values <- c(values, results[[ job$targets[i] ]]) + params <- c(params, results[[ job$params[i] ]]) + } + + complete_rows <- vctrs::vec_detect_complete(values) + values <- values[complete_rows] + params <- params[complete_rows] + + s_v <- mscale(values, values) + s_t <- mscale(targets, values) + s_p <- mscale(params, params) + + mod <- lm(s_v ~ poly(s_p, poly_n)) + loss_fun <- function(par, t) abs(predict(mod, data.frame(s_p = par)) - t) + s_newp <- vapply( + s_t, + function(t) optimize(interval = range(s_p), f = loss_fun, t = t)$minimum, + numeric(1) + ) + s_newv <- predict(mod, data.frame(s_p = s_newp)) + newp <- munscale(s_newp, params) + + oldp <- swfcalib::load_sideload(calib_object, job)$centers + swfcalib::save_sideload(calib_object, job, list(centers = newp)) + + if (is.null(oldp)) return(NULL) + + s_oldp <- mscale(oldp, params) + s_oldv <- predict(mod, data.frame(s_p = s_oldp)) + + newv <- munscale(s_newv, values) + oldv <- munscale(s_oldv, values) + + if (all(abs(oldv - newv) < threshold & abs(newv - targets) < threshold)) { + result <- as.list(newp) + names(result) <- job$params + return(dplyr::as_tibble(result)) + } else { + return(NULL) + } + } +} diff --git a/R/ending.R b/R/ending.R new file mode 100644 index 0000000..10af331 --- /dev/null +++ b/R/ending.R @@ -0,0 +1,30 @@ +get_calibrated_csv_path <- function(calib_object) { + root_directory <- get_root_dir(calib_object) + fs::path(root_directory, "calibrated.csv") +} + +get_param_type <- function(x) { + dplyr::case_when( + is.logical(x) ~ "logical", + is.numeric(x) ~ "numeric", + .default = "character" + ) +} + +save_calibrated_csv <- function(calib_object) { + long_calib <- get_long_calibrated_proposal(calib_object) + readr::write_csv(long_calib, get_calibrated_csv_path(calib_object)) +} + +wrap_up_calibration <- function(calib_object) { + calib_object <- store_calibrated_proposal(calib_object) + save_calibrated_csv(calib_object) + + full_results <- make_full_results(calib_object) + save_full_results(calib_object, full_results) + + save_calib_object(calib_object) + message("Calibration complete") + print_log(calib_object) +} + diff --git a/R/job.R b/R/job.R new file mode 100644 index 0000000..070f5ed --- /dev/null +++ b/R/job.R @@ -0,0 +1,23 @@ +initialize_job_ids <- function(calib_object) { + for (wave_i in seq_along(calib_object$waves)) { + for (job_i in seq_along(calib_object$waves[[wave_i]])) { + calib_object$waves[[wave_i]][[job_i]]$id <- + paste0("wave", wave_i, "-job", job_i) + } + } + calib_object +} + +get_current_jobs <- function(calib_object, not_done_only = FALSE) { + current_wave <- get_current_wave(calib_object) + current_jobs <- calib_object$waves[[current_wave]] + if (not_done_only) { + current_jobs <- current_jobs[!get_jobs_done_status(calib_object)] + } + current_jobs +} + +get_job_id <- function(job) { + job$id +} + diff --git a/R/proposal.R b/R/proposal.R new file mode 100644 index 0000000..98b16a8 --- /dev/null +++ b/R/proposal.R @@ -0,0 +1,131 @@ +get_proposals_path <- function(calib_object) { + current_wave_dir <- get_current_wave_dir(calib_object) + fs::path(current_wave_dir, "proposals.rds") +} + +get_default_proposal <- function(calib_object) { + calib_object$state$default_proposal +} + +get_proposal_n <- function(proposals, n) { + dplyr::filter(proposals, .data$`.proposal_index` == n) +} + +load_proposals <- function(calib_object) { + proposals_path <- get_proposals_path(calib_object) + if (fs::file_exists(proposals_path)) { + readRDS(get_proposals_path(calib_object)) + } else { + dplyr::tibble() + } +} + +save_proposals <- function(calib_object, proposals) { + saveRDS(proposals, get_proposals_path(calib_object)) + invisible(TRUE) +} + +mutate_default_proposal <- function(calib_object, default_proposal) { + mutate_calib_state(calib_object, "default_proposal", default_proposal) +} + +make_proposals <- function(calib_object, results) { + current_jobs <- get_current_jobs(calib_object, not_done_only = TRUE) + if (get_current_iteration(calib_object) == 1) { + proposals <- lapply(current_jobs, function(job) job$initial_proposals) + } else { + proposals <- future.apply::future_lapply( + current_jobs, + function(co, job, res) job$make_next_proposals(co, job, res), + res = results, + co = calib_object, + future.seed = TRUE + ) + } + proposals <- merge_proposals(proposals) + proposals <- fill_proposals(proposals, calib_object) + proposals[[".proposal_index"]] <- seq_len(nrow(proposals)) + proposals[[".wave"]] <- get_current_wave(calib_object) + proposals[[".iteration"]] <- get_current_iteration(calib_object) + dplyr::select( + proposals, + dplyr::everything(), ".proposal_index", ".wave", ".iteration" + ) +} + +merge_proposals <- function(proposals) { + max_rows <- max(vapply(proposals, nrow, numeric(1))) + proposals <- future.apply::future_lapply( + proposals, + function(d) { + missing_rows <- max_rows - nrow(d) + if (missing_rows > 0) { + d <- dplyr::bind_rows( + d, + dplyr::slice_sample(d, n = missing_rows, replace = TRUE) + ) + } + d + }, + future.seed = TRUE + ) + dplyr::bind_cols(proposals) +} + +fill_proposals <- function(proposals, calib_object) { + default_proposal <- get_default_proposal(calib_object) + missing_cols <- setdiff(names(default_proposal), names(proposals)) + proposals <- merge_proposals(list( + proposals, + default_proposal[, missing_cols] + )) + proposals <- adjust_proposal_number(proposals, calib_object) + + proposals +} + +adjust_proposal_number <- function(proposals, calib_object) { + n_sims <- get_n_sims(calib_object) + if (nrow(proposals) > n_sims) { + proposals <- dplyr::slice_sample(proposals, n = n_sims, replace = TRUE) + } else if (nrow(proposals) < n_sims) { + missing_proposals <- n_sims - nrow(proposals) + proposals <- dplyr::bind_rows( + proposals, + dplyr::slice_sample(proposals, n = missing_proposals, replace = TRUE) + ) + } + proposals +} + +get_calibrated_proposal <- function(calib_object) { + calib_object$calibrated_proposal +} + +store_calibrated_proposal <- function(calib_object) { + calib_object$calibrated_proposal <- get_default_proposal(calib_object) + calib_object +} + +get_long_calibrated_proposal <- function(calib_object) { + calibrated_proposal <- get_calibrated_proposal(calib_object) + long_calib <- tidyr::pivot_longer( + calibrated_proposal, cols = dplyr::everything(), + names_to = "param", values_to = "value" + ) + long_calib$type <- vapply(calibrated_proposal, get_param_type, "") + long_calib +} + +update_default_proposal <- function(calib_object, job_results) { + default_proposal <- get_default_proposal(calib_object) + for (i in seq_along(job_results)) { + result <- job_results[[i]] + if (!is.null(result)) { + shared_params <- intersect(names(result), names(default_proposal)) + default_proposal[, shared_params] <- result[, shared_params] + } + } + mutate_default_proposal(calib_object, default_proposal) +} + diff --git a/R/proposers.R b/R/proposers.R new file mode 100644 index 0000000..1eac226 --- /dev/null +++ b/R/proposers.R @@ -0,0 +1,73 @@ +#' @export +make_proposer_se_range <- function(n_new, retain_prop = 0.2) { + force(n_new) + force(retain_prop) + function(calib_object, job, results) { + p_ranges <- list() + values <- results[job$targets] + params <- results[job$params] + + params[[".SE_score"]] <- 0 + for (i in seq_along(job$targets)) { + t <- job$targets_val[i] + vs <- values[[i]] + params[[".SE_score"]] <- params[[".SE_score"]] + (vs - t)^2 + } + + params <- dplyr::arrange(params, .data$.SE_score) + params <- dplyr::select(params, - .data$.SE_score) + params <- head(params, ceiling(nrow(params) * retain_prop)) + + for (i in seq_along(job$targets)) { + p_ranges[[i]] <- range(params[[i]]) + } + + proposals <- lhs::randomLHS(n_new, length(job$params)) + for (i in seq_along(job$params)) { + spread <- p_ranges[[i]][2] - p_ranges[[i]][1] + rmin <- p_ranges[[i]][1] + proposals[, i] <- proposals[, i] * spread + rmin + } + colnames(proposals) <- job$params + dplyr::as_tibble(proposals) + } +} + +#' Shrink the range by a shrink factor (default 2) +#' +#' If centers are provided in a sideload, use that +#' +#' @export +make_shrink_proposer <- function(n_new, shrink = 2) { + force(n_new) + force(shrink) + function(calib_object, job, results) { + centers <- swfcalib::load_sideload(calib_object, job)$centers + if (is.null(centers)) { + stop("No centers were provided for shrinkage, abort!") + } + + outs <- list() + for (i in seq_along(job$params)) { + tar_range <- range( + results[[job$params[i]]][ + results[[".iteration"]] == max(results[[".iteration"]]) + ] + ) + spread <- (tar_range[2] - tar_range[1]) / shrink / 2 + + proposals <- seq( + max(centers[i] - spread, tar_range[1]), + min(centers[i] + spread, tar_range[2]), + length.out = n_new + ) + + proposals <- sample(proposals) + + out <- list(proposals) + names(out) <- job$params[i] + outs[[i]] <- dplyr::as_tibble(out) + } + dplyr::bind_cols(outs) + } +} diff --git a/R/result.R b/R/result.R new file mode 100644 index 0000000..33e6f5b --- /dev/null +++ b/R/result.R @@ -0,0 +1,122 @@ +# Process the simulations, save the results and clear the individual simulation +# results. +# Discard the iteration if some results are missing +process_sim_results <- function(calib_object) { + # Early return if no simulations has been run for this wave yet + if (get_current_iteration(calib_object) == 0) return(calib_object) + + new_results <- merge_sim_results(calib_object) + clear_sim_results(calib_object) + # If less results than expected, discard this iteration + # (for instance if the cluster stopped during the step 2) + if (nrow(new_results) < get_n_sims(calib_object)) { + calib_object <- decrement_iteration(calib_object) + } else { + save_results(calib_object, new_results) + } + calib_object +} + +merge_sim_results <- function(calib_object) { + proposals <- load_proposals(calib_object) + sim_results <- load_sim_results(calib_object) + # if multiple sim per proposal -> keep them all + dplyr::right_join( + proposals, sim_results, + by = c(".proposal_index", ".wave", ".iteration") + ) +} + +save_results <- function(calib_object, new_results) { + old_result <- load_results(calib_object) + results <- dplyr::bind_rows(old_result, new_results) + saveRDS(results, get_results_path(calib_object)) + invisible(TRUE) +} + +load_results <- function(calib_object) { + results_path <- get_results_path(calib_object) + if (fs::file_exists(results_path)) { + readRDS(get_results_path(calib_object)) + } else { + dplyr::tibble() + } +} + +get_results_path <- function(calib_object) { + fs::path(get_current_wave_dir(calib_object), "results.rds") +} + +save_sim_result <- function(calib_object, sim_results, i, proposal) { + save_path <- get_sim_result_save_path(calib_object, i) + sim_results[[".wave"]] <- get_current_wave(calib_object) + sim_results[[".iteration"]] <- get_current_iteration(calib_object) + sim_results[[".proposal_index"]] <- proposal[[".proposal_index"]] + saveRDS(sim_results, save_path) +} + +load_sim_results <- function(calib_object) { + sim_result_save_dir <- get_sim_result_save_dir(calib_object) + sim_result_files <- fs::dir_ls(sim_result_save_dir) + if (length(sim_result_files) == 0) { + dplyr::tibble( + .proposal_index = numeric(0), + .iteration = numeric(0), + .wave = numeric(0) + ) + } else { + dplyr::bind_rows(future.apply::future_lapply(sim_result_files, readRDS)) + } +} + +clear_sim_results <- function(calib_object) { + sim_result_save_dir <- get_sim_result_save_dir(calib_object) + sim_result_files <- fs::dir_ls(sim_result_save_dir) + fs::file_delete(sim_result_files) + invisible(TRUE) +} + +get_sim_result_save_dir <- function(calib_object) { + fs::path(get_current_wave_dir(calib_object), "sim_results") +} + +get_sim_result_save_path <- function(calib_object, i) { + fs::path(get_sim_result_save_dir(calib_object), paste0(i, ".rds")) +} + +# `job$job_results` is a function that return NULL if the calibration job is not +# finished and a 1-row `data.frame` of calibrated parameters otherwise. +get_jobs_results <- function(calib_object, results) { + future.apply::future_lapply( + get_current_jobs(calib_object), + function(co, job, res) job$get_result(co, job, res), + res = results, + co = calib_object, + future.seed = TRUE + ) +} + +# Full results: concatenation of all waves results +make_full_results <- function(calib_object) { + current_wave <- get_current_wave(calib_object) - 1 + wave_results <- lapply( + seq_len(current_wave), + function(wave) { + calib_object <- mutate_calib_state(calib_object, "wave", wave) + load_results(calib_object) + } + ) + dplyr::bind_rows(wave_results) +} + +save_full_results <- function(calib_object, full_results) { + full_results_path <- get_full_results_path(calib_object) + saveRDS(full_results, full_results_path) +} + +get_full_results_path <- function(calib_object) { + root_directory <- get_root_dir(calib_object) + fs::path(root_directory, "full_results.rds") +} + + diff --git a/R/sideload.R b/R/sideload.R new file mode 100644 index 0000000..cad73ed --- /dev/null +++ b/R/sideload.R @@ -0,0 +1,36 @@ +# A sideload is data stored by a job. It can be used to keep track of the +# previous calibration iteration or to communicate between the proposer and the +# completion checker. + +#' Save some data to be reused by the calibration process +#' @export +save_sideload <- function(calib_object, job, x) { + sl_path <- get_sideload_path(calib_object, job) + saveRDS(x, sl_path) +} + +#' Read some data saved to be reused by the calibration process +#' @export +load_sideload <- function(calib_object, job) { + sl_path <- get_sideload_path(calib_object, job) + if (!fs::file_exists(sl_path)) { + return(NULL) + } else { + readRDS(sl_path) + } +} + +get_sideload_path <- function(calib_object, job) { + sideload_directory <- get_sideload_dir(calib_object) + fs::path(sideload_directory, paste0(get_job_id(job), ".rds")) +} + +get_sideload_dir <- function(calib_object) { + root_directory <- get_root_dir(calib_object) + fs::path(root_directory, "sideloads") +} + +clear_sideloads <- function(calib_object) { + sideload_dir <- get_sideload_dir(calib_object) + if (fs::dir_exists(sideload_dir)) fs::dir_delete(sideload_dir) +} diff --git a/R/state.R b/R/state.R new file mode 100644 index 0000000..aa839e1 --- /dev/null +++ b/R/state.R @@ -0,0 +1,79 @@ +get_current_iteration <- function(calib_object) { + calib_object$state$iteration +} + +get_max_iteration <- function(calib_object) { + calib_object$config$max_iteration +} + +get_n_sims <- function(calib_object) { + calib_object$config$n_sims +} + +get_seq_sim <- function(calib_object, batch_num, batch_size) { + n_sims <- get_n_sims(calib_object) + lower_bound <- (batch_num - 1) * batch_size + 1 + upper_bound <- min((batch_num * batch_size), n_sims) + seq(lower_bound, upper_bound) +} + +get_batch_numbers <- function(calib_object, batch_size) { + n_batch <- ceiling(get_n_sims(calib_object) / batch_size) + seq_len(n_batch) +} + +increment_iteration <- function(calib_object) { + current_iteration <- get_current_iteration(calib_object) + mutate_calib_state(calib_object, "iteration", current_iteration + 1) +} + +decrement_iteration <- function(calib_object) { + current_iteration <- get_current_iteration(calib_object) + mutate_calib_state(calib_object, "iteration", current_iteration - 1) +} + +mutate_done_status <- function(calib_object, done) { + mutate_calib_state(calib_object, "done", done) +} + +mutate_calib_state <- function(calib_object, item, value) { + calib_object$state[[item]] <- value + calib_object +} + +update_done_status <- function(calib_object, job_results) { + done <- !any(vapply(job_results, is.null, logical(1))) + mutate_done_status(calib_object, done) +} + +mutate_jobs_done_status <- function(calib_object, done) { + mutate_calib_state(calib_object, "jobs_done", done) +} + +update_jobs_done_status <- function(calib_object, job_results) { + done <- !vapply(job_results, is.null, logical(1)) + mutate_jobs_done_status(calib_object, done) +} + +get_jobs_done_status <- function(calib_object) { + calib_object$state$jobs_done +} + +update_calibration_state <- function(calib_object, results) { + # Do not check the results on the first iteration + if (get_current_iteration(calib_object) > 0) { + jobs_results <- get_jobs_results(calib_object, results) + calib_object <- update_done_status(calib_object, jobs_results) + calib_object <- update_default_proposal(calib_object, jobs_results) + } + calib_object <- update_wave_iteration(calib_object) + calib_object +} + +is_valid_iteration <- function(calib_object) { + get_current_iteration(calib_object) <= get_max_iteration(calib_object) +} + +is_calibration_complete <- function(calib_object) { + !is_valid_wave(calib_object) +} diff --git a/R/utils.R b/R/utils.R index f9424f7..5ea790b 100644 --- a/R/utils.R +++ b/R/utils.R @@ -1,458 +1,7 @@ -# Getters ---------------------------------------------------------------------- -get_root_dir <- function(calib_object) { - calib_object$config$root_directory -} - -get_current_wave <- function(calib_object) { - calib_object$state$wave -} - -get_current_jobs <- function(calib_object) { - current_wave <- get_current_wave(calib_object) - calib_object$waves[[current_wave]] -} - -get_current_iteration <- function(calib_object) { - calib_object$state$iteration -} - -get_max_iteration <- function(calib_object) { - calib_object$config$max_iteration -} - -get_results_path <- function(calib_object) { - fs::path(get_current_wave_dir(calib_object), "results.rds") -} - -get_current_wave_dir <- function(calib_object) { - root_directory <- get_root_dir(calib_object) - current_wave <- get_current_wave(calib_object) - fs::path(root_directory, "waves", current_wave) -} - -get_proposals_path <- function(calib_object) { - current_wave_dir <- get_current_wave_dir(calib_object) - fs::path(current_wave_dir, "proposals.rds") -} - -get_default_proposal <- function(calib_object) { - calib_object$state$default_proposal -} - -get_proposal_n <- function(proposals, n) { - dplyr::filter(proposals, .data$`.proposal_index` == n) -} - -get_n_sims <- function(calib_object) { - calib_object$config$n_sims -} - -get_save_path <- function(calib_object) { - root_directory <- get_root_dir(calib_object) - fs::path(root_directory, "calib_object.rds") -} - -get_sim_result_save_dir <- function(calib_object) { - fs::path(get_current_wave_dir(calib_object), "sim_results") -} - -get_sim_result_save_path <- function(calib_object, i) { - fs::path(get_sim_result_save_dir(calib_object), paste0(i, ".rds")) -} - -get_seq_sim <- function(calib_object, batch_num, batch_size) { - n_sims <- get_n_sims(calib_object) - lower_bound <- (batch_num - 1) * batch_size + 1 - upper_bound <- min((batch_num * batch_size), n_sims) - seq(lower_bound, upper_bound) -} - -get_batch_numbers <- function(calib_object, batch_size) { - n_batch <- ceiling(get_n_sims(calib_object) / batch_size) - seq_len(n_batch) -} - -get_full_results_path <- function(calib_object) { - root_directory <- get_root_dir(calib_object) - fs::path(root_directory, "full_results.rds") -} - -get_sideload_dir <- function(calib_object) { - root_directory <- get_root_dir(calib_object) - fs::path(root_directory, "sideloads") -} - -get_sideload_path <- function(calib_object, job) { - sideload_directory <- get_sideload_dir(calib_object) - fs::path(sideload_directory, paste0(get_job_id(job), ".rds")) -} - -get_job_id <- function(job) { - job$id -} - -# # Checkers ------------------------------------------------------------------- -is_valid_iteration <- function(calib_object) { - get_current_iteration(calib_object) <= get_max_iteration(calib_object) -} - -is_valid_wave <- function(calib_object) { - get_current_wave(calib_object) <= length(calib_object$waves) -} - -is_calibration_complete <- function(calib_object) { - !is_valid_wave(calib_object) -} - -is_wave_done <- function(calib_object) { - calib_object$state$done -} - -# # Input-Output --------------------------------------------------------------- -make_folders <- function(calib_object) { - dirs <- c( - get_sim_result_save_dir(calib_object), - get_sideload_dir(calib_object) - ) - for (d in dirs) { - if (!fs::dir_exists(d)) fs::dir_create(d) - } - invisible(TRUE) -} - -# calib_save_path must be a variable in all SWF steps -load_calib_object <- function(calib_object) { - save_path <- get_save_path(calib_object) - if (fs::file_exists(save_path)) { - calib_object <- readRDS(save_path) - } else { - message("No calib_object file found. initializing the state and returning") - calib_object <- initialize_state(calib_object) - make_folders(calib_object) - } - calib_object -} - -load_results <- function(calib_object) { - results_path <- get_results_path(calib_object) - if (fs::file_exists(results_path)) { - readRDS(get_results_path(calib_object)) - } else { - dplyr::tibble() - } -} - -save_results <- function(calib_object, new_results) { - old_result <- load_results(calib_object) - results <- dplyr::bind_rows(old_result, new_results) - saveRDS(results, get_results_path(calib_object)) - invisible(TRUE) -} - -load_proposals <- function(calib_object) { - proposals_path <- get_proposals_path(calib_object) - if (fs::file_exists(proposals_path)) { - readRDS(get_proposals_path(calib_object)) - } else { - dplyr::tibble() - } -} - -save_proposals <- function(calib_object, proposals) { - saveRDS(proposals, get_proposals_path(calib_object)) - invisible(TRUE) -} - -save_calib_object <- function(calib_object) { - saveRDS(calib_object, get_save_path(calib_object)) -} - -save_sim_result <- function(calib_object, sim_results, i, proposal) { - save_path <- get_sim_result_save_path(calib_object, i) - sim_results[[".wave"]] <- get_current_wave(calib_object) - sim_results[[".iteration"]] <- get_current_iteration(calib_object) - sim_results[[".proposal_index"]] <- proposal[[".proposal_index"]] - saveRDS(sim_results, save_path) -} - -load_sim_results <- function(calib_object) { - sim_result_save_dir <- get_sim_result_save_dir(calib_object) - sim_result_files <- fs::dir_ls(sim_result_save_dir) - if (length(sim_result_files) == 0) { - dplyr::tibble( - .proposal_index = numeric(0), - .iteration = numeric(0), - .wave = numeric(0) - ) - } else { - dplyr::bind_rows(future.apply::future_lapply(sim_result_files, readRDS)) - } -} - -clear_sim_results <- function(calib_object) { - sim_result_save_dir <- get_sim_result_save_dir(calib_object) - sim_result_files <- fs::dir_ls(sim_result_save_dir) - fs::file_delete(sim_result_files) - invisible(TRUE) -} - -# function called after the calibration is finished -# so wave = max_wave + 1 -load_full_results <- function(calib_object) { - current_wave <- get_current_wave(calib_object) - 1 - wave_results <- lapply( - seq_len(current_wave), - function(wave) { - calib_object <- mutate_calib_state(calib_object, "wave", wave) - load_results(calib_object) - } - ) - dplyr::bind_rows(wave_results) -} - -save_full_results <- function(calib_object, full_results) { - full_results_path <- get_full_results_path(calib_object) - saveRDS(full_results, full_results_path) -} - -#' Save some data to be reused by the calibration process -#' @export -save_sideload <- function(calib_object, job, x) { - sl_path <- get_sideload_path(calib_object, job) - saveRDS(x, sl_path) -} - -#' Read some data saved to be reused by the calibration process -#' @export -load_sideload <- function(calib_object, job) { - sl_path <- get_sideload_path(calib_object, job) - if (!fs::file_exists(sl_path)) { - return(NULL) - } else { - readRDS(sl_path) - } -} - -# # Updaters ------------------------------------------------------------------- - -increment_iteration <- function(calib_object) { - current_iteration <- get_current_iteration(calib_object) - mutate_calib_state(calib_object, "iteration", current_iteration + 1) -} - -decrement_iteration <- function(calib_object) { - current_iteration <- get_current_iteration(calib_object) - mutate_calib_state(calib_object, "iteration", current_iteration - 1) -} - -increment_wave <- function(calib_object) { - current_wave <- get_current_wave(calib_object) - calib_object <- mutate_done_status(calib_object, FALSE) - calib_object <- mutate_calib_state(calib_object, "iteration", 1) - calib_object <- mutate_calib_state(calib_object, "wave", current_wave + 1) - make_folders(calib_object) - calib_object -} - -initialize_state <- function(calib_object) { - calib_object$state <- list() - calib_object$state$done <- FALSE - calib_object$state$wave <- 1 - calib_object$state$iteration <- 0 - calib_object$state$default_proposal <- calib_object$config$default_proposal - - calib_object <- initialize_job_ids(calib_object) - - calib_object -} - -initialize_job_ids <- function(calib_object) { - for (wave_i in seq_along(calib_object$waves)) { - for (job_i in seq_along(calib_object$waves[[wave_i]])) { - calib_object$waves[[wave_i]][[job_i]]$id <- - paste0("waves", wave_i, "-job", job_i) - } - } - calib_object -} - -mutate_done_status <- function(calib_object, done) { - mutate_calib_state(calib_object, "done", done) -} - -mutate_default_proposal <- function(calib_object, default_proposal) { - mutate_calib_state(calib_object, "default_proposal", default_proposal) -} - -mutate_calib_state <- function(calib_object, item, value) { - calib_object$state[[item]] <- value - calib_object -} - - -# Proposals -------------------------------------------------------------------- -make_proposals <- function(calib_object, results) { - current_jobs <- get_current_jobs(calib_object) - if (get_current_iteration(calib_object) == 1) { - proposals <- lapply(current_jobs, function(job) job$initial_proposals) - } else { - proposals <- future.apply::future_lapply( - current_jobs, - function(co, job, res) job$make_next_proposals(co, job, res), - res = results, - co = calib_object, - future.seed = TRUE - ) - } - proposals <- merge_proposals(proposals) - proposals <- fill_proposals(proposals, calib_object) - proposals[[".proposal_index"]] <- seq_len(nrow(proposals)) - proposals[[".wave"]] <- get_current_wave(calib_object) - proposals[[".iteration"]] <- get_current_iteration(calib_object) - dplyr::select( - proposals, - dplyr::everything(), ".proposal_index", ".wave", ".iteration" - ) -} - -merge_proposals <- function(proposals) { - max_rows <- max(vapply(proposals, nrow, numeric(1))) - proposals <- future.apply::future_lapply( - proposals, - function(d) { - missing_rows <- max_rows - nrow(d) - if (missing_rows > 0) { - d <- dplyr::bind_rows( - d, - dplyr::slice_sample(d, n = missing_rows, replace = TRUE) - ) - } - d - }, - future.seed = TRUE - ) - dplyr::bind_cols(proposals) -} - -fill_proposals <- function(proposals, calib_object) { - default_proposal <- get_default_proposal(calib_object) - missing_cols <- setdiff(names(default_proposal), names(proposals)) - proposals <- merge_proposals(list( - proposals, - default_proposal[, missing_cols] - )) - proposals <- adjust_proposal_number(proposals, calib_object) - - proposals -} - -adjust_proposal_number <- function(proposals, calib_object) { - n_sims <- get_n_sims(calib_object) - if (nrow(proposals) > n_sims) { - proposals <- dplyr::slice_sample(proposals, n = n_sims, replace = TRUE) - } else if (nrow(proposals) < n_sims) { - missing_proposals <- n_sims - nrow(proposals) - proposals <- dplyr::bind_rows( - proposals, - dplyr::slice_sample(proposals, n = missing_proposals, replace = TRUE) - ) - } - proposals -} - -# # Simulator ------------------------------------------------------------------ run_simulation <- function(calib_object, proposal) { calib_object$config$simulator(proposal) } -# # Results -------------------------------------------------------------------- -merge_results <- function(calib_object) { - proposals <- load_proposals(calib_object) - sim_results <- load_sim_results(calib_object) - # if multiple sim per proposal -> keep them all - dplyr::right_join( - proposals, sim_results, - by = c(".proposal_index", ".wave", ".iteration") - ) -} - -# # High level functions ------------------------------------------------------- -process_sim_results <- function(calib_object) { - if (get_current_iteration(calib_object) != 0) { - new_results <- merge_results(calib_object) - clear_sim_results(calib_object) - - # If less results than expected, discard this iteration - # (for instance if the cluster stopped during the step 2) - if (nrow(new_results) < get_n_sims(calib_object)) { - calib_object <- decrement_iteration(calib_object) - } else { - save_results(calib_object, new_results) - } - } - calib_object -} - -get_jobs_results <- function(calib_object, results) { - future.apply::future_lapply( - get_current_jobs(calib_object), - function(co, job, res) job$get_result(co, job, res), - res = results, - co = calib_object, - future.seed = TRUE - ) -} - -update_done_status <- function(calib_object, job_results) { - done <- !any(vapply(job_results, is.null, logical(1))) - mutate_done_status(calib_object, done) -} - -update_default_proposal <- function(calib_object, job_results) { - default_proposal <- get_default_proposal(calib_object) - for (i in seq_along(job_results)) { - result <- job_results[[i]] - if (!is.null(result)) { - shared_params <- intersect(names(result), names(default_proposal)) - default_proposal[, shared_params] <- result[, shared_params] - } - } - mutate_default_proposal(calib_object, default_proposal) -} - -update_wave_iteration <- function(calib_object) { - if (is_wave_done(calib_object)) { - calib_object <- increment_wave(calib_object) - } else { - calib_object <- increment_iteration(calib_object) - if (!is_valid_iteration(calib_object)) { - save_calib_object(calib_object) - stop("Max iteration exceded, stopping.") - } - } - calib_object -} - -update_calibration_state <- function(calib_object, results) { - # Do not check the results on the first iteration - if (get_current_iteration(calib_object) > 0) { - jobs_results <- get_jobs_results(calib_object, results) - calib_object <- update_done_status(calib_object, jobs_results) - calib_object <- update_default_proposal(calib_object, jobs_results) - } - calib_object <- update_wave_iteration(calib_object) - calib_object -} - -wrap_up_calibration <- function(calib_object) { - calib_object <- store_calibrated_proposal(calib_object) - save_calibrated_csv(calib_object) - save_calib_object(calib_object) - message("Calibration complete") - next_step <- slurmworkflow::get_current_workflow_step() + 2 - slurmworkflow::change_next_workflow_step(next_step) -} - print_log <- function(calib_object) { cat("Current state:\n") cat("\tWave: ", get_current_wave(calib_object), "\n") @@ -463,40 +12,3 @@ print_log <- function(calib_object) { cat("\t", nm, ": ", default_proposal[[nm]][1], "\n") } } - -get_calibrated_csv_path <- function(calib_object) { - root_directory <- get_root_dir(calib_object) - fs::path(root_directory, "calibrated.csv") -} - -get_param_type <- function(x) { - dplyr::case_when( - is.logical(x) ~ "logical", - is.numeric(x) ~ "numeric", - .default = "character" - ) -} - -get_calibrated_proposal <- function(calib_object) { - calib_object$calibrated_proposal -} - -store_calibrated_proposal <- function(calib_object) { - calib_object$calibrated_proposal <- get_default_proposal(calib_object) - calib_object -} - -get_long_calibrated_proposal <- function(calib_object) { - calibrated_proposal <- get_calibrated_proposal(calib_object) - long_calib <- tidyr::pivot_longer( - calibrated_proposal, cols = dplyr::everything(), - names_to = "param", values_to = "value" - ) - long_calib$type <- vapply(calibrated_proposal, get_param_type, "") - long_calib -} - -save_calibrated_csv <- function(calib_object) { - long_calib <- get_long_calibrated_proposal(calib_object) - readr::write_csv(long_calib, get_calibrated_csv_path(calib_object)) -} diff --git a/R/wave.R b/R/wave.R new file mode 100644 index 0000000..f311914 --- /dev/null +++ b/R/wave.R @@ -0,0 +1,45 @@ +get_current_wave <- function(calib_object) { + calib_object$state$wave +} + +get_current_wave_dir <- function(calib_object) { + root_directory <- get_root_dir(calib_object) + current_wave <- get_current_wave(calib_object) + fs::path(root_directory, "waves", current_wave) +} + +is_valid_wave <- function(calib_object) { + get_current_wave(calib_object) <= length(calib_object$waves) +} + +is_wave_done <- function(calib_object) { + calib_object$state$done +} + +update_wave_iteration <- function(calib_object) { + if (is_wave_done(calib_object)) { + calib_object <- increment_wave(calib_object) + } else { + calib_object <- increment_iteration(calib_object) + if (!is_valid_iteration(calib_object)) { + save_calib_object(calib_object) + stop("Max iteration exceded, stopping.") + } + } + calib_object +} + +increment_wave <- function(calib_object) { + current_wave <- get_current_wave(calib_object) + calib_object <- mutate_done_status(calib_object, FALSE) + calib_object <- mutate_calib_state(calib_object, "iteration", 1) + calib_object <- mutate_calib_state(calib_object, "wave", current_wave + 1) + calib_object <- mutate_jobs_done_status( + calib_object, + rep(FALSE, length(get_current_jobs(calib_object))) + ) + clear_sideloads(calib_object) + make_directories(calib_object) + calib_object +} + diff --git a/R/zzz.R b/R/zzz.R index 2357244..5c9044e 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -1,2 +1,2 @@ -#' @ImportFrom dplyr .data +#' @importFrom dplyr .data NULL diff --git a/README.Rmd b/README.Rmd index 52c306c..d9f4522 100644 --- a/README.Rmd +++ b/README.Rmd @@ -22,8 +22,80 @@ knitr::opts_chunk$set( [![codecov](https://codecov.io/gh/EpiModel/swfcalib/branch/main/graph/badge.svg?token=eo2r0HeP8Z)](https://codecov.io/gh/EpiModel/swfcalib) -**swfcalib** builds upon [slurmworkflow](https://github.com/EpiModel/slurmworkflow) -to provide an automatic calibration framework on a Slurm equiped HPC. +`swfcalib` is designed to automate the calibration of complex multi-parameters +multi-outputs models ran on [Slurm](https://slurm.schedmd.com/) equipped HPC. + +## Should you use `swfcalib` + +`swfcalib` is not the simplest calibration system to set up. It was designed to +solve a specific set of problems listed below. If you already have a system that +works well you should probably not consider `swfcalib`. + +However, if you have the following issues, `swfcalib` may be for you: + +- your model have many parameters to calibrate and produces many outputs. +- you have an idea of how some parameters influence only some outputs (see + example below) +- your outputs are very noisy +- you cannot or don't want to have a [Slurm](https://slurm.schedmd.com/) job + that runs continuously for the whole duration of the calibration process + (a few days to several weeks) + +By using [`slurmworkflow`](https://github.com/EpiModel/slurmworkflow), +`swfcalib` can implement a loop like behavior in [Slurm](https://slurm.schedmd.com/) +without the need of a pilot job staying alive for the whole duration of the +calibration process (sometimes several weeks). + +`swfcalib` was created to calibrate epidemic models like +[this one](https://github.com/EpiModel/DoxyPEP). +These models have around 20 free parameters to calibrate and 20 outcomes to be +matched to observed targets. However, knowledge of the model allow us to define +many conditional one to one relationship between parameters and outcomes. +`swfcalib` can use this knowledge to split the calibration into multiple waves +of simpler parallel calibration jobs. + +## Design + +The calibration process follows a proposal, validation loop. Where +the model is run with a set of parameters and new proposals are made according +to the results. This goes on until the model is fully calibrated. + +Terminology: + +- *model*: a function taking a *proposal* and returning some *outcomes*. +- *proposal*: a set of parameters values to be passed to the *model* +- *outcomes*: the output of a *model* run for a given *proposal*. +- *job*: a set of parameters to be calibrated by using a subset of the *outcomes* +- *wave*: a set of independent *jobs* that can be calibrated using the same run + of the model. + +The specificity of `swfcalib` lies in the ability to try many proposal at once +on an HPC, and to set up this proposal validation loop without a long running +orchestrating job (pilot job). + +The calibration is split into *waves*. Each wave can contains multiple *jobs*, +each focusing on a set of parameters and related outcomes. This permits the +parallel calibration of multiple independent parameters. At each +*iteration*, the model is ran once per proposal, and each job gather the +outcomes it needs to make its next proposal. Once all *jobs* in a *wave* +are done, i.e. the parameters they govern are calibrated, the system moves +to the next *wave*. This allows the sequential calibration of parameters +when strong assumption about their independence can be made. + +This design was crafted for the very noisy models where many replications are +needed and where most parameters are independent or conditionally independent. + +## Versatility + +`swfcalib` makes no assumptions on how a set of parameters should be changed +and how the quality of fit is assessed. It is up to the user to provide the +mechanism to: + +- produce the next *proposals* to be tested +- assess the quality of the fit + +`swfcalib` provides some pre-built functions for this. See the getting started +vignette. ## Installation diff --git a/README.md b/README.md index 1daa94a..1bef9ac 100644 --- a/README.md +++ b/README.md @@ -13,9 +13,88 @@ status](https://www.r-pkg.org/badges/version/slurmworkflow)](https://CRAN.R-proj [![codecov](https://codecov.io/gh/EpiModel/swfcalib/branch/main/graph/badge.svg?token=eo2r0HeP8Z)](https://codecov.io/gh/EpiModel/swfcalib) -**swfcalib** builds upon -[slurmworkflow](https://github.com/EpiModel/slurmworkflow) to provide an -automatic calibration framework on a Slurm equiped HPC. +`swfcalib` is designed to automate the calibration of complex +multi-parameters multi-outputs models ran on +[Slurm](https://slurm.schedmd.com/) equipped HPC. + +## Should you use `swfcalib` + +`swfcalib` is not the simplest calibration system to set up. It was +designed to solve a specific set of problems listed below. If you +already have a system that works well you should probably not consider +`swfcalib`. + +However, if you have the following issues, `swfcalib` may be for you: + +- your model have many parameters to calibrate and produces many + outputs. +- you have an idea of how some parameters influence only some outputs + (see example below) +- your outputs are very noisy +- you cannot or don’t want to have a [Slurm](https://slurm.schedmd.com/) + job that runs continuously for the whole duration of the calibration + process (a few days to several weeks) + +By using [`slurmworkflow`](https://github.com/EpiModel/slurmworkflow), +`swfcalib` can implement a loop like behavior in +[Slurm](https://slurm.schedmd.com/) without the need of a pilot job +staying alive for the whole duration of the calibration process +(sometimes several weeks). + +`swfcalib` was created to calibrate epidemic models like [this +one](https://github.com/EpiModel/DoxyPEP). These models have around 20 +free parameters to calibrate and 20 outcomes to be matched to observed +targets. However, knowledge of the model allow us to define many +conditional one to one relationship between parameters and outcomes. +`swfcalib` can use this knowledge to split the calibration into multiple +waves of simpler parallel calibration jobs. + +## Design + +The calibration process follows a proposal, validation loop. Where the +model is run with a set of parameters and new proposals are made +according to the results. This goes on until the model is fully +calibrated. + +Terminology: + +- *model*: a function taking a *proposal* and returning some *outcomes*. +- *proposal*: a set of parameters values to be passed to the *model* +- *outcomes*: the output of a *model* run for a given *proposal*. +- *job*: a set of parameters to be calibrated by using a subset of the + *outcomes* +- *wave*: a set of independent *jobs* that can be calibrated using the + same run of the model. + +The specificity of `swfcalib` lies in the ability to try many proposal +at once on an HPC, and to set up this proposal validation loop without a +long running orchestrating job (pilot job). + +The calibration is split into *waves*. Each wave can contains multiple +*jobs*, each focusing on a set of parameters and related outcomes. This +permits the parallel calibration of multiple independent parameters. At +each *iteration*, the model is ran once per proposal, and each job +gather the outcomes it needs to make its next proposal. Once all *jobs* +in a *wave* are done, i.e. the parameters they govern are calibrated, +the system moves to the next *wave*. This allows the sequential +calibration of parameters when strong assumption about their +independence can be made. + +This design was crafted for the very noisy models where many +replications are needed and where most parameters are independent or +conditionally independent. + +## Versatility + +`swfcalib` makes no assumptions on how a set of parameters should be +changed and how the quality of fit is assessed. It is up to the user to +provide the mechanism to: + +- produce the next *proposals* to be tested +- assess the quality of the fit + +`swfcalib` provides some pre-built functions for this. See the getting +started vignette. ## Installation diff --git a/_pkgdown.yml b/_pkgdown.yml new file mode 100644 index 0000000..4612429 --- /dev/null +++ b/_pkgdown.yml @@ -0,0 +1,4 @@ +url: https://epimodel.github.io/swfcalib/ +template: + bootstrap: 5 + diff --git a/inst/rmd/assessment.Rmd b/inst/rmd/assessment.Rmd new file mode 100644 index 0000000..bb74904 --- /dev/null +++ b/inst/rmd/assessment.Rmd @@ -0,0 +1,19 @@ +--- +title: "Auto Calibration Assessment" +output: + html_document: + toc: true + toc_float: + collapse: true +--- + +```{r setup, include=FALSE} +knitr::opts_chunk$set(echo = TRUE) +``` + +```{r results='asis', warning=FALSE, message=FALSE, echo=FALSE} +assessments <- readRDS(path_to_assessments) +for (i in seq_along(assessments)) { + make_wave_rmd(assessments, i) +} +``` diff --git a/man/calibration_step2.Rd b/man/calibration_step2.Rd new file mode 100644 index 0000000..a4f890e --- /dev/null +++ b/man/calibration_step2.Rd @@ -0,0 +1,20 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/calibration_steps.R +\name{calibration_step2} +\alias{calibration_step2} +\title{Second calibration step: run the model for each proposal} +\usage{ +calibration_step2(calib_object, n_cores, batch_num, n_batches) +} +\arguments{ +\item{calib_object}{a formatted calibration object} + +\item{n_cores}{number of cores to run the processing on} + +\item{batch_num}{the batch number for the current proposal} + +\item{n_batches}{the total number of batches for this step} +} +\description{ +Second calibration step: run the model for each proposal +} diff --git a/man/calibration_step3.Rd b/man/calibration_step3.Rd new file mode 100644 index 0000000..c9b7216 --- /dev/null +++ b/man/calibration_step3.Rd @@ -0,0 +1,15 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/calibration_steps.R +\name{calibration_step3} +\alias{calibration_step3} +\title{Third calibration step: Wrap up the calibration system and store the results} +\usage{ +calibration_step3(calib_object) +} +\arguments{ +\item{calib_object}{a formatted calibration object} +} +\description{ +Having this as a separate step allows a mail to be send at the end of the +calibration +} diff --git a/man/load_sideload.Rd b/man/load_sideload.Rd index 65384d1..8b92ace 100644 --- a/man/load_sideload.Rd +++ b/man/load_sideload.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/utils.R +% Please edit documentation in R/sideload.R \name{load_sideload} \alias{load_sideload} \title{Read some data saved to be reused by the calibration process} diff --git a/man/make_shrink_proposer.Rd b/man/make_shrink_proposer.Rd new file mode 100644 index 0000000..1ce0b45 --- /dev/null +++ b/man/make_shrink_proposer.Rd @@ -0,0 +1,11 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/proposers.R +\name{make_shrink_proposer} +\alias{make_shrink_proposer} +\title{Shrink the range by a shrink factor (default 2)} +\usage{ +make_shrink_proposer(n_new, shrink = 2) +} +\description{ +If centers are provided in a sideload, use that +} diff --git a/man/render_assessment.Rd b/man/render_assessment.Rd new file mode 100644 index 0000000..40eb457 --- /dev/null +++ b/man/render_assessment.Rd @@ -0,0 +1,25 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/assessment_rmd.R +\name{render_assessment} +\alias{render_assessment} +\title{Generate an html report of the auto-calibration} +\usage{ +render_assessment( + path_to_assessments, + output_filename = "assessment.html", + output_dir = NULL +) +} +\arguments{ +\item{path_to_assessments}{Path to an \code{assessments.rds} file generated by an +\code{swfcalib} process.} + +\item{output_filename}{Name of the html report (default = "assessment.html")} + +\item{output_dir}{Directory where to store the report (default = current +working directory)} +} +\description{ +The report contains descriptions of the parameters spaces and residual errors +over the duration of the calibration. +} diff --git a/man/save_sideload.Rd b/man/save_sideload.Rd index 8531673..05e1bdc 100644 --- a/man/save_sideload.Rd +++ b/man/save_sideload.Rd @@ -1,5 +1,5 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/utils.R +% Please edit documentation in R/sideload.R \name{save_sideload} \alias{save_sideload} \title{Save some data to be reused by the calibration process} diff --git a/vignettes/.gitignore b/vignettes/.gitignore new file mode 100644 index 0000000..097b241 --- /dev/null +++ b/vignettes/.gitignore @@ -0,0 +1,2 @@ +*.html +*.R diff --git a/vignettes/swfcalib.Rmd b/vignettes/swfcalib.Rmd new file mode 100644 index 0000000..4a1bb29 --- /dev/null +++ b/vignettes/swfcalib.Rmd @@ -0,0 +1,512 @@ +--- +title: "swfcalib" +output: rmarkdown::html_vignette +vignette: > + %\VignetteIndexEntry{swfcalib} + %\VignetteEngine{knitr::rmarkdown} + %\VignetteEncoding{UTF-8} +--- + +`swfcalib` gives a scaffold to make an automated calibration system on a +[Slurm](https://slurm.schedmd.com/) equipped HPC. `swfcalib` relies on +[`slurmworkflow`](https://github.com/EpiModel/slurmworkflow) and a stored +calibration object to chose which step of the calibration to run. This allows +very long calibrations to take place without flooding the `slurm` queue. + +For this vignette we will take as an example a simplified version of one of our +epidemic models to showcase `swfcalib` usage. + +This model is a network model simulating HIV circulation in a population of 100k +Men who have Sex with Men (MSM). + +## The model + +A single run of our model locally looks like this: + +```r +library(EpiModelHIV) + +epistats <- readRDS("data/input/epistats.rds") +netstats <- readRDS("data/input/netstats.rds") +netest <- readRDS("data/input/netest.rds") + +param <- param.net( + data.frame.params = read.csv("data/input/params.csv"), + netstats = netstats, + epistats = epistats +) + +init <- init_msm() + +control <- control_msm( + nsteps = 250, + nsims = 1, + ncores = 1 +) + +# The actual simulation happens here +sim <- netsim(est, param, init, control) +``` + +In this example, we need to load the `EpiModelHIV` package, to read 4 files +located in a "data/input" folder and to pass 4 parameters to the `netsim` +function. + +The result, stored in the `sim` variable is not directly usable by `swfcalib`. + +## Calibration parameters and outputs + +For this example, we will focus on 9 parameters to calibrate and 9 outputs to +fit to there targets. + +There are many more parameters in the model. Some get their values from the +literature, other free parameters to be calibrated are ignored for sake of +simplicity in this example. + +### Outputs + +The first outputs are the proportion of HIV individuals who are diagnosed (i.e. + aware of their status).Then is the proportion of HIV diagnosed who started +treatment in less than a month after diagnosis. Finally, the prevalence of +diagnosed HIV in the population is our final output of interest. As our +population is race stratified between, black, hispanic and white, we get three +time the number of outputs. + +| Output Name | Description | Target Value| +|-|-|-| +| cc.dx.B | Portion of HIV infected who are diagnosed (black) | 0.847 | +| cc.dx.H | Portion of HIV infected who are diagnosed (hispanic) | 0.818 | +| cc.dx.W | Portion of HIV infected who are diagnosed (white) | 0.862 | +| cc.linked1m.B | Linkage to care within 1 month (black) | 0.829 | +| cc.linked1m.H | Linkage to care within 1 month (hispanic) | 0.898 | +| cc.linked1m.W | Linkage to care within 1 month (white) | 0.881 | +| i.prev.dx.B | HIV diagnosed prevalence (black) | 0.33 | +| i.prev.dx.H | HIV diagnosed prevalence (hispanic) | 0.127 | +| i.prev.dx.W | HIV diagnosed prevalence (W) | 0.084 | + +### Parameters + +9 parameters will be calibrated to fit the model to these target values. The +weekly probability of being tested for HIV. The weekly probability of starting +treatment if diagnosed for HIV. And a *transmission scaler* which is a parameter +that encompass all the mechanism affecting transmission that are not explicitly +defined in the model. + +| Parameter Name | Description | +|-|-| +| hiv.test.rate_1 | Weekly test rate (black) | +| hiv.test.rate_2 | Weekly test rate (hispanic) | +| hiv.test.rate_3 | Weekly test rate (white) | +| tx.init.rate_1 | Weekly treatment start rate (black) | +| tx.init.rate_2 | Weekly treatment start rate (hispanic) | +| tx.init.rate_3 | Weekly treatment start rate (white) | +| hiv.trans.scale_1 | Transmission scaler (black) | +| hiv.trans.scale_2 | Transmission scaler (hispanic) | +| hiv.trans.scale_3 | Transmission scaler (white) | + +## Relationship between parameters and outputs + +Simply by looking at them, we can see that all the parameters do not relate +to all the outputs in the same way. + +The `hiv.test.rate` parameters directly relate to the `cc.dx` outputs +(proportion) of diagnosed. Also, no other parameter in our list would affect +these outcomes. Therefore, we have 3 one to one relationship. For each race, the +test rate will govern the diagnosed proportion. + +A similar situation can be described for linkage to care (`cc.linked1m`) and +treatment start rate `tx.init.rate`. (the denominator is the number of diagnosed +individuals, making it independent of `cc.dx`). + +The prevalence (`i.prev.dx`), on the other hand, depends on all 9 parameters. +The proportion of diagnosed influences the number of treated which then +influences the number of individuals able to transmit. The scalers then affect +directly the transmission for one population. But as they modify the prevalence, +they also indirectly influence the transmission in the other sub populations. + +However, once the `hiv.test.rate` and `tx.init.rate` are calibrated and their +values fixed, `i.prev.dx` only depends on the `hiv.trans.scale` parameters. +Reducing a 9 parameters 3 outputs problem into a simpler 3 parameters, 3 outputs +one. + +## Calibration structure + +Knowing these parameters - outputs relationship, we can define a calibration +structure to minimize the number of simulations required. + +A first wave will calibrate simulateanously the 3 `hiv.test.rate` and the +3 `tx.init.rate` parameters. This means that each run of the simulation will +test a value for each of these 6 parameters. This idea is similar [factorial +experiment design](https://en.wikipedia.org/wiki/Factorial_experiment) and is +only possible because of the independance between the parameters and their +respective outcomes. + +After these 6 parameters are calibrated (i.e. given fixed and final values), +the `hiv.trans.scale` parameters will be calibrated to fit the `i.prev.dx` +outcomes to their target values. + +## Waves and jobs + +To express this structure, we need to define calibration *jobs* that are +be run in parallel within several *waves*. + +In our example we will have two *waves*, the first one for the `hiv.test.rate` +and `tx.init.rate` parameters and a second for the `hiv.trans.scale` parameters. + +### Jobs + +Formally, `swfcalib` defines a *job* as a set of *parameters* to be calibrated +by trying to make a set of *outcomes* reach a set of *targets*. + +Each *job* needs a function to make the next set of *parameter* proposals to +test as well as a function for checking if the proposals gave sufficiently good +results. This latter function is in charge of stopping the calibration process +for the current job. + +### Waves + +A *wave* is a set of multiple jobs that can be run in parallel (i.e. +independent from one another). + +In practice, `swfcalib` takes the proposals from all the jobs in a wave, combine +them and run one simulation per proposal. If you have a 3 *job wave*, each +making 10 proposal, only 10 simulations will be run. On the evaluation step +each job will only assess the quality of it's own outcomes. + +Once all the *jobs* in a *wave* are finished, the system moves to the next one +if any, using the parameters calibrated on the previous ones. + +## Implementing the `model` function + +Now that we have a good conceptual idea how the calibration will go, we need to +make our code compatible with `swfcalib`. + +For the model function it is pretty straightforward. We need a function with +this signature: + +```r +model <- function(proposal) { + # simulation code + return(results) +} +``` + +Where `proposal` is an one row `tibble` with each column being a parameter to +calibrate. In our case, `proposal` is a 1 row 9 columns `tibble`. + +And `results` is a one row `tibble` where each column is an output for the +calibration process. In this case, `results` must also be a 1 row 9 columns +`tibble`. + +Below is the code for our example. + +```r +model <- function(proposal) { + # Load all required elements + library(EpiModelHIV) + library(dplyr) + + epistats <- readRDS("data/input/epistats.rds") + netstats <- readRDS("data/input/netstats.rds") + netest <- readRDS("data/input/netest.rds") + + param <- param.net( + data.frame.params = read.csv("data/input/params.csv"), + netstats = netstats, + epistats = epistats + ) + + init <- init_msm() + + control <- control_msm( + nsteps = 250, + nsims = 1, + ncores = 1 + ) + + # Proposal to scenario ------------------------------------------------------- + scenario <- EpiModelHPC::swfcalib_proposal_to_scenario(proposal) + param_sc <- EpiModel::use_scenario(param, scenario) + + # Run the simulation --------------------------------------------------------- + sim <- netsim(est, param_sc, init, control) + + # Process the results ------------------------------------------------------- + results <- as_tibble(sim) |> + mutate_calibration_targets() |> + filter(time >= max(time) - 52) |> + select( + cc.dx.B, cc.dx.H, cc.dx.W, + cc.linked1m.B, cc.linked1m.H, cc.linked1m.W, + i.prev.dx.B, i.prev.dx.H, i.prev.dx.W, + ) |> + summarise(across( everything(), ~ mean(.x, na.rm = TRUE))) + + # Return the one row `tibble` + return(results) +} +``` + +Several things are of importance here: + +1. We call `library` within the function. It is not usual, but we must remember +that this will be run on an `sbatch` job from a clean environment. Therefore, +the function must load all the required libraries and files. +2. We must adapt the `proposal` to be used by the simulation. In this case, a +helper function `swfcalib_proposal_to_scenario` does it. But simply copying the +values from the `proposal` `tibble` works as well. +3. After the simulation, the outputs must be processed to produce a correctly +formatted `tibble`. In this case, we take the mean over the last 52 weeks (1 +year) for each of the desired outputs. + +As the users of `swfcalib`, we are responsible for the correct output of +`model`. + +Also, `model` must be a function that run a single simulation and produce a +single set of `results`. This function will be run in parallel to test several +proposals at once. + +## Configuring an `swfcalib` system + +`swfcalib` maintain a `calib_object` that store the state of the calibration as +well as all it needs to operate. This object is first defined locally and then +updated on the HPC as the calibration progresses. + +This object is an `R` `list` with 3 elements: `state`, `config` and `waves`. + +We won't cover `state` now as it is created by `swfcalib` and only edited by it. + +### Configuration + +`config` is a `list` with the following elements: + +- `simulator`: the function to calibration `model` in our case +- `root_directory`: where the system will store itself on the HPC +- `n_sims`: the number of simulations to run in parallel at each iteration +- `max_iteration`: the maximum number of iteration before stopping the + calibration if a satisfactory state is not found. This is a fail-safe + mechanism to avoid consuming HPC resources when the calibration does not work. +- `default_proposal`: default values for the calibrated parameters. This fixes + the values for the parameters that are to be calibrated in later waves. Once + a job is finished, the calibrated value is stored here and used for the next + runs + +```r +config = list( + simulator = model, + root_directory = "data/calib", + n_sims = n_sims, + max_iteration = 100, + default_proposal = dplyr::tibble( + hiv.test.rate_1 = 0.004123238, + hiv.test.rate_2 = 0.003771226, + hiv.test.rate_3 = 0.005956663, + tx.init.rate_1 = 0.2981623, + tx.init.rate_2 = 0.3680919, + tx.init.rate_3 = 0.358254, + hiv.trans.scale_1 = 2.470962, + hiv.trans.scale_2 = 0.4247816, + hiv.trans.scale_3 = 0.3342994 + ) +) +``` + +### Waves + +`waves` is a list of *waves*, with each wave being a list of jobs + +```r +waves = list( + wave1 = list( + job1 = list(), + job2 = list(), + job3 = list() + ), + wave2 = list( + job1 = list(), + job2 = list() + ) + ) +``` + +### Calibration jobs + +A calibration *job* is a `list` with 6 elements: + +1. `targets`: the name of the outputs to fit (>= 1) +2. `targets_val`: the target values +3. `params`: the names of the parameters to calibrate (>=1) +4. `initial_proposals`: a `tibble` with the values to be tested for the first + run of the simulation. +5. `make_next_proposals`: a function that will define which proposals to make + next +6. `get_result`: a function to define is the calibration is done for this job + +We will get into the details of these elements later on. + +Below is an example of the *job* for calibrating the `hiv.trans.scale` +parameters using the `i.prev.dx` outputs. + +```r +job1 = list( + targets = paste0("i.prev.dx.", c("B", "H", "W")), + targets_val = c(0.33, 0.127, 0.09), + params = paste0("hiv.trans.scale_", 1:3), + initial_proposals = dplyr::tibble( + hiv.trans.scale_1 = sample(seq(1, 4, length.out = n_sims)), + hiv.trans.scale_2 = sample(seq(0.2, 0.6, length.out = n_sims)), + hiv.trans.scale_3 = sample(seq(0.2, 0.6, length.out = n_sims)) + ), + make_next_proposals = + swfcalib::make_proposer_se_range(n_sims, retain_prop = 0.3), + get_result = swfcalib::determ_end_thresh( + thresholds = rep(0.02, 3), + n_enough = 100 + ) +) +``` + +### Complete configuration + +Below is the complete `calib_object` defined locally. Note that we define an +`n_sims` variable at the beginning and reuse it all over the configuration +to ensure that we have the correct number of proposals at each step. + +```r +n_sims <- 400 + +calib_object <- list( + config = list( + simulator = model_fun, + default_proposal = dplyr::tibble( + hiv.test.rate_1 = 0.004123238, + hiv.test.rate_2 = 0.003771226, + hiv.test.rate_3 = 0.005956663, + tx.init.rate_1 = 0.2981623, + tx.init.rate_2 = 0.3680919, + tx.init.rate_3 = 0.358254, + hiv.trans.scale_1 = 2.470962, + hiv.trans.scale_2 = 0.4247816, + hiv.trans.scale_3 = 0.3342994 + ), + root_directory = "data/calib", + max_iteration = 100, + n_sims = n_sims + ), + waves = list( + wave1 = list( + job1 = list( + targets = "cc.dx.B", + targets_val = 0.847, + params = c("hiv.test.rate_1"), # target: 0.00385 + initial_proposals = dplyr::tibble( + hiv.test.rate_1 = seq(0.002, 0.006, length.out = n_sims), + ), + make_next_proposals = swfcalib::make_shrink_proposer(n_sims, shrink = 2), + get_result = swfcalib::determ_poly_end(0.001, poly_n = 5) + ), + job2 = list( + targets = "cc.dx.H", + targets_val = 0.818, + params = c("hiv.test.rate_2"), # target: 0.0038 + initial_proposals = dplyr::tibble( + hiv.test.rate_2 = seq(0.002, 0.006, length.out = n_sims), + ), + make_next_proposals = swfcalib::make_shrink_proposer(n_sims, shrink = 2), + get_result = swfcalib::determ_poly_end(0.001, poly_n = 5) + ), + job3 = list( + targets = "cc.dx.W", + targets_val = 0.862, + params = c("hiv.test.rate_3"), # target: 0.0069 + initial_proposals = dplyr::tibble( + hiv.test.rate_3 = seq(0.004, 0.008, length.out = n_sims), + ), + make_next_proposals = swfcalib::make_shrink_proposer(n_sims, shrink = 2), + get_result = swfcalib::determ_poly_end(0.001, poly_n = 5) + ), + job4 = list( + targets = paste0("cc.linked1m.", c("B", "H", "W")), + targets_val = c(0.829, 0.898, 0.881), + params = paste0("tx.init.rate_", 1:3), + initial_proposals = dplyr::tibble( + tx.init.rate_1 = sample(seq(0.1, 0.5, length.out = n_sims)), + tx.init.rate_2 = sample(tx.init.rate_1), + tx.init.rate_3 = sample(tx.init.rate_1), + ), + make_next_proposals = swfcalib::make_shrink_proposer(n_sims, shrink = 2), + get_result = swfcalib::determ_poly_end(0.001, poly_n = 3) + ) + ), + wave2 = list( + job1 = list( + targets = paste0("i.prev.dx.", c("B", "H", "W")), + targets_val = c(0.33, 0.127, 0.09), + params = paste0("hiv.trans.scale_", 1:3), + initial_proposals = dplyr::tibble( + hiv.trans.scale_1 = sample(seq(1, 4, length.out = n_sims)), + hiv.trans.scale_2 = sample(seq(0.2, 0.6, length.out = n_sims)), + hiv.trans.scale_3 = sample(seq(0.2, 0.6, length.out = n_sims)) + ), + make_next_proposals = + swfcalib::make_proposer_se_range(n_sims, retain_prop = 0.3), + get_result = swfcalib::determ_end_thresh( + thresholds = rep(0.02, 3), + n_enough = 100 + ) + ) + ) + ) +) +``` + +## Proposers and calibration check + +As mentioned earlier, each calibration *job* needs a function to define which +proposal to make at the next iteration (`make_next_proposals`) and a function +to assess if the calibration is finished (`get_result`). + +In this section we will explore these functions and what they do in each job. + +### HIV test rate + +The 3 *jobs* related to `hiv.test.rate` and proportion of diagnosed +among infected (`cc.dx`) use the same approach: + +The proposer function is generated by a [function factory](https://adv-r.hadley.nz/function-factories.html): +`make_shrink_proposer(n_sims, shrink = 2)`. This proposer will shrink the range +of proposals by a factor 2 around the best guess so far. + +The calibration assessor is made by `determ_poly_end(0.001, poly_n = 5)`. This +function fits a linear model with a degree 5 polynomial between the parameter +and the output. It then predict the best value for the parameter. This value +is later shared to the proposer to shrink the range around it. The calibration +is considered finished when the predicted value is less than `threshold` away +from the target (here `0.001`) AND when the prediction is not improving after +the last iteration. + +### Linkage to care + +Linkage to care uses the same functions but all 3 parameters and targets are +fitted at once. This here implies that a single model is fitted for all the +data. It works very well in this specific case as the relationship between +linkage to care and treatment uptake rate is consistent over the three groups. + +### HIV prevalence + +HIV prevalence and transmission scale is harder to calibrate as the 3 +parameters and outputs are linked together. + +Therefore a more basic approach is used, for the proposals, at each iteration +the squared error over all 3 targets is calculated for each proposal, then +the 30% best are kept. The ranges for the next round of proposals are the +ranges of these best previous rounds. The function factory here is +`make_proposer_se_range`. It takes a `retain_prop` argument that governs which +proportion of the simulations are used to define the new ranges. + +The calibration is considered when 100 simulations where the 3 outputs are less +than `thresholds` away from there respective targets. The `determ_end_thresh` +function factory allow us to define the number of *good* simulations required +and the thresholds for each output. + +## Running a calibration system