Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat : setup scripts #181

Merged
merged 9 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- setup functions added for cloud and db
- panic handling in process job
- upgrade ETH L1 bridge for withdrawals to work
- added makefile and submodules
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ aws-config = { workspace = true, features = ["behavior-version-latest"] }
aws-credential-types = { version = "1.2.1", features = [
"hardcoded-credentials",
] }
aws-sdk-eventbridge.workspace = true
heemankv marked this conversation as resolved.
Show resolved Hide resolved
aws-sdk-s3 = { workspace = true, features = ["behavior-version-latest"] }
aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] }
aws-sdk-sqs = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions crates/orchestrator/src/alerts/aws_sns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ impl Alerts for AWSSNS {
self.client.publish().topic_arn(self.topic_arn.clone()).message(message_body).send().await?;
Ok(())
}

async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()> {
let response = self.client.create_topic().name(topic_name).send().await?;
let topic_arn = response.topic_arn().expect("Topic Not found");
log::info!("SNS topic created. Topic ARN: {}", topic_arn);
heemankv marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}
}
7 changes: 7 additions & 0 deletions crates/orchestrator/src/alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use mockall::automock;
use utils::settings::Settings;

pub mod aws_sns;

Expand All @@ -8,4 +9,10 @@ pub mod aws_sns;
pub trait Alerts: Send + Sync {
/// To send an alert message to our alert service
async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>;
async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()>;
async fn setup(&self, settings_provider: Box<dyn Settings>) -> color_eyre::Result<()> {
let sns_topic_name = settings_provider.get_settings_or_panic("ALERT_TOPIC_NAME");
self.create_alert(&sns_topic_name).await?;
Ok(())
}
}
111 changes: 111 additions & 0 deletions crates/orchestrator/src/cron/event_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use std::time::Duration;

use async_trait::async_trait;
use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target};
use aws_sdk_sqs::types::QueueAttributeName;

use crate::cron::Cron;
use crate::setup::SetupConfig;

pub struct AWSEventBridge {}

#[async_trait]
#[allow(unreachable_patterns)]
impl Cron for AWSEventBridge {
async fn create_cron(
&self,
config: &SetupConfig,
cron_time: Duration,
trigger_rule_name: String,
) -> color_eyre::Result<()> {
let config = match config {
SetupConfig::AWS(config) => config,
_ => panic!("Unsupported Event Bridge configuration"),
};
let event_bridge_client = aws_sdk_eventbridge::Client::new(config);
event_bridge_client
.put_rule()
.name(&trigger_rule_name)
.schedule_expression(duration_to_rate_string(cron_time))
.state(RuleState::Enabled)
.send()
.await?;

Ok(())
}
async fn add_cron_target_queue(
&self,
config: &SetupConfig,
target_queue_name: String,
message: String,
trigger_rule_name: String,
) -> color_eyre::Result<()> {
let config = match config {
SetupConfig::AWS(config) => config,
_ => panic!("Unsupported Event Bridge configuration"),
};
let event_bridge_client = aws_sdk_eventbridge::Client::new(config);
let sqs_client = aws_sdk_sqs::Client::new(config);
let queue_url = sqs_client.get_queue_url().queue_name(target_queue_name).send().await?;

let queue_attributes = sqs_client
.get_queue_attributes()
.queue_url(queue_url.queue_url.unwrap())
.attribute_names(QueueAttributeName::QueueArn)
.send()
.await?;
let queue_arn = queue_attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap();

// Create the EventBridge target with the input transformer
let input_transformer =
InputTransformer::builder().input_paths_map("$.time", "time").input_template(message).build()?;

event_bridge_client
.put_targets()
.rule(trigger_rule_name)
.targets(
Target::builder()
.id(uuid::Uuid::new_v4().to_string())
.arn(queue_arn)
.input_transformer(input_transformer)
.build()?,
)
.send()
.await?;

Ok(())
}
}

