Skip to content

Commit

Permalink
refactor: adding retry jobs directly to the queue now
Browse files Browse the repository at this point in the history
  • Loading branch information
Mohiiit committed Jan 6, 2025
1 parent 2373eb7 commit bc710fd
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 28 deletions.
27 changes: 13 additions & 14 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,28 +642,27 @@ pub async fn retry_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
JobError::Other(OtherError(e))
})?;

let result = process_job(job.id, config.clone()).await;

if let Err(e) = &result {
add_job_to_process_queue(job.id, &job.job_type, config.clone()).await.map_err(|e| {
tracing::error!(
log_type = "error",
category = "general",
function_type = "retry_job",
block_no = %internal_id,
error = %e,
"General retry job failed for block"
);
} else {
tracing::info!(
log_type = "completed",
category = "general",
function_type = "retry_job",
block_no = %internal_id,
"General retry job completed for block"
"Failed to add job to process queue"
);
}
JobError::Other(OtherError(e))
})?;

tracing::info!(
log_type = "completed",
category = "general",
function_type = "retry_job",
block_no = %internal_id,
"Successfully queued job for retry"
);

result
Ok(())
}

/// Terminates the job and updates the status of the job in the DB.
Expand Down
72 changes: 71 additions & 1 deletion crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use crate::jobs::constants::{
use crate::jobs::job_handler_factory::mock_factory;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::{
create_job, handle_job_failure, increment_key_in_metadata, process_job, verify_job, Job, JobError, MockJob,
create_job, handle_job_failure, increment_key_in_metadata, process_job, retry_job, verify_job, Job, JobError,
MockJob,
};
use crate::queue::job_queue::QueueNameForJobType;
use crate::queue::QueueType;
Expand Down Expand Up @@ -759,3 +760,72 @@ async fn handle_job_failure_job_status_completed_works(#[case] job_type: JobType

assert_eq!(job_fetched, job_expected);
}

#[rstest]
#[tokio::test]
async fn test_retry_job_adds_to_process_queue() {
let services = TestConfigBuilder::new()
.configure_database(ConfigType::Actual)
.configure_queue_client(ConfigType::Actual)
.build()
.await;

// Create a failed job
let job_item = build_job_item(JobType::DataSubmission, JobStatus::Failed, 1);
services.config.database().create_job(job_item.clone()).await.unwrap();
let job_id = job_item.id;

// Retry the job
assert!(retry_job(job_id, services.config.clone()).await.is_ok());

// Verify job status was updated to PendingRetry
let updated_job = services.config.database().get_job_by_id(job_id).await.unwrap().unwrap();
assert_eq!(updated_job.status, JobStatus::PendingRetry);

// Wait for message to be processed
tokio::time::sleep(Duration::from_secs(5)).await;

// Verify message was added to process queue
let consumed_messages =
services.config.queue().consume_message_from_queue(job_item.job_type.process_queue_name()).await.unwrap();

let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap();
assert_eq!(consumed_message_payload.id, job_id);
}

#[rstest]
#[case::pending_verification(JobStatus::PendingVerification)]
#[case::completed(JobStatus::Completed)]
#[case::created(JobStatus::Created)]
#[tokio::test]
async fn test_retry_job_invalid_status(#[case] initial_status: JobStatus) {
let services = TestConfigBuilder::new()
.configure_database(ConfigType::Actual)
.configure_queue_client(ConfigType::Actual)
.build()
.await;

// Create a job with non-Failed status
let job_item = build_job_item(JobType::DataSubmission, initial_status.clone(), 1);
services.config.database().create_job(job_item.clone()).await.unwrap();
let job_id = job_item.id;

// Attempt to retry the job
let result = retry_job(job_id, services.config.clone()).await;
assert!(result.is_err());

if let Err(error) = result {
assert_matches!(error, JobError::InvalidStatus { .. });
}

// Verify job status was not changed
let job = services.config.database().get_job_by_id(job_id).await.unwrap().unwrap();
assert_eq!(job.status, initial_status);

// Wait briefly to ensure no messages were added
tokio::time::sleep(Duration::from_secs(5)).await;

// Verify no message was added to process queue
let queue_result = services.config.queue().consume_message_from_queue(job_item.job_type.process_queue_name()).await;
assert_matches!(queue_result, Err(QueueError::NoData));
}
29 changes: 16 additions & 13 deletions crates/orchestrator/src/tests/server/job_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::jobs::job_handler_factory::mock_factory;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::{Job, MockJob};
use crate::queue::init_consumers;
use crate::queue::job_queue::{JobQueueMessage, QueueNameForJobType};
use crate::tests::config::{ConfigType, TestConfigBuilder};

#[fixture]
Expand Down Expand Up @@ -126,35 +127,32 @@ async fn test_trigger_retry_job_when_failed(#[future] setup_trigger: (SocketAddr
let job_type = JobType::DataSubmission;

let job_item = build_job_item(job_type.clone(), JobStatus::Failed, 1);
let mut job_handler = MockJob::new();

// Expect process_job to be called once for failed jobs
job_handler.expect_process_job().times(1).returning(move |_, _| Ok("0xbeef".to_string()));
job_handler.expect_verification_polling_delay_seconds().return_const(1u64);

config.database().create_job(job_item.clone()).await.unwrap();
let job_id = job_item.clone().id;

let job_handler: Arc<Box<dyn Job>> = Arc::new(Box::new(job_handler));
let ctx = mock_factory::get_job_handler_context();
ctx.expect().times(1).with(eq(job_type)).returning(move |_| Arc::clone(&job_handler));

let client = hyper::Client::new();
let response = client
.request(Request::builder().uri(format!("http://{}/jobs/{}/retry", addr, job_id)).body(Body::empty()).unwrap())
.await
.unwrap();

assert_eq!(response.status(), 200);

// Verify job was added to process queue
let queue_message = config.queue().consume_message_from_queue(job_type.process_queue_name()).await.unwrap();

let message_payload: JobQueueMessage = queue_message.payload_serde_json().unwrap().unwrap();
assert_eq!(message_payload.id, job_id);

// Verify job status changed to PendingRetry
let job_fetched = config.database().get_job_by_id(job_id).await.unwrap().expect("Could not get job from database");
assert_eq!(job_fetched.id, job_item.id);
assert_eq!(job_fetched.status, JobStatus::PendingVerification);
assert_eq!(job_fetched.status, JobStatus::PendingRetry);
}

#[rstest]
#[case::pending_verification_job(JobStatus::PendingVerification)]
#[case::completed_job(JobStatus::Completed)]
#[case::created_job(JobStatus::Created)]
#[tokio::test]
async fn test_trigger_retry_job_not_allowed(
#[future] setup_trigger: (SocketAddr, Arc<Config>),
Expand All @@ -164,7 +162,6 @@ async fn test_trigger_retry_job_not_allowed(
let job_type = JobType::DataSubmission;

let job_item = build_job_item(job_type.clone(), initial_status.clone(), 1);

config.database().create_job(job_item.clone()).await.unwrap();
let job_id = job_item.clone().id;

Expand All @@ -174,10 +171,16 @@ async fn test_trigger_retry_job_not_allowed(
.await
.unwrap();

// Verify request was rejected
assert_eq!(response.status(), 400);

// Verify job status hasn't changed
let job_fetched = config.database().get_job_by_id(job_id).await.unwrap().expect("Could not get job from database");
assert_eq!(job_fetched.status, initial_status);

// Verify no message was added to the queue
let queue_result = config.queue().consume_message_from_queue(job_type.process_queue_name()).await;
assert!(queue_result.is_err(), "Queue should be empty - no message should be added for non-Failed jobs");
}

#[rstest]
Expand Down

0 comments on commit bc710fd

Please sign in to comment.