Skip to content

Commit

Permalink
remove temporary vecs when creating Index
Browse files Browse the repository at this point in the history
  • Loading branch information
ljeub-pometry committed Jan 14, 2025
1 parent 736d2c2 commit 760df60
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 60 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,4 @@ arrow-buffer = { version = "53.2.0" }
arrow-schema = { version = "53.2.0" }
arrow-array = { version = "53.2.0" }
moka = { version = "0.12.7", features = ["sync"] }
indexmap = "2.7.0"
indexmap = { version = "2.7.0", features = ["rayon"] }
4 changes: 3 additions & 1 deletion raphtory/src/algorithms/components/in_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
},
prelude::GraphViewOps,
};
use indexmap::IndexSet;
use itertools::Itertools;
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};

Expand Down Expand Up @@ -130,7 +131,8 @@ pub fn in_component<'graph, G: GraphViewOps<'graph>>(
}
}

let (nodes, distances): (Vec<_>, Vec<_>) = in_components.into_iter().sorted().unzip();
let (nodes, distances): (IndexSet<_, ahash::RandomState>, Vec<_>) =
in_components.into_iter().sorted().unzip();
NodeState::new(
node.graph.clone(),
node.graph.clone(),
Expand Down
4 changes: 3 additions & 1 deletion raphtory/src/algorithms/components/out_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
},
prelude::GraphViewOps,
};
use indexmap::IndexSet;
use itertools::Itertools;
use raphtory_api::core::entities::GID;
use rayon::prelude::*;
Expand Down Expand Up @@ -133,7 +134,8 @@ pub fn out_component<'graph, G: GraphViewOps<'graph>>(
}
}

let (nodes, distances): (Vec<_>, Vec<_>) = out_components.into_iter().sorted().unzip();
let (nodes, distances): (IndexSet<_, ahash::RandomState>, Vec<_>) =
out_components.into_iter().sorted().unzip();
NodeState::new(
node.graph.clone(),
node.graph.clone(),
Expand Down
6 changes: 4 additions & 2 deletions raphtory/src/db/api/state/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
prelude::{GraphViewOps, NodeStateOps},
};
use dashmap::DashMap;
use indexmap::IndexSet;
use raphtory_api::core::entities::VID;
use rayon::prelude::*;
use std::{hash::Hash, sync::Arc};
Expand All @@ -18,9 +19,10 @@ pub struct NodeGroups<V, G> {

impl<'graph, V: Hash + Eq + Send + Sync + Clone, G: GraphViewOps<'graph>> NodeGroups<V, G> {
pub(crate) fn new(values: impl ParallelIterator<Item = (VID, V)>, graph: G) -> Self {
let groups: DashMap<V, Vec<VID>, ahash::RandomState> = DashMap::default();
let groups: DashMap<V, IndexSet<VID, ahash::RandomState>, ahash::RandomState> =
DashMap::default();
values.for_each(|(node, v)| {
groups.entry(v).or_insert_with(Vec::new).push(node);
groups.entry(v).or_default().insert(node);
});

let groups = groups
Expand Down
3 changes: 2 additions & 1 deletion raphtory/src/db/api/state/lazy_node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
},
prelude::*,
};
use indexmap::IndexSet;
use rayon::prelude::*;
use std::fmt::{Debug, Formatter};

Expand Down Expand Up @@ -93,7 +94,7 @@ impl<'graph, Op: NodeOp + 'graph, G: GraphViewOps<'graph>, GH: GraphViewOps<'gra

pub fn compute(&self) -> NodeState<'graph, Op::Output, G, GH> {
if self.nodes.is_filtered() {
let (keys, values): (Vec<_>, Vec<_>) = self
let (keys, values): (IndexSet<_, ahash::RandomState>, Vec<_>) = self
.par_iter()
.map(|(node, value)| (node.node, value))
.unzip();
Expand Down
18 changes: 12 additions & 6 deletions raphtory/src/db/api/state/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,19 @@ use indexmap::IndexSet;
use rayon::{iter::Either, prelude::*};
use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc};

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct Index<K> {
index: Arc<IndexSet<K, ahash::RandomState>>,
}

impl<K: Copy + Eq + Hash + Into<usize> + From<usize> + Send + Sync> FromIterator<K> for Index<K> {
fn from_iter<T: IntoIterator<Item = K>>(iter: T) -> Self {
Self {
index: Arc::new(IndexSet::from_iter(iter)),
}
}
}

impl Index<VID> {
pub fn for_graph<'graph>(graph: impl GraphViewOps<'graph>) -> Option<Self> {
if graph.nodes_filtered() {
Expand All @@ -27,7 +35,7 @@ impl Index<VID> {
NodeList::List { nodes } => Some(nodes),
}
} else {
Some(Self::new(graph.nodes().iter().map(|node| node.node)))
Some(Self::from_iter(graph.nodes().iter().map(|node| node.node)))
}
} else {
None
Expand All @@ -36,10 +44,8 @@ impl Index<VID> {
}

impl<K: Copy + Eq + Hash + Into<usize> + From<usize> + Send + Sync> Index<K> {
pub fn new(keys: impl IntoIterator<Item = K>) -> Self {
Self {
index: Arc::new(IndexSet::from_iter(keys)),
}
pub fn new(keys: impl Into<Arc<IndexSet<K, ahash::RandomState>>>) -> Self {
Self { index: keys.into() }
}

#[inline]
Expand Down
87 changes: 42 additions & 45 deletions raphtory/src/db/api/state/node_state_ops.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use crate::{
core::entities::nodes::node_ref::AsNodeRef,
db::{
api::state::{group_by::NodeGroups, node_state::NodeState, node_state_ord_ops, Index},
api::{
state::{group_by::NodeGroups, node_state::NodeState, node_state_ord_ops, Index},
},
graph::{node::NodeView, nodes::Nodes},
},
prelude::{GraphViewOps, NodeViewOps},
};
use indexmap::IndexSet;
use num_traits::AsPrimitive;
use rayon::{
iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator},
prelude::ParallelSliceMut,
};
use rayon::prelude::*;
use std::{borrow::Borrow, hash::Hash, iter::Sum};

pub trait NodeStateOps<'graph>:
Expand Down Expand Up @@ -73,6 +73,40 @@ pub trait NodeStateOps<'graph>:

fn len(&self) -> usize;

fn sort_by<
F: Fn(
(NodeView<&Self::BaseGraph, &Self::Graph>, &Self::OwnedValue),
(NodeView<&Self::BaseGraph, &Self::Graph>, &Self::OwnedValue),
) -> std::cmp::Ordering
+ Sync,
>(
&self,
cmp: F,
) -> NodeState<'graph, Self::OwnedValue, Self::BaseGraph, Self::Graph> {
let mut state: Vec<_> = self
.par_iter()
.map(|(n, v)| (n.node, v.borrow().clone()))
.collect();
let graph = self.graph();
let base_graph = self.base_graph();
state.par_sort_by(|(n1, v1), (n2, v2)| {
cmp(
(NodeView::new_one_hop_filtered(base_graph, graph, *n1), v1),
(NodeView::new_one_hop_filtered(base_graph, graph, *n2), v2),
)
});

let (keys, values): (IndexSet<_, ahash::RandomState>, Vec<_>) =
state.into_par_iter().unzip();

NodeState::new(
self.base_graph().clone(),
self.graph().clone(),
values.into(),
Some(Index::new(keys)),
)
}

/// Sorts the by its values in ascending or descending order.
///
/// Arguments:
Expand All @@ -88,49 +122,12 @@ pub trait NodeStateOps<'graph>:
&self,
cmp: F,
) -> NodeState<'graph, Self::OwnedValue, Self::BaseGraph, Self::Graph> {
{
let mut state: Vec<_> = self
.par_iter()
.map(|(n, v)| (n.node, v.borrow().clone()))
.collect();
state.par_sort_by(|(_, v1), (_, v2)| cmp(v1, v2));

let mut keys = Vec::with_capacity(state.len());
let mut values = Vec::with_capacity(state.len());
state
.into_par_iter()
.unzip_into_vecs(&mut keys, &mut values);

NodeState::new(
self.base_graph().clone(),
self.graph().clone(),
values.into(),
Some(Index::new(keys)),
)
}
self.sort_by(|(_, v1), (_, v2)| cmp(v1, v2))
}

