Skip to content

Commit

Permalink
Add buffer objects for collective operations (#335)
Browse files Browse the repository at this point in the history
* Add ChunkBuffers for collective operations
* fix deprecations, get tests passing
* remove old deprecations
* update docs
* update tests for deprecations
  • Loading branch information
simonbyrne authored Nov 10, 2020
1 parent 502717a commit 3411efc
Show file tree
Hide file tree
Showing 20 changed files with 783 additions and 513 deletions.
1 change: 1 addition & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ makedocs(
"library.md",
"environment.md",
"comm.md",
"buffers.md",
"pointtopoint.md",
"collective.md",
"onesided.md",
Expand Down
8 changes: 0 additions & 8 deletions docs/src/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@
MPI.free
```

## Buffers

```@docs
MPI.Buffer
MPI.Buffer_send
MPI.MPIPtr
```

## Datatype objects

```@docs
Expand Down
13 changes: 13 additions & 0 deletions docs/src/buffers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Buffers

Buffers are used for sending and receiving data. MPI.jl provides the following buffer types:

```@docs
MPI.IN_PLACE
MPI.Buffer
MPI.Buffer_send
MPI.UBuffer
MPI.VBuffer
MPI.RBuffer
MPI.MPIPtr
```
3 changes: 1 addition & 2 deletions docs/src/collective.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ MPI.Barrier

```@docs
MPI.Bcast!
MPI.bcast
```

## Gather/Scatter
Expand All @@ -31,9 +32,7 @@ MPI.Gatherv

```@docs
MPI.Scatter!
MPI.Scatter
MPI.Scatterv!
MPI.Scatterv
```

### All-to-all
Expand Down
2 changes: 1 addition & 1 deletion src/MPI.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ using Libdl, Serialization
using Requires
using DocStringExtensions

export mpiexec
export mpiexec, UBuffer, VBuffer

function serialize(x)
s = IOBuffer()
Expand Down
229 changes: 227 additions & 2 deletions src/buffers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,42 @@ Additionally, certain sentinel values can be used, e.g. `MPI_IN_PLACE` or `MPI_B
"""
MPIPtr

# MPI_IN_PLACE

struct InPlace
end
Base.cconvert(::Type{MPIPtr}, ::InPlace) = MPI_IN_PLACE


"""
MPI.IN_PLACE
A sentinel value that can be passed as a buffer argument for certain collective operations
to use the same buffer for send and receive operations.
- [`Scatter!`](@ref) and [`Scatterv!`](@ref): can be used as the `recvbuf` argument on the
root process.
- [`Gather!`](@ref) and [`Gatherv!`](@ref): can be used as the `sendbuf` argument on the
root process.
- [`Allgather!`](@ref), [`Allgatherv!`](@ref), [`Alltoall!`](@ref) and
[`Alltoallv!`](@ref): can be used as the `sendbuf` argument on all processes.
- [`Reduce!`](@ref) (root only), [`Allreduce!`](@ref), [`Scan!`](@ref) and
[`Exscan!`](@ref): can be used as `sendbuf` argument.
"""
const IN_PLACE = InPlace()

# TODO: MPI_BOTTOM


"""
MPI.Buffer
An MPI buffer for communication operations.
An MPI buffer for communication with a single rank. It is used for point-to-point
communication and some collective operations.
# Fields
$(DocStringExtensions.FIELDS)
Expand All @@ -83,6 +114,10 @@ and `datatype`. Methods are provided for
- `SubArray`s of an `Array` or `CUDA.CuArray` where the layout is contiguous, sequential or
blocked.
# See also
- [`Buffer_send`](@ref)
"""
struct Buffer{A}
"""a Julia object referencing a region of memory to be used for communication. It is
Expand Down Expand Up @@ -125,6 +160,9 @@ function Buffer(sub::SubArray{T,N,P,I,false}) where {T,N,P,I<:Tuple{Vararg{Union
Buffer(parent(sub), Cint(1), datatype)
end

Buffer(::InPlace) = Buffer(IN_PLACE, 0, DATATYPE_NULL)
Buffer(::Nothing) = Buffer(nothing, 0, DATATYPE_NULL)

"""
Buffer_send(data)
Expand All @@ -133,7 +171,194 @@ Construct a [`Buffer`](@ref) object for a send operation from `data`, allowing c
"""
Buffer_send(data) = isbits(data) ? Buffer(Ref(data)) : Buffer(data)
Buffer_send(str::String) = Buffer(str, sizeof(str), MPI.CHAR)
Buffer_send(::InPlace) = Buffer(InPlace())
Buffer_send(::Nothing) = Buffer(nothing)






"""
MPI.UBuffer
An MPI buffer for chunked collective communication, where all chunks are of uniform size.
# Fields
$(DocStringExtensions.FIELDS)
# Usage
UBuffer(data, count::Integer, nchunks::Union{Nothing, Integer}, datatype::Datatype)
Generic constructor.
UBuffer(data, count::Integer)
Construct a `UBuffer` backed by `data`, where `count` is the number of elements in each chunk.
# See also
- [`VBuffer`](@ref): similar, but supports chunks of non-uniform sizes.
"""
struct UBuffer{A}
"""A Julia object referencing a region of memory to be used for communication. It is
required that the object can be `cconvert`ed to an [`MPIPtr`](@ref)."""
data::A

"""The number of elements of `datatype` in each chunk."""
count::Cint

"""The maximum number of chunks stored in the buffer. This is used only for
validation, and can be set to `nothing` to disable checks."""
nchunks::Union{Nothing,Cint}

"""The [`MPI.Datatype`](@ref) stored in the buffer."""
datatype::Datatype
end
UBuffer(data, count::Integer, nchunks::Union{Integer, Nothing}, datatype::Datatype) =
UBuffer(data, Cint(count), nchunks isa Integer ? Cint(nchunks) : nothing, datatype)

function UBuffer(arr::AbstractArray, count::Integer)
@assert stride(arr, 1) == 1
UBuffer(arr, count, div(length(arr), count), Datatype(eltype(arr)))
end
Base.similar(buf::UBuffer) =
UBuffer(similar(buf.data), buf.count, buf.nchunks, buf.datatype)

UBuffer(::Nothing) = UBuffer(nothing, 0, nothing, DATATYPE_NULL)
UBuffer(::InPlace) = UBuffer(IN_PLACE, 0, nothing, DATATYPE_NULL)



"""
MPI.VBuffer
An MPI buffer for chunked collective communication, where chunks can be of different sizes and at different offsets.
# Fields
$(DocStringExtensions.FIELDS)
# Usage
VBuffer(data, counts[, displs[, datatype]])
Construct a `VBuffer` backed by `data`, where `counts[j]` is the number of elements in the
`j`th chunk, and `displs[j]` is the 0-based displacement. In other words, the `j`th chunk
occurs in indices `displs[j]+1:displs[j]+counts[j]`.
The default value for `displs[j] = sum(counts[1:j-1])`.
# See also
- [`UBuffer`](@ref) when chunks are all of the same size.
"""
struct VBuffer{A}
"""A Julia object referencing a region of memory to be used for communication. It is
required that the object can be `cconvert`ed to an [`MPIPtr`](@ref)."""
data::A

"""An array containing the length of each chunk."""
counts::Vector{Cint}

"""An array containing the (0-based) displacements of each chunk."""
displs::Vector{Cint}

"""The [`MPI.Datatype`](@ref) stored in the buffer."""
datatype::Datatype
end
VBuffer(data, counts, displs, datatype::Datatype) =
VBuffer(data, convert(Vector{Cint}, counts),
convert(Vector{Cint}, displs), datatype)
VBuffer(data, counts, displs) =
VBuffer(data, counts, displs, Datatype(eltype(data)))

function VBuffer(arr::AbstractArray, counts)
@assert stride(arr,1) == 1
counts = convert(Vector{Cint}, counts)
displs = similar(counts)
d = zero(Cint)
for i in eachindex(displs)
displs[i] = d
d += counts[i]
end
@assert length(arr) >= d
VBuffer(arr, counts, displs, Datatype(eltype(arr)))
end

VBuffer(::Nothing) = VBuffer(nothing, Cint[], Cint[], DATATYPE_NULL)
VBuffer(::InPlace) = VBuffer(IN_PLACE, Cint[], Cint[], DATATYPE_NULL)


"""
MPI.RBuffer
An MPI buffer for reduction operations ([`MPI.Reduce!`](@ref), [`MPI.Allreduce!`](@ref), [`MPI.Scan!`](@ref), [`MPI.Exscan!`](@ref)).
# Fields
$(DocStringExtensions.FIELDS)
# Usage
RBuffer(senddata, recvdata[, count, datatype])
Generic constructor.
RBuffer(senddata, recvdata)
Construct a `Buffer` backed by `senddata` and `recvdata`, automatically determining the
appropriate `count` and `datatype`.
- `senddata` can be [`MPI.IN_PLACE`](@ref)
- `recvdata` can be `nothing` on a non-root node with [`MPI.Reduce!`](@ref)
"""
struct RBuffer{S,R}
"""A Julia object referencing a region of memory to be used for the send buffer. It is
required that the object can be `cconvert`ed to an [`MPIPtr`](@ref)."""
senddata::S

"""A Julia object referencing a region of memory to be used for the receive buffer. It is
required that the object can be `cconvert`ed to an [`MPIPtr`](@ref)."""
recvdata::R

"""the number of elements of `datatype` in the buffer. Note that this may not
correspond to the number of elements in the array if derived types are used."""
count::Cint

"""the [`MPI.Datatype`](@ref) stored in the buffer."""
datatype::Datatype
end

RBuffer(senddata, recvdata, count::Integer, datatype::Datatype) =
RBuffer(senddata, recvdata, Cint(count), datatype)

function RBuffer(senddata::AbstractArray{T}, recvdata::AbstractArray{T}) where {T}
@assert (count = length(senddata)) == length(recvdata)
@assert stride(senddata,1) == stride(recvdata,1) == 1
RBuffer(senddata, recvdata, count, Datatype(T))
end
function RBuffer(::InPlace, recvdata::AbstractArray{T}) where {T}
count = length(recvdata)
@assert stride(recvdata,1) == 1
RBuffer(IN_PLACE, recvdata, count, Datatype(T))
end
function RBuffer(senddata::AbstractArray{T}, recvdata::Nothing) where {T}
count = length(senddata)
@assert stride(senddata,1) == 1
RBuffer(senddata, nothing, count, Datatype(T))
end

function RBuffer(senddata::Ref{T}, recvdata::Ref{T}) where {T}
RBuffer(senddata, recvdata, 1, Datatype(T))
end
function RBuffer(senddata::InPlace, recvdata::Ref{T}) where {T}
RBuffer(IN_PLACE, recvdata, 1, Datatype(T))
end
function RBuffer(senddata::Ref{T}, recvdata::Nothing) where {T}
RBuffer(senddata, nothing, 1, Datatype(T))
end


const BUFFER_NULL = Buffer(C_NULL, 0, DATATYPE_NULL)
Base.eltype(rbuf::RBuffer) = eltype(rbuf.senddata)
Base.eltype(rbuf::RBuffer{InPlace}) = eltype(rbuf.recvdata)
Loading

0 comments on commit 3411efc

Please sign in to comment.