Skip to content

Commit

Permalink
Cut peak memory footprint in graph creation (#3966)
Browse files Browse the repository at this point in the history
This limits memory footprint (especially in single-GPU or multi-GPU with a small number of GPUs) to the size of edge list * 1.5 + alpha (alpha to store O(V) data, V: # vertices).

Authors:
  - Seunghwa Kang (https://github.com/seunghwak)

Approvers:
  - Naim (https://github.com/naimnv)
  - Chuck Hastings (https://github.com/ChuckHastings)

URL: #3966
  • Loading branch information
seunghwak authored Nov 1, 2023
1 parent eb1e515 commit 0a90563
Show file tree
Hide file tree
Showing 8 changed files with 415 additions and 458 deletions.
2 changes: 1 addition & 1 deletion cpp/src/c_api/capi_helper.cu
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ shuffle_vertex_ids_and_offsets(raft::handle_t const& handle,
thrust::make_zip_iterator(ids.end(), vertices.end()));

auto return_offsets = cugraph::detail::compute_sparse_offsets<size_t>(
ids.begin(), ids.end(), size_t{0}, size_t{offsets.size() - 1}, handle.get_stream());
ids.begin(), ids.end(), size_t{0}, size_t{offsets.size() - 1}, true, handle.get_stream());

return std::make_tuple(std::move(vertices), std::move(return_offsets));
}
Expand Down
425 changes: 180 additions & 245 deletions cpp/src/structure/create_graph_from_edgelist_impl.cuh

Large diffs are not rendered by default.

281 changes: 146 additions & 135 deletions cpp/src/structure/detail/structure_utils.cuh

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions cpp/src/structure/induced_subgraph_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ extract_induced_subgraphs(
graph_ids_v.end(),
size_t{0},
size_t{subgraph_offsets.size() - 1},
true,
handle.get_stream());

dst_subgraph_offsets =
Expand Down Expand Up @@ -290,6 +291,7 @@ extract_induced_subgraphs(
subgraph_edge_graph_ids.end(),
size_t{0},
size_t{subgraph_offsets.size() - 1},
true,
handle.get_stream());

#ifdef TIMING
Expand Down
100 changes: 54 additions & 46 deletions cpp/src/structure/renumber_edgelist_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -367,18 +367,19 @@ std::tuple<rmm::device_uvector<vertex_t>, std::vector<vertex_t>, vertex_t> compu
rmm::device_uvector<edge_t> sorted_local_vertex_degrees(0, handle.get_stream());
std::optional<std::vector<size_t>> stream_pool_indices{
std::nullopt}; // FIXME: move this inside the if statement

auto constexpr num_chunks = size_t{
2}; // tuning parameter, this trade-offs # binary searches (up to num_chunks times more binary
// searches can be necessary if num_unique_majors << edgelist_edge_counts[i]) and temporary
// buffer requirement (cut by num_chunks times), currently set to 2 to avoid peak memory
// usage happening in this part (especially when minor_comm_size is small)

if constexpr (multi_gpu) {
auto& comm = handle.get_comms();
auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name());
auto const minor_comm_rank = minor_comm.get_rank();
auto const minor_comm_size = minor_comm.get_size();

auto constexpr num_chunks = size_t{
2}; // tuning parameter, this trade-offs # binary searches (up to num_chunks times more
// binary searches can be necessary if num_unique_majors << edgelist_edge_counts[i]) and
// temporary buffer requirement (cut by num_chunks times), currently set to 2 to avoid
// peak memory usage happening in this part (especially when minor_comm_size is small)

assert(edgelist_majors.size() == minor_comm_size);

auto edge_partition_major_range_sizes =
Expand Down Expand Up @@ -433,29 +434,30 @@ std::tuple<rmm::device_uvector<vertex_t>, std::vector<vertex_t>, vertex_t> compu
sorted_major_degrees.end(),
edge_t{0});

rmm::device_uvector<vertex_t> tmp_majors(
rmm::device_uvector<vertex_t> tmp_majors(0, loop_stream);
tmp_majors.reserve(
(static_cast<size_t>(edgelist_edge_counts[i]) + (num_chunks - 1)) / num_chunks,
handle.get_stream());
loop_stream);
size_t offset{0};
for (size_t j = 0; j < num_chunks; ++j) {
size_t this_chunk_size =
std::min(tmp_majors.size(), static_cast<size_t>(edgelist_edge_counts[i]) - offset);
std::min(tmp_majors.capacity(), static_cast<size_t>(edgelist_edge_counts[i]) - offset);
tmp_majors.resize(this_chunk_size, loop_stream);
thrust::copy(rmm::exec_policy(loop_stream),
edgelist_majors[i] + offset,
edgelist_majors[i] + offset + this_chunk_size,
edgelist_majors[i] + offset + tmp_majors.size(),
tmp_majors.begin());
thrust::sort(
rmm::exec_policy(loop_stream), tmp_majors.begin(), tmp_majors.begin() + this_chunk_size);
thrust::sort(rmm::exec_policy(loop_stream), tmp_majors.begin(), tmp_majors.end());
auto num_unique_majors =
thrust::count_if(rmm::exec_policy(loop_stream),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(this_chunk_size),
thrust::make_counting_iterator(tmp_majors.size()),
is_first_in_run_t<vertex_t const*>{tmp_majors.data()});
rmm::device_uvector<vertex_t> tmp_keys(num_unique_majors, loop_stream);
rmm::device_uvector<edge_t> tmp_values(num_unique_majors, loop_stream);
thrust::reduce_by_key(rmm::exec_policy(loop_stream),
tmp_majors.begin(),
tmp_majors.begin() + this_chunk_size,
tmp_majors.end(),
thrust::make_constant_iterator(edge_t{1}),
tmp_keys.begin(),
tmp_values.begin());
Expand Down Expand Up @@ -486,44 +488,50 @@ std::tuple<rmm::device_uvector<vertex_t>, std::vector<vertex_t>, vertex_t> compu
} else {
assert(edgelist_majors.size() == 1);

rmm::device_uvector<vertex_t> tmp_majors(edgelist_edge_counts[0], handle.get_stream());
thrust::copy(handle.get_thrust_policy(),
edgelist_majors[0],
edgelist_majors[0] + edgelist_edge_counts[0],
tmp_majors.begin());
thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end());
auto num_unique_majors =
thrust::count_if(handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(tmp_majors.size()),
is_first_in_run_t<vertex_t const*>{tmp_majors.data()});
rmm::device_uvector<vertex_t> tmp_keys(num_unique_majors, handle.get_stream());
rmm::device_uvector<edge_t> tmp_values(num_unique_majors, handle.get_stream());
thrust::reduce_by_key(handle.get_thrust_policy(),
tmp_majors.begin(),
tmp_majors.end(),
thrust::make_constant_iterator(edge_t{1}),
tmp_keys.begin(),
tmp_values.begin());

tmp_majors.resize(0, handle.get_stream());
tmp_majors.shrink_to_fit(handle.get_stream());

sorted_local_vertex_degrees.resize(sorted_local_vertices.size(), handle.get_stream());
thrust::fill(handle.get_thrust_policy(),
sorted_local_vertex_degrees.begin(),
sorted_local_vertex_degrees.end(),
edge_t{0});

auto kv_pair_first =
thrust::make_zip_iterator(thrust::make_tuple(tmp_keys.begin(), tmp_values.begin()));
thrust::for_each(handle.get_thrust_policy(),
kv_pair_first,
kv_pair_first + tmp_keys.size(),
search_and_increment_degree_t<vertex_t, edge_t>{
sorted_local_vertices.data(),
static_cast<vertex_t>(sorted_local_vertices.size()),
sorted_local_vertex_degrees.data()});
rmm::device_uvector<vertex_t> tmp_majors(0, handle.get_stream());
tmp_majors.reserve(static_cast<size_t>(edgelist_edge_counts[0] + (num_chunks - 1)) / num_chunks,
handle.get_stream());
size_t offset{0};
for (size_t i = 0; i < num_chunks; ++i) {
size_t this_chunk_size =
std::min(tmp_majors.capacity(), static_cast<size_t>(edgelist_edge_counts[0]) - offset);
tmp_majors.resize(this_chunk_size, handle.get_stream());
thrust::copy(handle.get_thrust_policy(),
edgelist_majors[0] + offset,
edgelist_majors[0] + offset + tmp_majors.size(),
tmp_majors.begin());
thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end());
auto num_unique_majors =
thrust::count_if(handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(tmp_majors.size()),
is_first_in_run_t<vertex_t const*>{tmp_majors.data()});
rmm::device_uvector<vertex_t> tmp_keys(num_unique_majors, handle.get_stream());
rmm::device_uvector<edge_t> tmp_values(num_unique_majors, handle.get_stream());
thrust::reduce_by_key(handle.get_thrust_policy(),
tmp_majors.begin(),
tmp_majors.end(),
thrust::make_constant_iterator(edge_t{1}),
tmp_keys.begin(),
tmp_values.begin());

auto kv_pair_first =
thrust::make_zip_iterator(thrust::make_tuple(tmp_keys.begin(), tmp_values.begin()));
thrust::for_each(handle.get_thrust_policy(),
kv_pair_first,
kv_pair_first + tmp_keys.size(),
search_and_increment_degree_t<vertex_t, edge_t>{
sorted_local_vertices.data(),
static_cast<vertex_t>(sorted_local_vertices.size()),
sorted_local_vertex_degrees.data()});
offset += this_chunk_size;
}
}

