Skip to content

Commit

Permalink
feat: monitor idx cache consistency before switching (#2229)
Browse files Browse the repository at this point in the history
  • Loading branch information
ellie authored Jul 2, 2024
1 parent 255a7bc commit c3723aa
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/atuin-server-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ serde = { workspace = true }
sqlx = { workspace = true }
async-trait = { workspace = true }
uuid = { workspace = true }
metrics = "0.21.1"
futures-util = "0.3"
url = "2.5.2"
32 changes: 29 additions & 3 deletions crates/atuin-server-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use atuin_common::utils::crypto_random_string;
use atuin_server_database::models::{History, NewHistory, NewSession, NewUser, Session, User};
use atuin_server_database::{Database, DbError, DbResult};
use futures_util::TryStreamExt;
use metrics::counter;
use serde::{Deserialize, Serialize};
use sqlx::postgres::PgPoolOptions;
use sqlx::Row;
Expand Down Expand Up @@ -643,16 +644,41 @@ impl Database for Postgres {
const STATUS_SQL: &str =
"select host, tag, max(idx) from store where user_id = $1 group by host, tag";

let res: Vec<(Uuid, String, i64)> = sqlx::query_as(STATUS_SQL)
let mut res: Vec<(Uuid, String, i64)> = sqlx::query_as(STATUS_SQL)
.bind(user.id)
.fetch_all(&self.pool)
.await
.map_err(fix_error)?;
res.sort();

// We're temporarily increasing latency in order to improve confidence in the cache
// If it runs for a few days, and we confirm that cached values are equal to realtime, we
// can replace realtime with cached.
//
// But let's check so sync doesn't do Weird Things.

let mut cached_res: Vec<(Uuid, String, i64)> =
sqlx::query_as("select host, tag, idx from store_idx_cache where user_id = $1")
.bind(user.id)
.fetch_all(&self.pool)
.await
.map_err(fix_error)?;
cached_res.sort();

let mut status = RecordStatus::new();

for i in res {
status.set_raw(HostId(i.0), i.1, i.2 as u64);
let equal = res == cached_res;

if equal {
counter!("atuin_store_idx_cache_consistent", 1);
} else {
// log the values if we have an inconsistent cache
tracing::debug!(user = user.username, cache_match = equal, res = ?res, cached = ?cached_res, "record store index request");
counter!("atuin_store_idx_cache_inconsistent", 1);
};

for i in res.iter() {
status.set_raw(HostId(i.0), i.1.clone(), i.2 as u64);
}

Ok(status)
Expand Down

0 comments on commit c3723aa

Please sign in to comment.