fn duration_to_rate_string(duration: Duration) -> String {
let total_secs = duration.as_secs();
let total_mins = duration.as_secs() / 60;
let total_hours = duration.as_secs() / 3600;
let total_days = duration.as_secs() / 86400;

if total_days > 0 {
format!("rate({} day{})", total_days, if total_days == 1 { "" } else { "s" })
} else if total_hours > 0 {
format!("rate({} hour{})", total_hours, if total_hours == 1 { "" } else { "s" })
} else if total_mins > 0 {
format!("rate({} minute{})", total_mins, if total_mins == 1 { "" } else { "s" })
} else {
format!("rate({} second{})", total_secs, if total_secs == 1 { "" } else { "s" })
}
}

#[cfg(test)]
mod event_bridge_utils_test {
use rstest::rstest;

use super::*;

#[rstest]
fn test_duration_to_rate_string() {
assert_eq!(duration_to_rate_string(Duration::from_secs(60)), "rate(1 minute)");
assert_eq!(duration_to_rate_string(Duration::from_secs(120)), "rate(2 minutes)");
assert_eq!(duration_to_rate_string(Duration::from_secs(30)), "rate(30 seconds)");
assert_eq!(duration_to_rate_string(Duration::from_secs(3600)), "rate(1 hour)");
assert_eq!(duration_to_rate_string(Duration::from_secs(86400)), "rate(1 day)");
}
}
57 changes: 57 additions & 0 deletions crates/orchestrator/src/cron/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::time::Duration;

use async_trait::async_trait;
use lazy_static::lazy_static;

use crate::queue::job_queue::{WorkerTriggerMessage, WorkerTriggerType};
use crate::setup::SetupConfig;

pub mod event_bridge;

lazy_static! {
pub static ref CRON_DURATION: Duration = Duration::from_mins(1);
// TODO : we can take this from clap.
pub static ref TARGET_QUEUE_NAME: String = String::from("madara_orchestrator_worker_trigger_queue");
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
pub static ref WORKER_TRIGGERS: Vec<WorkerTriggerType> = vec![
WorkerTriggerType::Snos,
WorkerTriggerType::Proving,
WorkerTriggerType::DataSubmission,
WorkerTriggerType::UpdateState
];
pub static ref WORKER_TRIGGER_RULE_NAME: String = String::from("worker_trigger_scheduled");
}

#[async_trait]
pub trait Cron {
async fn create_cron(
&self,
config: &SetupConfig,
cron_time: Duration,
trigger_rule_name: String,
) -> color_eyre::Result<()>;
async fn add_cron_target_queue(
&self,
config: &SetupConfig,
target_queue_name: String,
message: String,
trigger_rule_name: String,
) -> color_eyre::Result<()>;
async fn setup(&self, config: SetupConfig) -> color_eyre::Result<()> {
self.create_cron(&config, *CRON_DURATION, WORKER_TRIGGER_RULE_NAME.clone()).await?;
for triggers in WORKER_TRIGGERS.iter() {
self.add_cron_target_queue(
&config,
TARGET_QUEUE_NAME.clone(),
get_worker_trigger_message(triggers.clone())?,
WORKER_TRIGGER_RULE_NAME.clone(),
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
)
.await?;
}
Ok(())
}
}

fn get_worker_trigger_message(worker_trigger_type: WorkerTriggerType) -> color_eyre::Result<String> {
let message = WorkerTriggerMessage { worker: worker_trigger_type };
Ok(serde_json::to_string(&message)?)
}
2 changes: 1 addition & 1 deletion crates/orchestrator/src/data_storage/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl DataStorage for AWSS3 {
Ok(())
}

