Skip to content

Commit

Permalink
Refactor mobile_verifier main run function (#796)
Browse files Browse the repository at this point in the history
* Refactor mobile-verifier start sequence to separate daemons

* Refactored for daemons to return TaskManager
  • Loading branch information
bbalser authored Apr 24, 2024
1 parent f4a60da commit 36ca129
Show file tree
Hide file tree
Showing 10 changed files with 546 additions and 342 deletions.
395 changes: 95 additions & 300 deletions mobile_verifier/src/cli/server.rs

Large diffs are not rendered by default.

75 changes: 69 additions & 6 deletions mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use crate::{
boosting_oracles::{assignment::HexAssignments, BoostedHexAssignments, HexBoostData},
boosting_oracles::{BoostedHexAssignments, HexAssignments, HexBoostData},
heartbeats::{HbType, KeyType, OwnedKeyType},
IsAuthorized,
IsAuthorized, Settings,
};
use chrono::{DateTime, Utc};
use chrono::{DateTime, Duration, Utc};
use file_store::{
coverage::{self, CoverageObjectIngestReport},
file_info_poller::FileInfoStream,
file_sink::FileSinkClient,
file_info_poller::{FileInfoStream, LookbackBehavior},
file_sink::{self, FileSinkClient},
file_source,
file_upload::FileUpload,
traits::TimestampEncode,
FileStore, FileType,
};
use futures::{
stream::{BoxStream, Stream, StreamExt},
Expand Down Expand Up @@ -39,7 +42,7 @@ use std::{
sync::Arc,
time::Instant,
};
use task_manager::ManagedTask;
use task_manager::{ManagedTask, TaskManager};
use tokio::sync::mpsc::Receiver;
use uuid::Uuid;

Expand Down Expand Up @@ -75,6 +78,66 @@ pub struct CoverageDaemon {
}

impl CoverageDaemon {
pub async fn create_managed_task(
pool: Pool<Postgres>,
settings: &Settings,
file_upload: FileUpload,
file_store: FileStore,
auth_client: AuthorizationClient,
hex_boost_data: HexBoostData,
) -> anyhow::Result<impl ManagedTask> {
let (valid_coverage_objs, valid_coverage_objs_server) = file_sink::FileSinkBuilder::new(
FileType::CoverageObject,
settings.store_base_path(),
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_coverage_object"),
)
.auto_commit(false)
.roll_time(Duration::minutes(15))
.create()
.await?;

// Oracle boosting reports
let (oracle_boosting_reports, oracle_boosting_reports_server) =
file_sink::FileSinkBuilder::new(
FileType::OracleBoostingReport,
settings.store_base_path(),
file_upload,
concat!(env!("CARGO_PKG_NAME"), "_oracle_boosting_report"),
)
.auto_commit(false)
.roll_time(Duration::minutes(15))
.create()
.await?;

let (coverage_objs, coverage_objs_server) =
file_source::continuous_source::<CoverageObjectIngestReport, _>()
.state(pool.clone())
.store(file_store)
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::CoverageObjectIngestReport.to_string())
.create()
.await?;

// let hex_boost_data = boosting_oracles::make_hex_boost_data(settings, geofence)?;
let coverage_daemon = CoverageDaemon::new(
pool,
auth_client,
hex_boost_data,
coverage_objs,
valid_coverage_objs,
oracle_boosting_reports,
)
.await?;

Ok(TaskManager::builder()
.add_task(valid_coverage_objs_server)
.add_task(oracle_boosting_reports_server)
.add_task(coverage_objs_server)
.add_task(coverage_daemon)
.build())
}

pub async fn new(
pool: PgPool,
auth_client: AuthorizationClient,
Expand Down
36 changes: 33 additions & 3 deletions mobile_verifier/src/data_session.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
use chrono::{DateTime, Utc};
use file_store::{file_info_poller::FileInfoStream, mobile_transfer::ValidDataTransferSession};
use file_store::{
file_info_poller::{FileInfoStream, LookbackBehavior},
file_source,
mobile_transfer::ValidDataTransferSession,
FileStore, FileType,
};
use futures::{
stream::{Stream, StreamExt, TryStreamExt},
TryFutureExt,
};
use helium_crypto::PublicKeyBinary;
use helium_proto::ServiceProvider;
use rust_decimal::Decimal;
use sqlx::{PgPool, Postgres, Row, Transaction};
use sqlx::{PgPool, Pool, Postgres, Row, Transaction};
use std::{collections::HashMap, ops::Range, time::Instant};
use task_manager::ManagedTask;
use task_manager::{ManagedTask, TaskManager};
use tokio::sync::mpsc::Receiver;

use crate::Settings;

pub struct DataSessionIngestor {
pub receiver: Receiver<FileInfoStream<ValidDataTransferSession>>,
pub pool: PgPool,
Expand All @@ -32,6 +39,29 @@ pub struct ServiceProviderDataSession {
pub type HotspotMap = HashMap<PublicKeyBinary, HotspotReward>;

impl DataSessionIngestor {
pub async fn create_managed_task(
pool: Pool<Postgres>,
settings: &Settings,
) -> anyhow::Result<impl ManagedTask> {
let data_transfer_ingest = FileStore::from_settings(&settings.data_transfer_ingest).await?;
// data transfers
let (data_session_ingest, data_session_ingest_server) =
file_source::continuous_source::<ValidDataTransferSession, _>()
.state(pool.clone())
.store(data_transfer_ingest)
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::ValidDataTransferSession.to_string())
.create()
.await?;

let data_session_ingestor = DataSessionIngestor::new(pool.clone(), data_session_ingest);

Ok(TaskManager::builder()
.add_task(data_session_ingest_server)
.add_task(data_session_ingestor)
.build())
}

pub fn new(
pool: sqlx::Pool<sqlx::Postgres>,
receiver: Receiver<FileInfoStream<ValidDataTransferSession>>,
Expand Down
55 changes: 49 additions & 6 deletions mobile_verifier/src/heartbeats/cbrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,28 @@ use crate::{
coverage::{CoverageClaimTimeCache, CoverageObjectCache},
geofence::GeofenceValidator,
heartbeats::LocationCache,
GatewayResolver,
GatewayResolver, Settings,
};

use chrono::{DateTime, Duration, Utc};
use file_store::{
file_info_poller::FileInfoStream, file_sink::FileSinkClient,
file_info_poller::{FileInfoStream, LookbackBehavior},
file_sink::FileSinkClient,
file_source,
heartbeat::CbrsHeartbeatIngestReport,
FileStore, FileType,
};
use futures::{stream::StreamExt, TryFutureExt};
use retainer::Cache;
use sqlx::{Pool, Postgres};
use std::{
sync::Arc,
time::{self, Instant},
};
use task_manager::ManagedTask;
use task_manager::{ManagedTask, TaskManager};
use tokio::sync::mpsc::Receiver;

pub struct HeartbeatDaemon<GIR, GFV> {
pub struct CbrsHeartbeatDaemon<GIR, GFV> {
pool: sqlx::Pool<sqlx::Postgres>,
gateway_info_resolver: GIR,
heartbeats: Receiver<FileInfoStream<CbrsHeartbeatIngestReport>>,
Expand All @@ -32,11 +36,50 @@ pub struct HeartbeatDaemon<GIR, GFV> {
geofence: GFV,
}

impl<GIR, GFV> HeartbeatDaemon<GIR, GFV>
impl<GIR, GFV> CbrsHeartbeatDaemon<GIR, GFV>
where
GIR: GatewayResolver,
GFV: GeofenceValidator<Heartbeat>,
{
#[allow(clippy::too_many_arguments)]
pub async fn create_managed_task(
pool: Pool<Postgres>,
settings: &Settings,
file_store: FileStore,
gateway_resolver: GIR,
valid_heartbeats: FileSinkClient,
seniority_updates: FileSinkClient,
geofence: GFV,
) -> anyhow::Result<impl ManagedTask> {
// CBRS Heartbeats
let (cbrs_heartbeats, cbrs_heartbeats_server) =
file_source::continuous_source::<CbrsHeartbeatIngestReport, _>()
.state(pool.clone())
.store(file_store)
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::CbrsHeartbeatIngestReport.to_string())
.queue_size(1)
.create()
.await?;

let cbrs_heartbeat_daemon = CbrsHeartbeatDaemon::new(
pool,
gateway_resolver,
cbrs_heartbeats,
settings.modeled_coverage_start(),
settings.max_asserted_distance_deviation,
settings.max_distance_from_coverage,
valid_heartbeats,
seniority_updates,
geofence,
);

Ok(TaskManager::builder()
.add_task(cbrs_heartbeats_server)
.add_task(cbrs_heartbeat_daemon)
.build())
}

#[allow(clippy::too_many_arguments)]
pub fn new(
pool: sqlx::Pool<sqlx::Postgres>,
Expand Down Expand Up @@ -151,7 +194,7 @@ where
}
}

impl<GIR, GFV> ManagedTask for HeartbeatDaemon<GIR, GFV>
impl<GIR, GFV> ManagedTask for CbrsHeartbeatDaemon<GIR, GFV>
where
GIR: GatewayResolver,
GFV: GeofenceValidator<Heartbeat>,
Expand Down
54 changes: 48 additions & 6 deletions mobile_verifier/src/heartbeats/wifi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,27 @@ use crate::{
coverage::{CoverageClaimTimeCache, CoverageObjectCache},
geofence::GeofenceValidator,
heartbeats::LocationCache,
GatewayResolver,
GatewayResolver, Settings,
};
use chrono::{DateTime, Duration, Utc};
use file_store::{
file_info_poller::FileInfoStream, file_sink::FileSinkClient,
file_info_poller::{FileInfoStream, LookbackBehavior},
file_sink::FileSinkClient,
file_source,
wifi_heartbeat::WifiHeartbeatIngestReport,
FileStore, FileType,
};
use futures::{stream::StreamExt, TryFutureExt};
use retainer::Cache;
use sqlx::{Pool, Postgres};
use std::{
sync::Arc,
time::{self, Instant},
};
use task_manager::ManagedTask;
use task_manager::{ManagedTask, TaskManager};
use tokio::sync::mpsc::Receiver;

pub struct HeartbeatDaemon<GIR, GFV> {
pub struct WifiHeartbeatDaemon<GIR, GFV> {
pool: sqlx::Pool<sqlx::Postgres>,
gateway_info_resolver: GIR,
heartbeats: Receiver<FileInfoStream<WifiHeartbeatIngestReport>>,
Expand All @@ -31,11 +35,49 @@ pub struct HeartbeatDaemon<GIR, GFV> {
geofence: GFV,
}

impl<GIR, GFV> HeartbeatDaemon<GIR, GFV>
impl<GIR, GFV> WifiHeartbeatDaemon<GIR, GFV>
where
GIR: GatewayResolver,
GFV: GeofenceValidator<Heartbeat>,
{
#[allow(clippy::too_many_arguments)]
pub async fn create_managed_task(
pool: Pool<Postgres>,
settings: &Settings,
file_store: FileStore,
gateway_resolver: GIR,
valid_heartbeats: FileSinkClient,
seniority_updates: FileSinkClient,
geofence: GFV,
) -> anyhow::Result<impl ManagedTask> {
// Wifi Heartbeats
let (wifi_heartbeats, wifi_heartbeats_server) =
file_source::continuous_source::<WifiHeartbeatIngestReport, _>()
.state(pool.clone())
.store(file_store)
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::WifiHeartbeatIngestReport.to_string())
.create()
.await?;

let wifi_heartbeat_daemon = WifiHeartbeatDaemon::new(
pool,
gateway_resolver,
wifi_heartbeats,
settings.modeled_coverage_start(),
settings.max_asserted_distance_deviation,
settings.max_distance_from_coverage,
valid_heartbeats,
seniority_updates,
geofence,
);

Ok(TaskManager::builder()
.add_task(wifi_heartbeats_server)
.add_task(wifi_heartbeat_daemon)
.build())
}

#[allow(clippy::too_many_arguments)]
pub fn new(
pool: sqlx::Pool<sqlx::Postgres>,
Expand Down Expand Up @@ -143,7 +185,7 @@ where
}
}

impl<GIR, GFV> ManagedTask for HeartbeatDaemon<GIR, GFV>
impl<GIR, GFV> ManagedTask for WifiHeartbeatDaemon<GIR, GFV>
where
GIR: GatewayResolver,
GFV: GeofenceValidator<Heartbeat>,
Expand Down
Loading

0 comments on commit 36ca129

Please sign in to comment.