Skip to content

Commit

Permalink
Workflow for tracking PRs assignment
Browse files Browse the repository at this point in the history
General overview at: #1753

This patch implements the first part:
- a new DB table with just the fields to track how many PRs are
assigned to a contributor at any time
- Update this table everytime a PR is assigned or unassigned

No initial sync is planned at this time. Populating the table will happen over
time.
  • Loading branch information
apiraino committed Dec 13, 2023
1 parent 92c3399 commit a9800b7
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub(crate) struct Config {
pub(crate) note: Option<NoteConfig>,
pub(crate) mentions: Option<MentionsConfig>,
pub(crate) no_merges: Option<NoMergesConfig>,
pub(crate) review_work_queue: Option<ReviewWorkQueueConfig>,
}

#[derive(PartialEq, Eq, Debug, serde::Deserialize)]
Expand Down Expand Up @@ -152,6 +153,12 @@ pub(crate) struct ShortcutConfig {
_empty: (),
}

#[derive(PartialEq, Eq, Debug, serde::Deserialize)]
pub(crate) struct ReviewWorkQueueConfig {
#[serde(default)]
_empty: (),
}

#[derive(PartialEq, Eq, Debug, serde::Deserialize)]
pub(crate) struct PrioritizeConfig {
pub(crate) label: String,
Expand Down
8 changes: 8 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,5 +307,13 @@ CREATE UNIQUE INDEX jobs_name_scheduled_at_unique_index
ON jobs (
name, scheduled_at
);
",
"
CREATE table review_prefs (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
user_id BIGINT REFERENCES users(user_id),
assigned_prs INT[] NOT NULL DEFAULT array[]::INT[],
num_assigned_prs INTEGER
);
",
];
2 changes: 2 additions & 0 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ mod prioritize;
mod relabel;
mod review_requested;
mod review_submitted;
mod review_work_queue;
mod rfc_helper;
pub mod rustc_commits;
mod shortcut;
Expand Down Expand Up @@ -166,6 +167,7 @@ issue_handlers! {
no_merges,
notify_zulip,
review_requested,
review_work_queue,
}

