Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggestion for improving parallel data loading speed #1249

Open
sebffischer opened this issue Jan 22, 2025 · 6 comments
Open

Suggestion for improving parallel data loading speed #1249

sebffischer opened this issue Jan 22, 2025 · 6 comments

Comments

@sebffischer
Copy link
Collaborator

sebffischer commented Jan 22, 2025

Following up on a discussion we had previously. The main bottleneck when using a parallel dataloader is currently the serialization-deserialization roundtrip that happens every thime a batch is sent from one of the workers to the main process.

My suggestion for solving this problem would be to rely on the SharedObject library, which allows to share matrices between processes.
I think we could do the following (assuming for simplicity the dataloader returns only a single 2d tensor but this can easily be generalized):

  1. Upon starting the num_workers workers, we create num_workers shared matrices, i.e. matrix M1 for worker W1, matrix M2 for worker W2, ... .
  2. What changes on the worker: When a worker has successfully loaded a batch and converted it to a tensor, instead of serializing it and sending it to the main process via a connection and then deserializing it there, it simply writes the tensor into the shared matrix.
  3. What changes in the main process: When the main process loads a batch from worker W1, it creates a tensor T1 from from the matrix M1. I think to be on the safe side, we still need to copy the buffer from the shared matrix M1 for the new tensor T1. This copying should be faster than the deserialization I think.
    I think we can even do without the copying, but things can go wrong here, so it should be made opt-in:
    To avoid copying the buffer from M1 to create tensor T1, we could use torch_tensor_from_buffer so M1 and T2 also share memory.
    What is tricky here is now to determine when worker W1 is allowed again to write into matrix M1.
    One reasonable heuristic here would be to say it can write into M1 again when another batch was loaded from a worker Wi with i != 1.
    This can still lead to issues when the batches are saved somewhere and not only used for a forward pass.
    I still think it would be useful to offer this option as this might be okay in many use-cases
@sebffischer
Copy link
Collaborator Author

@dfalbel let me know what you think. This should not be too hard to implement I guess.

@dfalbel
Copy link
Member

dfalbel commented Jan 23, 2025

The approach sounds reasonable, my questions are:

  • Can a CRAN package take a dependency on a Bioc package?
  • Can we pre-determine the size of the shared object, so we never need to realocate it?
  • Can we pass torch tensors, ie, preserve torch data types, dimensions, strides and what not.
  • Determining wether the shared object is rewritable is tricky, but we could have an atomic shared object that is just a flag indicating if the main thread already copied (or used) the tensor.

I would argue that serialization is not that expensive in torch because with safetensors, it's essensitally a memcpy + some headers. The problem is that reading from the background process does not happen asyncronously together with the backward() step which should take most of the time when training large models.

@sebffischer
Copy link
Collaborator Author

The approach sounds reasonable, my questions are:

* Can a CRAN package take a dependency on a Bioc package?

Yes, I am pretty sure.

* Can we pre-determine the size of the shared object, so we never need to realocate it?

I made the assumption that only the batch dimension is varying (and where we know the maximum value). But I now realize that Transformers or RNNs can also have variable sequence length. This would mean we have to require from the user to specify the maximum dimension

* Can we pass torch tensors, ie, preserve torch data types, dimensions, strides and what not.

I thought we could still communicate those from the processes through a connection. Because these objects are much smaller I thought this would not be an issue anymore, but this might be wrong?

But yeah, maybe this is not as simple as I thought first...

I would argue that serialization is not that expensive in torch because with safetensors, it's essensitally a memcpy + some headers. The problem is that reading from the background process does not happen asyncronously together with the backward() step which should take most of the time when training large models.

But we can't fully rely on the $backward() step either, because during validation backward is never called.

@sebffischer
Copy link
Collaborator Author

Also, can you maybe explain again (if you know) why this is not a problem for the pytorch dataloader?

@dfalbel
Copy link
Member

dfalbel commented Jan 24, 2025

AFAICT PyTorch uses similar ideas, eg: https://pytorch.org/docs/stable/generated/torch.Tensor.share_memory_.html
And is able to fully avoid memory copies.

@dfalbel
Copy link
Member

dfalbel commented Jan 30, 2025

I was looking at Pytorch source code to make use of share_memory_(). Once you call at::share_memory_ on a Tensor, you should be able to obtain a handle and size for it's storage with:

https://github.com/pytorch/pytorch/blob/894ef8c1e3b4745ffe1c50b6cd019af5fe2e9489/torch/csrc/StorageSharing.cpp#L249-L284

You'll also need a storage_offset(). When re-creating the tensor, they first create a storage object with: https://github.com/pytorch/pytorch/blob/6bd19e65b126947d459ce9d7dabfbc419cc80329/torch/csrc/StorageSharing.cpp#L249-L284

Then use rebuild_tensor():

https://github.com/pytorch/pytorch/blob/894ef8c1e3b4745ffe1c50b6cd019af5fe2e9489/torch/multiprocessing/reductions.py#L110-L120

To create a tensor pointing to the shared memory location.

Unfortunatelly, most of this is Python specific C++ code, so we would need to re-implement in the Lantern side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants