Skip to content

Commit

Permalink
refactor: process job updated to update panic message in the db
Browse files Browse the repository at this point in the history
  • Loading branch information
mohiiit committed Nov 8, 2024
1 parent d88efb8 commit c470f82
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 24 deletions.
22 changes: 14 additions & 8 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,20 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
tracing::error!(job_id = ?id, error = ?e, "Failed to process job");
return move_job_to_failed(&job, config.clone(), format!("Processing failed: {}", e)).await;
}
Err(_) => {
tracing::error!(job_id = ?id, "Job handler panicked during processing");
let _ =
move_job_to_failed(&job, config.clone(), "Job handler panicked during processing".to_string()).await;
panic!(
"Job handler panicked during processing of job with id: {} and internal id: {}",
id, job.internal_id
);
Err(panic) => {
let panic_msg = panic
.downcast_ref::<String>()
.map(|s| s.as_str())
.or_else(|| panic.downcast_ref::<&str>().copied())
.unwrap_or("Unknown panic message");

tracing::error!(job_id = ?id, panic_msg = %panic_msg, "Job handler panicked during processing");
return move_job_to_failed(
&job,
config.clone(),
format!("Job handler panicked in job with id: {} and panic message: {}", id, panic_msg),
)
.await;
}
};
tracing::debug!(job_id = ?id, "Incrementing process attempt count in metadata");
Expand Down
24 changes: 8 additions & 16 deletions crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::collections::HashMap;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::time::Duration;

use futures::FutureExt;
use mockall::predicate::eq;
use mongodb::bson::doc;
use omniqueue::QueueError;
Expand Down Expand Up @@ -227,25 +225,19 @@ async fn process_job_handles_panic() {
let ctx = mock_factory::get_job_handler_context();
ctx.expect().times(1).with(eq(JobType::SnosRun)).return_once(move |_| Arc::clone(&job_handler));

let async_result = AssertUnwindSafe(process_job(job_item.id, services.config.clone())).catch_unwind().await;

// Verify that it panicked
assert!(async_result.is_err());
let err = async_result.unwrap_err();
let panic_msg = err.downcast_ref::<String>().expect("Panic message should be a string");
assert_eq!(
*panic_msg,
format!(
"Job handler panicked during processing of job with id: {} and internal id: {}",
job_item.id, job_item.internal_id
)
);
assert!(process_job(job_item.id, services.config.clone()).await.is_ok());

// DB checks - verify the job was moved to failed state
let job_in_db = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap();
assert_eq!(job_in_db.status, JobStatus::Failed);
assert!(
job_in_db.metadata.get(JOB_METADATA_FAILURE_REASON).unwrap().contains("Job handler panicked during processing")
job_in_db.metadata.get(JOB_METADATA_FAILURE_REASON).unwrap().contains(
format!(
"Job handler panicked in job with id: {} and panic message: Simulated panic in process_job",
job_item.id
)
.as_str()
)
);
}

Expand Down

0 comments on commit c470f82

Please sign in to comment.