Skip to content

Commit

Permalink
TL/UCP: Allow self copy in allgather using network loopback
Browse files Browse the repository at this point in the history
  • Loading branch information
yaeliyac committed Dec 16, 2024
1 parent 73651ea commit 5940100
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 31 deletions.
51 changes: 34 additions & 17 deletions src/components/tl/ucp/allgather/allgather_knomial.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,15 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
size_t peer_seg_count, local_seg_count;
ucc_status_t status;
size_t extra_count;
int use_loopback = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_loopback;

EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test",
task->allgather_kn.etask);
if (use_loopback) {
if (UCC_INPROGRESS == ucc_tl_ucp_test(task)){
return;
}
}

if (!use_loopback) EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test", task->allgather_kn.etask);
task->allgather_kn.etask = NULL;
UCC_KN_GOTO_PHASE(task->allgather_kn.phase);
if (KN_NODE_EXTRA == node_type) {
Expand Down Expand Up @@ -213,6 +219,7 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
ptrdiff_t offset;
ucc_ee_executor_t *exec;
void *rbuf;
int use_loopback = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_loopback;

UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_kn_start", 0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
Expand All @@ -225,21 +232,29 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
ucc_dt_size(args->dst.info.datatype);
rbuf = args->dst.info.buffer;
if (!UCC_IS_INPLACE(*args)) {
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY;
eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset);
eargs.copy.src = args->src.info.buffer;
eargs.copy.len = args->src.info.count *
ucc_dt_size(args->src.info.datatype);
status = ucc_ee_executor_task_post(exec, &eargs,
&task->allgather_kn.etask);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
if (!use_loopback) {
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY;
eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset);
eargs.copy.src = args->src.info.buffer;
eargs.copy.len = args->src.info.count *
ucc_dt_size(args->src.info.datatype);
status = ucc_ee_executor_task_post(exec, &eargs,
&task->allgather_kn.etask);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
} else {
/*Loopback*/
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(args->src.info.buffer, args->src.info.count * ucc_dt_size(args->src.info.datatype),
args->src.info.mem_type, rank, team, task),task, out);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(args->dst.info.buffer, offset), args->src.info.count * ucc_dt_size(args->src.info.datatype),
args->dst.info.mem_type, rank, team, task),task, out);
}
}
} else if (ct == UCC_COLL_TYPE_ALLGATHERV) {
Expand Down Expand Up @@ -284,6 +299,8 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
task->allgather_kn.sbuf = PTR_OFFSET(rbuf, offset);

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
out:
return task->super.status;
}

ucc_status_t ucc_tl_ucp_allgather_knomial_init_r(
Expand Down
17 changes: 13 additions & 4 deletions src/components/tl/ucp/allgather/allgather_neighbor.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,23 @@ ucc_status_t ucc_tl_ucp_allgather_neighbor_start(ucc_coll_task_t *coll_task)
ucc_status_t status;
ucc_rank_t neighbor;
void *tmprecv, *tmpsend;

int use_loopback = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_loopback;

UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_neighbor_start",
0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);

if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
if (!use_loopback) {
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, err);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, err);
}
}

Expand All @@ -175,4 +182,6 @@ ucc_status_t ucc_tl_ucp_allgather_neighbor_start(ucc_coll_task_t *coll_task)
task, out);
out:
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
err:
return task->super.status;
}
24 changes: 18 additions & 6 deletions src/components/tl/ucp/allgather/allgather_ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@ void ucc_tl_ucp_allgather_ring_progress(ucc_coll_task_t *coll_task)
ucc_rank_t sendto, recvfrom, sblock, rblock;
int step;
void *buf;
int use_loopback = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_loopback;

if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) {
return;
}
sendto = ucc_ep_map_eval(task->subset.map, (trank + 1) % tsize);
recvfrom = ucc_ep_map_eval(task->subset.map, (trank - 1 + tsize) % tsize);

while (task->tagged.send_posted < tsize - 1) {
step = task->tagged.send_posted;
int iter = use_loopback ? tsize : tsize - 1; //when using loopback tagged.send_posted has 1 more which will cause non-complete ring algorithm
while (task->tagged.send_posted < iter) {
step = use_loopback ? task->tagged.send_posted - 1 : task->tagged.send_posted; //when using loopback tagged.send_posted has 1 more which will cause wrong calculation of send/recv

sblock = task->allgather_ring.get_send_block(&task->subset, trank,
tsize, step);
rblock = task->allgather_ring.get_recv_block(&task->subset, trank,
Expand Down Expand Up @@ -89,21 +92,30 @@ ucc_status_t ucc_tl_ucp_allgather_ring_start(ucc_coll_task_t *coll_task)
size_t data_size = (count / tsize) * ucc_dt_size(dt);
ucc_status_t status;
ucc_rank_t block;
int use_loopback = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_loopback;

UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_ring_start", 0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);

if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
block = task->allgather_ring.get_send_block(&task->subset, trank, tsize,
0);
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block),
if (!use_loopback) {
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block),
sbuf, data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
ucc_rank_t rank = ucc_ep_map_eval(task->subset.map, trank);
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, rank, team, task),task, out);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * block), data_size, rmem, rank, team, task),task, out);
}
}

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
out:
return task->super.status;
}

ucc_status_t ucc_tl_ucp_allgather_ring_init_common(ucc_tl_ucp_task_t *task)
Expand Down
17 changes: 13 additions & 4 deletions src/components/tl/ucp/allgather/allgather_sparbit.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,29 @@ ucc_status_t ucc_tl_ucp_allgather_sparbit_start(ucc_coll_task_t *coll_task)
ucc_rank_t tsize = UCC_TL_TEAM_SIZE(team);
size_t data_size = (count / tsize) * ucc_dt_size(dt);
ucc_status_t status;

int use_loopback = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_loopback;

UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_sparbit_start",
0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
task->allgather_sparbit.i = 0; // setup iteration
task->allgather_sparbit.data_expected = 1;

if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
if (!use_loopback) {
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, out);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, out);
}
}

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
out:
return task->super.status;
}
4 changes: 4 additions & 0 deletions src/components/tl/ucp/tl_ucp.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = {
ucc_offsetof(ucc_tl_ucp_lib_config_t, allgather_kn_radix),
UCC_CONFIG_TYPE_UINT},

{"ALLGATHER_USE_LOOPBACK", "0", "If set to 1 uses mc cuda copy, otherwise performs loopback for self copy",
ucc_offsetof(ucc_tl_ucp_lib_config_t, allgather_use_loopback),
UCC_CONFIG_TYPE_UINT},

{"BCAST_KN_RADIX", "4", "Radix of the recursive-knomial bcast algorithm",
ucc_offsetof(ucc_tl_ucp_lib_config_t, bcast_kn_radix),
UCC_CONFIG_TYPE_UINT},
Expand Down
1 change: 1 addition & 0 deletions src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ extern ucc_tl_ucp_iface_t ucc_tl_ucp;
typedef struct ucc_tl_ucp_lib_config {
ucc_tl_lib_config_t super;
uint32_t kn_radix;
uint32_t allgather_use_loopback;
uint32_t fanin_kn_radix;
uint32_t fanout_kn_radix;
uint32_t barrier_kn_radix;
Expand Down

0 comments on commit 5940100

Please sign in to comment.