Skip to content

Commit

Permalink
cut peak memory usage in edge list compression (and switch from the p…
Browse files Browse the repository at this point in the history
…revious segmented sort based neighbor list sorting to global sorting as I don't see significant performance difference anymore)
  • Loading branch information
seunghwak committed Oct 31, 2023
1 parent 46e8256 commit 2235e5e
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 412 deletions.
2 changes: 1 addition & 1 deletion cpp/src/c_api/capi_helper.cu
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ shuffle_vertex_ids_and_offsets(raft::handle_t const& handle,
thrust::make_zip_iterator(ids.end(), vertices.end()));

auto return_offsets = cugraph::detail::compute_sparse_offsets<size_t>(
ids.begin(), ids.end(), size_t{0}, size_t{offsets.size() - 1}, handle.get_stream());
ids.begin(), ids.end(), size_t{0}, size_t{offsets.size() - 1}, true, handle.get_stream());

return std::make_tuple(std::move(vertices), std::move(return_offsets));
}
Expand Down
425 changes: 180 additions & 245 deletions cpp/src/structure/create_graph_from_edgelist_impl.cuh

Large diffs are not rendered by default.

286 changes: 151 additions & 135 deletions cpp/src/structure/detail/structure_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -47,57 +47,38 @@ namespace cugraph {

namespace detail {

template <bool store_transposed,
typename vertex_t,
typename edge_t,
typename EdgeIterator,
typename EdgeValueIterator>
struct update_edge_t {
raft::device_span<edge_t> offsets{};
raft::device_span<vertex_t> indices{};
EdgeValueIterator edge_value_first{};
vertex_t major_range_first{};

__device__ void operator()(typename thrust::iterator_traits<EdgeIterator>::value_type e) const
{
auto s = thrust::get<0>(e);
auto d = thrust::get<1>(e);
auto major = store_transposed ? d : s;
auto minor = store_transposed ? s : d;
auto start = offsets[major - major_range_first];
auto degree = offsets[(major - major_range_first) + 1] - start;
auto idx =
atomicAdd(&indices[start + degree - 1], vertex_t{1}); // use the last element as a counter
// FIXME: we can actually store minor - minor_range_first instead of minor to save memory if
// minor can be larger than 32 bit but minor - minor_range_first fits within 32 bit
indices[start + idx] = minor; // overwrite the counter only if idx == degree - 1 (no race)
if constexpr (!std::is_same_v<EdgeValueIterator, void*>) {
auto value = thrust::get<2>(e);
*(edge_value_first + (start + idx)) = value;
}
}
};

template <typename edge_t, typename VertexIterator>
rmm::device_uvector<edge_t> compute_sparse_offsets(
VertexIterator edgelist_major_first,
VertexIterator edgelist_major_last,
typename thrust::iterator_traits<VertexIterator>::value_type major_range_first,
typename thrust::iterator_traits<VertexIterator>::value_type major_range_last,
bool edgelist_major_sorted,
rmm::cuda_stream_view stream_view)
{
rmm::device_uvector<edge_t> offsets((major_range_last - major_range_first) + 1, stream_view);
thrust::fill(rmm::exec_policy(stream_view), offsets.begin(), offsets.end(), edge_t{0});

auto offset_view = raft::device_span<edge_t>(offsets.data(), offsets.size());
thrust::for_each(rmm::exec_policy(stream_view),
edgelist_major_first,
edgelist_major_last,
[offset_view, major_range_first] __device__(auto v) {
atomicAdd(&offset_view[v - major_range_first], edge_t{1});
});
thrust::exclusive_scan(
rmm::exec_policy(stream_view), offsets.begin(), offsets.end(), offsets.begin());
if (edgelist_major_sorted) {
offsets.set_element_to_zero_async(0, stream_view);
thrust::upper_bound(rmm::exec_policy(stream_view),
edgelist_major_first,
edgelist_major_last,
thrust::make_counting_iterator(major_range_first),
thrust::make_counting_iterator(major_range_last),
offsets.begin() + 1);
} else {
thrust::fill(rmm::exec_policy(stream_view), offsets.begin(), offsets.end(), edge_t{0});

auto offset_view = raft::device_span<edge_t>(offsets.data(), offsets.size());
thrust::for_each(rmm::exec_policy(stream_view),
edgelist_major_first,
edgelist_major_last,
[offset_view, major_range_first] __device__(auto v) {
atomicAdd(&offset_view[v - major_range_first], edge_t{1});
});

thrust::exclusive_scan(
rmm::exec_policy(stream_view), offsets.begin(), offsets.end(), offsets.begin());
}

return offsets;
}
Expand Down Expand Up @@ -156,61 +137,80 @@ std::tuple<rmm::device_uvector<edge_t>, rmm::device_uvector<vertex_t>> compress_
}

// compress edge list (COO) to CSR (or CSC) or CSR + DCSR (CSC + DCSC) hybrid
template <typename edge_t,
bool store_transposed,
typename VertexIterator,
typename EdgeValueIterator>
std::tuple<
rmm::device_uvector<edge_t>,
rmm::device_uvector<typename thrust::iterator_traits<VertexIterator>::value_type>,
decltype(allocate_dataframe_buffer<typename thrust::iterator_traits<
EdgeValueIterator>::value_type>(size_t{0}, rmm::cuda_stream_view{})),
std::optional<rmm::device_uvector<typename thrust::iterator_traits<VertexIterator>::value_type>>>
compress_edgelist(
VertexIterator edgelist_src_first,
VertexIterator edgelist_src_last,
VertexIterator edgelist_dst_first,
EdgeValueIterator edge_value_first,
typename thrust::iterator_traits<VertexIterator>::value_type major_range_first,
std::optional<typename thrust::iterator_traits<VertexIterator>::value_type>
major_hypersparse_first,
typename thrust::iterator_traits<VertexIterator>::value_type major_range_last,
typename thrust::iterator_traits<VertexIterator>::value_type /* minor_range_first */,
typename thrust::iterator_traits<VertexIterator>::value_type /* minor_range_last */,
template <typename vertex_t, typename edge_t, typename edge_value_t, bool store_transposed>
std::tuple<rmm::device_uvector<edge_t>,
rmm::device_uvector<vertex_t>,
decltype(allocate_dataframe_buffer<edge_value_t>(size_t{0}, rmm::cuda_stream_view{})),
std::optional<rmm::device_uvector<vertex_t>>>
sort_and_compress_edgelist(
rmm::device_uvector<vertex_t>&& edgelist_srcs,
rmm::device_uvector<vertex_t>&& edgelist_dsts,
decltype(allocate_dataframe_buffer<edge_value_t>(0, rmm::cuda_stream_view{}))&& edgelist_values,
vertex_t major_range_first,
std::optional<vertex_t> major_hypersparse_first,
vertex_t major_range_last,
vertex_t /* minor_range_first */,
vertex_t /* minor_range_last */,
size_t mem_frugal_threshold,
rmm::cuda_stream_view stream_view)
{
using vertex_t = std::remove_cv_t<typename thrust::iterator_traits<VertexIterator>::value_type>;
using edge_value_t =
std::remove_cv_t<typename thrust::iterator_traits<EdgeValueIterator>::value_type>;

auto number_of_edges =
static_cast<edge_t>(thrust::distance(edgelist_src_first, edgelist_src_last));

auto offsets = compute_sparse_offsets<edge_t>(
store_transposed ? edgelist_dst_first : edgelist_src_first,
store_transposed ? edgelist_dst_first + number_of_edges : edgelist_src_last,
major_range_first,
major_range_last,
stream_view);

rmm::device_uvector<vertex_t> indices(number_of_edges, stream_view);
thrust::fill(rmm::exec_policy(stream_view), indices.begin(), indices.end(), vertex_t{0});
auto values = allocate_dataframe_buffer<edge_value_t>(number_of_edges, stream_view);

auto offset_view = raft::device_span<edge_t>(offsets.data(), offsets.size());
auto index_view = raft::device_span<vertex_t>(indices.data(), indices.size());
auto edge_first = thrust::make_zip_iterator(
thrust::make_tuple(edgelist_src_first, edgelist_dst_first, edge_value_first));
thrust::for_each(
rmm::exec_policy(stream_view),
edge_first,
edge_first + number_of_edges,
update_edge_t<store_transposed,
vertex_t,
edge_t,
decltype(edge_first),
decltype(get_dataframe_buffer_begin(values))>{
offset_view, index_view, get_dataframe_buffer_begin(values), major_range_first});
std::cout << "with values" << std::endl;
auto edgelist_majors = std::move(store_transposed ? edgelist_dsts : edgelist_srcs);
auto edgelist_minors = std::move(store_transposed ? edgelist_srcs : edgelist_dsts);

rmm::device_uvector<edge_t> offsets(0, stream_view);
rmm::device_uvector<vertex_t> indices(0, stream_view);
auto values = allocate_dataframe_buffer<edge_value_t>(0, stream_view);
auto pair_first = thrust::make_zip_iterator(edgelist_majors.begin(), edgelist_minors.begin());
if (edgelist_minors.size() > mem_frugal_threshold) {
std::cout << "frugal" << std::endl;
offsets = compute_sparse_offsets<edge_t>(edgelist_majors.begin(),
edgelist_majors.end(),
major_range_first,
major_range_last,
false,
stream_view);

auto pivot = major_range_first + static_cast<vertex_t>(thrust::distance(
offsets.begin(),
thrust::lower_bound(rmm::exec_policy(stream_view),
offsets.begin(),
offsets.end(),
edgelist_minors.size() / 2)));
auto second_first =
detail::mem_frugal_partition(pair_first,
pair_first + edgelist_minors.size(),
get_dataframe_buffer_begin(edgelist_values),
thrust_tuple_get<thrust::tuple<vertex_t, vertex_t>, 0>{},
pivot,
stream_view);
thrust::sort_by_key(rmm::exec_policy(stream_view),
pair_first,
std::get<0>(second_first),
get_dataframe_buffer_begin(edgelist_values));
thrust::sort_by_key(rmm::exec_policy(stream_view),
std::get<0>(second_first),
pair_first + edgelist_minors.size(),
std::get<1>(second_first));
} else {
std::cout << "nonfrugal" << std::endl;
thrust::sort_by_key(rmm::exec_policy(stream_view),
pair_first,
pair_first + edgelist_minors.size(),
get_dataframe_buffer_begin(edgelist_values));

offsets = compute_sparse_offsets<edge_t>(edgelist_majors.begin(),
edgelist_majors.end(),
major_range_first,
major_range_last,
true,
stream_view);
}
indices = std::move(edgelist_minors);
values = std::move(edgelist_values);

edgelist_majors.resize(0, stream_view);
edgelist_majors.shrink_to_fit(stream_view);

std::optional<rmm::device_uvector<vertex_t>> dcs_nzd_vertices{std::nullopt};
if (major_hypersparse_first) {
Expand All @@ -226,47 +226,63 @@ compress_edgelist(
}

// compress edge list (COO) to CSR (or CSC) or CSR + DCSR (CSC + DCSC) hybrid
template <typename edge_t, bool store_transposed, typename VertexIterator>
std::tuple<
rmm::device_uvector<edge_t>,
rmm::device_uvector<typename thrust::iterator_traits<VertexIterator>::value_type>,
std::optional<rmm::device_uvector<typename thrust::iterator_traits<VertexIterator>::value_type>>>
compress_edgelist(
VertexIterator edgelist_src_first,
VertexIterator edgelist_src_last,
VertexIterator edgelist_dst_first,
typename thrust::iterator_traits<VertexIterator>::value_type major_range_first,
std::optional<typename thrust::iterator_traits<VertexIterator>::value_type>
major_hypersparse_first,
typename thrust::iterator_traits<VertexIterator>::value_type major_range_last,
typename thrust::iterator_traits<VertexIterator>::value_type /* minor_range_first */,
typename thrust::iterator_traits<VertexIterator>::value_type /* minor_range_last */,
rmm::cuda_stream_view stream_view)
template <typename vertex_t, typename edge_t, bool store_transposed>
std::tuple<rmm::device_uvector<edge_t>,
rmm::device_uvector<vertex_t>,
std::optional<rmm::device_uvector<vertex_t>>>
sort_and_compress_edgelist(rmm::device_uvector<vertex_t>&& edgelist_srcs,
rmm::device_uvector<vertex_t>&& edgelist_dsts,
vertex_t major_range_first,
std::optional<vertex_t> major_hypersparse_first,
vertex_t major_range_last,
vertex_t /* minor_range_first */,
vertex_t /* minor_range_last */,
size_t mem_frugal_threshold,
rmm::cuda_stream_view stream_view)
{
using vertex_t = std::remove_cv_t<typename thrust::iterator_traits<VertexIterator>::value_type>;

auto number_of_edges =
static_cast<edge_t>(thrust::distance(edgelist_src_first, edgelist_src_last));

auto offsets = compute_sparse_offsets<edge_t>(
store_transposed ? edgelist_dst_first : edgelist_src_first,
store_transposed ? edgelist_dst_first + number_of_edges : edgelist_src_last,
major_range_first,
major_range_last,
stream_view);

rmm::device_uvector<vertex_t> indices(number_of_edges, stream_view);
thrust::fill(rmm::exec_policy(stream_view), indices.begin(), indices.end(), vertex_t{0});

auto offset_view = raft::device_span<edge_t>(offsets.data(), offsets.size());
auto index_view = raft::device_span<vertex_t>(indices.data(), indices.size());
auto edge_first =
thrust::make_zip_iterator(thrust::make_tuple(edgelist_src_first, edgelist_dst_first));
thrust::for_each(rmm::exec_policy(stream_view),
edge_first,
edge_first + number_of_edges,
update_edge_t<store_transposed, vertex_t, edge_t, decltype(edge_first), void*>{
offset_view, index_view, static_cast<void*>(nullptr), major_range_first});
auto edgelist_majors = std::move(store_transposed ? edgelist_dsts : edgelist_srcs);
auto edgelist_minors = std::move(store_transposed ? edgelist_srcs : edgelist_dsts);

rmm::device_uvector<edge_t> offsets(0, stream_view);
rmm::device_uvector<vertex_t> indices(0, stream_view);
auto edge_first = thrust::make_zip_iterator(edgelist_majors.begin(), edgelist_minors.begin());
if (edgelist_minors.size() > mem_frugal_threshold) {
std::cout << "frugal" << std::endl;
offsets = compute_sparse_offsets<edge_t>(edgelist_majors.begin(),
edgelist_majors.end(),
major_range_first,
major_range_last,
false,
stream_view);

auto pivot = major_range_first + static_cast<vertex_t>(thrust::distance(
offsets.begin(),
thrust::lower_bound(rmm::exec_policy(stream_view),
offsets.begin(),
offsets.end(),
edgelist_minors.size() / 2)));
auto second_first =
detail::mem_frugal_partition(edge_first,
edge_first + edgelist_minors.size(),
thrust_tuple_get<thrust::tuple<vertex_t, vertex_t>, 0>{},
pivot,
stream_view);
thrust::sort(rmm::exec_policy(stream_view), edge_first, second_first);
thrust::sort(rmm::exec_policy(stream_view), second_first, edge_first + edgelist_minors.size());
} else {
std::cout << "nonfrugal" << std::endl;
thrust::sort(rmm::exec_policy(stream_view), edge_first, edge_first + edgelist_minors.size());
offsets = compute_sparse_offsets<edge_t>(edgelist_majors.begin(),
edgelist_majors.end(),
major_range_first,
major_range_last,
true,
stream_view);
}
indices = std::move(edgelist_minors);

edgelist_majors.resize(0, stream_view);
edgelist_majors.shrink_to_fit(stream_view);

std::optional<rmm::device_uvector<vertex_t>> dcs_nzd_vertices{std::nullopt};
if (major_hypersparse_first) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/structure/induced_subgraph_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ extract_induced_subgraphs(
graph_ids_v.end(),
size_t{0},
size_t{subgraph_offsets.size() - 1},
true,
handle.get_stream());

dst_subgraph_offsets =
Expand Down Expand Up @@ -290,6 +291,7 @@ extract_induced_subgraphs(
subgraph_edge_graph_ids.end(),
size_t{0},
size_t{subgraph_offsets.size() - 1},
true,
handle.get_stream());

#ifdef TIMING
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/community/mg_egonet_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ class Tests_MGEgonet
graph_ids_v.end(),
size_t{0},
d_mg_edgelist_offsets.size() - 1,
true,
handle_->get_stream());

auto [d_reference_src, d_reference_dst, d_reference_wgt, d_reference_offsets] =
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/structure/mg_induced_subgraph_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class Tests_MGInducedSubgraph
graph_ids_v.end(),
size_t{0},
size_t{d_subgraph_offsets.size() - 1},
true,
handle_->get_stream());

auto [sg_graph, sg_edge_weights, sg_number_map] = cugraph::test::mg_graph_to_sg_graph(
Expand Down
Loading

0 comments on commit 2235e5e

Please sign in to comment.