diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 44743aae..2f2250b4 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -642,28 +642,27 @@ pub async fn retry_job(id: Uuid, config: Arc) -> Result<(), JobError> { JobError::Other(OtherError(e)) })?; - let result = process_job(job.id, config.clone()).await; - - if let Err(e) = &result { + add_job_to_process_queue(job.id, &job.job_type, config.clone()).await.map_err(|e| { tracing::error!( log_type = "error", category = "general", function_type = "retry_job", block_no = %internal_id, error = %e, - "General retry job failed for block" - ); - } else { - tracing::info!( - log_type = "completed", - category = "general", - function_type = "retry_job", - block_no = %internal_id, - "General retry job completed for block" + "Failed to add job to process queue" ); - } + JobError::Other(OtherError(e)) + })?; + + tracing::info!( + log_type = "completed", + category = "general", + function_type = "retry_job", + block_no = %internal_id, + "Successfully queued job for retry" + ); - result + Ok(()) } /// Terminates the job and updates the status of the job in the DB. diff --git a/crates/orchestrator/src/tests/jobs/mod.rs b/crates/orchestrator/src/tests/jobs/mod.rs index c34fe0ff..c1b3fb72 100644 --- a/crates/orchestrator/src/tests/jobs/mod.rs +++ b/crates/orchestrator/src/tests/jobs/mod.rs @@ -16,7 +16,8 @@ use crate::jobs::constants::{ use crate::jobs::job_handler_factory::mock_factory; use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::jobs::{ - create_job, handle_job_failure, increment_key_in_metadata, process_job, verify_job, Job, JobError, MockJob, + create_job, handle_job_failure, increment_key_in_metadata, process_job, retry_job, verify_job, Job, JobError, + MockJob, }; use crate::queue::job_queue::QueueNameForJobType; use crate::queue::QueueType; @@ -759,3 +760,72 @@ async fn handle_job_failure_job_status_completed_works(#[case] job_type: JobType assert_eq!(job_fetched, job_expected); } + +#[rstest] +#[tokio::test] +async fn test_retry_job_adds_to_process_queue() { + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + // Create a failed job + let job_item = build_job_item(JobType::DataSubmission, JobStatus::Failed, 1); + services.config.database().create_job(job_item.clone()).await.unwrap(); + let job_id = job_item.id; + + // Retry the job + assert!(retry_job(job_id, services.config.clone()).await.is_ok()); + + // Verify job status was updated to PendingRetry + let updated_job = services.config.database().get_job_by_id(job_id).await.unwrap().unwrap(); + assert_eq!(updated_job.status, JobStatus::PendingRetry); + + // Wait for message to be processed + tokio::time::sleep(Duration::from_secs(5)).await; + + // Verify message was added to process queue + let consumed_messages = + services.config.queue().consume_message_from_queue(job_item.job_type.process_queue_name()).await.unwrap(); + + let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); + assert_eq!(consumed_message_payload.id, job_id); +} + +#[rstest] +#[case::pending_verification(JobStatus::PendingVerification)] +#[case::completed(JobStatus::Completed)] +#[case::created(JobStatus::Created)] +#[tokio::test] +async fn test_retry_job_invalid_status(#[case] initial_status: JobStatus) { + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + // Create a job with non-Failed status + let job_item = build_job_item(JobType::DataSubmission, initial_status.clone(), 1); + services.config.database().create_job(job_item.clone()).await.unwrap(); + let job_id = job_item.id; + + // Attempt to retry the job + let result = retry_job(job_id, services.config.clone()).await; + assert!(result.is_err()); + + if let Err(error) = result { + assert_matches!(error, JobError::InvalidStatus { .. }); + } + + // Verify job status was not changed + let job = services.config.database().get_job_by_id(job_id).await.unwrap().unwrap(); + assert_eq!(job.status, initial_status); + + // Wait briefly to ensure no messages were added + tokio::time::sleep(Duration::from_secs(5)).await; + + // Verify no message was added to process queue + let queue_result = services.config.queue().consume_message_from_queue(job_item.job_type.process_queue_name()).await; + assert_matches!(queue_result, Err(QueueError::NoData)); +} diff --git a/crates/orchestrator/src/tests/server/job_routes.rs b/crates/orchestrator/src/tests/server/job_routes.rs index d0918559..2b513caf 100644 --- a/crates/orchestrator/src/tests/server/job_routes.rs +++ b/crates/orchestrator/src/tests/server/job_routes.rs @@ -17,6 +17,7 @@ use crate::jobs::job_handler_factory::mock_factory; use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::jobs::{Job, MockJob}; use crate::queue::init_consumers; +use crate::queue::job_queue::{JobQueueMessage, QueueNameForJobType}; use crate::tests::config::{ConfigType, TestConfigBuilder}; #[fixture] @@ -126,35 +127,32 @@ async fn test_trigger_retry_job_when_failed(#[future] setup_trigger: (SocketAddr let job_type = JobType::DataSubmission; let job_item = build_job_item(job_type.clone(), JobStatus::Failed, 1); - let mut job_handler = MockJob::new(); - - // Expect process_job to be called once for failed jobs - job_handler.expect_process_job().times(1).returning(move |_, _| Ok("0xbeef".to_string())); - job_handler.expect_verification_polling_delay_seconds().return_const(1u64); - config.database().create_job(job_item.clone()).await.unwrap(); let job_id = job_item.clone().id; - let job_handler: Arc> = Arc::new(Box::new(job_handler)); - let ctx = mock_factory::get_job_handler_context(); - ctx.expect().times(1).with(eq(job_type)).returning(move |_| Arc::clone(&job_handler)); - let client = hyper::Client::new(); let response = client .request(Request::builder().uri(format!("http://{}/jobs/{}/retry", addr, job_id)).body(Body::empty()).unwrap()) .await .unwrap(); - assert_eq!(response.status(), 200); + // Verify job was added to process queue + let queue_message = config.queue().consume_message_from_queue(job_type.process_queue_name()).await.unwrap(); + + let message_payload: JobQueueMessage = queue_message.payload_serde_json().unwrap().unwrap(); + assert_eq!(message_payload.id, job_id); + + // Verify job status changed to PendingRetry let job_fetched = config.database().get_job_by_id(job_id).await.unwrap().expect("Could not get job from database"); assert_eq!(job_fetched.id, job_item.id); - assert_eq!(job_fetched.status, JobStatus::PendingVerification); + assert_eq!(job_fetched.status, JobStatus::PendingRetry); } #[rstest] #[case::pending_verification_job(JobStatus::PendingVerification)] #[case::completed_job(JobStatus::Completed)] +#[case::created_job(JobStatus::Created)] #[tokio::test] async fn test_trigger_retry_job_not_allowed( #[future] setup_trigger: (SocketAddr, Arc), @@ -164,7 +162,6 @@ async fn test_trigger_retry_job_not_allowed( let job_type = JobType::DataSubmission; let job_item = build_job_item(job_type.clone(), initial_status.clone(), 1); - config.database().create_job(job_item.clone()).await.unwrap(); let job_id = job_item.clone().id; @@ -174,10 +171,16 @@ async fn test_trigger_retry_job_not_allowed( .await .unwrap(); + // Verify request was rejected assert_eq!(response.status(), 400); + // Verify job status hasn't changed let job_fetched = config.database().get_job_by_id(job_id).await.unwrap().expect("Could not get job from database"); assert_eq!(job_fetched.status, initial_status); + + // Verify no message was added to the queue + let queue_result = config.queue().consume_message_from_queue(job_type.process_queue_name()).await; + assert!(queue_result.is_err(), "Queue should be empty - no message should be added for non-Failed jobs"); } #[rstest]