Skip to content

Commit

Permalink
Merge pull request #1742 from jackh726/scheduled_jobs
Browse files Browse the repository at this point in the history
Some scheduling cleanup and automatic types planning stream opening
  • Loading branch information
Mark-Simulacrum authored Nov 22, 2023
2 parents 0a68321 + b9d17bc commit df11506
Show file tree
Hide file tree
Showing 11 changed files with 386 additions and 89 deletions.
47 changes: 41 additions & 6 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::handlers::jobs::handle_job;
use crate::{db::jobs::*, handlers::Context};
use crate::{db::jobs::*, handlers::Context, jobs::jobs};
use anyhow::Context as _;
use chrono::Utc;
use native_tls::{Certificate, TlsConnector};
Expand Down Expand Up @@ -188,16 +187,32 @@ pub async fn schedule_jobs(db: &DbClient, jobs: Vec<JobSchedule>) -> anyhow::Res
let mut upcoming = job.schedule.upcoming(Utc).take(1);

if let Some(scheduled_at) = upcoming.next() {
if let Err(_) = get_job_by_name_and_scheduled_at(&db, &job.name, &scheduled_at).await {
// mean there's no job already in the db with that name and scheduled_at
insert_job(&db, &job.name, &scheduled_at, &job.metadata).await?;
}
schedule_job(db, job.name, job.metadata, scheduled_at).await?;
}
}

Ok(())
}

pub async fn schedule_job(
db: &DbClient,
job_name: &str,
job_metadata: serde_json::Value,
when: chrono::DateTime<Utc>,
) -> anyhow::Result<()> {
let all_jobs = jobs();
if !all_jobs.iter().any(|j| j.name() == job_name) {
anyhow::bail!("Job {} does not exist in the current job list.", job_name);
}

if let Err(_) = get_job_by_name_and_scheduled_at(&db, job_name, &when).await {
// mean there's no job already in the db with that name and scheduled_at
insert_job(&db, job_name, &when, &job_metadata).await?;
}

Ok(())
}

pub async fn run_scheduled_jobs(ctx: &Context, db: &DbClient) -> anyhow::Result<()> {
let jobs = get_jobs_to_execute(&db).await.unwrap();
tracing::trace!("jobs to execute: {:#?}", jobs);
Expand All @@ -220,6 +235,26 @@ pub async fn run_scheduled_jobs(ctx: &Context, db: &DbClient) -> anyhow::Result<
Ok(())
}

// Try to handle a specific job
async fn handle_job(
ctx: &Context,
name: &String,
metadata: &serde_json::Value,
) -> anyhow::Result<()> {
for job in jobs() {
if &job.name() == &name {
return job.run(ctx, metadata).await;
}
}
tracing::trace!(
"handle_job fell into default case: (name={:?}, metadata={:?})",
name,
metadata
);

Ok(())
}

