Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Help implementing 'parallel' writing to a RealizationSink with BiocParallel #20

Open
PeteHaitch opened this issue May 21, 2018 · 4 comments

Comments

@PeteHaitch
Copy link
Collaborator

I'm trying to implement writing to an arbitrary RealizationSink backend via BiocParallel::bplapply() with an arbitrary BiocParallelParam backend. That is, I want to be able to construct blocks of output, possibly in parallel, and then safely write each block to a HDF5Array, RleArray, etc. (controllable by the user) after each block is generated.

I've managed to get this set up for an HDF5RealizationSink (my main use case) by using the inter-process locks available in BiocParallel to ensure the writes are safe (@mtmorgan am I using these correctly?). But I've not had any luck using the same code for an arrayRealizationSink. Nor can I get it to work for the RleArray backend, although here the problem seems to be slightly different.

Below is a toy example that tries to construct a DelayedMatrix column-by-column using this strategy, in conjunction with different DelayedArray backends:

suppressPackageStartupMessages(library(DelayedArray))
suppressPackageStartupMessages(library(BiocParallel))

FUN_with_IPC_lock <- function(b, nrow, sink, grid, ipcid) {
    # Ensure DelayedArray package is loaded on worker (needed for backends
    # other than MulticoreParam).
    suppressPackageStartupMessages(library("DelayedArray"))
    if (is(sink, "HDF5RealizationSink")) {
        # Ensure HDF5Array package is loaded on worker, if required.
        # NOTE: Feels a little clunky that this is necessary; would be good if
        #       this was automatically loaded, properly.
        suppressPackageStartupMessages(library("HDF5Array"))
    }
    message("Processing block = ", b, " on PID = ", Sys.getpid())
    tmp <- matrix(seq_len(nrow) + (b - 1L) * nrow, ncol = 1)
    ipclock(ipcid)
    write_block_to_sink(tmp, sink, grid[[b]])
    ipcunlock(ipcid)
}

g <- function(FUN, BPPARAM, nrow = 100L, ncol = 10L) {
    # Construct RealizationSink
    backend <- getRealizationBackend()
    message("The backend is ", if (is.null(backend)) "NULL" else backend)
    sink <- DelayedArray:::RealizationSink(dim = c(nrow, ncol), type = "integer")
    # Consruct ArrayGrid over columns of RealizationSink
    grid <- RegularArrayGrid(dim(sink), c(nrow(sink), 1L))
    # Fill the RealizationSink
    n_block <- length(grid)
    # Construct an IPC mutex ID
    ipcid <- ipcid()
    # Apply function with BiocParallel
    bplapply(seq_len(n_block), FUN, BPPARAM = BPPARAM, nrow = nrow, sink = sink,
             grid = grid, ipcid = ipcid)
    # Return the RealizationSink as a DelayedArray
    as(sink, "DelayedArray")
}

# Everything works with the HDF5Array backend
setRealizationBackend("HDF5Array")
#> Loading required package: rhdf5
# Works
g(FUN_with_IPC_lock, SerialParam())
#> The backend is HDF5Array
#> Processing block = 1 on PID = 5740
#> Processing block = 2 on PID = 5740
#> Processing block = 3 on PID = 5740
#> Processing block = 4 on PID = 5740
#> Processing block = 5 on PID = 5740
#> Processing block = 6 on PID = 5740
#> Processing block = 7 on PID = 5740
#> Processing block = 8 on PID = 5740
#> Processing block = 9 on PID = 5740
#> Processing block = 10 on PID = 5740
#> <100 x 10> DelayedMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]     1   101   201   301   .   601   701   801   901
#>   [2,]     2   102   202   302   .   602   702   802   902
#>   [3,]     3   103   203   303   .   603   703   803   903
#>   [4,]     4   104   204   304   .   604   704   804   904
#>   [5,]     5   105   205   305   .   605   705   805   905
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    96   196   296   396   .   696   796   896   996
#>  [97,]    97   197   297   397   .   697   797   897   997
#>  [98,]    98   198   298   398   .   698   798   898   998
#>  [99,]    99   199   299   399   .   699   799   899   999
#> [100,]   100   200   300   400   .   700   800   900  1000
# Works: the IPC lock does its job!
g(FUN_with_IPC_lock, MulticoreParam(2L))
#> The backend is HDF5Array
#> <100 x 10> DelayedMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]     1   101   201   301   .   601   701   801   901
#>   [2,]     2   102   202   302   .   602   702   802   902
#>   [3,]     3   103   203   303   .   603   703   803   903
#>   [4,]     4   104   204   304   .   604   704   804   904
#>   [5,]     5   105   205   305   .   605   705   805   905
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    96   196   296   396   .   696   796   896   996
#>  [97,]    97   197   297   397   .   697   797   897   997
#>  [98,]    98   198   298   398   .   698   798   898   998
#>  [99,]    99   199   299   399   .   699   799   899   999
#> [100,]   100   200   300   400   .   700   800   900  1000
# Works: the IPC lock does its job!
g(FUN_with_IPC_lock, SnowParam(2L))
#> The backend is HDF5Array
#> Loading required package: HDF5Array
#> Loading required package: rhdf5
#> Processing block = 1 on PID = 5755
#> Processing block = 2 on PID = 5755
#> Processing block = 3 on PID = 5755
#> Processing block = 4 on PID = 5755
#> Processing block = 5 on PID = 5755
#> Loading required package: HDF5Array
#> Loading required package: rhdf5
#> Processing block = 6 on PID = 5761
#> Processing block = 7 on PID = 5761
#> Processing block = 8 on PID = 5761
#> Processing block = 9 on PID = 5761
#> Processing block = 10 on PID = 5761
#> <100 x 10> DelayedMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]     1   101   201   301   .   601   701   801   901
#>   [2,]     2   102   202   302   .   602   702   802   902
#>   [3,]     3   103   203   303   .   603   703   803   903
#>   [4,]     4   104   204   304   .   604   704   804   904
#>   [5,]     5   105   205   305   .   605   705   805   905
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    96   196   296   396   .   696   796   896   996
#>  [97,]    97   197   297   397   .   697   797   897   997
#>  [98,]    98   198   298   398   .   698   798   898   998
#>  [99,]    99   199   299   399   .   699   799   899   999
#> [100,]   100   200   300   400   .   700   800   900  1000

