Skip to content

Commit

Permalink
Update TaskManager to support nesting (#794)
Browse files Browse the repository at this point in the history
* Update TaskManager to support nesting

* Refactor to support nested TaskManager better
  • Loading branch information
bbalser authored Apr 23, 2024
1 parent fe059f9 commit 992d80c
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 64 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tokio = { version = "1", default-features = false, features = [
"fs",
"macros",
"signal",
"sync",
"rt-multi-thread",
"rt",
"process",
Expand Down
1 change: 1 addition & 0 deletions boost_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl Server {
.add_task(watcher)
.add_task(updater)
.add_task(purger)
.build()
.start()
.await
}
Expand Down
1 change: 1 addition & 0 deletions ingest/src/server_iot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.add_task(beacon_report_sink_server)
.add_task(witness_report_sink_server)
.add_task(grpc_server)
.build()
.start()
.await
}
1 change: 1 addition & 0 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.add_task(invalidated_radio_threshold_report_sink_server)
.add_task(coverage_object_report_sink_server)
.add_task(grpc_server)
.build()
.start()
.await
}
54 changes: 45 additions & 9 deletions ingest/tests/iot_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ async fn initialize_session_and_send_beacon_and_witness() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -77,7 +81,11 @@ async fn stream_stops_after_incorrectly_signed_init_request() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -111,7 +119,11 @@ async fn stream_stops_after_incorrectly_signed_beacon() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -148,7 +160,11 @@ async fn stream_stops_after_incorrect_beacon_pubkey() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -188,7 +204,11 @@ async fn stream_stops_after_incorrectly_signed_witness() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -225,7 +245,11 @@ async fn stream_stops_after_incorrect_witness_pubkey() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -265,7 +289,11 @@ async fn stream_stop_if_client_attempts_to_initiliaze_2nd_session() {
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let pub_key = generate_keypair();
Expand Down Expand Up @@ -316,7 +344,11 @@ async fn stream_stops_if_init_not_sent_within_timeout() {
tokio::task::spawn_local(async move {
let server =
create_test_server(port, beacon_client, witness_client, Some(500), None);
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let mut client = connect_and_stream(port).await;
Expand All @@ -338,7 +370,11 @@ async fn stream_stops_on_session_timeout() {
tokio::task::spawn_local(async move {
let server =
create_test_server(port, beacon_client, witness_client, Some(500), Some(900));
TaskManager::builder().add_task(server).start().await
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let mut client = connect_and_stream(port).await;
Expand Down
1 change: 1 addition & 0 deletions iot_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl Daemon {
TaskManager::builder()
.add_task(grpc_server)
.add_task(db_cleaner)
.build()
.start()
.await
}
Expand Down
1 change: 1 addition & 0 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl Cmd {
.add_task(verifier_daemon)
.add_task(burner)
.add_task(report_files_server)
.build()
.start()
.await
}
Expand Down
1 change: 1 addition & 0 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl Server {
.add_task(pk_loader_server)
.add_task(entropy_loader_server)
.add_task(rewarder)
.build()
.start()
.await
}
Expand Down
6 changes: 5 additions & 1 deletion mobile_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ impl Daemon {
hex_boosting_svc,
};

TaskManager::builder().add_task(grpc_server).start().await
TaskManager::builder()
.add_task(grpc_server)
.build()
.start()
.await
}
}

Expand Down
1 change: 1 addition & 0 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ impl Cmd {
.add_task(reports_server)
.add_task(event_id_purger)
.add_task(daemon)
.build()
.start()
.await
}
Expand Down
1 change: 1 addition & 0 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ impl Cmd {
.add_task(radio_threshold_ingest_server)
.add_task(invalidated_radio_threshold_ingest_server)
.add_task(data_session_ingestor)
.build()
.start()
.await
}
Expand Down
1 change: 1 addition & 0 deletions price/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl Server {
.add_task(hnt_price_generator)
.add_task(mobile_price_generator)
.add_task(iot_price_generator)
.build()
.start()
.await
}
Expand Down
Loading

0 comments on commit 992d80c

Please sign in to comment.