async fn build_test_bucket(&self, bucket_name: &str) -> Result<()> {
async fn create_bucket(&self, bucket_name: &str) -> Result<()> {
self.client.create_bucket().bucket(bucket_name).send().await?;
Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion crates/orchestrator/src/data_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ use utils::settings::Settings;
pub trait DataStorage: Send + Sync {
async fn get_data(&self, key: &str) -> Result<Bytes>;
async fn put_data(&self, data: Bytes, key: &str) -> Result<()>;
async fn build_test_bucket(&self, bucket_name: &str) -> Result<()>;
async fn create_bucket(&self, bucket_name: &str) -> Result<()>;
async fn setup(&self, settings_provider: Box<dyn Settings>) -> Result<()> {
let bucket_name = settings_provider.get_settings_or_panic("STORAGE_BUCKET_NAME");
self.create_bucket(&bucket_name).await
}
}

/// **DataStorageConfig** : Trait method to represent the config struct needed for
Expand Down
5 changes: 5 additions & 0 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#![feature(duration_constructors)]

/// Contains the trait implementations for alerts
pub mod alerts;
/// Config of the service. Contains configurations for DB, Queues and other services.
pub mod config;
pub mod constants;
/// Controllers for the routes
pub mod controllers;
pub mod cron;
/// Contains the trait that implements the fetching functions
/// for blob and SNOS data from cloud for a particular block.
pub mod data_storage;
Expand All @@ -20,6 +23,8 @@ pub mod metrics;
pub mod queue;
/// Contains the routes for the service
pub mod routes;
/// Contains setup functions to set up db and cloud.
pub mod setup;
/// Contains telemetry collection services. (Metrics/Logs/Traces)
pub mod telemetry;
#[cfg(test)]
Expand Down
81 changes: 80 additions & 1 deletion crates/orchestrator/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,82 @@ use std::time::Duration;

use async_trait::async_trait;
use color_eyre::Result as EyreResult;
use lazy_static::lazy_static;
use mockall::automock;
use omniqueue::{Delivery, QueueError};

use crate::config::Config;
use crate::jobs::JobError;
use crate::setup::SetupConfig;

#[derive(Clone)]
pub struct DlqConfig<'a> {
pub max_receive_count: i32,
pub dlq_name: &'a str,
}

#[derive(Clone)]
pub struct QueueConfig<'a> {
pub name: String,
pub visibility_timeout: i32,
pub dlq_config: Option<DlqConfig<'a>>,
}

lazy_static! {
pub static ref JOB_HANDLE_FAILURE_QUEUE: String = String::from("madara_orchestrator_job_handle_failure_queue");
pub static ref QUEUES: Vec<QueueConfig<'static>> = vec![
QueueConfig {
name: String::from("madara_orchestrator_snos_job_processing_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_snos_job_verification_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_proving_job_processing_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_proving_job_verification_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_data_submission_job_processing_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_data_submission_job_verification_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_update_state_job_processing_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_update_state_job_verification_queue"),
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: &JOB_HANDLE_FAILURE_QUEUE })
},
QueueConfig {
name: String::from("madara_orchestrator_job_handle_failure_queue"),
visibility_timeout: 300,
dlq_config: None
},
QueueConfig {
name: String::from("madara_orchestrator_worker_trigger_queue"),
visibility_timeout: 300,
dlq_config: None
},
];
}

/// Queue Provider Trait
///
Expand All @@ -21,7 +92,15 @@ use crate::jobs::JobError;
#[async_trait]
pub trait QueueProvider: Send + Sync {
async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option<Duration>) -> EyreResult<()>;
async fn consume_message_from_queue(&self, queue: String) -> std::result::Result<Delivery, QueueError>;
async fn consume_message_from_queue(&self, queue: String) -> Result<Delivery, QueueError>;
async fn create_queue<'a>(&self, queue_config: &QueueConfig<'a>, config: &SetupConfig) -> EyreResult<()>;
async fn setup(&self, config: SetupConfig) -> EyreResult<()> {
// Creating the queues :
for queue in QUEUES.iter() {
self.create_queue(queue, &config).await?;
}
Ok(())
}
}

pub async fn init_consumers(config: Arc<Config>) -> Result<(), JobError> {
Expand Down
Loading
Loading