# Only SerialParam() works for the in-memory backend. The in-memory backend,
# an arrayRealizationSink, is implemented as an array inside an environment.
setRealizationBackend(NULL)
# Works
g(FUN_with_IPC_lock, SerialParam())
#> The backend is NULL
#> Processing block = 1 on PID = 5740
#> Processing block = 2 on PID = 5740
#> Processing block = 3 on PID = 5740
#> Processing block = 4 on PID = 5740
#> Processing block = 5 on PID = 5740
#> Processing block = 6 on PID = 5740
#> Processing block = 7 on PID = 5740
#> Processing block = 8 on PID = 5740
#> Processing block = 9 on PID = 5740
#> Processing block = 10 on PID = 5740
#> <100 x 10> DelayedMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]     1   101   201   301   .   601   701   801   901
#>   [2,]     2   102   202   302   .   602   702   802   902
#>   [3,]     3   103   203   303   .   603   703   803   903
#>   [4,]     4   104   204   304   .   604   704   804   904
#>   [5,]     5   105   205   305   .   605   705   805   905
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    96   196   296   396   .   696   796   896   996
#>  [97,]    97   197   297   397   .   697   797   897   997
#>  [98,]    98   198   298   398   .   698   798   898   998
#>  [99,]    99   199   299   399   .   699   799   899   999
#> [100,]   100   200   300   400   .   700   800   900  1000
# Doesn't work: sink isn't filled. The IPC mutex isn't doing its job?
g(FUN_with_IPC_lock, MulticoreParam(2L))
#> The backend is NULL
#> <100 x 10> DelayedMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [2,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [3,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [4,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [5,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>  [97,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>  [98,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>  [99,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#> [100,]    NA    NA    NA    NA   .    NA    NA    NA    NA
# Doesn't work: sink isn't filled. The IPC mutex isn't doing its job?
g(FUN_with_IPC_lock, SnowParam(2L))
#> The backend is NULL
#> Processing block = 6 on PID = 5777
#> Processing block = 7 on PID = 5777
#> Processing block = 8 on PID = 5777
#> Processing block = 9 on PID = 5777
#> Processing block = 10 on PID = 5777
#> Processing block = 1 on PID = 5771
#> Processing block = 2 on PID = 5771
#> Processing block = 3 on PID = 5771
#> Processing block = 4 on PID = 5771
#> Processing block = 5 on PID = 5771
#> <100 x 10> DelayedMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [2,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [3,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [4,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>   [5,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>  [97,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>  [98,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#>  [99,]    NA    NA    NA    NA   .    NA    NA    NA    NA
#> [100,]    NA    NA    NA    NA   .    NA    NA    NA    NA

