Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat : setup scripts #181

Merged
merged 9 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- setup functions added for cloud and db
- upgrade ETH L1 bridge for withdrawals to work
- added makefile and submodules
- Endpoints for triggering processing and verification jobs
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ aws-config = { workspace = true, features = ["behavior-version-latest"] }
aws-credential-types = { version = "1.2.1", features = [
"hardcoded-credentials",
] }
aws-sdk-eventbridge.workspace = true
heemankv marked this conversation as resolved.
Show resolved Hide resolved
aws-sdk-s3 = { workspace = true, features = ["behavior-version-latest"] }
aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] }
aws-sdk-sqs = { workspace = true }
Expand Down
10 changes: 10 additions & 0 deletions crates/orchestrator/src/alerts/aws_sns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use aws_sdk_sns::Client;
use utils::settings::env::EnvSettingsProvider;
use utils::settings::Settings;

use crate::alerts::aws_sns::config::AWSSNSConfig;
Expand Down Expand Up @@ -32,4 +33,13 @@ impl Alerts for AWSSNS {
self.client.publish().topic_arn(self.topic_arn.clone()).message(message_body).send().await?;
Ok(())
}

async fn setup_alerts(&self) -> color_eyre::Result<()> {
let settings_provider = EnvSettingsProvider {};
let response =
self.client.create_topic().name(settings_provider.get_settings_or_panic("SNS_TOPIC_NAME")).send().await?;
let topic_arn = response.topic_arn().unwrap_or_default();
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
log::debug!("SNS topic created. Topic ARN: {}", topic_arn);
heemankv marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}
}
5 changes: 5 additions & 0 deletions crates/orchestrator/src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,9 @@ pub mod aws_sns;
pub trait Alerts: Send + Sync {
/// To send an alert message to our alert service
async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>;
async fn setup_alerts(&self) -> color_eyre::Result<()>;
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
async fn setup(&self) -> color_eyre::Result<()> {
self.setup_alerts().await?;
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/orchestrator/src/data_storage/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl DataStorage for AWSS3 {
Ok(())
}

async fn build_test_bucket(&self, bucket_name: &str) -> Result<()> {
async fn build_bucket(&self, bucket_name: &str) -> Result<()> {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
self.client.create_bucket().bucket(bucket_name).send().await?;
Ok(())
}
Expand Down
5 changes: 4 additions & 1 deletion crates/orchestrator/src/data_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use utils::settings::Settings;
pub trait DataStorage: Send + Sync {
async fn get_data(&self, key: &str) -> Result<Bytes>;
async fn put_data(&self, data: Bytes, key: &str) -> Result<()>;
async fn build_test_bucket(&self, bucket_name: &str) -> Result<()>;
async fn build_bucket(&self, bucket_name: &str) -> Result<()>;
async fn setup(&self, bucket_name: &str) -> Result<()> {
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
self.build_bucket(bucket_name).await
}
}

/// **DataStorageConfig** : Trait method to represent the config struct needed for
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub mod metrics;
pub mod queue;
/// Contains the routes for the service
pub mod routes;
/// Contains setup functions to set up db and cloud.
pub mod setup;
/// Contains telemetry collection services. (Metrics/Logs/Traces)
pub mod telemetry;
#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions crates/orchestrator/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ use crate::jobs::JobError;
pub trait QueueProvider: Send + Sync {
async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option<Duration>) -> EyreResult<()>;
async fn consume_message_from_queue(&self, queue: String) -> std::result::Result<Delivery, QueueError>;
async fn create_and_setup_queues(&self) -> EyreResult<()>;
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
async fn setup(&self) -> EyreResult<()> {
self.create_and_setup_queues().await?;
Ok(())
}
}

pub async fn init_consumers(config: Arc<Config>) -> Result<(), JobError> {
Expand Down
177 changes: 173 additions & 4 deletions crates/orchestrator/src/queue/sqs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@ use std::collections::HashMap;
use std::time::Duration;

use async_trait::async_trait;
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::meta::region::RegionProviderChain;
use aws_config::{from_env, SdkConfig};
use aws_credential_types::provider::ProvideCredentials;
use aws_sdk_eventbridge::config::Region;
use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target};
use aws_sdk_sqs::types::QueueAttributeName;
use aws_sdk_sqs::Client;
use color_eyre::Result;
use lazy_static::lazy_static;
use omniqueue::backends::{SqsBackend, SqsConfig, SqsConsumer, SqsProducer};
use omniqueue::{Delivery, QueueError};
use utils::env_utils::get_env_var_or_panic;

use crate::queue::job_queue::{
DATA_SUBMISSION_JOB_PROCESSING_QUEUE, DATA_SUBMISSION_JOB_VERIFICATION_QUEUE, JOB_HANDLE_FAILURE_QUEUE,
PROOF_REGISTRATION_JOB_PROCESSING_QUEUE, PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE, PROVING_JOB_PROCESSING_QUEUE,
PROVING_JOB_VERIFICATION_QUEUE, SNOS_JOB_PROCESSING_QUEUE, SNOS_JOB_VERIFICATION_QUEUE,
UPDATE_STATE_JOB_PROCESSING_QUEUE, UPDATE_STATE_JOB_VERIFICATION_QUEUE, WORKER_TRIGGER_QUEUE,
WorkerTriggerMessage, WorkerTriggerType, DATA_SUBMISSION_JOB_PROCESSING_QUEUE,
DATA_SUBMISSION_JOB_VERIFICATION_QUEUE, JOB_HANDLE_FAILURE_QUEUE, PROOF_REGISTRATION_JOB_PROCESSING_QUEUE,
PROOF_REGISTRATION_JOB_VERIFICATION_QUEUE, PROVING_JOB_PROCESSING_QUEUE, PROVING_JOB_VERIFICATION_QUEUE,
SNOS_JOB_PROCESSING_QUEUE, SNOS_JOB_VERIFICATION_QUEUE, UPDATE_STATE_JOB_PROCESSING_QUEUE,
UPDATE_STATE_JOB_VERIFICATION_QUEUE, WORKER_TRIGGER_QUEUE,
};
use crate::queue::QueueProvider;
pub struct SqsQueue;
Expand Down Expand Up @@ -54,6 +63,166 @@ impl QueueProvider for SqsQueue {
let mut consumer = get_consumer(queue_url).await?;
consumer.receive().await
}

async fn create_and_setup_queues(&self) -> Result<()> {
// Setting up queues
let client = Self::create_sqs_client().await;
let dlq_arn = Self::create_dlq(&client).await?;
log::debug!("DLQ created. DLQ ARN: {}", dlq_arn);

for queue in Self::get_queue_configs() {
let redrive_policy = if queue.needs_dlq {
Some(format!(r#"{{"deadLetterTargetArn":"{}","maxReceiveCount":"{}"}}"#, dlq_arn, MAX_RECEIVE_COUNT))
} else {
None
};

let queue_url = Self::create_queue(&client, &queue.name, redrive_policy.as_deref()).await?;
log::info!("Created queue: {} at URL: {}", queue.name, queue_url);
}

// Setting up event bridge :
Self::setup_event_bridge().await?;

Ok(())
}
}

#[derive(Debug)]
struct QueueConfig {
name: String,
needs_dlq: bool,
}

const VISIBILITY_TIMEOUT: i32 = 1800; // 30 minutes in seconds
const DLQ_NAME: &str = "madara_orchestrator_job_handle_failure_queue";
const MAX_RECEIVE_COUNT: &str = "5";

impl SqsQueue {
async fn get_aws_configs() -> SdkConfig {
let region_provider = RegionProviderChain::default_provider();
from_env().region(region_provider).load().await
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
}
async fn create_sqs_client() -> Client {
Client::new(&Self::get_aws_configs().await)
}

async fn create_event_bridge_client() -> aws_sdk_eventbridge::Client {
let region_provider = Region::new(get_env_var_or_panic("AWS_REGION"));
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
let creds = EnvironmentVariableCredentialsProvider::new().provide_credentials().await.unwrap();
let config = from_env().region(region_provider).credentials_provider(creds).load().await;
aws_sdk_eventbridge::Client::new(&config)
}

fn get_queue_configs() -> Vec<QueueConfig> {
vec![
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
QueueConfig { name: "madara_orchestrator_snos_job_processing_queue".to_string(), needs_dlq: true },
QueueConfig { name: "madara_orchestrator_snos_job_verification_queue".to_string(), needs_dlq: true },
QueueConfig { name: "madara_orchestrator_proving_job_processing_queue".to_string(), needs_dlq: true },
QueueConfig { name: "madara_orchestrator_proving_job_verification_queue".to_string(), needs_dlq: true },
QueueConfig {
name: "madara_orchestrator_data_submission_job_processing_queue".to_string(),
needs_dlq: true,
},
QueueConfig {
name: "madara_orchestrator_data_submission_job_verification_queue".to_string(),
needs_dlq: true,
},
QueueConfig { name: "madara_orchestrator_update_state_job_processing_queue".to_string(), needs_dlq: true },
QueueConfig {
name: "madara_orchestrator_update_state_job_verification_queue".to_string(),
needs_dlq: true,
},
QueueConfig { name: "madara_orchestrator_worker_trigger_queue".to_string(), needs_dlq: true },
]
}

async fn create_queue(client: &Client, queue_name: &str, redrive_policy: Option<&str>) -> Result<String> {
let mut attributes = HashMap::new();
attributes.insert(QueueAttributeName::VisibilityTimeout, VISIBILITY_TIMEOUT.to_string());
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
if let Some(policy) = redrive_policy {
attributes.insert(QueueAttributeName::RedrivePolicy, policy.to_string());
}
let response = client.create_queue().queue_name(queue_name).set_attributes(Some(attributes)).send().await?;
Ok(response.queue_url().unwrap().to_string())
}

async fn get_queue_arn(client: &Client, queue_url: &str) -> Result<String> {
let attributes = client
.get_queue_attributes()
.queue_url(queue_url)
.attribute_names(QueueAttributeName::QueueArn)
.send()
.await?;

Ok(attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap().to_string())
}

async fn create_dlq(client: &Client) -> Result<String> {
let dlq_url = Self::create_queue(client, DLQ_NAME, None).await?;
Self::get_queue_arn(client, &dlq_url).await
}

async fn setup_event_bridge() -> Result<()> {
for trigger_type in [
WorkerTriggerType::Snos,
WorkerTriggerType::Proving,
WorkerTriggerType::DataSubmission,
WorkerTriggerType::UpdateState,
] {
Self::setup_event_bridge_for_trigger_type(trigger_type.clone()).await?;
log::info!("Event Bridge trigger created for trigger type: {:?}", trigger_type);
}
Ok(())
}

async fn setup_event_bridge_for_trigger_type(worker_trigger_type: WorkerTriggerType) -> Result<()> {
let rule_name = "worker_trigger_scheduled";
let client = Self::create_event_bridge_client().await;
let sqs_client = Self::create_sqs_client().await;

client
.put_rule()
.name(rule_name)
.schedule_expression("rate(1 minute)")
.state(RuleState::Enabled)
.send()
.await?;
let queue_url = sqs_client.get_queue_url().queue_name(WORKER_TRIGGER_QUEUE).send().await?;

let queue_attributes = sqs_client
.get_queue_attributes()
.queue_url(queue_url.queue_url.unwrap())
.attribute_names(QueueAttributeName::QueueArn)
.send()
.await?;
let queue_arn = queue_attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap();
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved

// Create a sample WorkerTriggerMessage
let message = WorkerTriggerMessage { worker: worker_trigger_type.clone() };
let event_detail = serde_json::to_string(&message)?;

// Create the EventBridge target with the input transformer
let input_transformer = InputTransformer::builder()
.input_paths_map("$.time", "time")
.input_template(event_detail.to_string())
.build()?;

client
.put_targets()
.rule(rule_name)
.targets(
Target::builder()
.id(format!("worker-trigger-target-{:?}", worker_trigger_type))
.arn(queue_arn)
.input_transformer(input_transformer)
.build()?,
)
.send()
.await?;

Ok(())
}
}

/// To fetch the queue URL from the environment variables
Expand Down
62 changes: 62 additions & 0 deletions crates/orchestrator/src/setup/mod.rs
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
apoorvsadana marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::process::Command;
use std::sync::Arc;

use utils::env_utils::get_env_var_or_panic;
use utils::settings::env::EnvSettingsProvider;
use utils::settings::Settings;

use crate::alerts::aws_sns::AWSSNS;
use crate::alerts::Alerts;
use crate::config::{get_aws_config, ProviderConfig};
use crate::data_storage::aws_s3::AWSS3;
use crate::data_storage::DataStorage;
use crate::queue::QueueProvider;

pub async fn setup_cloud() -> color_eyre::Result<()> {
log::info!("Setting up cloud.");
let settings_provider = EnvSettingsProvider {};
let provider_config = Arc::new(ProviderConfig::AWS(Box::new(get_aws_config(&settings_provider).await)));

log::info!("Setting up data storage.");
match get_env_var_or_panic("DATA_STORAGE").as_str() {
"s3" => {
let s3 = Box::new(AWSS3::new_with_settings(&settings_provider, provider_config.clone()).await);
s3.setup(&settings_provider.get_settings_or_panic("AWS_S3_BUCKET_NAME")).await?
}
_ => panic!("Unsupported Storage Client"),
}
log::info!("Data storage setup completed ✅");

log::info!("Setting up queues and event bridge.");
match get_env_var_or_panic("QUEUE_PROVIDER").as_str() {
"sqs" => {
let sqs = Box::new(crate::queue::sqs::SqsQueue {});
sqs.setup().await?
}
_ => panic!("Unsupported Queue Client"),
}
log::info!("Queues and Event Bridge setup completed ✅");

log::info!("Setting up alerts.");
match get_env_var_or_panic("ALERTS").as_str() {
"sns" => {
let sns = Box::new(AWSSNS::new_with_settings(&settings_provider, provider_config).await);
sns.setup().await?
}
_ => panic!("Unsupported Alert Client"),
}
log::info!("Alerts setup completed ✅");

Ok(())
}

pub async fn setup_db() -> color_eyre::Result<()> {
// We run the js script in the folder root:
log::info!("Setting up database.");

Command::new("node").arg("migrate-mongo-config.js").output()?;

log::info!("Database setup completed ✅");

Ok(())
}
4 changes: 1 addition & 3 deletions crates/orchestrator/src/tests/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,7 @@ pub mod implement_client {
ConfigType::Actual => {
let storage = get_storage_client(provider_config).await;
match get_env_var_or_panic("DATA_STORAGE").as_str() {
"s3" => {
storage.as_ref().build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap()
}
"s3" => storage.as_ref().build_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap(),
_ => panic!("Unsupported Storage Client"),
}
storage
Expand Down
2 changes: 1 addition & 1 deletion e2e-tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,6 @@ pub async fn put_job_data_in_db_proving(mongo_db: &MongoDbServer, l2_block_numbe
/// To set up s3 files needed for e2e test (test_orchestrator_workflow)
#[allow(clippy::borrowed_box)]
pub async fn setup_s3(s3_client: &Box<dyn DataStorage + Send + Sync>) -> color_eyre::Result<()> {
s3_client.build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap();
s3_client.build_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap();
Ok(())
}
Loading