Skip to content

Commit

Permalink
Implemented Parallel Graph Loading from efile only. (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
luoxiaojian authored Sep 5, 2024
1 parent 35d0893 commit 27ea3d2
Show file tree
Hide file tree
Showing 16 changed files with 819 additions and 104 deletions.
1 change: 1 addition & 0 deletions examples/analytical_apps/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ DEFINE_string(serialization_prefix, "",
"where to load/store the serialization files");

DEFINE_int32(app_concurrency, -1, "concurrency of application");
DEFINE_int32(load_concurrency, 1, "concurrency of loading graph");

DEFINE_string(lb, "cta",
"Load balancing policy, these options can be used: "
Expand Down
1 change: 1 addition & 0 deletions examples/analytical_apps/flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ DECLARE_bool(deserialize);
DECLARE_string(serialization_prefix);

DECLARE_int32(app_concurrency);
DECLARE_int32(load_concurrency);

DECLARE_string(lb);
#endif // EXAMPLES_ANALYTICAL_APPS_FLAGS_H_
2 changes: 2 additions & 0 deletions examples/analytical_apps/run_app.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ void CreateAndQuery(const CommSpec& comm_spec, const std::string& out_prefix,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
} else if (FLAGS_serialize) {
Expand Down Expand Up @@ -194,6 +195,7 @@ void CreateAndQueryStagedApp(const CommSpec& comm_spec,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
} else if (FLAGS_serialize) {
Expand Down
5 changes: 5 additions & 0 deletions examples/analytical_apps/run_app_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ void RunUndirectedPageRankOpt(const CommSpec& comm_spec,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
}
Expand Down Expand Up @@ -171,6 +172,7 @@ void RunDirectedCDLP(const CommSpec& comm_spec, const std::string& out_prefix,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
}
Expand Down Expand Up @@ -208,6 +210,7 @@ void RunUndirectedCDLP(const CommSpec& comm_spec, const std::string& out_prefix,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
}
Expand Down Expand Up @@ -263,6 +266,7 @@ void CreateAndQueryOpt(const CommSpec& comm_spec, const std::string& out_prefix,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
}
Expand Down Expand Up @@ -293,6 +297,7 @@ void CreateAndQueryStagedAppOpt(const CommSpec& comm_spec,
LoadGraphSpec graph_spec = DefaultLoadGraphSpec();
graph_spec.set_directed(FLAGS_directed);
graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor);
graph_spec.load_concurrency = FLAGS_load_concurrency;
if (FLAGS_deserialize) {
graph_spec.set_deserialize(true, FLAGS_serialization_prefix);
} else if (FLAGS_serialize) {
Expand Down
22 changes: 22 additions & 0 deletions grape/communication/shuffle.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,14 @@ void foreach_helper(const Tuple& t, const Func& func,
}
}

template <typename Tuple, typename Func, std::size_t... index>
void range_foreach_helper(const Tuple& t, size_t begin, size_t end,
const Func& func, index_sequence<index...>) {
for (size_t i = begin; i < end; ++i) {
func(get_const_buffer<index>(t)[i]...);
}
}

template <typename Tuple, typename Func, std::size_t... index>
void foreach_rval_helper(Tuple& t, const Func& func, index_sequence<index...>) {
size_t size = t.size();
Expand All @@ -314,11 +322,25 @@ void foreach_rval_helper(Tuple& t, const Func& func, index_sequence<index...>) {
}
}

template <typename Tuple, typename Func, std::size_t... index>
void range_foreach_rval_helper(Tuple& t, size_t begin, size_t end,
const Func& func, index_sequence<index...>) {
for (size_t i = begin; i < end; ++i) {
func(std::move(get_buffer<index>(t)[i])...);
}
}

template <typename Tuple, typename Func>
void foreach(Tuple& t, const Func& func) {
foreach_helper(t, func, make_index_sequence<Tuple::tuple_size>{});
}

template <typename Tuple, typename Func>
void range_foreach_rval(Tuple& t, size_t begin, size_t end, const Func& func) {
range_foreach_rval_helper(t, begin, end, func,
make_index_sequence<Tuple::tuple_size>{});
}

template <typename Tuple, typename Func>
void foreach_rval(Tuple& t, const Func& func) {
foreach_rval_helper(t, func, make_index_sequence<Tuple::tuple_size>{});
Expand Down
211 changes: 190 additions & 21 deletions grape/fragment/basic_efile_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.
#ifndef GRAPE_FRAGMENT_BASIC_EFILE_FRAGMENT_LOADER_H_
#define GRAPE_FRAGMENT_BASIC_EFILE_FRAGMENT_LOADER_H_

#include <atomic>

#include "grape/communication/shuffle.h"
#include "grape/fragment/basic_fragment_loader_base.h"
#include "grape/fragment/rebalancer.h"
Expand All @@ -25,6 +27,15 @@ limitations under the License.

namespace grape {

inline size_t rehash_oid(size_t val) {
val = (val ^ 61) ^ (val >> 16);
val = val + (val << 3);
val = val ^ (val >> 4);
val = val * 0x27d4eb2d;
val = val ^ (val >> 15);
return val;
}

template <typename FRAG_T>
class BasicEFileFragmentLoader : public BasicFragmentLoaderBase<FRAG_T> {
using fragment_t = FRAG_T;
Expand Down Expand Up @@ -62,6 +73,8 @@ class BasicEFileFragmentLoader : public BasicFragmentLoaderBase<FRAG_T> {
edge_recv_thread_ =
std::thread(&BasicEFileFragmentLoader::edgeRecvRoutine, this);
recv_thread_running_ = true;

concurrency_ = spec.load_concurrency;
}

~BasicEFileFragmentLoader() {
Expand Down Expand Up @@ -106,43 +119,198 @@ class BasicEFileFragmentLoader : public BasicFragmentLoaderBase<FRAG_T> {
std::move(edges_to_frag_[comm_spec_.fid()].buffers()));
edges_to_frag_[comm_spec_.fid()].Clear();

double t0 = -grape::GetCurrentTime();
std::unique_ptr<VertexMap<oid_t, vid_t>> vm_ptr(
new VertexMap<oid_t, vid_t>());
{
VertexMapBuilder<oid_t, vid_t> builder(
comm_spec_.fid(), comm_spec_.fnum(), std::move(partitioner_),
spec_.idxer_type);
for (auto& buffers : got_edges_) {
foreach_helper(
buffers,
[&builder](const internal_oid_t& src, const internal_oid_t& dst) {
builder.add_vertex(src);
builder.add_vertex(dst);
},
make_index_sequence<2>{});
if (concurrency_ == 1) {
for (auto& buffers : got_edges_) {
foreach_helper(
buffers,
[&builder](const internal_oid_t& src, const internal_oid_t& dst) {
builder.add_vertex(src);
builder.add_vertex(dst);
},
make_index_sequence<2>{});
}
} else {
std::atomic<size_t> idx(0);
std::vector<std::vector<internal_oid_t>> vertices(concurrency_);
std::vector<std::vector<internal_oid_t>> vertices_mat(concurrency_ *
concurrency_);
std::vector<std::thread> threads;
for (int i = 0; i < concurrency_; ++i) {
threads.emplace_back(
[&, this](int tid) {
fid_t fid = comm_spec_.fid();
for (auto& buffer : got_edges_) {
size_t size = buffer.size();
size_t chunk = (size + concurrency_ - 1) / concurrency_;
size_t start = std::min(size, chunk * tid);
size_t end = std::min(size, start + chunk);
if (spec_.idxer_type == IdxerType::kLocalIdxer) {
range_foreach_helper(
buffer, start, end,
[&](const internal_oid_t& src,
const internal_oid_t& dst) {
int src_hash =
rehash_oid(std::hash<internal_oid_t>()(src)) %
concurrency_;
vertices_mat[tid * concurrency_ + src_hash]
.emplace_back(src);
int dst_hash =
rehash_oid(std::hash<internal_oid_t>()(dst)) %
concurrency_;
vertices_mat[tid * concurrency_ + dst_hash]
.emplace_back(dst);
},
make_index_sequence<2>{});
} else {
range_foreach_helper(
buffer, start, end,
[&](const internal_oid_t& src,
const internal_oid_t& dst) {
if (builder.get_fragment_id(src) == fid) {
int src_hash =
rehash_oid(std::hash<internal_oid_t>()(src)) %
concurrency_;
vertices_mat[tid * concurrency_ + src_hash]
.emplace_back(src);
}
if (builder.get_fragment_id(dst) == fid) {
int dst_hash =
rehash_oid(std::hash<internal_oid_t>()(dst)) %
concurrency_;
vertices_mat[tid * concurrency_ + dst_hash]
.emplace_back(dst);
}
},
make_index_sequence<2>{});
}
}
},
i);
}
for (auto& thrd : threads) {
thrd.join();
}
std::vector<std::thread> aggregate_threads;
for (int i = 0; i < concurrency_; ++i) {
aggregate_threads.emplace_back(
[&, this](int tid) {
auto& vec = vertices[tid];
for (int j = 0; j < concurrency_; ++j) {
vec.insert(vec.end(),
vertices_mat[j * concurrency_ + tid].begin(),
vertices_mat[j * concurrency_ + tid].end());
}
DistinctSort(vec);
},
i);
}
for (auto& thrd : aggregate_threads) {
thrd.join();
}
// TODO(luoxiaojian): parallelize this part
for (auto& vec : vertices) {
for (auto& v : vec) {
builder.add_vertex(v);
}
}
}
builder.finish(comm_spec_, *vm_ptr);
}
MPI_Barrier(comm_spec_.comm());
t0 += grape::GetCurrentTime();
if (comm_spec_.worker_id() == 0) {
VLOG(1) << "finished constructing vertex_map, time: " << t0 << " s";
}

double t1 = -grape::GetCurrentTime();
std::vector<Edge<vid_t, edata_t>> processed_edges;
for (auto& buffers : got_edges_) {
foreach_rval(buffers, [&processed_edges, &vm_ptr](internal_oid_t&& src,
internal_oid_t&& dst,
edata_t&& data) {
vid_t src_gid, dst_gid;
if (vm_ptr->GetGid(oid_t(src), src_gid) &&
vm_ptr->GetGid(oid_t(dst), dst_gid)) {
processed_edges.emplace_back(src_gid, dst_gid, std::move(data));
}
});
if (concurrency_ == 1) {
for (auto& buffers : got_edges_) {
foreach_rval(buffers, [&processed_edges, &vm_ptr](internal_oid_t&& src,
internal_oid_t&& dst,
edata_t&& data) {
vid_t src_gid, dst_gid;
if (vm_ptr->GetGid(oid_t(src), src_gid) &&
vm_ptr->GetGid(oid_t(dst), dst_gid)) {
processed_edges.emplace_back(src_gid, dst_gid, std::move(data));
}
});
}
} else {
std::vector<size_t> offsets;
size_t total = 0;
for (auto& buffers : got_edges_) {
offsets.emplace_back(total);
total += buffers.size();
}
processed_edges.resize(total);
std::vector<std::thread> threads;
for (int i = 0; i < concurrency_; ++i) {
threads.emplace_back(
[&, this](int tid) {
size_t global_offset = 0;
for (auto& buffer : got_edges_) {
size_t size = buffer.size();
size_t chunk = (size + concurrency_ - 1) / concurrency_;
size_t start = std::min(size, chunk * tid);
size_t end = std::min(size, start + chunk);
size_t local_offset = global_offset + start;
global_offset += size;
range_foreach_rval(
buffer, start, end,
[&](internal_oid_t&& src, internal_oid_t&& dst,
edata_t&& data) {
vid_t src_gid, dst_gid;
if (vm_ptr->GetGidFromInternalOid(src, src_gid) &&
vm_ptr->GetGidFromInternalOid(dst, dst_gid)) {
processed_edges[local_offset++] = Edge<vid_t, edata_t>(
src_gid, dst_gid, std::move(data));
} else {
processed_edges[local_offset++] = Edge<vid_t, edata_t>(
std::numeric_limits<vid_t>::max(),
std::numeric_limits<vid_t>::max(), std::move(data));
}
});
}
},
i);
}
for (auto& thrd : threads) {
thrd.join();
}
}
MPI_Barrier(comm_spec_.comm());
t1 += grape::GetCurrentTime();
if (comm_spec_.worker_id() == 0) {
VLOG(1) << "finished parsing edges, time: " << t1 << " s";
}

double t2 = -grape::GetCurrentTime();
fragment = std::make_shared<fragment_t>();
std::vector<internal::Vertex<vid_t, vdata_t>> fake_vertices;
fragment->Init(comm_spec_, spec_.directed, std::move(vm_ptr), fake_vertices,
processed_edges);
if (concurrency_ == 1) {
fragment->Init(comm_spec_, spec_.directed, std::move(vm_ptr),
fake_vertices, processed_edges);
} else {
fragment->ParallelInit(comm_spec_, spec_.directed, std::move(vm_ptr),
fake_vertices, processed_edges, concurrency_);
}
MPI_Barrier(comm_spec_.comm());
t2 += grape::GetCurrentTime();
if (comm_spec_.worker_id() == 0) {
VLOG(1) << "finished initializing fragment, time: " << t2 << " s";
}

this->InitOuterVertexData(fragment);
if (!std::is_same<EmptyType, vdata_t>::value) {
this->InitOuterVertexData(fragment);
}
}

private:
Expand Down Expand Up @@ -172,6 +340,7 @@ class BasicEFileFragmentLoader : public BasicFragmentLoaderBase<FRAG_T> {

std::vector<ShuffleBufferTuple<internal_oid_t, internal_oid_t, edata_t>>
got_edges_;
int concurrency_;

using BasicFragmentLoaderBase<FRAG_T>::comm_spec_;
using BasicFragmentLoaderBase<FRAG_T>::spec_;
Expand Down
3 changes: 3 additions & 0 deletions grape/fragment/basic_fragment_loader_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ struct LoadGraphSpec {
PartitionerType partitioner_type;
IdxerType idxer_type;

int load_concurrency;

void set_directed(bool val = true) { directed = val; }
void set_rebalance(bool flag, int weight) {
rebalance = flag;
Expand Down Expand Up @@ -100,6 +102,7 @@ inline LoadGraphSpec DefaultLoadGraphSpec() {
spec.deserialize = false;
spec.partitioner_type = PartitionerType::kHashPartitioner;
spec.idxer_type = IdxerType::kHashMapIdxer;
spec.load_concurrency = 1;
return spec;
}

Expand Down
Loading

0 comments on commit 27ea3d2

Please sign in to comment.