From 760df60808a5ecc24082164f4a1b1ca38c498fe6 Mon Sep 17 00:00:00 2001 From: Lucas Jeub Date: Tue, 14 Jan 2025 11:29:50 +0100 Subject: [PATCH] remove temporary vecs when creating Index --- Cargo.lock | 1 + Cargo.toml | 2 +- .../algorithms/components/in_components.rs | 4 +- .../algorithms/components/out_components.rs | 4 +- raphtory/src/db/api/state/group_by.rs | 6 +- raphtory/src/db/api/state/lazy_node_state.rs | 3 +- raphtory/src/db/api/state/node_state.rs | 18 ++-- raphtory/src/db/api/state/node_state_ops.rs | 87 +++++++++---------- raphtory/src/db/graph/views/node_subgraph.rs | 4 +- raphtory/src/db/graph/views/window_graph.rs | 2 +- 10 files changed, 71 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d1d606598..d38ce8b3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2906,6 +2906,7 @@ checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", "hashbrown 0.15.2", + "rayon", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 7bd6d2221..7efbdd47f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,4 +149,4 @@ arrow-buffer = { version = "53.2.0" } arrow-schema = { version = "53.2.0" } arrow-array = { version = "53.2.0" } moka = { version = "0.12.7", features = ["sync"] } -indexmap = "2.7.0" +indexmap = { version = "2.7.0", features = ["rayon"] } diff --git a/raphtory/src/algorithms/components/in_components.rs b/raphtory/src/algorithms/components/in_components.rs index d26e2c493..6841b22ba 100644 --- a/raphtory/src/algorithms/components/in_components.rs +++ b/raphtory/src/algorithms/components/in_components.rs @@ -19,6 +19,7 @@ use crate::{ }, prelude::GraphViewOps, }; +use indexmap::IndexSet; use itertools::Itertools; use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque}; @@ -130,7 +131,8 @@ pub fn in_component<'graph, G: GraphViewOps<'graph>>( } } - let (nodes, distances): (Vec<_>, Vec<_>) = in_components.into_iter().sorted().unzip(); + let (nodes, distances): (IndexSet<_, ahash::RandomState>, Vec<_>) = + in_components.into_iter().sorted().unzip(); NodeState::new( node.graph.clone(), node.graph.clone(), diff --git a/raphtory/src/algorithms/components/out_components.rs b/raphtory/src/algorithms/components/out_components.rs index 519d46a7c..855e213a7 100644 --- a/raphtory/src/algorithms/components/out_components.rs +++ b/raphtory/src/algorithms/components/out_components.rs @@ -16,6 +16,7 @@ use crate::{ }, prelude::GraphViewOps, }; +use indexmap::IndexSet; use itertools::Itertools; use raphtory_api::core::entities::GID; use rayon::prelude::*; @@ -133,7 +134,8 @@ pub fn out_component<'graph, G: GraphViewOps<'graph>>( } } - let (nodes, distances): (Vec<_>, Vec<_>) = out_components.into_iter().sorted().unzip(); + let (nodes, distances): (IndexSet<_, ahash::RandomState>, Vec<_>) = + out_components.into_iter().sorted().unzip(); NodeState::new( node.graph.clone(), node.graph.clone(), diff --git a/raphtory/src/db/api/state/group_by.rs b/raphtory/src/db/api/state/group_by.rs index f654b318d..c4f200e4f 100644 --- a/raphtory/src/db/api/state/group_by.rs +++ b/raphtory/src/db/api/state/group_by.rs @@ -6,6 +6,7 @@ use crate::{ prelude::{GraphViewOps, NodeStateOps}, }; use dashmap::DashMap; +use indexmap::IndexSet; use raphtory_api::core::entities::VID; use rayon::prelude::*; use std::{hash::Hash, sync::Arc}; @@ -18,9 +19,10 @@ pub struct NodeGroups { impl<'graph, V: Hash + Eq + Send + Sync + Clone, G: GraphViewOps<'graph>> NodeGroups { pub(crate) fn new(values: impl ParallelIterator, graph: G) -> Self { - let groups: DashMap, ahash::RandomState> = DashMap::default(); + let groups: DashMap, ahash::RandomState> = + DashMap::default(); values.for_each(|(node, v)| { - groups.entry(v).or_insert_with(Vec::new).push(node); + groups.entry(v).or_default().insert(node); }); let groups = groups diff --git a/raphtory/src/db/api/state/lazy_node_state.rs b/raphtory/src/db/api/state/lazy_node_state.rs index fab4e21ce..7d2d085d1 100644 --- a/raphtory/src/db/api/state/lazy_node_state.rs +++ b/raphtory/src/db/api/state/lazy_node_state.rs @@ -15,6 +15,7 @@ use crate::{ }, prelude::*, }; +use indexmap::IndexSet; use rayon::prelude::*; use std::fmt::{Debug, Formatter}; @@ -93,7 +94,7 @@ impl<'graph, Op: NodeOp + 'graph, G: GraphViewOps<'graph>, GH: GraphViewOps<'gra pub fn compute(&self) -> NodeState<'graph, Op::Output, G, GH> { if self.nodes.is_filtered() { - let (keys, values): (Vec<_>, Vec<_>) = self + let (keys, values): (IndexSet<_, ahash::RandomState>, Vec<_>) = self .par_iter() .map(|(node, value)| (node.node, value)) .unzip(); diff --git a/raphtory/src/db/api/state/node_state.rs b/raphtory/src/db/api/state/node_state.rs index 4b2357770..398490bb0 100644 --- a/raphtory/src/db/api/state/node_state.rs +++ b/raphtory/src/db/api/state/node_state.rs @@ -13,11 +13,19 @@ use indexmap::IndexSet; use rayon::{iter::Either, prelude::*}; use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct Index { index: Arc>, } +impl + From + Send + Sync> FromIterator for Index { + fn from_iter>(iter: T) -> Self { + Self { + index: Arc::new(IndexSet::from_iter(iter)), + } + } +} + impl Index { pub fn for_graph<'graph>(graph: impl GraphViewOps<'graph>) -> Option { if graph.nodes_filtered() { @@ -27,7 +35,7 @@ impl Index { NodeList::List { nodes } => Some(nodes), } } else { - Some(Self::new(graph.nodes().iter().map(|node| node.node))) + Some(Self::from_iter(graph.nodes().iter().map(|node| node.node))) } } else { None @@ -36,10 +44,8 @@ impl Index { } impl + From + Send + Sync> Index { - pub fn new(keys: impl IntoIterator) -> Self { - Self { - index: Arc::new(IndexSet::from_iter(keys)), - } + pub fn new(keys: impl Into>>) -> Self { + Self { index: keys.into() } } #[inline] diff --git a/raphtory/src/db/api/state/node_state_ops.rs b/raphtory/src/db/api/state/node_state_ops.rs index 3e57aede7..10b653390 100644 --- a/raphtory/src/db/api/state/node_state_ops.rs +++ b/raphtory/src/db/api/state/node_state_ops.rs @@ -1,16 +1,16 @@ use crate::{ core::entities::nodes::node_ref::AsNodeRef, db::{ - api::state::{group_by::NodeGroups, node_state::NodeState, node_state_ord_ops, Index}, + api::{ + state::{group_by::NodeGroups, node_state::NodeState, node_state_ord_ops, Index}, + }, graph::{node::NodeView, nodes::Nodes}, }, prelude::{GraphViewOps, NodeViewOps}, }; +use indexmap::IndexSet; use num_traits::AsPrimitive; -use rayon::{ - iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}, - prelude::ParallelSliceMut, -}; +use rayon::prelude::*; use std::{borrow::Borrow, hash::Hash, iter::Sum}; pub trait NodeStateOps<'graph>: @@ -73,6 +73,40 @@ pub trait NodeStateOps<'graph>: fn len(&self) -> usize; + fn sort_by< + F: Fn( + (NodeView<&Self::BaseGraph, &Self::Graph>, &Self::OwnedValue), + (NodeView<&Self::BaseGraph, &Self::Graph>, &Self::OwnedValue), + ) -> std::cmp::Ordering + + Sync, + >( + &self, + cmp: F, + ) -> NodeState<'graph, Self::OwnedValue, Self::BaseGraph, Self::Graph> { + let mut state: Vec<_> = self + .par_iter() + .map(|(n, v)| (n.node, v.borrow().clone())) + .collect(); + let graph = self.graph(); + let base_graph = self.base_graph(); + state.par_sort_by(|(n1, v1), (n2, v2)| { + cmp( + (NodeView::new_one_hop_filtered(base_graph, graph, *n1), v1), + (NodeView::new_one_hop_filtered(base_graph, graph, *n2), v2), + ) + }); + + let (keys, values): (IndexSet<_, ahash::RandomState>, Vec<_>) = + state.into_par_iter().unzip(); + + NodeState::new( + self.base_graph().clone(), + self.graph().clone(), + values.into(), + Some(Index::new(keys)), + ) + } + /// Sorts the by its values in ascending or descending order. /// /// Arguments: @@ -88,49 +122,12 @@ pub trait NodeStateOps<'graph>: &self, cmp: F, ) -> NodeState<'graph, Self::OwnedValue, Self::BaseGraph, Self::Graph> { - { - let mut state: Vec<_> = self - .par_iter() - .map(|(n, v)| (n.node, v.borrow().clone())) - .collect(); - state.par_sort_by(|(_, v1), (_, v2)| cmp(v1, v2)); - - let mut keys = Vec::with_capacity(state.len()); - let mut values = Vec::with_capacity(state.len()); - state - .into_par_iter() - .unzip_into_vecs(&mut keys, &mut values); - - NodeState::new( - self.base_graph().clone(), - self.graph().clone(), - values.into(), - Some(Index::new(keys)), - ) - } + self.sort_by(|(_, v1), (_, v2)| cmp(v1, v2)) } /// Sort the results by global node id fn sort_by_id(&self) -> NodeState<'graph, Self::OwnedValue, Self::BaseGraph, Self::Graph> { - let mut state: Vec<_> = self - .par_iter() - .map(|(n, v)| (n.id(), n.node, v.borrow().clone())) - .collect(); - state.par_sort_by(|(l_id, l_n, _), (r_id, r_n, _)| (l_id, l_n).cmp(&(r_id, r_n))); - - let mut keys = Vec::with_capacity(state.len()); - let mut values = Vec::with_capacity(state.len()); - state - .into_par_iter() - .map(|(_, n, v)| (n, v)) - .unzip_into_vecs(&mut keys, &mut values); - - NodeState::new( - self.base_graph().clone(), - self.graph().clone(), - values.into(), - Some(Index::new(keys)), - ) + self.sort_by(|(n1, _), (n2, _)| n1.id().cmp(&n2.id())) } /// Retrieves the top-k elements from the `AlgorithmResult` based on its values. @@ -157,7 +154,7 @@ pub trait NodeStateOps<'graph>: |(_, v1), (_, v2)| cmp(v1.borrow(), v2.borrow()), k, ); - let (keys, values): (Vec<_>, Vec<_>) = values + let (keys, values): (IndexSet<_, ahash::RandomState>, Vec<_>) = values .into_iter() .map(|(n, v)| (n.node, v.borrow().clone())) .unzip(); diff --git a/raphtory/src/db/graph/views/node_subgraph.rs b/raphtory/src/db/graph/views/node_subgraph.rs index ed2f88f3e..04722f5c6 100644 --- a/raphtory/src/db/graph/views/node_subgraph.rs +++ b/raphtory/src/db/graph/views/node_subgraph.rs @@ -55,9 +55,9 @@ impl<'graph, G: GraphViewOps<'graph>> NodeSubgraph { .into_iter() .flat_map(|v| graph.internalise_node(v.as_node_ref())); let nodes = if graph.nodes_filtered() { - Index::new(nodes.filter(|n| graph.has_node(*n))) + Index::from_iter(nodes.filter(|n| graph.has_node(*n))) } else { - Index::new(nodes) + Index::from_iter(nodes) }; Self { graph, nodes } } diff --git a/raphtory/src/db/graph/views/window_graph.rs b/raphtory/src/db/graph/views/window_graph.rs index eda93c3df..f7f330d4d 100644 --- a/raphtory/src/db/graph/views/window_graph.rs +++ b/raphtory/src/db/graph/views/window_graph.rs @@ -140,7 +140,7 @@ impl<'graph, G: GraphViewOps<'graph>> ListOps for WindowedGraph { fn node_list(&self) -> NodeList { if self.window_is_empty() { NodeList::List { - nodes: Index::new(vec![]), + nodes: Index::default(), } } else { self.graph.node_list()