From e6108e6e8f48594ba0bc614fb5f2407a7ad2a8d7 Mon Sep 17 00:00:00 2001 From: Petr Krysl Date: Sun, 29 Sep 2024 08:13:54 -0700 Subject: [PATCH 1/4] add Ibcast! --- src/collective.jl | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/collective.jl b/src/collective.jl index 9366fbf26..408eea506 100644 --- a/src/collective.jl +++ b/src/collective.jl @@ -103,6 +103,29 @@ function bcast(obj, root::Integer, comm::Comm) return obj end + +""" + Ibcast!(buf, comm::Comm; root::Integer=0[, req::AbstractRequest = Request()]) + +Broadcast the buffer `buf` from `root` to all processes in `comm`. + +# External links +$(_doc_external("MPI_Ibcast")) +""" +Ibcast!(buf, comm::Comm; root::Integer=Cint(0)) = + Ibcast!(buf, root, comm) + +function Ibcast!(buf::Buffer, root::Integer, comm::Comm, req::AbstractRequest = Request()) + # int MPI_Ibcast(void *buffer, int count, MPI_Datatype datatype, int root, + # MPI_Comm comm, MPI_Request *request) + API.MPI_Ibcast(buf.data, buf.count, buf.datatype, root, comm, req) + return req +end +function Ibcast!(data, root::Integer, comm::Comm) + Ibcast!(Buffer(data), root, comm) +end + + """ Scatter!(sendbuf::Union{UBuffer,Nothing}, recvbuf, comm::Comm; root::Integer=0) From 338c40da4f4c74ca39426ab86dd37c9071844176 Mon Sep 17 00:00:00 2001 From: Petr Krysl Date: Sun, 17 Nov 2024 10:19:57 -0800 Subject: [PATCH 2/4] add ibcast! tests --- test/test_ibcast.jl | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 test/test_ibcast.jl diff --git a/test/test_ibcast.jl b/test/test_ibcast.jl new file mode 100644 index 000000000..f5bb378d4 --- /dev/null +++ b/test/test_ibcast.jl @@ -0,0 +1,35 @@ +include("common.jl") +using Random + +MPI.Init() + +comm = MPI.COMM_WORLD +root = 0 +matsize = (17,17) + +for T in MPITestTypes + # This test depends on the stability of the rng and we have observed with + # CUDA.jl that it is not guaranteed that the same number of rand calls will + # occur on each rank. (This is a hypothesis). To be sure we shall seed the rng + # just before we call rand. + Random.seed!(17) + A = ArrayType(rand(T, matsize)) + B = MPI.Comm_rank(comm) == root ? A : similar(A) + req = MPI.Ibcast!(B, comm; root=root) + sleep(rand()) + MPI.Wait(req) + @test B == A +end + +# Char +A = ['s', 't', 'a', 'r', ' ', 'w', 'a', 'r', 's'] +B = MPI.Comm_rank(comm) == root ? A : similar(A) +req = MPI.Ibcast!(B, comm; root=root) +sleep(rand()) +MPI.Wait(req) +@test B == A + + + +MPI.Finalize() +@test MPI.Finalized() From d7967bf5f97d431e0d76b66dc3f547482bc753de Mon Sep 17 00:00:00 2001 From: Petr Krysl Date: Sun, 17 Nov 2024 10:20:26 -0800 Subject: [PATCH 3/4] add Iallreduce! --- src/collective.jl | 40 ++++++++++++++++++++++++++++++++ test/test_iallreduce.jl | 51 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 test/test_iallreduce.jl diff --git a/src/collective.jl b/src/collective.jl index 408eea506..5d1b89c26 100644 --- a/src/collective.jl +++ b/src/collective.jl @@ -109,6 +109,9 @@ end Broadcast the buffer `buf` from `root` to all processes in `comm`. +The operation is non-blocking, and the request object `req` can be used to wait +for the operation to complete. + # External links $(_doc_external("MPI_Ibcast")) """ @@ -798,6 +801,43 @@ Allreduce(sendbuf::AbstractArray, op, comm::Comm) = Allreduce(obj::T, op, comm::Comm) where {T} = Allreduce!(Ref(obj), Ref{T}(), op, comm)[] +## Iallreduce + +# mutating +""" + Iallreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) + Iallreduce!(sendrecvbuf, op, comm::Comm, req::AbstractRequest=Request()) + +Performs elementwise reduction using the operator `op` on the buffer `sendbuf`, +storing the result in the `recvbuf` of all processes in the group. + +If only one `sendrecvbuf` buffer is provided, then the operation is performed +in-place. + +The operation is non-blocking, and the request object `req` can be used to wait +for the operation to complete. + +# See also +- [`Op`](@ref) for details on reduction operators. + +# External links +$(_doc_external("MPI_Iallreduce")) +""" +function Iallreduce!(rbuf::RBuffer, op::Union{Op,MPI_Op}, comm::Comm, req::AbstractRequest=Request()) + # int MPI_Iallreduce(const void *sendbuf, void *recvbuf, int count, + # MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, + # MPI_Request *request) + API.MPI_Iallreduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, comm, req) + return req +end +Iallreduce!(rbuf::RBuffer, op, comm::Comm, req::AbstractRequest=Request()) = + Iallreduce!(rbuf, Op(op, eltype(rbuf)), comm, req) +Iallreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) = + Iallreduce!(RBuffer(sendbuf, recvbuf), op, comm, req) + +# inplace +Iallreduce!(buf, op, comm::Comm, req::AbstractRequest=Request()) = Iallreduce!(IN_PLACE, buf, op, comm, req) + ## Scan # mutating diff --git a/test/test_iallreduce.jl b/test/test_iallreduce.jl new file mode 100644 index 000000000..25b897242 --- /dev/null +++ b/test/test_iallreduce.jl @@ -0,0 +1,51 @@ +include("common.jl") + +MPI.Init() + +comm_size = MPI.Comm_size(MPI.COMM_WORLD) + +if ArrayType != Array || + MPI.MPI_LIBRARY == "MicrosoftMPI" && Sys.WORD_SIZE == 32 || + Sys.ARCH === :powerpc64le || Sys.ARCH === :ppc64le || + Sys.ARCH === :aarch64 || startswith(string(Sys.ARCH), "arm") + operators = [MPI.SUM, +] +else + operators = [MPI.SUM, +, (x,y) -> 2x+y-x] +end + +for T = [Int] + for dims = [1, 2, 3] + send_arr = ArrayType(zeros(T, Tuple(3 for i in 1:dims))) + send_arr[:] .= 1:length(send_arr) + synchronize() + + for op in operators + + # Non allocating version + recv_arr = ArrayType{T}(undef, size(send_arr)) + req = MPI.Iallreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD) + sleep(rand()) + MPI.Wait(req) + @test recv_arr == comm_size .* send_arr + + # Assertions when output buffer too small + recv_arr = ArrayType{T}(undef, size(send_arr).-1) + @test_throws AssertionError MPI.Iallreduce!(send_arr, recv_arr, + op, MPI.COMM_WORLD) + # IN_PLACE + recv_arr = copy(send_arr) + synchronize() + req = MPI.Iallreduce!(recv_arr, op, MPI.COMM_WORLD) + sleep(rand()) + MPI.Wait(req) + @test recv_arr == comm_size .* send_arr + end + end +end + + +MPI.Barrier( MPI.COMM_WORLD ) + +GC.gc() +MPI.Finalize() +@test MPI.Finalized() From acd35bc77a06589c3f397fb53d0cf96a7fd60913 Mon Sep 17 00:00:00 2001 From: Petr Krysl Date: Sun, 17 Nov 2024 13:30:39 -0800 Subject: [PATCH 4/4] add Ibcast! and Iallreduce! to the docs --- docs/src/reference/collective.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/src/reference/collective.md b/docs/src/reference/collective.md index c89e6f880..96c4b6cb1 100644 --- a/docs/src/reference/collective.md +++ b/docs/src/reference/collective.md @@ -13,6 +13,7 @@ MPI.Ibarrier MPI.Bcast! MPI.Bcast MPI.bcast +MPI.Ibcast! ``` ## Gather/Scatter @@ -57,6 +58,7 @@ MPI.Reduce! MPI.Reduce MPI.Allreduce! MPI.Allreduce +MPI.Iallreduce! MPI.Scan! MPI.Scan MPI.Exscan!