Skip to content

Commit

Permalink
Unlocked storage 2 (#1662)
Browse files Browse the repository at this point in the history
* 1 fatal error

* fixed fatal error

* able to make neighbours iterators out of Entry

* make TPropOps non Copy

* LockedView<TProp> needs to be TPropOps

* add edge_tuples to Entry<NodeStore>

* changed name in NodeStorageOps to return cows

* actually add the unlocked and the variants files

* NodesStorageRef compiles

* NodesStorage compiles

* NodeOwnedEntry compiles

* fixed compilation on non storage feature

* still needing to implement EdgeStorageRef

* node storage additions work with reference

* add node additions_ref

* tests pass

* replace EdgeStoreRef with EdgeStoreEntry

* change the benchmarks to reduce the sample size for materialize

* fix materialize for Persistent graph and add global locks only when we need all the nodes

* redo the nodes to move to unlocked

* ready to merge

* removed some useless stuff

* clippy fix

* fmt

* inline active

* increase sample size in graph_ops bench and fix earliest_time

* fix the build issues

* fix build errors

* revert back to NodeStorageRef and always lock when we get all nodes

* refactor Node storage remove graph_ops from GH benches

* deactivate storage

* fix node_owned_entry.rs
  • Loading branch information
fabianmurariu authored Jun 20, 2024
1 parent cad9bc0 commit 5703639
Show file tree
Hide file tree
Showing 35 changed files with 1,040 additions and 469 deletions.
46 changes: 23 additions & 23 deletions raphtory-benchmark/benches/base.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::common::{bootstrap_graph, run_analysis_benchmarks, run_large_ingestion_benchmarks};
use crate::common::{bootstrap_graph, run_large_ingestion_benchmarks};
use common::run_graph_ops_benches;
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use raphtory::{graph_loader::example::lotr_graph::lotr_graph, prelude::*};

Expand All @@ -21,29 +22,28 @@ pub fn base(c: &mut Criterion) {
// Make an option of None
run_large_ingestion_benchmarks(&mut large_group, || bootstrap_graph(10000), None);
large_group.finish();
let mut graph_group = c.benchmark_group("lotr_graph");

let graph = lotr_graph();
run_analysis_benchmarks(&mut graph_group, || graph.clone(), None);
graph_group.finish();
let mut graph_window_group_100 = c.benchmark_group("lotr_graph_window_100");
graph_window_group_100.sample_size(10);
run_analysis_benchmarks(
&mut graph_window_group_100,
|| graph.window(i64::MIN, i64::MAX),
None,
);
graph_window_group_100.finish();
let mut graph_window_group_10 = c.benchmark_group("lotr_graph_window_10");
let latest = graph.latest_time().expect("non-empty graph");
let earliest = graph.earliest_time().expect("non-empty graph");
let start = latest - (latest - earliest) / 10;
graph_window_group_10.sample_size(10);
run_analysis_benchmarks(
&mut graph_window_group_10,
|| graph.window(start, latest + 1),
None,
);
graph_window_group_10.finish();

let layered_graph = Graph::new();

for layer in (0..10).map(|i| i.to_string()) {
for edge in graph.edges() {
for t in edge.history() {
layered_graph
.add_edge(
t,
edge.src().name().clone(),
edge.dst().name().clone(),
NO_PROPS,
Some(&layer),
)
.expect("Error: Unable to add edge");
}
}
}

run_graph_ops_benches(c, "lotr", graph, layered_graph)
}

criterion_group!(benches, base);
Expand Down
150 changes: 135 additions & 15 deletions raphtory-benchmark/benches/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
#![allow(dead_code)]

use criterion::{
black_box, measurement::WallTime, BatchSize, Bencher, BenchmarkGroup, BenchmarkId,
};
use rand::{distributions::Uniform, seq::*, Rng};
use raphtory::{
core::entities::{LayerIds, VID},
db::api::view::StaticGraphViewOps,
prelude::*,
black_box, measurement::WallTime, BatchSize, Bencher, BenchmarkGroup, BenchmarkId, Criterion,
};
use rand::{distributions::Uniform, seq::*, Rng, SeedableRng};
use raphtory::{db::api::view::StaticGraphViewOps, prelude::*};
use std::collections::HashSet;

fn make_index_gen() -> Box<dyn Iterator<Item = u64>> {
Expand Down Expand Up @@ -421,20 +417,144 @@ pub fn run_analysis_benchmarks<F, G>(
},
);

bench(group, "materialize", parameter, |b: &mut Bencher| {
b.iter(|| {
let mg = graph.materialize();
black_box(mg)
})
});

bench(
group,
"max_neighbour_degree",
parameter,
|b: &mut Bencher| {
let v = graph.node(VID(0)).expect("graph should not be empty");
let v = graph
.nodes()
.into_iter()
.next()
.expect("graph should not be empty");
b.iter(|| v.neighbours().degree().max())
},
);
}

