diff --git a/extension/sql/meta.sql b/extension/sql/meta.sql index 0570668..322a67f 100644 --- a/extension/sql/meta.sql +++ b/extension/sql/meta.sql @@ -20,6 +20,33 @@ GRANT SELECT ON ALL SEQUENCES IN SCHEMA vectorize TO pg_monitor; ALTER DEFAULT PRIVILEGES IN SCHEMA vectorize GRANT SELECT ON TABLES TO pg_monitor; ALTER DEFAULT PRIVILEGES IN SCHEMA vectorize GRANT SELECT ON SEQUENCES TO pg_monitor; +CREATE OR REPLACE FUNCTION handle_table_drop() +RETURNS event_trigger AS $$ +DECLARE + obj RECORD; + schema_name TEXT; + table_name TEXT; +BEGIN + FOR obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP + IF obj.object_type = 'table' THEN + schema_name := split_part(obj.object_identity, '.', 1); + table_name := split_part(obj.object_identity, '.', 2); + + -- Perform cleanup: delete the associated job from the vectorize.job table + DELETE FROM vectorize.job + WHERE params ->> 'table' = table_name + AND params ->> 'schema' = schema_name; + END IF; + END LOOP; +END; +$$ LANGUAGE plpgsql; + +DROP EVENT TRIGGER IF EXISTS vectorize_job_drop_trigger; + +CREATE EVENT TRIGGER vectorize_job_drop_trigger +ON sql_drop +WHEN TAG IN ('DROP TABLE') +EXECUTE FUNCTION handle_table_drop(); INSERT INTO vectorize.prompts (prompt_type, sys_prompt, user_prompt) VALUES ( diff --git a/extension/tests/integration_tests.rs b/extension/tests/integration_tests.rs index 6361682..8828b59 100644 --- a/extension/tests/integration_tests.rs +++ b/extension/tests/integration_tests.rs @@ -860,3 +860,66 @@ async fn test_cohere() { .unwrap(); assert_eq!(search_results.len(), 3); } + +#[ignore] +#[tokio::test] +async fn test_event_trigger_on_table_drop() { + let conn = common::init_database().await; + let mut rng = rand::thread_rng(); + let test_num = rng.gen_range(1..100000); + let test_table_name = format!("products_test_{}", test_num); + let job_name = format!("job_{}", test_num); + + // Initialize the test table and job + common::init_test_table(&test_table_name, &conn).await; + common::init_embedding_svc_url(&conn).await; + + let _ = sqlx::query(&format!( + "SELECT vectorize.table( + job_name => '{job_name}', + \"table\" => '{test_table_name}', + primary_key => 'product_id', + columns => ARRAY['product_name'], + transformer => 'sentence-transformers/all-MiniLM-L6-v2' + );" + )) + .execute(&conn) + .await + .expect("failed to initialize vectorize job"); + + // Check the job table before dropping the test table + let job_count_before = common::row_count("vectorize.job", &conn).await; + assert_eq!(job_count_before, 1); + + // Drop the test table + let drop_result = sqlx::query(&format!("DROP TABLE {test_table_name} CASCADE;")) + .execute(&conn) + .await; + assert!(drop_result.is_ok(), "Failed to drop the test table"); + + // Debug: Check job table after dropping the test table + let job_count_after = common::row_count("vectorize.job", &conn).await; + assert_eq!(job_count_after, 0, "Job entry was not removed after table drop"); + + // Check if the job was deleted + let deleted_job = sqlx::query("SELECT * FROM vectorize.job WHERE params->>'table' = $1 AND params->>'schema' = $2") + .bind(test_table_name) + .bind("public") + .fetch_optional(&conn) + .await + .expect("Failed to fetch job"); + + assert!(deleted_job.is_none(), "Job was not deleted after table drop"); + + // Attempt to drop a non-associated table and verify no action is taken + let unrelated_table_name = format!("unrelated_test_{}", test_num); + common::init_test_table(&unrelated_table_name, &conn).await; + let _ = sqlx::query(&format!("DROP TABLE {unrelated_table_name};")) + .execute(&conn) + .await + .expect("Failed to drop the unrelated test table"); + + // Ensure vectorize.job is unaffected + let final_job_count = common::row_count("vectorize.job", &conn).await; + assert_eq!(final_job_count, 0, "vectorize.job should remain unaffected by unrelated table drops"); +}