Skip to content

Commit

Permalink
make groub_by parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
ljeub-pometry committed Jan 13, 2025
1 parent 73460b6 commit d5bced2
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ rand_distr = "0.4.3"
rustc-hash = "2.0.0"
twox-hash = "2.1.0"
lock_api = { version = "0.4.11", features = ["arc_lock", "serde"] }
dashmap = { version = "6.0.1", features = ["serde"] }
dashmap = { version = "6.0.1", features = ["serde", "rayon"] }
enum_dispatch = "0.3.12"
glam = "0.29.0"
quad-rand = "0.2.1"
Expand Down
17 changes: 10 additions & 7 deletions raphtory/src/db/api/state/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,26 @@ use crate::{
},
prelude::{GraphViewOps, NodeStateOps},
};
use dashmap::DashMap;
use raphtory_api::core::entities::VID;
use std::{collections::HashMap, hash::Hash, sync::Arc};
use rayon::prelude::*;
use std::{hash::Hash, sync::Arc};

#[derive(Clone, Debug)]
pub struct NodeGroups<V, G> {
groups: Arc<[(V, Index<VID>)]>,
graph: G,
}

impl<'graph, V: Hash + Eq, G: GraphViewOps<'graph>> NodeGroups<V, G> {
pub(crate) fn new(values: impl Iterator<Item = (VID, V)>, graph: G) -> Self {
let mut groups: HashMap<V, Vec<VID>> = HashMap::new();
for (node, v) in values {
impl<'graph, V: Hash + Eq + Send + Sync + Clone, G: GraphViewOps<'graph>> NodeGroups<V, G> {
pub(crate) fn new(values: impl ParallelIterator<Item = (VID, V)>, graph: G) -> Self {
let groups: DashMap<V, Vec<VID>, ahash::RandomState> = DashMap::default();
values.for_each(|(node, v)| {
groups.entry(v).or_insert_with(Vec::new).push(node);
}
});

let groups = groups
.into_iter()
.into_par_iter()
.map(|(k, v)| (k, Index::new(v)))
.collect();
Self { groups, graph }
Expand Down
4 changes: 2 additions & 2 deletions raphtory/src/db/api/state/node_state_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,12 @@ pub trait NodeStateOps<'graph>:
values.into_iter().nth(median_index)
}

fn group_by<V: Hash + Eq, F: Fn(&Self::OwnedValue) -> V + Sync>(
fn group_by<V: Hash + Eq + Send + Sync + Clone, F: Fn(&Self::OwnedValue) -> V + Sync>(
&self,
group_fn: F,
) -> NodeGroups<V, Self::Graph> {
NodeGroups::new(
self.iter()
self.par_iter()
.map(|(node, v)| (node.node, group_fn(v.borrow()))),
self.graph().clone(),
)
Expand Down

0 comments on commit d5bced2

Please sign in to comment.