pub fn run_materialize<F, G>(
group: &mut BenchmarkGroup<WallTime>,
make_graph: F,
parameter: Option<usize>,
) where
F: Fn() -> G,
G: StaticGraphViewOps,
{
let graph = make_graph();
bench(group, "materialize", parameter, |b: &mut Bencher| {
b.iter(|| {
let mg = graph.materialize();
black_box(mg)
})
});
}

pub fn run_graph_ops_benches(
c: &mut Criterion,
graph_name: &str,
graph: Graph,
layered_graph: Graph,
) {
let mut graph_group = c.benchmark_group(graph_name);
let make_graph = || graph.clone();
run_analysis_benchmarks(&mut graph_group, make_graph, None);
graph_group.finish();

bench_materialise(&format!("{graph_name}_materialise"), c, make_graph);

let group_name = format!("{graph_name}_window_100");
let make_graph = || graph.window(i64::MIN, i64::MAX);
let mut graph_window_group_100 = c.benchmark_group(group_name);
// graph_window_group_100.sample_size(10);
run_analysis_benchmarks(&mut graph_window_group_100, make_graph, None);
graph_window_group_100.finish();

bench_materialise(&format!("{graph_name}_materialise"), c, make_graph);

// graph windowed
let group_name = format!("{graph_name}_graph_window_10");
let mut graph_window_group_10 = c.benchmark_group(group_name);
let latest = graph.latest_time().expect("non-empty graph");
let earliest = graph.earliest_time().expect("non-empty graph");
let start = latest - (latest - earliest) / 10;
// graph_window_group_10.sample_size(10);
let make_graph = || graph.window(start, latest + 1);
run_analysis_benchmarks(&mut graph_window_group_10, make_graph, None);
graph_window_group_10.finish();
bench_materialise(&format!("{graph_name}_materialise"), c, make_graph);

// subgraph
let mut rng = rand::rngs::StdRng::seed_from_u64(73);
let nodes = graph
.nodes()
.into_iter()
.choose_multiple(&mut rng, graph.count_nodes() / 10)
.into_iter()
.map(|n| n.id())
.collect::<Vec<_>>();
let subgraph = graph.subgraph(nodes);
let group_name = format!("{graph_name}_subgraph_10pc");
let mut subgraph_10 = c.benchmark_group(group_name);
// subgraph_10.sample_size(10);

let make_graph = || subgraph.clone();
run_analysis_benchmarks(&mut subgraph_10, make_graph, None);
subgraph_10.finish();
bench_materialise(&format!("{graph_name}_materialise"), c, make_graph);

// subgraph windowed
let group_name = format!("{graph_name}_subgraph_10pc_windowed");
let mut subgraph_10_windowed = c.benchmark_group(group_name);

let make_graph = || subgraph.window(start, latest + 1);
run_analysis_benchmarks(&mut subgraph_10_windowed, make_graph, None);
subgraph_10_windowed.finish();
bench_materialise(&format!("{graph_name}_materialise"), c, make_graph);

// layered graph windowed
let graph = layered_graph;
let group_name = format!("{graph_name}_graph_window_50_layered");
let mut graph_window_layered_group_50 = c.benchmark_group(group_name);
let latest = graph.latest_time().expect("non-empty graph");
let earliest = graph.earliest_time().expect("non-empty graph");
let start = latest - (latest - earliest) / 2;
graph_window_layered_group_50.sample_size(10);
let make_graph = || {
graph
.window(start, latest + 1)
.layers(["0", "1", "2", "3", "4"])
.unwrap()
};
run_analysis_benchmarks(&mut graph_window_layered_group_50, make_graph, None);
graph_window_layered_group_50.finish();
bench_materialise(&format!("{graph_name}_materialise"), c, make_graph);

let graph = graph.persistent_graph();

let group_name = format!("{graph_name}_persistent_window_50_layered");
let mut graph_window_layered_group_50 = c.benchmark_group(group_name);
let latest = graph.latest_time().expect("non-empty graph");
let earliest = graph.earliest_time().expect("non-empty graph");
let start = latest - (latest - earliest) / 2;
graph_window_layered_group_50.sample_size(10);
let make_graph = || {
graph
.window(start, latest + 1)
.layers(["0", "1", "2", "3", "4"])
.unwrap()
};
run_analysis_benchmarks(&mut graph_window_layered_group_50, make_graph, None);
graph_window_layered_group_50.finish();
bench_materialise(&format!("{graph_name}_materialise"), c, make_graph);
}