macro_rules! command_handlers {
Expand Down
164 changes: 164 additions & 0 deletions src/handlers/review_work_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
use crate::{
config::ReviewWorkQueueConfig,
github::{IssuesAction, IssuesEvent},
handlers::Context,
ReviewWorkQueue,
};
use anyhow::Context as _;
use tokio_postgres::Client as DbClient;
use tracing as log;

// This module updates the PR work queue of team members
// - When a PR has been assigned, adds the PR to the work queue of team members
// - When a PR is unassigned or closed, removes the PR from the work queue of all team members

/// Get work queue for a number of team members
pub async fn get_review_candidates_by_username(
db: &DbClient,
usernames: Vec<String>,
) -> anyhow::Result<Vec<ReviewWorkQueue>> {
let q = format!(
"
SELECT u.username, r.*
FROM review_capacity_prefs r
JOIN users u on u.user_id=r.user_id
WHERE username = ANY('{{ {} }}')",
usernames.join(",")
);
let rows = db.query(&q, &[]).await.unwrap();
Ok(rows
.into_iter()
.filter_map(|row| Some(ReviewWorkQueue::from(row)))
.collect())
}

/// Get all assignees for a pull request
async fn get_pr_assignees(db: &DbClient, issue_num: i64) -> anyhow::Result<Vec<ReviewWorkQueue>> {
let q = format!(
"
SELECT u.username, r.*
FROM review_capacity_prefs r
JOIN users u on u.user_id=r.user_id
WHERE {} = ANY (assigned_prs);",
issue_num,
);

let rows = db.query(&q, &[]).await.unwrap();
Ok(rows
.into_iter()
.filter_map(|row| Some(ReviewWorkQueue::from(row)))
.collect())
}

pub(super) struct ReviewPrefsInput {}

pub(super) async fn parse_input(
_ctx: &Context,
event: &IssuesEvent,
config: Option<&ReviewWorkQueueConfig>,
) -> Result<Option<ReviewPrefsInput>, String> {
log::debug!("[review_prefs] parse_input");
let _config = match config {
Some(config) => config,
None => return Ok(None),
};

log::debug!(
"[review_prefs] now matching the action for event {:?}",
event
);
match event.action {
IssuesAction::Assigned => {
log::debug!("[review_prefs] IssuesAction::Assigned: Will add to work queue");
Ok(Some(ReviewPrefsInput {}))
}
IssuesAction::Unassigned | IssuesAction::Closed => {
log::debug!("[review_prefs] IssuesAction::Unassigned | IssuesAction::Closed: Will remove from work queue");
Ok(Some(ReviewPrefsInput {}))
}
_ => {
log::debug!("[review_prefs] Other action on PR {:?}", event.action);
Ok(None)
}
}
}

async fn update_team_member_workqueue(
db: &DbClient,
user_id: i64,
assigned_prs: &Vec<i64>,
) -> anyhow::Result<ReviewWorkQueue> {
let q = "
UPDATE review_capacity_prefs r
SET assigned_prs = $2, num_assigned_prs = $3
FROM users u
WHERE r.user_id=$1 AND u.user_id=r.user_id
RETURNING u.username, r.*";
let num_assigned_prs = assigned_prs.len() as i32;
let rec = db
.query_one(q, &[&user_id, assigned_prs, &num_assigned_prs])
.await
.context("Update DB error")?;
Ok(rec.into())
}

pub(super) async fn handle_input<'a>(
ctx: &Context,
_config: &ReviewWorkQueueConfig,
event: &IssuesEvent,
_inputs: ReviewPrefsInput,
) -> anyhow::Result<()> {
log::debug!("[review_prefs] handle_input");
let db_client = ctx.db.get().await;

// Note:
// When assigning or unassigning a PR, we don't receive the assignee(s) removed from the PR
// so we need to run two queries:

// 1) Remove the PR from everyones' work queue
let iss_num = event.issue.number as i64;
let current_assignees = get_pr_assignees(&db_client, iss_num).await.unwrap();
for mut assignee in current_assignees {
if let Some(index) = assignee
.assigned_prs
.iter()
.position(|value| *value == iss_num)
{
assignee.assigned_prs.swap_remove(index);
}
update_team_member_workqueue(&db_client, assignee.user_id, &assignee.assigned_prs)
.await
.unwrap();
}

// If the action is to unassign or close a PR, nothing else to do
if event.action == IssuesAction::Closed || event.action == IssuesAction::Unassigned {
return Ok(());
}

// 2) Add the PR to the requested team members' work queues
let usernames = event
.issue
.assignees
.iter()
.map(|x| x.login.clone())
.collect::<Vec<String>>();
let requested_assignees = get_review_candidates_by_username(&db_client, usernames)
.await
.unwrap();
for mut assignee in requested_assignees {
log::debug!(
"Adding PR#{} to work queue of team member {}",
iss_num,
&assignee.username
);
if !assignee.assigned_prs.contains(&iss_num) {
assignee.assigned_prs.push(iss_num)
}
update_team_member_workqueue(&db_client, assignee.user_id, &assignee.assigned_prs)
.await
.unwrap();
}

Ok(())
}
22 changes: 22 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::github::PullRequestDetails;
use anyhow::Context;
use handlers::HandlerError;
use interactions::ErrorComment;
use serde::Serialize;
use std::fmt;
use tracing as log;

Expand Down Expand Up @@ -122,6 +123,27 @@ impl From<anyhow::Error> for WebhookError {
}
}

#[derive(Debug, Serialize)]
pub struct ReviewWorkQueue {
pub username: String,
pub id: uuid::Uuid,
pub user_id: i64,
pub assigned_prs: Vec<i64>,
pub num_assigned_prs: Option<i32>,
}

impl From<tokio_postgres::row::Row> for ReviewWorkQueue {
fn from(row: tokio_postgres::row::Row) -> Self {
Self {
username: row.get("username"),
id: row.get("id"),
user_id: row.get("user_id"),
assigned_prs: row.get("assigned_prs"),
num_assigned_prs: row.get("num_assigned_prs"),
}
}
}

pub fn deserialize_payload<T: serde::de::DeserializeOwned>(v: &str) -> anyhow::Result<T> {
let mut deserializer = serde_json::Deserializer::from_str(&v);
let res: Result<T, _> = serde_path_to_error::deserialize(&mut deserializer);
Expand Down

0 comments on commit a9800b7

Please sign in to comment.