Skip to content

Commit

Permalink
feat: Add event trigger to handle job cleanup on table drop in vector…
Browse files Browse the repository at this point in the history
…ize schema (#178)

* feat: Add event trigger to handle job cleanup on table drop in vectorize schema

Signed-off-by: Akhilender <[email protected]>

* fix: made requested changes-1

* fix: added corresponding integration test

* fix: minor-fix in meta.sql file

* fix: integration_test test_event_trigger_on_table_drop

* fix: remove debugging notices

---------

Signed-off-by: Akhilender <[email protected]>
  • Loading branch information
akhilender-bongirwar authored Dec 13, 2024
1 parent 4b76996 commit 4a365d3
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
27 changes: 27 additions & 0 deletions extension/sql/meta.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,33 @@ GRANT SELECT ON ALL SEQUENCES IN SCHEMA vectorize TO pg_monitor;
ALTER DEFAULT PRIVILEGES IN SCHEMA vectorize GRANT SELECT ON TABLES TO pg_monitor;
ALTER DEFAULT PRIVILEGES IN SCHEMA vectorize GRANT SELECT ON SEQUENCES TO pg_monitor;

CREATE OR REPLACE FUNCTION handle_table_drop()
RETURNS event_trigger AS $$
DECLARE
obj RECORD;
schema_name TEXT;
table_name TEXT;
BEGIN
FOR obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP
IF obj.object_type = 'table' THEN
schema_name := split_part(obj.object_identity, '.', 1);
table_name := split_part(obj.object_identity, '.', 2);

-- Perform cleanup: delete the associated job from the vectorize.job table
DELETE FROM vectorize.job
WHERE params ->> 'table' = table_name
AND params ->> 'schema' = schema_name;
END IF;
END LOOP;
END;
$$ LANGUAGE plpgsql;

DROP EVENT TRIGGER IF EXISTS vectorize_job_drop_trigger;

CREATE EVENT TRIGGER vectorize_job_drop_trigger
ON sql_drop
WHEN TAG IN ('DROP TABLE')
EXECUTE FUNCTION handle_table_drop();

INSERT INTO vectorize.prompts (prompt_type, sys_prompt, user_prompt)
VALUES (
Expand Down
63 changes: 63 additions & 0 deletions extension/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,3 +860,66 @@ async fn test_cohere() {
.unwrap();
assert_eq!(search_results.len(), 3);
}

#[ignore]
#[tokio::test]
async fn test_event_trigger_on_table_drop() {
let conn = common::init_database().await;
let mut rng = rand::thread_rng();
let test_num = rng.gen_range(1..100000);
let test_table_name = format!("products_test_{}", test_num);
let job_name = format!("job_{}", test_num);

// Initialize the test table and job
common::init_test_table(&test_table_name, &conn).await;
common::init_embedding_svc_url(&conn).await;

let _ = sqlx::query(&format!(
"SELECT vectorize.table(
job_name => '{job_name}',
\"table\" => '{test_table_name}',
primary_key => 'product_id',
columns => ARRAY['product_name'],
transformer => 'sentence-transformers/all-MiniLM-L6-v2'
);"
))
.execute(&conn)
.await
.expect("failed to initialize vectorize job");

// Check the job table before dropping the test table
let job_count_before = common::row_count("vectorize.job", &conn).await;
assert_eq!(job_count_before, 1);

// Drop the test table
let drop_result = sqlx::query(&format!("DROP TABLE {test_table_name} CASCADE;"))
.execute(&conn)
.await;
assert!(drop_result.is_ok(), "Failed to drop the test table");

// Debug: Check job table after dropping the test table
let job_count_after = common::row_count("vectorize.job", &conn).await;
assert_eq!(job_count_after, 0, "Job entry was not removed after table drop");

// Check if the job was deleted
let deleted_job = sqlx::query("SELECT * FROM vectorize.job WHERE params->>'table' = $1 AND params->>'schema' = $2")
.bind(test_table_name)
.bind("public")
.fetch_optional(&conn)
.await
.expect("Failed to fetch job");

assert!(deleted_job.is_none(), "Job was not deleted after table drop");

// Attempt to drop a non-associated table and verify no action is taken
let unrelated_table_name = format!("unrelated_test_{}", test_num);
common::init_test_table(&unrelated_table_name, &conn).await;
let _ = sqlx::query(&format!("DROP TABLE {unrelated_table_name};"))
.execute(&conn)
.await
.expect("Failed to drop the unrelated test table");

// Ensure vectorize.job is unaffected
let final_job_count = common::row_count("vectorize.job", &conn).await;
assert_eq!(final_job_count, 0, "vectorize.job should remain unaffected by unrelated table drops");
}

0 comments on commit 4a365d3

Please sign in to comment.