fn bench_materialise<F, G>(name: &str, c: &mut Criterion, make_graph: F)
where
F: Fn() -> G,
G: StaticGraphViewOps,
{
let mut mat_graph_group = c.benchmark_group(name);
mat_graph_group.sample_size(10);
run_materialize(&mut mat_graph_group, make_graph, None);
mat_graph_group.finish();
}
96 changes: 4 additions & 92 deletions raphtory-benchmark/benches/graph_ops.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use common::run_analysis_benchmarks;
use common::run_graph_ops_benches;
use criterion::{criterion_group, criterion_main, Criterion};
use rand::{seq::*, SeedableRng};
use raphtory::{
core::utils::hashing::calculate_hash,
db::api::view::*,
graph_loader::{
example::sx_superuser_graph::{sx_superuser_file, sx_superuser_graph, TEdge},
source::csv_loader::CsvLoader,
Expand All @@ -14,97 +12,11 @@ use raphtory::{
mod common;

pub fn graph(c: &mut Criterion) {
let mut graph_group = c.benchmark_group("analysis_graph");
let group_name = "analysis_graph";
let graph = sx_superuser_graph().unwrap();
run_analysis_benchmarks(&mut graph_group, || graph.clone(), None);
graph_group.finish();
let mut graph_window_group_100 = c.benchmark_group("analysis_graph_window_100");
graph_window_group_100.sample_size(10);
run_analysis_benchmarks(
&mut graph_window_group_100,
|| graph.window(i64::MIN, i64::MAX),
None,
);
graph_window_group_100.finish();
let layered_graph = layered_sx_super_user_graph(Some(10)).unwrap();

// graph windowed
let mut graph_window_group_10 = c.benchmark_group("analysis_graph_window_10");
let latest = graph.latest_time().expect("non-empty graph");
let earliest = graph.earliest_time().expect("non-empty graph");
let start = latest - (latest - earliest) / 10;
graph_window_group_10.sample_size(10);
run_analysis_benchmarks(
&mut graph_window_group_10,
|| graph.window(start, latest + 1),
None,
);
graph_window_group_10.finish();

// subgraph
let mut rng = rand::rngs::StdRng::seed_from_u64(73);
let nodes = graph
.nodes()
.into_iter()
.choose_multiple(&mut rng, graph.count_nodes() / 10)
.into_iter()
.map(|n| n.id())
.collect::<Vec<_>>();
let subgraph = graph.subgraph(nodes);
let mut subgraph_10 = c.benchmark_group("analysis_subgraph_10pc");
subgraph_10.sample_size(10);

run_analysis_benchmarks(&mut subgraph_10, || subgraph.clone(), None);
subgraph_10.finish();

// subgraph windowed
let mut subgraph_10_windowed = c.benchmark_group("analysis_subgraph_10pc_windowed");
subgraph_10_windowed.sample_size(10);

run_analysis_benchmarks(
&mut subgraph_10_windowed,
|| subgraph.window(start, latest + 1),
None,
);
subgraph_10_windowed.finish();

// layered graph windowed
let graph = layered_sx_super_user_graph(Some(10)).unwrap();
let mut graph_window_layered_group_50 = c.benchmark_group("analysis_graph_window_50_layered");
let latest = graph.latest_time().expect("non-empty graph");
let earliest = graph.earliest_time().expect("non-empty graph");
let start = latest - (latest - earliest) / 2;
graph_window_layered_group_50.sample_size(10);
run_analysis_benchmarks(
&mut graph_window_layered_group_50,
|| {
graph
.window(start, latest + 1)
.layers(["0", "1", "2", "3", "4"])
.unwrap()
},
None,
);
graph_window_layered_group_50.finish();

let graph = graph.persistent_graph();

let mut graph_window_layered_group_50 =
c.benchmark_group("persistent_analysis_graph_window_50_layered");
let latest = graph.latest_time().expect("non-empty graph");
let earliest = graph.earliest_time().expect("non-empty graph");
let start = latest - (latest - earliest) / 2;
graph_window_layered_group_50.sample_size(10);
run_analysis_benchmarks(
&mut graph_window_layered_group_50,
|| {
graph
.window(start, latest + 1)
.layers(["0", "1", "2", "3", "4"])
.unwrap()
},
None,
);
graph_window_layered_group_50.finish();
run_graph_ops_benches(c, group_name, graph, layered_graph);
}

/// Load the SX SuperUser dataset into a graph and return it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ where
.permutations(2)
.map(|e| {
u.graph()
.edge(*e.get(0).unwrap(), *e.get(1).unwrap())
.edge(*e.first().unwrap(), *e.get(1).unwrap())
.iter()
.flat_map(|edge| edge.explode())
.collect::<Vec<_>>()
Expand Down
14 changes: 5 additions & 9 deletions raphtory/src/core/entities/nodes/input_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ pub fn parse_u64_strict(input: &str) -> Option<u64> {
if !(byte_1..=MAX_U64_BYTES[0]).contains(&first) {
return None;
}
} else {
if !(byte_1..=byte_9).contains(&first) {
return None;
}
} else if !(byte_1..=byte_9).contains(&first) {
return None;
}

let mut result = (first - byte_0) as u64;
Expand All @@ -41,14 +39,12 @@ pub fn parse_u64_strict(input: &str) -> Option<u64> {
return None;
}
check_max = next_byte == max_byte;
} else {
if !(byte_0..=byte_9).contains(&next_byte) {
return None;
}
} else if !(byte_0..=byte_9).contains(&next_byte) {
return None;
}
result = result * 10 + (next_byte - byte_0) as u64;
}
return Some(result);
Some(result)
}

pub trait InputNode: Clone {
Expand Down
Loading

0 comments on commit 5703639

Please sign in to comment.