Skip to content

Commit

Permalink
added support for Arrow
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianmurariu committed Jan 8, 2025
1 parent 065818f commit eda883b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 38 deletions.
7 changes: 6 additions & 1 deletion raphtory-graphql/src/model/graph/property.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ fn prop_to_gql(prop: &Prop) -> GqlValue {
),
Prop::DTime(t) => GqlValue::Number(t.timestamp_millis().into()),
Prop::NDTime(t) => GqlValue::Number(t.and_utc().timestamp_millis().into()),
Prop::Array(a) => GqlValue::Binary(a.clone().into()),
Prop::Array(a) => GqlValue::List(
a.iter_prop()
.into_iter()
.flat_map(|p_iter| p_iter.map(|p| prop_to_gql(&p)))
.collect(),
),
Prop::Document(d) => GqlValue::String(d.content.to_owned()), // TODO: return GqlValue::Object ??
}
}
Expand Down
4 changes: 1 addition & 3 deletions raphtory/src/algorithms/metrics/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::{
prelude::{EdgeViewOps, GraphViewOps, NodeViewOps},
};
use ordered_float::OrderedFloat;
use raphtory_api::core::PropType;

/// Computes the net sum of weights for a given node based on edge direction.
///
Expand Down Expand Up @@ -108,8 +107,7 @@ pub fn balance<G: StaticGraphViewOps>(
let min = sum(0);
ctx.agg(min);

let mut weight_type = Some(PropType::U8);
weight_type = match graph.edge_meta().temporal_prop_meta().get_id(&name) {
let weight_type = match graph.edge_meta().temporal_prop_meta().get_id(&name) {
Some(weight_id) => graph.edge_meta().temporal_prop_meta().get_dtype(weight_id),
None => graph
.edge_meta()
Expand Down
79 changes: 45 additions & 34 deletions raphtory/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow_array::{ArrayRef, ArrowPrimitiveType, PrimitiveArray, RecordBatch};
use arrow_buffer::{ArrowNativeType, ScalarBuffer};
use chrono::{DateTime, NaiveDateTime, Utc};
use itertools::Itertools;
use raphtory_api::core::storage::arc_str::ArcStr;
use raphtory_api::{core::storage::arc_str::ArcStr, iter::{BoxedLIter, IntoDynBoxed}};
use serde::{Deserialize, Serialize, Serializer};
use serde_json::{json, Value};
use std::{
Expand Down Expand Up @@ -159,39 +159,50 @@ impl PropArray {
}
}

// pub fn iter_prop(&self) -> Option<BoxedLIter<Prop>> {
// self.0.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::Int32Type>>().map(|arr| {
// arr.into_iter().map(|v| Prop::I32(v.unwrap_or_default())).into_dyn_boxed()
// }).or_else(|| {
// self.0.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::Float64Type>>().map(|arr| {
// arr.into_iter().map(|v| Prop::F64(v.unwrap_or_default())).into_dyn_boxed()
// })
// }).or_else(|| {
// self.0.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::Float32Type>>().map(|arr| {
// arr.into_iter().map(|v| Prop::F32(v.unwrap_or_default())).into_dyn_boxed()
// })
// }).or_else(|| {
// self.0.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::UInt64Type>>().map(|arr| {
// arr.into_iter().map(|v| Prop::U64(v.unwrap_or_default())).into_dyn_boxed()
// })
// }).or_else(|| {
// self.0.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::UInt32Type>>().map(|arr| {
// arr.into_iter().map(|v| Prop::U32(v.unwrap_or_default())).into_dyn_boxed()
// })
// }).or_else(|| {
// self.0.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::Int64Type>>().map(|arr| {
// arr.into_iter().map(|v| Prop::I64(v.unwrap_or_default())).into_dyn_boxed()
// })
// }).or_else(|| {
// self.0.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::UInt16Type>>().map(|arr| {
// arr.into_iter().map(|v| Prop::U16(v.unwrap_or_default())).into_dyn_boxed()
// })
// }).or_else(|| {
// self.0.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::UInt8Type>>().map(|arr| {
// arr.into_iter().map(|v| Prop::U8(v.unwrap_or_default())).into_dyn_boxed()
// })
// })
// }
pub fn as_array_ref(&self) -> Option<&ArrayRef> {
match self {
PropArray::Array(arr) => Some(arr),
_ => None,
}
}

pub fn iter_prop(&self) -> Option<BoxedLIter<Prop>> {

let arr = self.as_array_ref()?;

arr.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::Int32Type>>().map(|arr| {
arr.into_iter().map(|v| Prop::I32(v.unwrap_or_default())).into_dyn_boxed()
}).or_else(|| {
arr.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::Float64Type>>().map(|arr| {
arr.into_iter().map(|v| Prop::F64(v.unwrap_or_default())).into_dyn_boxed()
})
}).or_else(|| {
arr.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::Float32Type>>().map(|arr| {
arr.into_iter().map(|v| Prop::F32(v.unwrap_or_default())).into_dyn_boxed()
})
}).or_else(|| {
arr.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::UInt64Type>>().map(|arr| {
arr.into_iter().map(|v| Prop::U64(v.unwrap_or_default())).into_dyn_boxed()
})
}).or_else(|| {
arr.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::UInt32Type>>().map(|arr| {
arr.into_iter().map(|v| Prop::U32(v.unwrap_or_default())).into_dyn_boxed()
})
}).or_else(|| {
arr.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::Int64Type>>().map(|arr| {
arr.into_iter().map(|v| Prop::I64(v.unwrap_or_default())).into_dyn_boxed()
})
}).or_else(|| {
arr.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::UInt16Type>>().map(|arr| {
arr.into_iter().map(|v| Prop::U16(v.unwrap_or_default())).into_dyn_boxed()
})
}).or_else(|| {
arr.as_any().downcast_ref::<PrimitiveArray<arrow_array::types::UInt8Type>>().map(|arr| {
arr.into_iter().map(|v| Prop::U8(v.unwrap_or_default())).into_dyn_boxed()
})
})

}
}

impl Serialize for PropArray {
Expand Down

0 comments on commit eda883b

Please sign in to comment.