// 4. sort local vertices by degree (descending)
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/community/mg_egonet_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ class Tests_MGEgonet
graph_ids_v.end(),
size_t{0},
d_mg_edgelist_offsets.size() - 1,
true,
handle_->get_stream());

auto [d_reference_src, d_reference_dst, d_reference_wgt, d_reference_offsets] =
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/structure/mg_induced_subgraph_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class Tests_MGInducedSubgraph
graph_ids_v.end(),
size_t{0},
size_t{d_subgraph_offsets.size() - 1},
true,
handle_->get_stream());

auto [sg_graph, sg_edge_weights, sg_number_map] = cugraph::test::mg_graph_to_sg_graph(
Expand Down
61 changes: 30 additions & 31 deletions cpp/tests/utilities/test_utilities_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -183,43 +183,42 @@ graph_to_host_csr(
}
}

auto total_global_mem = handle.get_device_properties().totalGlobalMem;
size_t element_size = sizeof(vertex_t) * 2;
if (d_wgt) { element_size += sizeof(weight_t); }
auto constexpr mem_frugal_ratio =
0.25; // if the expected temporary buffer size exceeds the mem_frugal_ratio of the
// total_global_mem, switch to the memory frugal approach
auto mem_frugal_threshold =
static_cast<size_t>(static_cast<double>(total_global_mem / element_size) * mem_frugal_ratio);

rmm::device_uvector<edge_t> d_offsets(0, handle.get_stream());

if (d_wgt) {
std::tie(d_offsets, d_dst, *d_wgt, std::ignore) =
detail::compress_edgelist<edge_t, store_transposed>(d_src.begin(),
d_src.end(),
d_dst.begin(),
d_wgt->begin(),
vertex_t{0},
std::optional<vertex_t>{std::nullopt},
graph_view.number_of_vertices(),
vertex_t{0},
graph_view.number_of_vertices(),
handle.get_stream());

// segmented sort neighbors
detail::sort_adjacency_list(handle,
raft::device_span<edge_t const>(d_offsets.data(), d_offsets.size()),
d_dst.begin(),
d_dst.end(),
d_wgt->begin());
detail::sort_and_compress_edgelist<vertex_t, edge_t, weight_t, store_transposed>(
std::move(d_src),
std::move(d_dst),
std::move(*d_wgt),
vertex_t{0},
std::optional<vertex_t>{std::nullopt},
graph_view.number_of_vertices(),
vertex_t{0},
graph_view.number_of_vertices(),
mem_frugal_threshold,
handle.get_stream());
} else {
std::tie(d_offsets, d_dst, std::ignore) =
detail::compress_edgelist<edge_t, store_transposed>(d_src.begin(),
d_src.end(),
d_dst.begin(),
vertex_t{0},
std::optional<vertex_t>{std::nullopt},
graph_view.number_of_vertices(),
vertex_t{0},
graph_view.number_of_vertices(),
handle.get_stream());
// segmented sort neighbors
detail::sort_adjacency_list(handle,
raft::device_span<edge_t const>(d_offsets.data(), d_offsets.size()),
d_dst.begin(),
d_dst.end());
detail::sort_and_compress_edgelist<vertex_t, edge_t, store_transposed>(
std::move(d_src),
std::move(d_dst),
vertex_t{0},
std::optional<vertex_t>{std::nullopt},
graph_view.number_of_vertices(),
vertex_t{0},
graph_view.number_of_vertices(),
mem_frugal_threshold,
handle.get_stream());
}

return std::make_tuple(
Expand Down

0 comments on commit 0a90563

Please sign in to comment.