# Only SerialParam() works for the RleArray backend.
setRealizationBackend("RleArray")
# Works
g(FUN_with_IPC_lock, SerialParam())
#> The backend is RleArray
#> Processing block = 1 on PID = 5740
#> Processing block = 2 on PID = 5740
#> Processing block = 3 on PID = 5740
#> Processing block = 4 on PID = 5740
#> Processing block = 5 on PID = 5740
#> Processing block = 6 on PID = 5740
#> Processing block = 7 on PID = 5740
#> Processing block = 8 on PID = 5740
#> Processing block = 9 on PID = 5740
#> Processing block = 10 on PID = 5740
#> <100 x 10> RleMatrix object of type "integer":
#>         [,1]  [,2]  [,3]  [,4] ...  [,7]  [,8]  [,9] [,10]
#>   [1,]     1   101   201   301   .   601   701   801   901
#>   [2,]     2   102   202   302   .   602   702   802   902
#>   [3,]     3   103   203   303   .   603   703   803   903
#>   [4,]     4   104   204   304   .   604   704   804   904
#>   [5,]     5   105   205   305   .   605   705   805   905
#>    ...     .     .     .     .   .     .     .     .     .
#>  [96,]    96   196   296   396   .   696   796   896   996
#>  [97,]    97   197   297   397   .   697   797   897   997
#>  [98,]    98   198   298   398   .   698   798   898   998
#>  [99,]    99   199   299   399   .   699   799   899   999
#> [100,]   100   200   300   400   .   700   800   900  1000
# Doesn't work: Don't understand why.
g(FUN_with_IPC_lock, MulticoreParam(2L))
#> The backend is RleArray
#> Error in validObject(.Object): invalid class "ChunkedRleArraySeed" object: 
#>     length of object data [0] does not match object dimensions
#>     [product 1000]
# Doesn't work: don't understand why I get this error.
g(FUN_with_IPC_lock, SnowParam(2L))
#> The backend is RleArray
#> Processing block = 6 on PID = 5793
#> Processing block = 7 on PID = 5793
#> Processing block = 8 on PID = 5793
#> Processing block = 9 on PID = 5793
#> Processing block = 10 on PID = 5793
#> Processing block = 1 on PID = 5787
#> Processing block = 2 on PID = 5787
#> Processing block = 3 on PID = 5787
#> Processing block = 4 on PID = 5787
#> Processing block = 5 on PID = 5787
#> Error in validObject(.Object): invalid class "ChunkedRleArraySeed" object: 
#>     length of object data [0] does not match object dimensions
#>     [product 1000]

Created on 2018-05-21 by the reprex package (v0.2.0).

Session info
devtools::session_info()
#> Session info -------------------------------------------------------------
#>  setting  value                       
#>  version  R version 3.5.0 (2018-04-23)
#>  system   x86_64, darwin15.6.0        
#>  ui       X11                         
#>  language (EN)                        
#>  collate  en_AU.UTF-8                 
#>  tz       America/New_York            
#>  date     2018-05-21
#> Packages -----------------------------------------------------------------
#>  package      * version date       source        
#>  backports      1.1.2   2017-12-13 CRAN (R 3.5.0)
#>  base         * 3.5.0   2018-04-24 local         
#>  BiocGenerics * 0.27.0  2018-05-01 Bioconductor  
#>  BiocParallel * 1.15.3  2018-05-11 Bioconductor  
#>  compiler       3.5.0   2018-04-24 local         
#>  datasets     * 3.5.0   2018-04-24 local         
#>  DelayedArray * 0.7.0   2018-05-01 Bioconductor  
#>  devtools       1.13.5  2018-02-18 CRAN (R 3.5.0)
#>  digest         0.6.15  2018-01-28 CRAN (R 3.5.0)
#>  evaluate       0.10.1  2017-06-24 CRAN (R 3.5.0)
#>  graphics     * 3.5.0   2018-04-24 local         
#>  grDevices    * 3.5.0   2018-04-24 local         
#>  HDF5Array    * 1.9.0   2018-05-01 Bioconductor  
#>  htmltools      0.3.6   2017-04-28 CRAN (R 3.5.0)
#>  IRanges      * 2.15.13 2018-05-20 Bioconductor  
#>  knitr          1.20    2018-02-20 CRAN (R 3.5.0)
#>  magrittr       1.5     2014-11-22 CRAN (R 3.5.0)
#>  matrixStats  * 0.53.1  2018-02-11 CRAN (R 3.5.0)
#>  memoise        1.1.0   2017-04-21 CRAN (R 3.5.0)
#>  methods      * 3.5.0   2018-04-24 local         
#>  parallel     * 3.5.0   2018-04-24 local         
#>  Rcpp           0.12.17 2018-05-18 CRAN (R 3.5.0)
#>  rhdf5        * 2.25.0  2018-05-01 Bioconductor  
#>  Rhdf5lib       1.3.1   2018-05-17 Bioconductor  
#>  rmarkdown      1.9     2018-03-01 CRAN (R 3.5.0)
#>  rprojroot      1.3-2   2018-01-03 CRAN (R 3.5.0)
#>  S4Vectors    * 0.19.5  2018-05-20 Bioconductor  
#>  snow           0.4-2   2016-10-14 CRAN (R 3.5.0)
#>  stats        * 3.5.0   2018-04-24 local         
#>  stats4       * 3.5.0   2018-04-24 local         
#>  stringi        1.2.2   2018-05-02 CRAN (R 3.5.0)
#>  stringr        1.3.1   2018-05-10 CRAN (R 3.5.0)
#>  tools          3.5.0   2018-04-24 local         
#>  utils        * 3.5.0   2018-04-24 local         
#>  withr          2.1.2   2018-03-15 CRAN (R 3.5.0)
#>  yaml           2.1.19  2018-05-01 CRAN (R 3.5.0)
@mtmorgan
Copy link
Contributor

