Skip to content

Commit

Permalink
simple change to reduce memory usage (#1668)
Browse files Browse the repository at this point in the history
* simple change to reduce memory usage

* remove EdgeLayerRef from edge_store.rs

* rename x to data

* rename l to data

* remove useless wrapper struct
  • Loading branch information
fabianmurariu authored Jun 20, 2024
1 parent 5703639 commit b89065e
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 60 deletions.
121 changes: 67 additions & 54 deletions raphtory/src/core/entities/edges/edge_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ pub struct EdgeStore {
pub(crate) eid: EID,
pub(crate) src: VID,
pub(crate) dst: VID,
pub(crate) layers: Vec<EdgeLayer>, // each layer has its own set of properties
pub(crate) additions: Vec<TimeIndex<TimeIndexEntry>>,
pub(crate) deletions: Vec<TimeIndex<TimeIndexEntry>>,
pub(crate) data: Vec<EdgeData>,
}

#[derive(Serialize, Deserialize, Debug, Default, PartialEq)]
pub struct EdgeData {
pub(crate) layer: EdgeLayer,
pub(crate) additions: TimeIndex<TimeIndexEntry>,
pub(crate) deletions: TimeIndex<TimeIndexEntry>,
}

#[derive(Serialize, Deserialize, Debug, Default, PartialEq)]
Expand Down Expand Up @@ -95,32 +100,28 @@ impl EdgeStore {
}

pub fn internal_num_layers(&self) -> usize {
self.layers
.len()
.max(self.additions.len())
.max(self.deletions.len())
self.data.len()
}

fn get_or_allocate_layer(&mut self, layer_id: usize) -> &mut EdgeLayer {
if self.layers.len() <= layer_id {
self.layers.resize_with(layer_id + 1, Default::default);
if self.data.len() <= layer_id {
self.data.resize_with(layer_id + 1, Default::default);
}
&mut self.layers[layer_id]
&mut self.data[layer_id].layer
}

pub fn has_layer_inner(&self, layer_id: usize) -> bool {
self.additions
.get(layer_id)
self.get_additions(layer_id)
.filter(|t_index| !t_index.is_empty())
.is_some()
|| self
.deletions
.get(layer_id)
.get_deletions(layer_id)
.filter(|t_index| !t_index.is_empty())
.is_some()
}

pub fn layer_iter(&self) -> impl Iterator<Item = &EdgeLayer> + '_ {
self.layers.iter()
pub fn layer_iter(&self) -> impl Iterator<Item = &EdgeData> + '_ {
self.data.iter()
}

/// Iterate over (layer_id, additions, deletions) triplets for edge
Expand Down Expand Up @@ -148,14 +149,14 @@ impl EdgeStore {
.into_dyn_boxed(),
LayerIds::One(id) => Box::new(iter::once((
*id,
self.additions.get(*id).unwrap_or(&TimeIndex::Empty),
self.deletions.get(*id).unwrap_or(&TimeIndex::Empty),
self.get_additions(*id).unwrap_or(&TimeIndex::Empty),
self.get_deletions(*id).unwrap_or(&TimeIndex::Empty),
))),
LayerIds::Multiple(ids) => Box::new(ids.iter().map(|id| {
(
*id,
self.additions.get(*id).unwrap_or(&TimeIndex::Empty),
self.deletions.get(*id).unwrap_or(&TimeIndex::Empty),
self.get_additions(*id).unwrap_or(&TimeIndex::Empty),
self.get_deletions(*id).unwrap_or(&TimeIndex::Empty),
)
})),
}
Expand All @@ -167,11 +168,11 @@ impl EdgeStore {
) -> BoxedLIter<'a, &TimeIndex<TimeIndexEntry>> {
match layers {
LayerIds::None => iter::empty().into_dyn_boxed(),
LayerIds::All => self.additions.iter().into_dyn_boxed(),
LayerIds::One(id) => self.additions.get(*id).into_iter().into_dyn_boxed(),
LayerIds::All => self.iter_additions().into_dyn_boxed(),
LayerIds::One(id) => self.get_additions(*id).into_iter().into_dyn_boxed(),
LayerIds::Multiple(ids) => ids
.iter()
.flat_map(|id| self.additions.get(*id))
.flat_map(|id| self.get_additions(*id))
.into_dyn_boxed(),
}
}
Expand All @@ -182,21 +183,20 @@ impl EdgeStore {
) -> BoxedLIter<'a, &TimeIndex<TimeIndexEntry>> {
match layers {
LayerIds::None => iter::empty().into_dyn_boxed(),
LayerIds::All => self.deletions.iter().into_dyn_boxed(),
LayerIds::One(id) => self.deletions.get(*id).into_iter().into_dyn_boxed(),
LayerIds::All => self.iter_deletions().into_dyn_boxed(),
LayerIds::One(id) => self.get_deletions(*id).into_iter().into_dyn_boxed(),
LayerIds::Multiple(ids) => ids
.iter()
.flat_map(|id| self.deletions.get(*id))
.flat_map(|id| self.get_deletions(*id))
.into_dyn_boxed(),
}
}