/// Sort the results by global node id
fn sort_by_id(&self) -> NodeState<'graph, Self::OwnedValue, Self::BaseGraph, Self::Graph> {
let mut state: Vec<_> = self
.par_iter()
.map(|(n, v)| (n.id(), n.node, v.borrow().clone()))
.collect();
state.par_sort_by(|(l_id, l_n, _), (r_id, r_n, _)| (l_id, l_n).cmp(&(r_id, r_n)));

let mut keys = Vec::with_capacity(state.len());
let mut values = Vec::with_capacity(state.len());
state
.into_par_iter()
.map(|(_, n, v)| (n, v))
.unzip_into_vecs(&mut keys, &mut values);

NodeState::new(
self.base_graph().clone(),
self.graph().clone(),
values.into(),
Some(Index::new(keys)),
)
self.sort_by(|(n1, _), (n2, _)| n1.id().cmp(&n2.id()))
}

/// Retrieves the top-k elements from the `AlgorithmResult` based on its values.
Expand All @@ -157,7 +154,7 @@ pub trait NodeStateOps<'graph>:
|(_, v1), (_, v2)| cmp(v1.borrow(), v2.borrow()),
k,
);
let (keys, values): (Vec<_>, Vec<_>) = values
let (keys, values): (IndexSet<_, ahash::RandomState>, Vec<_>) = values
.into_iter()
.map(|(n, v)| (n.node, v.borrow().clone()))
.unzip();
Expand Down
4 changes: 2 additions & 2 deletions raphtory/src/db/graph/views/node_subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ impl<'graph, G: GraphViewOps<'graph>> NodeSubgraph<G> {
.into_iter()
.flat_map(|v| graph.internalise_node(v.as_node_ref()));
let nodes = if graph.nodes_filtered() {
Index::new(nodes.filter(|n| graph.has_node(*n)))
Index::from_iter(nodes.filter(|n| graph.has_node(*n)))
} else {
Index::new(nodes)
Index::from_iter(nodes)
};
Self { graph, nodes }
}
Expand Down
2 changes: 1 addition & 1 deletion raphtory/src/db/graph/views/window_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<'graph, G: GraphViewOps<'graph>> ListOps for WindowedGraph<G> {
fn node_list(&self) -> NodeList {
if self.window_is_empty() {
NodeList::List {
nodes: Index::new(vec![]),
nodes: Index::default(),
}
} else {
self.graph.node_list()
Expand Down

0 comments on commit 760df60

Please sign in to comment.