From 27ea3d23e995db058a5aa1ea6420b5f6725ab4ee Mon Sep 17 00:00:00 2001 From: luoxiaojian Date: Thu, 5 Sep 2024 17:39:38 +0800 Subject: [PATCH] Implemented Parallel Graph Loading from efile only. (#171) --- examples/analytical_apps/flags.cc | 1 + examples/analytical_apps/flags.h | 1 + examples/analytical_apps/run_app.h | 2 + examples/analytical_apps/run_app_opt.h | 5 + grape/communication/shuffle.h | 22 ++ grape/fragment/basic_efile_fragment_loader.h | 211 ++++++++++++++++-- grape/fragment/basic_fragment_loader_base.h | 3 + grape/fragment/csr_edgecut_fragment_base.h | 219 ++++++++++++++----- grape/fragment/ev_fragment_loader.h | 38 +++- grape/fragment/fragment_base.h | 16 ++ grape/fragment/immutable_edgecut_fragment.h | 158 ++++++++++++- grape/graph/de_mutable_csr.h | 6 +- grape/graph/immutable_csr.h | 128 ++++++++++- grape/graph/mutable_csr.h | 2 +- grape/vertex_map/idxers/sorted_array_idxer.h | 75 ++++++- grape/vertex_map/vertex_map.h | 36 ++- 16 files changed, 819 insertions(+), 104 deletions(-) diff --git a/examples/analytical_apps/flags.cc b/examples/analytical_apps/flags.cc index 08acc594..e2872d08 100644 --- a/examples/analytical_apps/flags.cc +++ b/examples/analytical_apps/flags.cc @@ -59,6 +59,7 @@ DEFINE_string(serialization_prefix, "", "where to load/store the serialization files"); DEFINE_int32(app_concurrency, -1, "concurrency of application"); +DEFINE_int32(load_concurrency, 1, "concurrency of loading graph"); DEFINE_string(lb, "cta", "Load balancing policy, these options can be used: " diff --git a/examples/analytical_apps/flags.h b/examples/analytical_apps/flags.h index 03c51dd7..59a55e6e 100644 --- a/examples/analytical_apps/flags.h +++ b/examples/analytical_apps/flags.h @@ -51,6 +51,7 @@ DECLARE_bool(deserialize); DECLARE_string(serialization_prefix); DECLARE_int32(app_concurrency); +DECLARE_int32(load_concurrency); DECLARE_string(lb); #endif // EXAMPLES_ANALYTICAL_APPS_FLAGS_H_ diff --git a/examples/analytical_apps/run_app.h b/examples/analytical_apps/run_app.h index aa586138..94e17f78 100644 --- a/examples/analytical_apps/run_app.h +++ b/examples/analytical_apps/run_app.h @@ -164,6 +164,7 @@ void CreateAndQuery(const CommSpec& comm_spec, const std::string& out_prefix, LoadGraphSpec graph_spec = DefaultLoadGraphSpec(); graph_spec.set_directed(FLAGS_directed); graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor); + graph_spec.load_concurrency = FLAGS_load_concurrency; if (FLAGS_deserialize) { graph_spec.set_deserialize(true, FLAGS_serialization_prefix); } else if (FLAGS_serialize) { @@ -194,6 +195,7 @@ void CreateAndQueryStagedApp(const CommSpec& comm_spec, LoadGraphSpec graph_spec = DefaultLoadGraphSpec(); graph_spec.set_directed(FLAGS_directed); graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor); + graph_spec.load_concurrency = FLAGS_load_concurrency; if (FLAGS_deserialize) { graph_spec.set_deserialize(true, FLAGS_serialization_prefix); } else if (FLAGS_serialize) { diff --git a/examples/analytical_apps/run_app_opt.h b/examples/analytical_apps/run_app_opt.h index 24ce04cd..23c358da 100644 --- a/examples/analytical_apps/run_app_opt.h +++ b/examples/analytical_apps/run_app_opt.h @@ -64,6 +64,7 @@ void RunUndirectedPageRankOpt(const CommSpec& comm_spec, LoadGraphSpec graph_spec = DefaultLoadGraphSpec(); graph_spec.set_directed(FLAGS_directed); graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor); + graph_spec.load_concurrency = FLAGS_load_concurrency; if (FLAGS_deserialize) { graph_spec.set_deserialize(true, FLAGS_serialization_prefix); } @@ -171,6 +172,7 @@ void RunDirectedCDLP(const CommSpec& comm_spec, const std::string& out_prefix, LoadGraphSpec graph_spec = DefaultLoadGraphSpec(); graph_spec.set_directed(FLAGS_directed); graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor); + graph_spec.load_concurrency = FLAGS_load_concurrency; if (FLAGS_deserialize) { graph_spec.set_deserialize(true, FLAGS_serialization_prefix); } @@ -208,6 +210,7 @@ void RunUndirectedCDLP(const CommSpec& comm_spec, const std::string& out_prefix, LoadGraphSpec graph_spec = DefaultLoadGraphSpec(); graph_spec.set_directed(FLAGS_directed); graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor); + graph_spec.load_concurrency = FLAGS_load_concurrency; if (FLAGS_deserialize) { graph_spec.set_deserialize(true, FLAGS_serialization_prefix); } @@ -263,6 +266,7 @@ void CreateAndQueryOpt(const CommSpec& comm_spec, const std::string& out_prefix, LoadGraphSpec graph_spec = DefaultLoadGraphSpec(); graph_spec.set_directed(FLAGS_directed); graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor); + graph_spec.load_concurrency = FLAGS_load_concurrency; if (FLAGS_deserialize) { graph_spec.set_deserialize(true, FLAGS_serialization_prefix); } @@ -293,6 +297,7 @@ void CreateAndQueryStagedAppOpt(const CommSpec& comm_spec, LoadGraphSpec graph_spec = DefaultLoadGraphSpec(); graph_spec.set_directed(FLAGS_directed); graph_spec.set_rebalance(FLAGS_rebalance, FLAGS_rebalance_vertex_factor); + graph_spec.load_concurrency = FLAGS_load_concurrency; if (FLAGS_deserialize) { graph_spec.set_deserialize(true, FLAGS_serialization_prefix); } else if (FLAGS_serialize) { diff --git a/grape/communication/shuffle.h b/grape/communication/shuffle.h index ab3ee77b..5559795c 100644 --- a/grape/communication/shuffle.h +++ b/grape/communication/shuffle.h @@ -306,6 +306,14 @@ void foreach_helper(const Tuple& t, const Func& func, } } +template +void range_foreach_helper(const Tuple& t, size_t begin, size_t end, + const Func& func, index_sequence) { + for (size_t i = begin; i < end; ++i) { + func(get_const_buffer(t)[i]...); + } +} + template void foreach_rval_helper(Tuple& t, const Func& func, index_sequence) { size_t size = t.size(); @@ -314,11 +322,25 @@ void foreach_rval_helper(Tuple& t, const Func& func, index_sequence) { } } +template +void range_foreach_rval_helper(Tuple& t, size_t begin, size_t end, + const Func& func, index_sequence) { + for (size_t i = begin; i < end; ++i) { + func(std::move(get_buffer(t)[i])...); + } +} + template void foreach(Tuple& t, const Func& func) { foreach_helper(t, func, make_index_sequence{}); } +template +void range_foreach_rval(Tuple& t, size_t begin, size_t end, const Func& func) { + range_foreach_rval_helper(t, begin, end, func, + make_index_sequence{}); +} + template void foreach_rval(Tuple& t, const Func& func) { foreach_rval_helper(t, func, make_index_sequence{}); diff --git a/grape/fragment/basic_efile_fragment_loader.h b/grape/fragment/basic_efile_fragment_loader.h index 8f26088f..c7ab6c6a 100644 --- a/grape/fragment/basic_efile_fragment_loader.h +++ b/grape/fragment/basic_efile_fragment_loader.h @@ -16,6 +16,8 @@ limitations under the License. #ifndef GRAPE_FRAGMENT_BASIC_EFILE_FRAGMENT_LOADER_H_ #define GRAPE_FRAGMENT_BASIC_EFILE_FRAGMENT_LOADER_H_ +#include + #include "grape/communication/shuffle.h" #include "grape/fragment/basic_fragment_loader_base.h" #include "grape/fragment/rebalancer.h" @@ -25,6 +27,15 @@ limitations under the License. namespace grape { +inline size_t rehash_oid(size_t val) { + val = (val ^ 61) ^ (val >> 16); + val = val + (val << 3); + val = val ^ (val >> 4); + val = val * 0x27d4eb2d; + val = val ^ (val >> 15); + return val; +} + template class BasicEFileFragmentLoader : public BasicFragmentLoaderBase { using fragment_t = FRAG_T; @@ -62,6 +73,8 @@ class BasicEFileFragmentLoader : public BasicFragmentLoaderBase { edge_recv_thread_ = std::thread(&BasicEFileFragmentLoader::edgeRecvRoutine, this); recv_thread_running_ = true; + + concurrency_ = spec.load_concurrency; } ~BasicEFileFragmentLoader() { @@ -106,43 +119,198 @@ class BasicEFileFragmentLoader : public BasicFragmentLoaderBase { std::move(edges_to_frag_[comm_spec_.fid()].buffers())); edges_to_frag_[comm_spec_.fid()].Clear(); + double t0 = -grape::GetCurrentTime(); std::unique_ptr> vm_ptr( new VertexMap()); { VertexMapBuilder builder( comm_spec_.fid(), comm_spec_.fnum(), std::move(partitioner_), spec_.idxer_type); - for (auto& buffers : got_edges_) { - foreach_helper( - buffers, - [&builder](const internal_oid_t& src, const internal_oid_t& dst) { - builder.add_vertex(src); - builder.add_vertex(dst); - }, - make_index_sequence<2>{}); + if (concurrency_ == 1) { + for (auto& buffers : got_edges_) { + foreach_helper( + buffers, + [&builder](const internal_oid_t& src, const internal_oid_t& dst) { + builder.add_vertex(src); + builder.add_vertex(dst); + }, + make_index_sequence<2>{}); + } + } else { + std::atomic idx(0); + std::vector> vertices(concurrency_); + std::vector> vertices_mat(concurrency_ * + concurrency_); + std::vector threads; + for (int i = 0; i < concurrency_; ++i) { + threads.emplace_back( + [&, this](int tid) { + fid_t fid = comm_spec_.fid(); + for (auto& buffer : got_edges_) { + size_t size = buffer.size(); + size_t chunk = (size + concurrency_ - 1) / concurrency_; + size_t start = std::min(size, chunk * tid); + size_t end = std::min(size, start + chunk); + if (spec_.idxer_type == IdxerType::kLocalIdxer) { + range_foreach_helper( + buffer, start, end, + [&](const internal_oid_t& src, + const internal_oid_t& dst) { + int src_hash = + rehash_oid(std::hash()(src)) % + concurrency_; + vertices_mat[tid * concurrency_ + src_hash] + .emplace_back(src); + int dst_hash = + rehash_oid(std::hash()(dst)) % + concurrency_; + vertices_mat[tid * concurrency_ + dst_hash] + .emplace_back(dst); + }, + make_index_sequence<2>{}); + } else { + range_foreach_helper( + buffer, start, end, + [&](const internal_oid_t& src, + const internal_oid_t& dst) { + if (builder.get_fragment_id(src) == fid) { + int src_hash = + rehash_oid(std::hash()(src)) % + concurrency_; + vertices_mat[tid * concurrency_ + src_hash] + .emplace_back(src); + } + if (builder.get_fragment_id(dst) == fid) { + int dst_hash = + rehash_oid(std::hash()(dst)) % + concurrency_; + vertices_mat[tid * concurrency_ + dst_hash] + .emplace_back(dst); + } + }, + make_index_sequence<2>{}); + } + } + }, + i); + } + for (auto& thrd : threads) { + thrd.join(); + } + std::vector aggregate_threads; + for (int i = 0; i < concurrency_; ++i) { + aggregate_threads.emplace_back( + [&, this](int tid) { + auto& vec = vertices[tid]; + for (int j = 0; j < concurrency_; ++j) { + vec.insert(vec.end(), + vertices_mat[j * concurrency_ + tid].begin(), + vertices_mat[j * concurrency_ + tid].end()); + } + DistinctSort(vec); + }, + i); + } + for (auto& thrd : aggregate_threads) { + thrd.join(); + } + // TODO(luoxiaojian): parallelize this part + for (auto& vec : vertices) { + for (auto& v : vec) { + builder.add_vertex(v); + } + } } builder.finish(comm_spec_, *vm_ptr); } + MPI_Barrier(comm_spec_.comm()); + t0 += grape::GetCurrentTime(); + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished constructing vertex_map, time: " << t0 << " s"; + } + double t1 = -grape::GetCurrentTime(); std::vector> processed_edges; - for (auto& buffers : got_edges_) { - foreach_rval(buffers, [&processed_edges, &vm_ptr](internal_oid_t&& src, - internal_oid_t&& dst, - edata_t&& data) { - vid_t src_gid, dst_gid; - if (vm_ptr->GetGid(oid_t(src), src_gid) && - vm_ptr->GetGid(oid_t(dst), dst_gid)) { - processed_edges.emplace_back(src_gid, dst_gid, std::move(data)); - } - }); + if (concurrency_ == 1) { + for (auto& buffers : got_edges_) { + foreach_rval(buffers, [&processed_edges, &vm_ptr](internal_oid_t&& src, + internal_oid_t&& dst, + edata_t&& data) { + vid_t src_gid, dst_gid; + if (vm_ptr->GetGid(oid_t(src), src_gid) && + vm_ptr->GetGid(oid_t(dst), dst_gid)) { + processed_edges.emplace_back(src_gid, dst_gid, std::move(data)); + } + }); + } + } else { + std::vector offsets; + size_t total = 0; + for (auto& buffers : got_edges_) { + offsets.emplace_back(total); + total += buffers.size(); + } + processed_edges.resize(total); + std::vector threads; + for (int i = 0; i < concurrency_; ++i) { + threads.emplace_back( + [&, this](int tid) { + size_t global_offset = 0; + for (auto& buffer : got_edges_) { + size_t size = buffer.size(); + size_t chunk = (size + concurrency_ - 1) / concurrency_; + size_t start = std::min(size, chunk * tid); + size_t end = std::min(size, start + chunk); + size_t local_offset = global_offset + start; + global_offset += size; + range_foreach_rval( + buffer, start, end, + [&](internal_oid_t&& src, internal_oid_t&& dst, + edata_t&& data) { + vid_t src_gid, dst_gid; + if (vm_ptr->GetGidFromInternalOid(src, src_gid) && + vm_ptr->GetGidFromInternalOid(dst, dst_gid)) { + processed_edges[local_offset++] = Edge( + src_gid, dst_gid, std::move(data)); + } else { + processed_edges[local_offset++] = Edge( + std::numeric_limits::max(), + std::numeric_limits::max(), std::move(data)); + } + }); + } + }, + i); + } + for (auto& thrd : threads) { + thrd.join(); + } + } + MPI_Barrier(comm_spec_.comm()); + t1 += grape::GetCurrentTime(); + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished parsing edges, time: " << t1 << " s"; } + double t2 = -grape::GetCurrentTime(); fragment = std::make_shared(); std::vector> fake_vertices; - fragment->Init(comm_spec_, spec_.directed, std::move(vm_ptr), fake_vertices, - processed_edges); + if (concurrency_ == 1) { + fragment->Init(comm_spec_, spec_.directed, std::move(vm_ptr), + fake_vertices, processed_edges); + } else { + fragment->ParallelInit(comm_spec_, spec_.directed, std::move(vm_ptr), + fake_vertices, processed_edges, concurrency_); + } + MPI_Barrier(comm_spec_.comm()); + t2 += grape::GetCurrentTime(); + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished initializing fragment, time: " << t2 << " s"; + } - this->InitOuterVertexData(fragment); + if (!std::is_same::value) { + this->InitOuterVertexData(fragment); + } } private: @@ -172,6 +340,7 @@ class BasicEFileFragmentLoader : public BasicFragmentLoaderBase { std::vector> got_edges_; + int concurrency_; using BasicFragmentLoaderBase::comm_spec_; using BasicFragmentLoaderBase::spec_; diff --git a/grape/fragment/basic_fragment_loader_base.h b/grape/fragment/basic_fragment_loader_base.h index 44c1bdbb..bf689f7e 100644 --- a/grape/fragment/basic_fragment_loader_base.h +++ b/grape/fragment/basic_fragment_loader_base.h @@ -41,6 +41,8 @@ struct LoadGraphSpec { PartitionerType partitioner_type; IdxerType idxer_type; + int load_concurrency; + void set_directed(bool val = true) { directed = val; } void set_rebalance(bool flag, int weight) { rebalance = flag; @@ -100,6 +102,7 @@ inline LoadGraphSpec DefaultLoadGraphSpec() { spec.deserialize = false; spec.partitioner_type = PartitionerType::kHashPartitioner; spec.idxer_type = IdxerType::kHashMapIdxer; + spec.load_concurrency = 1; return spec; } diff --git a/grape/fragment/csr_edgecut_fragment_base.h b/grape/fragment/csr_edgecut_fragment_base.h index a84ff96d..1b51e91e 100644 --- a/grape/fragment/csr_edgecut_fragment_base.h +++ b/grape/fragment/csr_edgecut_fragment_base.h @@ -411,9 +411,11 @@ class CSREdgecutFragmentBase using base_t::IsInnerVertexGid; using base_t::IsInnerVertexLid; using base_t::OuterVertexGid2Lid; - void buildCSR(const typename csr_builder_t::vertex_range_t& vertex_range, + using vertices_t = typename TRAITS_T::vertices_t; + + void buildCSR(const vertices_t& vertex_range, std::vector>& edges, - LoadStrategy load_strategy) { + LoadStrategy load_strategy, int concurrency = 1) { csr_builder_t ie_builder, oe_builder; ie_builder.init(vertex_range); oe_builder.init(vertex_range); @@ -497,43 +499,90 @@ class CSREdgecutFragmentBase ie_builder.inc_degree(e.dst); } }; - if (load_strategy == LoadStrategy::kOnlyIn) { - if (this->directed_) { - for (auto& e : edges) { - parse_iter_in(e); - } - } else { - for (auto& e : edges) { - parse_iter_in_undirected(e); - } - } - } else if (load_strategy == LoadStrategy::kOnlyOut) { - if (this->directed_) { - for (auto& e : edges) { - parse_iter_out(e); + + if (concurrency == 1) { + if (load_strategy == LoadStrategy::kOnlyIn) { + if (this->directed_) { + for (auto& e : edges) { + parse_iter_in(e); + } + } else { + for (auto& e : edges) { + parse_iter_in_undirected(e); + } } - } else { - for (auto& e : edges) { - parse_iter_out_undirected(e); + } else if (load_strategy == LoadStrategy::kOnlyOut) { + if (this->directed_) { + for (auto& e : edges) { + parse_iter_out(e); + } + } else { + for (auto& e : edges) { + parse_iter_out_undirected(e); + } } - } - } else if (load_strategy == LoadStrategy::kBothOutIn) { - if (this->directed_) { - for (auto& e : edges) { - parse_iter_out_in(e); + } else if (load_strategy == LoadStrategy::kBothOutIn) { + if (this->directed_) { + for (auto& e : edges) { + parse_iter_out_in(e); + } + } else { + for (auto& e : edges) { + parse_iter_out_in_undirected(e); + } } } else { - for (auto& e : edges) { - parse_iter_out_in_undirected(e); - } + LOG(FATAL) << "Invalid load strategy"; } } else { - LOG(FATAL) << "Invalid load strategy"; + std::vector threads; + for (int i = 0; i < concurrency; ++i) { + threads.emplace_back( + [&, this](int tid) { + size_t batch = (edges.size() + concurrency - 1) / concurrency; + size_t begin = std::min(batch * tid, edges.size()); + size_t end = std::min(begin + batch, edges.size()); + if (load_strategy == LoadStrategy::kOnlyIn) { + if (this->directed_) { + for (size_t i = begin; i < end; ++i) { + parse_iter_in(edges[i]); + } + } else { + for (size_t i = begin; i < end; ++i) { + parse_iter_in_undirected(edges[i]); + } + } + } else if (load_strategy == LoadStrategy::kOnlyOut) { + if (this->directed_) { + for (size_t i = begin; i < end; ++i) { + parse_iter_out(edges[i]); + } + } else { + for (size_t i = begin; i < end; ++i) { + parse_iter_out_undirected(edges[i]); + } + } + } else if (load_strategy == LoadStrategy::kBothOutIn) { + if (this->directed_) { + for (size_t i = begin; i < end; ++i) { + parse_iter_out_in(edges[i]); + } + } else { + for (size_t i = begin; i < end; ++i) { + parse_iter_out_in_undirected(edges[i]); + } + } + } else { + LOG(FATAL) << "Invalid load strategy"; + } + }, + i); + } + for (auto& thrd : threads) { + thrd.join(); + } } - ie_builder.build_offsets(); - oe_builder.build_offsets(); - auto insert_iter_in = [&](const Edge& e) { if (e.src != invalid_vid) { ie_builder.add_edge(e.dst, nbr_t(e.src, e.edata)); @@ -593,42 +642,94 @@ class CSREdgecutFragmentBase } }; - if (load_strategy == LoadStrategy::kOnlyIn) { - if (this->directed_) { - for (auto& e : edges) { - insert_iter_in(e); - } - } else { - for (auto& e : edges) { - insert_iter_in_undirected(e); - } - } - } else if (load_strategy == LoadStrategy::kOnlyOut) { - if (this->directed_) { - for (auto& e : edges) { - insert_iter_out(e); + ie_builder.build_offsets(); + oe_builder.build_offsets(); + + if (concurrency == 1) { + if (load_strategy == LoadStrategy::kOnlyIn) { + if (this->directed_) { + for (auto& e : edges) { + insert_iter_in(e); + } + } else { + for (auto& e : edges) { + insert_iter_in_undirected(e); + } } - } else { - for (auto& e : edges) { - insert_iter_out_undirected(e); + } else if (load_strategy == LoadStrategy::kOnlyOut) { + if (this->directed_) { + for (auto& e : edges) { + insert_iter_out(e); + } + } else { + for (auto& e : edges) { + insert_iter_out_undirected(e); + } } - } - } else if (load_strategy == LoadStrategy::kBothOutIn) { - if (this->directed_) { - for (auto& e : edges) { - insert_iter_out_in(e); + } else if (load_strategy == LoadStrategy::kBothOutIn) { + if (this->directed_) { + for (auto& e : edges) { + insert_iter_out_in(e); + } + } else { + for (auto& e : edges) { + insert_iter_out_in_undirected(e); + } } } else { - for (auto& e : edges) { - insert_iter_out_in_undirected(e); - } + LOG(FATAL) << "Invalid load strategy"; } } else { - LOG(FATAL) << "Invalid load strategy"; + std::vector threads; + for (int i = 0; i < concurrency; ++i) { + threads.emplace_back( + [&, this](int tid) { + size_t batch = (edges.size() + concurrency - 1) / concurrency; + size_t begin = std::min(batch * tid, edges.size()); + size_t end = std::min(begin + batch, edges.size()); + if (load_strategy == LoadStrategy::kOnlyIn) { + if (this->directed_) { + for (size_t i = begin; i < end; ++i) { + insert_iter_in(edges[i]); + } + } else { + for (size_t i = begin; i < end; ++i) { + insert_iter_in_undirected(edges[i]); + } + } + } else if (load_strategy == LoadStrategy::kOnlyOut) { + if (this->directed_) { + for (size_t i = begin; i < end; ++i) { + insert_iter_out(edges[i]); + } + } else { + for (size_t i = begin; i < end; ++i) { + insert_iter_out_undirected(edges[i]); + } + } + } else if (load_strategy == LoadStrategy::kBothOutIn) { + if (this->directed_) { + for (size_t i = begin; i < end; ++i) { + insert_iter_out_in(edges[i]); + } + } else { + for (size_t i = begin; i < end; ++i) { + insert_iter_out_in_undirected(edges[i]); + } + } + } else { + LOG(FATAL) << "Invalid load strategy"; + } + }, + i); + } + for (auto& thrd : threads) { + thrd.join(); + } } - ie_builder.finish(ie_); - oe_builder.finish(oe_); + ie_builder.finish(ie_, concurrency); + oe_builder.finish(oe_, concurrency); } template diff --git a/grape/fragment/ev_fragment_loader.h b/grape/fragment/ev_fragment_loader.h index 89bc1d53..be5c6be6 100644 --- a/grape/fragment/ev_fragment_loader.h +++ b/grape/fragment/ev_fragment_loader.h @@ -115,6 +115,9 @@ class EVFragmentLoader { } if (!vfile.empty()) { + MPI_Barrier(comm_spec_.comm()); + double t0 = -grape::GetCurrentTime(); + auto io_adaptor = std::unique_ptr(new IOADAPTOR_T(vfile)); io_adaptor->SetPartialRead(comm_spec_.worker_id(), comm_spec_.worker_num()); @@ -140,10 +143,26 @@ class EVFragmentLoader { basic_fragment_loader_->AddVertex(vertex_id, v_data); } io_adaptor->Close(); - } - basic_fragment_loader_->ConstructVertices(); + MPI_Barrier(comm_spec_.comm()); + t0 += grape::GetCurrentTime(); + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished reading vertices inputs, time: " << t0 << " s"; + } + + double t1 = -grape::GetCurrentTime(); + basic_fragment_loader_->ConstructVertices(); + + MPI_Barrier(comm_spec_.comm()); + t1 += grape::GetCurrentTime(); + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished constructing vertices, time: " << t1 << " s"; + } + } else { + basic_fragment_loader_->ConstructVertices(); + } + double t2 = -grape::GetCurrentTime(); { auto io_adaptor = std::unique_ptr(new IOADAPTOR_T(std::string(efile))); @@ -175,11 +194,20 @@ class EVFragmentLoader { } io_adaptor->Close(); } + MPI_Barrier(comm_spec_.comm()); + t2 += grape::GetCurrentTime(); + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished reading edges inputs, time: " << t2 << " s"; + } - VLOG(1) << "[worker-" << comm_spec_.worker_id() - << "] finished add vertices and edges"; - + double t3 = -grape::GetCurrentTime(); basic_fragment_loader_->ConstructFragment(fragment); + MPI_Barrier(comm_spec_.comm()); + t3 += grape::GetCurrentTime(); + + if (comm_spec_.worker_id() == 0) { + VLOG(1) << "finished constructing fragment, time: " << t3 << " s"; + } if (spec.serialize) { bool serialized = SerializeFragment( diff --git a/grape/fragment/fragment_base.h b/grape/fragment/fragment_base.h index f2fbd3dc..11d88e4e 100644 --- a/grape/fragment/fragment_base.h +++ b/grape/fragment/fragment_base.h @@ -86,6 +86,14 @@ class FragmentBase { std::vector>& vertices, std::vector>& edges) = 0; + virtual void ParallelInit( + const CommSpec& comm_spec, bool directed, + std::unique_ptr>&& vm_ptr, + std::vector>& vertices, + std::vector>& edges, int concurrency) { + return Init(comm_spec, directed, std::move(vm_ptr), vertices, edges); + } + /** * @brief For some kind of applications, specific data structures will be * generated. @@ -175,6 +183,14 @@ class FragmentBase { return oid; } + using internal_id_t = typename InternalOID::type; + + internal_id_t GetInternalId(const Vertex& v) const { + internal_id_t oid{}; + vm_ptr_->GetInternalOid(Vertex2Gid(v), oid); + return oid; + } + OID_T Gid2Oid(VID_T gid) const { OID_T oid; vm_ptr_->GetOid(gid, oid); diff --git a/grape/fragment/immutable_edgecut_fragment.h b/grape/fragment/immutable_edgecut_fragment.h index aecbc9d5..5482440c 100644 --- a/grape/fragment/immutable_edgecut_fragment.h +++ b/grape/fragment/immutable_edgecut_fragment.h @@ -59,7 +59,7 @@ struct ImmutableEdgecutFragmentTraits { using fragment_const_adj_list_t = ConstAdjList; using csr_t = ImmutableCSR>; - using csr_builder_t = ImmutableCSRBuild>; + using csr_builder_t = ImmutableCSRParallelBuilder>; using mirror_vertices_t = std::vector>; }; @@ -350,6 +350,162 @@ class ImmutableEdgecutFragment } } + void ParallelInit(const CommSpec& comm_spec, bool directed, + std::unique_ptr>&& vm_ptr, + std::vector& vertices, + std::vector& edges, int concurrency) override { + init(comm_spec.fid(), directed, std::move(vm_ptr)); + + static constexpr VID_T invalid_vid = std::numeric_limits::max(); + { + auto iter_in = [&](Edge& e, + std::vector& outer_vertices) { + if (IsInnerVertexGid(e.dst)) { + if (!IsInnerVertexGid(e.src)) { + outer_vertices.push_back(e.src); + } + } else { + e.src = invalid_vid; + } + }; + auto iter_out = [&](Edge& e, + std::vector& outer_vertices) { + if (IsInnerVertexGid(e.src)) { + if (!IsInnerVertexGid(e.dst)) { + outer_vertices.push_back(e.dst); + } + } else { + e.src = invalid_vid; + } + }; + auto iter_out_in = [&](Edge& e, + std::vector& outer_vertices) { + if (IsInnerVertexGid(e.src)) { + if (!IsInnerVertexGid(e.dst)) { + outer_vertices.push_back(e.dst); + } + } else if (IsInnerVertexGid(e.dst)) { + outer_vertices.push_back(e.src); + } else { + e.src = invalid_vid; + } + }; + + auto iter_in_undirected = [&](Edge& e, + std::vector& outer_vertices) { + if (IsInnerVertexGid(e.dst)) { + if (!IsInnerVertexGid(e.src)) { + outer_vertices.push_back(e.src); + } + } else { + if (IsInnerVertexGid(e.src)) { + outer_vertices.push_back(e.dst); + } else { + e.src = invalid_vid; + } + } + }; + auto iter_out_undirected = [&](Edge& e, + std::vector& outer_vertices) { + if (IsInnerVertexGid(e.src)) { + if (!IsInnerVertexGid(e.dst)) { + outer_vertices.push_back(e.dst); + } + } else { + if (IsInnerVertexGid(e.dst)) { + outer_vertices.push_back(e.src); + } else { + e.src = invalid_vid; + } + } + }; + + std::vector> outer_vertices_vec(concurrency); + std::vector threads; + for (int i = 0; i < concurrency; ++i) { + threads.emplace_back( + [&, this](int tid) { + size_t batch = (edges.size() + concurrency - 1) / concurrency; + size_t begin = std::min(batch * tid, edges.size()); + size_t end = std::min(begin + batch, edges.size()); + auto& vec = outer_vertices_vec[tid]; + if (load_strategy == LoadStrategy::kOnlyIn) { + if (directed) { + for (size_t j = begin; j < end; ++j) { + iter_in(edges[j], vec); + } + } else { + for (size_t j = begin; j < end; ++j) { + iter_in_undirected(edges[j], vec); + } + } + } else if (load_strategy == LoadStrategy::kOnlyOut) { + if (directed) { + for (size_t j = begin; j < end; ++j) { + iter_out(edges[j], vec); + } + } else { + for (size_t j = begin; j < end; ++j) { + iter_out_undirected(edges[j], vec); + } + } + } else if (load_strategy == LoadStrategy::kBothOutIn) { + for (size_t j = begin; j < end; ++j) { + iter_out_in(edges[j], vec); + } + } else { + LOG(FATAL) << "Invalid load strategy"; + } + DistinctSort(vec); + }, + i); + } + for (auto& thrd : threads) { + thrd.join(); + } + std::vector outer_vertices; + for (auto& vec : outer_vertices_vec) { + outer_vertices.insert(outer_vertices.end(), vec.begin(), vec.end()); + } + + DistinctSort(outer_vertices); + + ovgid_.resize(outer_vertices.size()); + memcpy(&ovgid_[0], &outer_vertices[0], + outer_vertices.size() * sizeof(VID_T)); + } + + vid_t ovid = ivnum_; + for (auto gid : ovgid_) { + ovg2l_.emplace(gid, ovid); + ++ovid; + } + ovnum_ = ovid - ivnum_; + this->inner_vertices_.SetRange(0, ivnum_); + this->outer_vertices_.SetRange(ivnum_, ivnum_ + ovnum_); + this->vertices_.SetRange(0, ivnum_ + ovnum_); + + buildCSR(this->Vertices(), edges, load_strategy, concurrency); + + initOuterVerticesOfFragment(); + + vdata_.clear(); + vdata_.resize(ivnum_ + ovnum_); + if (sizeof(internal_vertex_t) > sizeof(VID_T)) { + for (auto& v : vertices) { + VID_T gid = v.vid; + if (id_parser_.get_fragment_id(gid) == fid_) { + vdata_[id_parser_.get_local_id(gid)] = v.vdata; + } else { + auto iter = ovg2l_.find(gid); + if (iter != ovg2l_.end()) { + vdata_[iter->second] = v.vdata; + } + } + } + } + } + template void Serialize(const std::string& prefix) { char fbuf[1024]; diff --git a/grape/graph/de_mutable_csr.h b/grape/graph/de_mutable_csr.h index cea96b61..011b7f90 100644 --- a/grape/graph/de_mutable_csr.h +++ b/grape/graph/de_mutable_csr.h @@ -84,15 +84,15 @@ class DeMutableCSRBuilder> { } } - void finish(DeMutableCSR>& ret) { + void finish(DeMutableCSR>& ret, int concurrency) { ret.min_id_ = min_id_; ret.max_id_ = max_id_; ret.max_head_id_ = max_head_id_; ret.min_tail_id_ = min_tail_id_; ret.dedup_ = dedup_; - head_builder_.finish(ret.head_); - tail_builder_.finish(ret.tail_); + head_builder_.finish(ret.head_, concurrency); + tail_builder_.finish(ret.tail_, concurrency); if (dedup_) { VID_T head_num = ret.head_.vertex_num(); diff --git a/grape/graph/immutable_csr.h b/grape/graph/immutable_csr.h index 3a18a202..a3f81be6 100644 --- a/grape/graph/immutable_csr.h +++ b/grape/graph/immutable_csr.h @@ -33,15 +33,127 @@ template class ImmutableCSR; template -class ImmutableCSRBuild { +class ImmutableCSRParallelBuilder { using vid_t = VID_T; using nbr_t = NBR_T; public: using vertex_range_t = VertexRange; - ImmutableCSRBuild() {} - ~ImmutableCSRBuild() {} + ImmutableCSRParallelBuilder() {} + ~ImmutableCSRParallelBuilder() {} + + void init(VID_T vnum) { + vnum_ = vnum; + degree_ = std::vector>(vnum); + for (VID_T i = 0; i < vnum; ++i) { + degree_[i].store(0); + } + } + + void init(const VertexRange& range) { + assert(range.begin_value() == 0); + init(range.size()); + } + + void inc_degree(VID_T i) { + if (i < vnum_) { + ++degree_[i]; + } + } + + void build_offsets() { + edge_num_ = 0; + for (VID_T i = 0; i < vnum_; ++i) { + edge_num_ += degree_[i].load(); + } + edges_.clear(); + edges_.resize(edge_num_); + offsets_.clear(); + offsets_.resize(vnum_ + 1); + offsets_[0] = edges_.data(); + for (VID_T i = 0; i < vnum_; ++i) { + offsets_[i + 1] = offsets_[i] + degree_[i]; + } + CHECK_EQ(offsets_[vnum_], edges_.data() + edge_num_); + { + std::vector> tmp; + tmp.swap(degree_); + } + iter_ = std::vector>(vnum_); + for (VID_T i = 0; i < vnum_; ++i) { + iter_[i].store(0); + } + } + + void add_edge(VID_T src, const nbr_t& nbr) { + if (src < vnum_) { + int offset = iter_[src].fetch_add(1); + nbr_t* ptr = offsets_[src] + offset; + *ptr = nbr; + } + } + + template + void sort(const FUNC_T& func) { + for (VID_T i = 0; i < vnum_; ++i) { + std::sort(offsets_[i], offsets_[i + 1], func); + } + } + + void finish(ImmutableCSR& ret, int concurrency) { + if (concurrency == 1) { + for (VID_T i = 0; i < vnum_; ++i) { + std::sort(offsets_[i], offsets_[i + 1]); + } + } else { + std::vector threads; + std::atomic offset(0); + static constexpr VID_T chunk = 4096; + for (int i = 0; i < concurrency; ++i) { + threads.emplace_back([this, &offset]() { + while (true) { + VID_T begin = std::min(offset.fetch_add(chunk), vnum_); + VID_T end = std::min(begin + chunk, vnum_); + if (begin == end) { + break; + } + while (begin < end) { + std::sort(offsets_[begin], offsets_[begin + 1]); + ++begin; + } + } + }); + } + for (auto& thrd : threads) { + thrd.join(); + } + } + + ret.edges_.swap(edges_); + ret.offsets_.swap(offsets_); + } + + private: + VID_T vnum_; + size_t edge_num_; + + Array> edges_; + Array> offsets_; + std::vector> degree_; + std::vector> iter_; +}; + +template +class ImmutableCSRBuilder { + using vid_t = VID_T; + using nbr_t = NBR_T; + + public: + using vertex_range_t = VertexRange; + + ImmutableCSRBuilder() {} + ~ImmutableCSRBuilder() {} void init(VID_T vnum) { vnum_ = vnum; @@ -97,7 +209,7 @@ class ImmutableCSRBuild { } } - void finish(ImmutableCSR& ret) { + void finish(ImmutableCSR& ret, int concurrency) { for (VID_T i = 0; i < vnum_; ++i) { std::sort(offsets_[i], offsets_[i + 1]); } @@ -131,7 +243,7 @@ class ImmutableCSRStreamBuilder { degree_.push_back(degree); } - void finish(ImmutableCSR& ret) { + void finish(ImmutableCSR& ret, int concurrency) { ret.edges_.clear(); ret.edges_.resize(edges_.size()); std::copy(edges_.begin(), edges_.end(), ret.edges_.begin()); @@ -154,6 +266,7 @@ class ImmutableCSR { public: using vid_t = VID_T; using nbr_t = NBR_T; + using vertex_range_t = VertexRange; ImmutableCSR() { offsets_.resize(1); @@ -254,7 +367,10 @@ class ImmutableCSR { Array> offsets_; template - friend class ImmutableCSRBuild; + friend class ImmutableCSRBuilder; + + template + friend class ImmutableCSRParallelBuilder; template friend class ImmutableCSRStreamBuilder; diff --git a/grape/graph/mutable_csr.h b/grape/graph/mutable_csr.h index ba952c8c..faa2ea23 100644 --- a/grape/graph/mutable_csr.h +++ b/grape/graph/mutable_csr.h @@ -103,7 +103,7 @@ class MutableCSRBuilder> { } } - void finish(MutableCSR>& ret) { + void finish(MutableCSR>& ret, int concurrency) { if (vnum_ == 0) { ret.capacity_.clear(); ret.prev_.clear(); diff --git a/grape/vertex_map/idxers/sorted_array_idxer.h b/grape/vertex_map/idxers/sorted_array_idxer.h index 0eaf48da..270ea216 100644 --- a/grape/vertex_map/idxers/sorted_array_idxer.h +++ b/grape/vertex_map/idxers/sorted_array_idxer.h @@ -80,12 +80,8 @@ class SortedArrayIdxer public: SortedArrayIdxer() {} - explicit SortedArrayIdxer( - Array>&& id_list) { - for (auto& id : id_list) { - id_list_.emplace_back(id); - } - } + explicit SortedArrayIdxer(StringViewVector&& id_list) + : id_list_(std::move(id_list)) {} ~SortedArrayIdxer() {} bool get_key(VID_T vid, internal_oid_t& oid) const override { @@ -159,6 +155,31 @@ class SortedArrayIdxerDummyBuilder : public IdxerBuilderBase { Array> id_list_; }; +template +class SortedArrayIdxerDummyBuilder + : public IdxerBuilderBase { + public: + using internal_oid_t = typename InternalOID::type; + void add(const internal_oid_t& oid) override {} + + std::unique_ptr> finish() override { + return std::unique_ptr>( + new SortedArrayIdxer(std::move(id_list_))); + } + + void sync_request(const CommSpec& comm_spec, int target, int tag) override { + sync_comm::Recv(id_list_, target, tag, comm_spec.comm()); + } + + void sync_response(const CommSpec& comm_spec, int source, int tag) override { + LOG(ERROR) << "SortedArrayIdxerDummyBuilder should not be used to sync " + "response"; + } + + private: + StringViewVector id_list_; +}; + template class SortedArrayIdxerBuilder : public IdxerBuilderBase { public: @@ -193,6 +214,48 @@ class SortedArrayIdxerBuilder : public IdxerBuilderBase { bool sorted_ = false; }; +template +class SortedArrayIdxerBuilder + : public IdxerBuilderBase { + public: + using internal_oid_t = typename InternalOID::type; + void add(const internal_oid_t& oid) override { + keys_.push_back(std::string(oid)); + } + + std::unique_ptr> finish() override { + if (!sorted_) { + DistinctSort(keys_); + for (auto& key : keys_) { + id_list_.emplace_back(key); + } + sorted_ = true; + } + return std::unique_ptr>( + new SortedArrayIdxer(std::move(id_list_))); + } + + void sync_request(const CommSpec& comm_spec, int target, int tag) override { + LOG(ERROR) << "HashMapIdxerBuilder should not be used to sync request"; + } + + void sync_response(const CommSpec& comm_spec, int source, int tag) override { + if (!sorted_) { + DistinctSort(keys_); + for (auto& key : keys_) { + id_list_.emplace_back(key); + } + sorted_ = true; + } + sync_comm::Send(id_list_, source, tag, comm_spec.comm()); + } + + private: + std::vector keys_; + StringViewVector id_list_; + bool sorted_ = false; +}; + } // namespace grape #endif // GRAPE_VERTEX_MAP_IDXERS_SORTED_ARRAY_IDXER_H_ diff --git a/grape/vertex_map/vertex_map.h b/grape/vertex_map/vertex_map.h index a1c385bd..e970567d 100644 --- a/grape/vertex_map/vertex_map.h +++ b/grape/vertex_map/vertex_map.h @@ -77,13 +77,25 @@ class VertexMap { return GetOid(fid, GetLidFromGid(gid), oid); } + bool GetInternalOid(const VID_T& gid, internal_oid_t& oid) const { + fid_t fid = GetFidFromGid(gid); + return GetInternalOid(fid, GetLidFromGid(gid), oid); + } + bool GetOid(fid_t fid, const VID_T& lid, OID_T& oid) const { internal_oid_t internal_oid; + if (GetInternalOid(fid, lid, internal_oid)) { + oid = InternalOID::FromInternal(internal_oid); + return true; + } + return false; + } + + bool GetInternalOid(fid_t fid, const VID_T& lid, internal_oid_t& oid) const { if (fid >= fnum_) { return false; } - if (idxers_[fid]->get_key(lid, internal_oid)) { - oid = InternalOID::FromInternal(internal_oid); + if (idxers_[fid]->get_key(lid, oid)) { return true; } return false; @@ -109,6 +121,26 @@ class VertexMap { return GetGid(fid, oid, gid); } + bool GetGidFromInternalOid(fid_t fid, const internal_oid_t& oid, + VID_T& gid) const { + if (fid >= fnum_) { + return false; + } + if (idxers_[fid]->get_index(oid, gid)) { + gid = Lid2Gid(fid, gid); + return true; + } + return false; + } + + bool GetGidFromInternalOid(const internal_oid_t& oid, VID_T& gid) const { + fid_t fid = partitioner_->GetPartitionId(oid); + if (fid == fnum_) { + return false; + } + return GetGidFromInternalOid(fid, oid, gid); + } + void reset() { idxers_.clear(); } void ExtendVertices(const CommSpec& comm_spec,