static MIGRATIONS: &[&str] = &[
"
CREATE TABLE notifications (
Expand Down
6 changes: 3 additions & 3 deletions src/db/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio_postgres::Client as DbClient;
use uuid::Uuid;

pub struct JobSchedule {
pub name: String,
pub name: &'static str,
pub schedule: Schedule,
pub metadata: serde_json::Value,
}
Expand All @@ -24,7 +24,7 @@ pub struct Job {

pub async fn insert_job(
db: &DbClient,
name: &String,
name: &str,
scheduled_at: &DateTime<Utc>,
metadata: &serde_json::Value,
) -> Result<()> {
Expand Down Expand Up @@ -76,7 +76,7 @@ pub async fn update_job_executed_at(db: &DbClient, id: &Uuid) -> Result<()> {

pub async fn get_job_by_name_and_scheduled_at(
db: &DbClient,
name: &String,
name: &str,
scheduled_at: &DateTime<Utc>,
) -> Result<Job> {
tracing::trace!(
Expand Down
15 changes: 15 additions & 0 deletions src/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,21 @@ impl Issue {
.await?)
}

// returns an array of one element
pub async fn get_first100_comments(
&self,
client: &GithubClient,
) -> anyhow::Result<Vec<Comment>> {
let comment_url = format!(
"{}/issues/{}/comments?page=1&per_page=100",
self.repository().url(),
self.number,
);
Ok(client
.json::<Vec<Comment>>(client.get(&comment_url))
.await?)
}

pub async fn edit_body(&self, client: &GithubClient, body: &str) -> anyhow::Result<()> {
let edit_url = format!("{}/issues/{}", self.repository().url(), self.number);
#[derive(serde::Serialize)]
Expand Down
2 changes: 1 addition & 1 deletion src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ mod close;
pub mod docs_update;
mod github_releases;
mod glacier;
pub mod jobs;
mod major_change;
mod mentions;
mod milestone_prs;
Expand All @@ -46,6 +45,7 @@ mod review_submitted;
mod rfc_helper;
pub mod rustc_commits;
mod shortcut;
pub mod types_planning_updates;

pub async fn handle(ctx: &Context, event: &Event) -> Vec<HandlerError> {
let config = config::get(&ctx.github, event.repo()).await;
Expand Down
67 changes: 35 additions & 32 deletions src/handlers/docs_update.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
//! A scheduled job to post a PR to update the documentation on rust-lang/rust.
use crate::db::jobs::JobSchedule;
use crate::github::{self, GitTreeEntry, GithubClient, Issue, Repository};
use crate::jobs::Job;
use anyhow::Context;
use anyhow::Result;
use cron::Schedule;
use async_trait::async_trait;
use reqwest::Client;
use std::fmt::Write;
use std::str::FromStr;

/// This is the repository where the commits will be created.
const WORK_REPO: &str = "rustbot/rust";
Expand All @@ -28,38 +27,42 @@ const SUBMODULES: &[&str] = &[

const TITLE: &str = "Update books";

pub fn job() -> JobSchedule {
JobSchedule {
name: "docs_update".to_string(),
// Around 9am Pacific time on every Monday.
schedule: Schedule::from_str("0 00 17 * * Mon *").unwrap(),
metadata: serde_json::Value::Null,
}
}
pub struct DocsUpdateJob;

pub async fn handle_job() -> Result<()> {
// Only run every other week. Doing it every week can be a bit noisy, and
// (rarely) a PR can take longer than a week to merge (like if there are
// CI issues). `Schedule` does not allow expressing this, so check it
// manually.
//
// This is set to run the first week after a release, and the week just
// before a release. That allows getting the latest changes in the next
// release, accounting for possibly taking a few days for the PR to land.
let today = chrono::Utc::today().naive_utc();
let base = chrono::naive::NaiveDate::from_ymd(2015, 12, 10);
let duration = today.signed_duration_since(base);
let weeks = duration.num_weeks();
if weeks % 2 != 0 {
tracing::trace!("skipping job, this is an odd week");
return Ok(());
#[async_trait]
impl Job for DocsUpdateJob {
fn name(&self) -> &'static str {
"docs_update"
}

tracing::trace!("starting docs-update");
docs_update()
.await
.context("failed to process docs update")?;
Ok(())
async fn run(
&self,
_ctx: &super::Context,
_metadata: &serde_json::Value,
) -> anyhow::Result<()> {
// Only run every other week. Doing it every week can be a bit noisy, and
// (rarely) a PR can take longer than a week to merge (like if there are
// CI issues). `Schedule` does not allow expressing this, so check it
// manually.
//
// This is set to run the first week after a release, and the week just
// before a release. That allows getting the latest changes in the next
// release, accounting for possibly taking a few days for the PR to land.
let today = chrono::Utc::today().naive_utc();
let base = chrono::naive::NaiveDate::from_ymd(2015, 12, 10);
let duration = today.signed_duration_since(base);
let weeks = duration.num_weeks();
if weeks % 2 != 0 {
tracing::trace!("skipping job, this is an odd week");
return Ok(());
}

tracing::trace!("starting docs-update");
docs_update()
.await
.context("failed to process docs update")?;
Ok(())
}
}

pub async fn docs_update() -> Result<Option<Issue>> {
Expand Down
8 changes: 4 additions & 4 deletions src/handlers/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ use super::Context;

pub async fn handle_job(
ctx: &Context,
name: &String,
name: &str,
metadata: &serde_json::Value,
) -> anyhow::Result<()> {
match name.as_str() {
match name {
"docs_update" => super::docs_update::handle_job().await,
"rustc_commits" => {
super::rustc_commits::synchronize_commits_inner(ctx, None).await;
Ok(())
}
_ => default(&name, &metadata),
_ => default(name, &metadata),
}
}

fn default(name: &String, metadata: &serde_json::Value) -> anyhow::Result<()> {
fn default(name: &str, metadata: &serde_json::Value) -> anyhow::Result<()> {
tracing::trace!(
"handle_job fell into default case: (name={:?}, metadata={:?})",
name,
Expand Down
22 changes: 13 additions & 9 deletions src/handlers/rustc_commits.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::db::jobs::JobSchedule;
use crate::db::rustc_commits;
use crate::db::rustc_commits::get_missing_commits;
use crate::jobs::Job;
use crate::{
github::{self, Event},
handlers::Context,
};
use cron::Schedule;
use async_trait::async_trait;
use std::collections::VecDeque;
use std::convert::TryInto;
use std::str::FromStr;
use tracing as log;

const BORS_GH_ID: i64 = 3372342;
Expand Down Expand Up @@ -153,12 +152,17 @@ pub async fn synchronize_commits_inner(ctx: &Context, starter: Option<(String, u
}
}

pub fn job() -> JobSchedule {
JobSchedule {
name: "rustc_commits".to_string(),
// Every 30 minutes...
schedule: Schedule::from_str("* 0,30 * * * * *").unwrap(),
metadata: serde_json::Value::Null,
pub struct RustcCommitsJob;

#[async_trait]
impl Job for RustcCommitsJob {
fn name(&self) -> &'static str {
"rustc_commits"
}

async fn run(&self, ctx: &super::Context, _metadata: &serde_json::Value) -> anyhow::Result<()> {
synchronize_commits_inner(ctx, None).await;
Ok(())
}
}

Expand Down
Loading

0 comments on commit df11506

Please sign in to comment.