Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Ibcast! #882

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/reference/collective.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ MPI.Ibarrier
MPI.Bcast!
MPI.Bcast
MPI.bcast
MPI.Ibcast!
```

## Gather/Scatter
Expand Down Expand Up @@ -57,6 +58,7 @@ MPI.Reduce!
MPI.Reduce
MPI.Allreduce!
MPI.Allreduce
MPI.Iallreduce!
MPI.Scan!
MPI.Scan
MPI.Exscan!
Expand Down
63 changes: 63 additions & 0 deletions src/collective.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,32 @@ function bcast(obj, root::Integer, comm::Comm)
return obj
end


"""
Ibcast!(buf, comm::Comm; root::Integer=0[, req::AbstractRequest = Request()])

Broadcast the buffer `buf` from `root` to all processes in `comm`.

The operation is non-blocking, and the request object `req` can be used to wait
for the operation to complete.

# External links
$(_doc_external("MPI_Ibcast"))
"""
Ibcast!(buf, comm::Comm; root::Integer=Cint(0)) =
Ibcast!(buf, root, comm)

function Ibcast!(buf::Buffer, root::Integer, comm::Comm, req::AbstractRequest = Request())
# int MPI_Ibcast(void *buffer, int count, MPI_Datatype datatype, int root,
# MPI_Comm comm, MPI_Request *request)
API.MPI_Ibcast(buf.data, buf.count, buf.datatype, root, comm, req)
return req
end
function Ibcast!(data, root::Integer, comm::Comm)
Ibcast!(Buffer(data), root, comm)
end


"""
Scatter!(sendbuf::Union{UBuffer,Nothing}, recvbuf, comm::Comm;
root::Integer=0)
Expand Down Expand Up @@ -775,6 +801,43 @@ Allreduce(sendbuf::AbstractArray, op, comm::Comm) =
Allreduce(obj::T, op, comm::Comm) where {T} =
Allreduce!(Ref(obj), Ref{T}(), op, comm)[]

## Iallreduce

# mutating
"""
Iallreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request())
Iallreduce!(sendrecvbuf, op, comm::Comm, req::AbstractRequest=Request())

Performs elementwise reduction using the operator `op` on the buffer `sendbuf`,
storing the result in the `recvbuf` of all processes in the group.

If only one `sendrecvbuf` buffer is provided, then the operation is performed
in-place.

The operation is non-blocking, and the request object `req` can be used to wait
for the operation to complete.

# See also
- [`Op`](@ref) for details on reduction operators.

# External links
$(_doc_external("MPI_Iallreduce"))
"""
function Iallreduce!(rbuf::RBuffer, op::Union{Op,MPI_Op}, comm::Comm, req::AbstractRequest=Request())
# int MPI_Iallreduce(const void *sendbuf, void *recvbuf, int count,
# MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
# MPI_Request *request)
API.MPI_Iallreduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, comm, req)
return req
end
Iallreduce!(rbuf::RBuffer, op, comm::Comm, req::AbstractRequest=Request()) =
Iallreduce!(rbuf, Op(op, eltype(rbuf)), comm, req)
Iallreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) =
Iallreduce!(RBuffer(sendbuf, recvbuf), op, comm, req)

# inplace
Iallreduce!(buf, op, comm::Comm, req::AbstractRequest=Request()) = Iallreduce!(IN_PLACE, buf, op, comm, req)

## Scan

# mutating
Expand Down
51 changes: 51 additions & 0 deletions test/test_iallreduce.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
include("common.jl")

MPI.Init()

comm_size = MPI.Comm_size(MPI.COMM_WORLD)

if ArrayType != Array ||
MPI.MPI_LIBRARY == "MicrosoftMPI" && Sys.WORD_SIZE == 32 ||
Sys.ARCH === :powerpc64le || Sys.ARCH === :ppc64le ||
Sys.ARCH === :aarch64 || startswith(string(Sys.ARCH), "arm")
operators = [MPI.SUM, +]
else
operators = [MPI.SUM, +, (x,y) -> 2x+y-x]
end

for T = [Int]
for dims = [1, 2, 3]
send_arr = ArrayType(zeros(T, Tuple(3 for i in 1:dims)))
send_arr[:] .= 1:length(send_arr)
synchronize()

for op in operators

# Non allocating version
recv_arr = ArrayType{T}(undef, size(send_arr))
req = MPI.Iallreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD)
sleep(rand())
MPI.Wait(req)
@test recv_arr == comm_size .* send_arr

# Assertions when output buffer too small
recv_arr = ArrayType{T}(undef, size(send_arr).-1)
@test_throws AssertionError MPI.Iallreduce!(send_arr, recv_arr,
op, MPI.COMM_WORLD)
# IN_PLACE
recv_arr = copy(send_arr)
synchronize()
req = MPI.Iallreduce!(recv_arr, op, MPI.COMM_WORLD)
sleep(rand())
MPI.Wait(req)
@test recv_arr == comm_size .* send_arr
end
end
end


MPI.Barrier( MPI.COMM_WORLD )

GC.gc()
MPI.Finalize()
@test MPI.Finalized()
35 changes: 35 additions & 0 deletions test/test_ibcast.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
include("common.jl")
using Random

MPI.Init()

comm = MPI.COMM_WORLD
root = 0
matsize = (17,17)

for T in MPITestTypes
# This test depends on the stability of the rng and we have observed with
# CUDA.jl that it is not guaranteed that the same number of rand calls will
# occur on each rank. (This is a hypothesis). To be sure we shall seed the rng
# just before we call rand.
Random.seed!(17)
A = ArrayType(rand(T, matsize))
B = MPI.Comm_rank(comm) == root ? A : similar(A)
req = MPI.Ibcast!(B, comm; root=root)
sleep(rand())
MPI.Wait(req)
@test B == A
end

# Char
A = ['s', 't', 'a', 'r', ' ', 'w', 'a', 'r', 's']
B = MPI.Comm_rank(comm) == root ? A : similar(A)
req = MPI.Ibcast!(B, comm; root=root)
sleep(rand())
MPI.Wait(req)
@test B == A



MPI.Finalize()
@test MPI.Finalized()
Loading