Skip to content

Commit

Permalink
update: JOB_METADATA_PROCESSING_COMPLETED_AT
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv committed Dec 15, 2024
1 parent 66e3c31 commit b35ea02
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
2 changes: 2 additions & 0 deletions crates/orchestrator/src/jobs/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pub const JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO: &str = "last_failed_bl
pub const JOB_METADATA_SNOS_BLOCK: &str = "block_number_to_run";
pub const JOB_METADATA_SNOS_FACT: &str = "snos_fact";
pub const JOB_METADATA_FAILURE_REASON: &str = "failure_reason";
pub const JOB_METADATA_ERROR: &str = "error";
pub const JOB_METADATA_PROCESSING_COMPLETED_AT: &str = "processing_completed_at";
9 changes: 5 additions & 4 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use chrono::Utc;
use color_eyre::eyre::{eyre, Context};
use constants::JOB_METADATA_FAILURE_REASON;
use constants::{JOB_METADATA_ERROR, JOB_METADATA_FAILURE_REASON, JOB_METADATA_PROCESSING_COMPLETED_AT};
use conversion::parse_string;
use da_job::DaError;
use futures::FutureExt;
Expand Down Expand Up @@ -236,7 +236,8 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
Ok(Ok(external_id)) => {
tracing::debug!(job_id = ?id, "Successfully processed job");
// Add the time of processing to the metadata.
job.metadata.insert("processing_completed_at".to_string(), Utc::now().timestamp_millis().to_string());
job.metadata
.insert(JOB_METADATA_PROCESSING_COMPLETED_AT.to_string(), Utc::now().timestamp_millis().to_string());
external_id
}
Ok(Err(e)) => {
Expand Down Expand Up @@ -357,7 +358,7 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
tracing::info!(job_id = ?id, "Job verified successfully");
match job
.metadata
.get("processing_completed_at")
.get(JOB_METADATA_PROCESSING_COMPLETED_AT)
.and_then(|time| time.parse::<i64>().ok())
.map(|start| Utc::now().timestamp_millis() - start)
{
Expand All @@ -383,7 +384,7 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
JobVerificationStatus::Rejected(e) => {
tracing::warn!(job_id = ?id, error = ?e, "Job verification rejected");
let mut new_job = job.clone();
new_job.metadata.insert("error".to_string(), e);
new_job.metadata.insert(JOB_METADATA_ERROR.to_string(), e);
new_job.status = JobStatus::VerificationFailed;

let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)
Expand Down

0 comments on commit b35ea02

Please sign in to comment.