pub fn layer_ids_window_iter(&self, w: Range<i64>) -> impl Iterator<Item = usize> + '_ {
let layer_ids = self
.additions
.iter()
.iter_additions()
.enumerate()
.zip_longest(self.deletions.iter().enumerate())
.zip_longest(self.iter_deletions().enumerate())
.flat_map(move |e| match e {
EitherOrBoth::Both((i, t1), (_, t2)) => {
if t1.contains(w.clone()) || t2.contains(w.clone()) {
Expand Down Expand Up @@ -229,27 +229,23 @@ impl EdgeStore {
eid: 0.into(),
src,
dst,
layers: Vec::with_capacity(1),
additions: Vec::with_capacity(1),
deletions: Vec::with_capacity(1),
data: Vec::with_capacity(1),
}
}

pub fn layer(&self, layer_id: usize) -> Option<&EdgeLayer> {
self.layers.get(layer_id)
self.data.get(layer_id).map(|data| &data.layer)
}

/// an edge is active in a window if it has an addition event in any of the layers
pub fn active(&self, layer_ids: &LayerIds, w: Range<i64>) -> bool {
match layer_ids {
LayerIds::None => false,
LayerIds::All => self
.additions
.iter()
.iter_additions()
.any(|t_index| t_index.contains(w.clone())),
LayerIds::One(l_id) => self
.additions
.get(*l_id)
.get_additions(*l_id)
.map(|t_index| t_index.contains(w))
.unwrap_or(false),
LayerIds::Multiple(layers) => layers
Expand All @@ -261,72 +257,89 @@ impl EdgeStore {
pub fn last_deletion(&self, layer_ids: &LayerIds) -> Option<TimeIndexEntry> {
match layer_ids {
LayerIds::None => None,
LayerIds::All => self.deletions.iter().flat_map(|d| d.last()).max(),
LayerIds::One(id) => self.deletions.get(*id).and_then(|t| t.last()),
LayerIds::All => self.iter_deletions().flat_map(|d| d.last()).max(),
LayerIds::One(id) => self.get_deletions(*id).and_then(|t| t.last()),
LayerIds::Multiple(ids) => ids
.iter()
.flat_map(|id| self.deletions.get(*id).and_then(|t| t.last()))
.flat_map(|id| self.get_deletions(*id).and_then(|t| t.last()))
.max(),
}
}

pub fn last_addition(&self, layer_ids: &LayerIds) -> Option<TimeIndexEntry> {
match layer_ids {
LayerIds::None => None,
LayerIds::All => self.additions.iter().flat_map(|d| d.last()).max(),
LayerIds::One(id) => self.additions.get(*id).and_then(|t| t.last()),
LayerIds::All => self.iter_additions().flat_map(|d| d.last()).max(),
LayerIds::One(id) => self.get_additions(*id).and_then(|t| t.last()),
LayerIds::Multiple(ids) => ids
.iter()
.flat_map(|id| self.additions.get(*id).and_then(|t| t.last()))
.flat_map(|id| self.get_additions(*id).and_then(|t| t.last()))
.max(),
}
}

pub fn temporal_prop_layer_inner(&self, layer_id: usize, prop_id: usize) -> Option<&TProp> {
self.layers
self.data
.get(layer_id)
.and_then(|layer| layer.temporal_property(prop_id))
.and_then(|layer| layer.layer.temporal_property(prop_id))
}

pub fn layer_mut(&mut self, layer_id: usize) -> impl DerefMut<Target = EdgeLayer> + '_ {
self.get_or_allocate_layer(layer_id)
}

pub fn deletions_mut(&mut self, layer_id: usize) -> &mut TimeIndex<TimeIndexEntry> {
if self.deletions.len() <= layer_id {
self.deletions.resize_with(layer_id + 1, Default::default);
if self.data.len() <= layer_id {
self.data.resize_with(layer_id + 1, Default::default);
}
&mut self.deletions[layer_id]
&mut self.data[layer_id].deletions
}

pub fn additions_mut(&mut self, layer_id: usize) -> &mut TimeIndex<TimeIndexEntry> {
if self.additions.len() <= layer_id {
self.additions.resize_with(layer_id + 1, Default::default);
if self.data.len() <= layer_id {
self.data.resize_with(layer_id + 1, Default::default);
}
&mut self.additions[layer_id]
&mut self.data[layer_id].additions
}

pub(crate) fn temp_prop_ids(
&self,
layer_id: Option<usize>,
) -> Box<dyn Iterator<Item = usize> + '_> {
if let Some(layer_id) = layer_id {
Box::new(self.layers.get(layer_id).into_iter().flat_map(|layer| {
Box::new(self.data.get(layer_id).into_iter().flat_map(|layer| {
layer
.layer
.props()
.into_iter()
.flat_map(|props| props.temporal_prop_ids())
}))
} else {
Box::new(
self.layers
self.data
.iter()
.flat_map(|layer| layer.props().map(|prop| prop.temporal_prop_ids()))
.flat_map(|layer| layer.layer.props().map(|prop| prop.temporal_prop_ids()))
.kmerge()
.dedup(),
)
}
}

pub fn get_additions(&self, layer_id: usize) -> Option<&TimeIndex<TimeIndexEntry>> {
self.data.get(layer_id).map(|data| &data.additions)
}

pub fn get_deletions(&self, layer_id: usize) -> Option<&TimeIndex<TimeIndexEntry>> {
self.data.get(layer_id).map(|data| &data.deletions)
}

pub fn iter_additions(&self) -> impl Iterator<Item = &TimeIndex<TimeIndexEntry>> + '_ {
self.data.iter().map(|data| &data.additions)
}

pub fn iter_deletions(&self) -> impl Iterator<Item = &TimeIndex<TimeIndexEntry>> + '_ {
self.data.iter().map(|data| &data.deletions)
}
}

impl EdgeStorageIntoOps for ArcEntry<EdgeStore> {
Expand Down
4 changes: 2 additions & 2 deletions raphtory/src/db/api/storage/edges/edge_storage_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,11 @@ impl<'a> EdgeStorageOps<'a> for &'a EdgeStore {
}

fn additions(self, layer_id: usize) -> TimeIndexRef<'a> {
TimeIndexRef::Ref(self.additions.get(layer_id).unwrap_or(&TimeIndex::Empty))
TimeIndexRef::Ref(self.get_additions(layer_id).unwrap_or(&TimeIndex::Empty))
}

fn deletions(self, layer_id: usize) -> TimeIndexRef<'a> {
TimeIndexRef::Ref(self.deletions.get(layer_id).unwrap_or(&TimeIndex::Empty))
TimeIndexRef::Ref(self.get_deletions(layer_id).unwrap_or(&TimeIndex::Empty))
}

fn temporal_prop_layer(self, layer_id: usize, prop_id: usize) -> impl TPropOps<'a> + 'a {
Expand Down
8 changes: 4 additions & 4 deletions raphtory/src/db/internal/core_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ impl CoreGraphOps for InternalGraph {
entry
.layer_iter()
.next()
.and_then(|layer| layer.const_prop(prop_id).cloned())
.and_then(|data| data.layer.const_prop(prop_id).cloned())
} else {
let prop_map: HashMap<_, _> = entry
.layer_iter()
.enumerate()
.flat_map(|(id, layer)| {
layer
.flat_map(|(id, data)| {
data.layer
.const_prop(prop_id)
.map(|p| (self.inner().get_layer_name(id), p.clone()))
})
Expand Down Expand Up @@ -214,7 +214,7 @@ impl CoreGraphOps for InternalGraph {
LayerIds::None => vec![],
LayerIds::All => entry
.layer_iter()
.map(|l| l.const_prop_ids())
.map(|data| data.layer.const_prop_ids())
.kmerge()
.dedup()
.collect(),
Expand Down

0 comments on commit b89065e

Please sign in to comment.