diff --git a/src/components/tl/ucp/allgather/allgather_knomial.c b/src/components/tl/ucp/allgather/allgather_knomial.c index 1fbcf773cc..1959fc6a3e 100644 --- a/src/components/tl/ucp/allgather/allgather_knomial.c +++ b/src/components/tl/ucp/allgather/allgather_knomial.c @@ -77,9 +77,15 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task) size_t peer_seg_count, local_seg_count; ucc_status_t status; size_t extra_count; + int use_loopback = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_loopback; - EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test", - task->allgather_kn.etask); + if (use_loopback) { + if (UCC_INPROGRESS == ucc_tl_ucp_test(task)){ + return; + } + } + + if (!use_loopback) EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test", task->allgather_kn.etask); task->allgather_kn.etask = NULL; UCC_KN_GOTO_PHASE(task->allgather_kn.phase); if (KN_NODE_EXTRA == node_type) { @@ -213,6 +219,7 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task) ptrdiff_t offset; ucc_ee_executor_t *exec; void *rbuf; + int use_loopback = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_loopback; UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_kn_start", 0); ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); @@ -225,21 +232,29 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task) ucc_dt_size(args->dst.info.datatype); rbuf = args->dst.info.buffer; if (!UCC_IS_INPLACE(*args)) { - status = ucc_coll_task_get_executor(&task->super, &exec); - if (ucc_unlikely(status != UCC_OK)) { - task->super.status = status; - return status; - } - eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY; - eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset); - eargs.copy.src = args->src.info.buffer; - eargs.copy.len = args->src.info.count * - ucc_dt_size(args->src.info.datatype); - status = ucc_ee_executor_task_post(exec, &eargs, - &task->allgather_kn.etask); - if (ucc_unlikely(status != UCC_OK)) { - task->super.status = status; - return status; + if (!use_loopback) { + status = ucc_coll_task_get_executor(&task->super, &exec); + if (ucc_unlikely(status != UCC_OK)) { + task->super.status = status; + return status; + } + eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY; + eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset); + eargs.copy.src = args->src.info.buffer; + eargs.copy.len = args->src.info.count * + ucc_dt_size(args->src.info.datatype); + status = ucc_ee_executor_task_post(exec, &eargs, + &task->allgather_kn.etask); + if (ucc_unlikely(status != UCC_OK)) { + task->super.status = status; + return status; + } + } else { + /*Loopback*/ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(args->src.info.buffer, args->src.info.count * ucc_dt_size(args->src.info.datatype), + args->src.info.mem_type, rank, team, task),task, out); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(args->dst.info.buffer, offset), args->src.info.count * ucc_dt_size(args->src.info.datatype), + args->dst.info.mem_type, rank, team, task),task, out); } } } else if (ct == UCC_COLL_TYPE_ALLGATHERV) { @@ -284,6 +299,8 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task) task->allgather_kn.sbuf = PTR_OFFSET(rbuf, offset); return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); +out: + return task->super.status; } ucc_status_t ucc_tl_ucp_allgather_knomial_init_r( diff --git a/src/components/tl/ucp/allgather/allgather_neighbor.c b/src/components/tl/ucp/allgather/allgather_neighbor.c index 534c197e4e..347ae1a96e 100644 --- a/src/components/tl/ucp/allgather/allgather_neighbor.c +++ b/src/components/tl/ucp/allgather/allgather_neighbor.c @@ -144,16 +144,23 @@ ucc_status_t ucc_tl_ucp_allgather_neighbor_start(ucc_coll_task_t *coll_task) ucc_status_t status; ucc_rank_t neighbor; void *tmprecv, *tmpsend; - + int use_loopback = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_loopback; + UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_neighbor_start", 0); ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); if (!UCC_IS_INPLACE(TASK_ARGS(task))) { - status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf, + if (!use_loopback) { + status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf, data_size, rmem, smem); - if (ucc_unlikely(UCC_OK != status)) { - return status; + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else { + /* Loopback */ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, err); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, err); } } @@ -175,4 +182,6 @@ ucc_status_t ucc_tl_ucp_allgather_neighbor_start(ucc_coll_task_t *coll_task) task, out); out: return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); +err: + return task->super.status; } diff --git a/src/components/tl/ucp/allgather/allgather_ring.c b/src/components/tl/ucp/allgather/allgather_ring.c index 07178aea25..463687ec64 100644 --- a/src/components/tl/ucp/allgather/allgather_ring.c +++ b/src/components/tl/ucp/allgather/allgather_ring.c @@ -43,6 +43,7 @@ void ucc_tl_ucp_allgather_ring_progress(ucc_coll_task_t *coll_task) ucc_rank_t sendto, recvfrom, sblock, rblock; int step; void *buf; + int use_loopback = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_loopback; if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { return; @@ -50,8 +51,10 @@ void ucc_tl_ucp_allgather_ring_progress(ucc_coll_task_t *coll_task) sendto = ucc_ep_map_eval(task->subset.map, (trank + 1) % tsize); recvfrom = ucc_ep_map_eval(task->subset.map, (trank - 1 + tsize) % tsize); - while (task->tagged.send_posted < tsize - 1) { - step = task->tagged.send_posted; + int iter = use_loopback ? tsize : tsize - 1; //when using loopback tagged.send_posted has 1 more which will cause non-complete ring algorithm + while (task->tagged.send_posted < iter) { + step = use_loopback ? task->tagged.send_posted - 1 : task->tagged.send_posted; //when using loopback tagged.send_posted has 1 more which will cause wrong calculation of send/recv + sblock = task->allgather_ring.get_send_block(&task->subset, trank, tsize, step); rblock = task->allgather_ring.get_recv_block(&task->subset, trank, @@ -89,6 +92,7 @@ ucc_status_t ucc_tl_ucp_allgather_ring_start(ucc_coll_task_t *coll_task) size_t data_size = (count / tsize) * ucc_dt_size(dt); ucc_status_t status; ucc_rank_t block; + int use_loopback = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_loopback; UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_ring_start", 0); ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); @@ -96,14 +100,22 @@ ucc_status_t ucc_tl_ucp_allgather_ring_start(ucc_coll_task_t *coll_task) if (!UCC_IS_INPLACE(TASK_ARGS(task))) { block = task->allgather_ring.get_send_block(&task->subset, trank, tsize, 0); - status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block), + if (!use_loopback) { + status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block), sbuf, data_size, rmem, smem); - if (ucc_unlikely(UCC_OK != status)) { - return status; + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else { + /* Loopback */ + ucc_rank_t rank = ucc_ep_map_eval(task->subset.map, trank); + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, rank, team, task),task, out); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * block), data_size, rmem, rank, team, task),task, out); } } - return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); +out: + return task->super.status; } ucc_status_t ucc_tl_ucp_allgather_ring_init_common(ucc_tl_ucp_task_t *task) diff --git a/src/components/tl/ucp/allgather/allgather_sparbit.c b/src/components/tl/ucp/allgather/allgather_sparbit.c index 0edfc4d4a3..ae60702399 100644 --- a/src/components/tl/ucp/allgather/allgather_sparbit.c +++ b/src/components/tl/ucp/allgather/allgather_sparbit.c @@ -123,7 +123,8 @@ ucc_status_t ucc_tl_ucp_allgather_sparbit_start(ucc_coll_task_t *coll_task) ucc_rank_t tsize = UCC_TL_TEAM_SIZE(team); size_t data_size = (count / tsize) * ucc_dt_size(dt); ucc_status_t status; - + int use_loopback = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_loopback; + UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_sparbit_start", 0); ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); @@ -131,12 +132,20 @@ ucc_status_t ucc_tl_ucp_allgather_sparbit_start(ucc_coll_task_t *coll_task) task->allgather_sparbit.data_expected = 1; if (!UCC_IS_INPLACE(TASK_ARGS(task))) { - status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf, + if (!use_loopback) { + status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf, data_size, rmem, smem); - if (ucc_unlikely(UCC_OK != status)) { - return status; + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else { + /* Loopback */ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, out); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, out); } } return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); +out: + return task->super.status; } diff --git a/src/components/tl/ucp/tl_ucp.c b/src/components/tl/ucp/tl_ucp.c index 7db99bdaf2..044359808c 100644 --- a/src/components/tl/ucp/tl_ucp.c +++ b/src/components/tl/ucp/tl_ucp.c @@ -140,6 +140,10 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = { ucc_offsetof(ucc_tl_ucp_lib_config_t, allgather_kn_radix), UCC_CONFIG_TYPE_UINT}, + {"ALLGATHER_USE_LOOPBACK", "0", "If set to 1 uses mc cuda copy, otherwise performs loopback for self copy", + ucc_offsetof(ucc_tl_ucp_lib_config_t, allgather_use_loopback), + UCC_CONFIG_TYPE_UINT}, + {"BCAST_KN_RADIX", "4", "Radix of the recursive-knomial bcast algorithm", ucc_offsetof(ucc_tl_ucp_lib_config_t, bcast_kn_radix), UCC_CONFIG_TYPE_UINT}, diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index 3c439f4ae5..e228ba7d0f 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -45,6 +45,7 @@ extern ucc_tl_ucp_iface_t ucc_tl_ucp; typedef struct ucc_tl_ucp_lib_config { ucc_tl_lib_config_t super; uint32_t kn_radix; + uint32_t allgather_use_loopback; uint32_t fanin_kn_radix; uint32_t fanout_kn_radix; uint32_t barrier_kn_radix;