The use of ipcmutex() is correct.

The challenge with the in-memory representations is that the memory is being modified in the memory space of each thread / process; that memory is NOT shared by the manager process. I don't think there's an easy way around that. A 'feature' for BiocParallel might be shared memory management, but that would be a big feature, and would probably require a custom 'realization back end' to work with it.

Also, the ipc stuff is inter-process (i.e., on the same computer); some back-ends (notably BatchtoolsParam()) can work on clusters or clouds where the ipc lock doesn't help.

@PeteHaitch
Copy link
Collaborator Author

Thanks, Martin, that's very helpful for my current use case in bsseq.

I was naïvely hoping that bplapply() with MulticoreParam() combined with a mutex might have been safe for in-memory representations. But now I realise that the copy-on-write mechanism in the fork will mean that a copy will be made even if a mutex is used to ensure that writes never occur in parallel. Bummer.

I think what I'll do in the short term is limit the 'realization back end' to 'on-disk' backends and the 'parallelisation back end' to 'same computer' backends. If the user requests an in-memory representation of the data then I'll assume they've got enough RAM to pass around lists of vectors that I'll eventually combine into a matrix in the main process with cbind()/rbind() operations.

@hpages
Copy link
Contributor

hpages commented Nov 2, 2020

Hi Pete,

Where do we stand on this? Is there something that we should implement in DelayedArray to facilitate parallel writing to a RealizationSink?

My feeling about this is that supporting parallel writing to an arbitrary RealizationSink is a hard problem. For on-disk backends that don't natively support concurrent writing, a simple workaround is to split the array to be written into column- or row-oriented blocks, then write the blocks in parallel by using one RealizationSink per block, and finally combine the on-disk blocks with a delayed cbind() or rbind(). This is a very generic strategy that works well whatever the on-disk backend. The drawback is that the final object is not monolithic i.e. it's made of several on-disk datasets combined together. If a monolithic object is desired, an extra call to realize() is needed. Since this last call to realize() cannot do parallel writing, it can be expensive.

We could introduce a new class (e.g. SplitRealizationSink or ParallelRealizationSink) to abstract the above strategy.

H.

@PeteHaitch
Copy link
Collaborator Author

Hi Hervé,

It's been a while since I thought about this. The model in my head involves workers that read input files (e.g., BAM files, CSV files, etc.) and process them (e.g., filtering, selection, aggregation, etc.) and then either:

  1. Write directly to the RealizationSink. This would need to be done serially (like my above attempt using mutexs) unless the backend supported concurrent writing. I envisioned this would be a single, shared RealizationSink but from the discussion here it sounds difficult/impossible for an in-memory RealizationSink and from Where the workers can write files that are guaranteed to be accessible from the head node? BiocParallel#126 it sounds like there are challenges even for an on-disk RealizationSink.
  2. Pass the processed data back to a 'manager' that writes to the single RealizationSink. This incurs the cost of data transfer between worker and manager (and potential memory blowups if multiple workers return data to the manager concurrently) but ensures the writing to the RealizationSink is serial.

I think it's fair to say (1) would be preferable.

The 'one RealizationSink per block' approach is obviously viable, but I hoped/envisioned that it would be a single, shared RealizationSink mostly because I'd like to avoid having to do the extra (and in my case, expensive) realize() on the data before it's 'saveable' (e.g., with saveHDF5SummarizedExperiment()).

With some data types (like the whole-genome bisulfite-sequencing data handled by bsseq), this initial data ingestion/processing can be quite expensive (as is the writing to disk of these large datasets) and so you really only want to do this once and then be able to load the saved object for downstream analyses.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants