Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement event trigger to remove jobs from vectorize.job upon table deletion #164

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions extension/sql/meta.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,34 @@ CREATE TABLE vectorize.job (
last_completion TIMESTAMP WITH TIME ZONE
);

-- create an event trigger function to delete jobs when corresponding tables are dropped
CREATE OR REPLACE FUNCTION after_drop_trigger()
RETURNS event_trigger AS $$
DECLARE
dropped_table_name TEXT;
dropped_table_schema TEXT;
BEGIN
-- Get the name and schema of the table being dropped
FOR dropped_table_name, dropped_table_schema IN
SELECT objid::regclass::text, nspname
FROM pg_event_trigger_dropped_objects()
JOIN pg_class ON objid = pg_class.oid
JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid
WHERE classid = 'pg_class'::regclass
LOOP
DELETE FROM vectorize.job
WHERE LOWER(params ->> 'table') = LOWER(dropped_table_name)
AND LOWER(params ->> 'schema') = LOWER(dropped_table_schema);
END LOOP;
END;
$$ LANGUAGE plpgsql;

-- create the event trigger for DROP TABLE events
CREATE EVENT TRIGGER trg_after_drop
ON sql_drop
WHEN TAG IN ('DROP TABLE')
EXECUTE FUNCTION after_drop_trigger();

CREATE TABLE vectorize.prompts (
prompt_type TEXT NOT NULL UNIQUE,
sys_prompt TEXT NOT NULL,
Expand Down
54 changes: 54 additions & 0 deletions extension/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,60 @@ async fn test_scheduled_job() {
assert_eq!(result.rows_affected(), 3);
}

#[ignore]
#[tokio::test]
async fn test_drop_table_triggers_job_deletion() {
let conn = common::init_database().await;
let mut rng = rand::thread_rng();
let test_num = rng.gen_range(1..100000);
let test_table_name = format!("drop_test_table_{}", test_num);
let job_name = format!("job_{}", test_num);

common::init_test_table(&test_table_name, &conn).await;

let insert_job_query = format!(
"INSERT INTO vectorize.job (name, index_dist_type, transformer, search_alg, params, last_completion)
VALUES ('{job_name}', 'pgv_hsnw_cosine', 'sentence-transformers/all-MiniLM-L6-v2', 'search_algorithm',
jsonb_build_object('table', '{test_table_name}', 'schema', 'public'), NOW());"
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a select vectorize.table(...) instead, so that the test stays current with the APIs around that table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@varshith257 - i think replace this direct insert with vectorize.table(). It should help the test move along as it is currently failing because the search_alg column no longer exists (it was removed from the project).

sqlx::query(&insert_job_query)
.execute(&conn)
.await
.expect("failed to insert job");

// Check row count in vectorize.job before dropping the table
let rowcount_before = common::row_count("vectorize.job", &conn).await;
assert!(rowcount_before >= 1);

// Drop the test table
let drop_table_query = format!("DROP TABLE public.{test_table_name};");
sqlx::query(&drop_table_query)
.execute(&conn)
.await
.expect("failed to drop table");

// Check row count in vectorize.job after dropping the table
let rowcount_after = common::row_count("vectorize.job", &conn).await;
assert_eq!(
rowcount_after,
rowcount_before - 1,
"Job was not deleted after table drop"
);

// Verify the specific job no longer exists in vectorize.job
let job_exists: bool = sqlx::query_scalar(&format!(
"SELECT EXISTS (SELECT 1 FROM vectorize.job WHERE name = '{job_name}');"
))
.fetch_one(&conn)
.await
.expect("failed to check job existence");

assert!(
!job_exists,
"Job associated with table `{test_table_name}` was not deleted after table drop"
);
}

#[ignore]
#[tokio::test]
async fn test_scheduled_single_table() {
Expand Down
Loading