From 484d89e700dd8016df25d70b9b7b4ceb8001e2b9 Mon Sep 17 00:00:00 2001 From: Adam Hendel Date: Tue, 21 Nov 2023 15:22:30 -0600 Subject: [PATCH] check if queue already exists (#28) * fix .table bug * unused import * add upgrade script --- Cargo.toml | 2 +- Trunk.toml | 2 +- sql/vectorize--0.6.0--0.6.1.sql | 0 src/api.rs | 4 +--- src/init.rs | 23 ++++++++++++++++------- tests/util.rs | 2 +- 6 files changed, 20 insertions(+), 13 deletions(-) create mode 100644 sql/vectorize--0.6.0--0.6.1.sql diff --git a/Cargo.toml b/Cargo.toml index e4dd8e3..d0132d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "vectorize" -version = "0.6.0" +version = "0.6.1" edition = "2021" publish = false diff --git a/Trunk.toml b/Trunk.toml index 62f3c17..107d83b 100644 --- a/Trunk.toml +++ b/Trunk.toml @@ -6,7 +6,7 @@ description = "The simplest way to orchestrate vector search on Postgres." homepage = "https://github.com/tembo-io/pg_vectorize" documentation = "https://github.com/tembo-io/pg_vectorize" categories = ["orchestration", "machine_learning"] -version = "0.6.0" +version = "0.6.1" [build] postgres_version = "15" diff --git a/sql/vectorize--0.6.0--0.6.1.sql b/sql/vectorize--0.6.0--0.6.1.sql new file mode 100644 index 0000000..e69de29 diff --git a/src/api.rs b/src/api.rs index 00b909b..c1b7d29 100644 --- a/src/api.rs +++ b/src/api.rs @@ -130,9 +130,7 @@ fn table( // TODO: first batch update // initialize cron let _ = init::init_cron(&schedule, &job_name); // handle this error - Ok(format!( - "{schema}.{table}.{columns:?}.{transformer}.{search_alg}" - )) + Ok(format!("Successfully created job: {job_name}")) } #[pg_extern] diff --git a/src/init.rs b/src/init.rs index d9bf4c6..cb8ce0b 100644 --- a/src/init.rs +++ b/src/init.rs @@ -2,7 +2,7 @@ use crate::{query::check_input, types, types::TableMethod, types::Transformer}; use pgrx::prelude::*; use std::collections::HashMap; -use anyhow::Result; +use anyhow::{Context, Result}; use lazy_static::lazy_static; lazy_static! { @@ -18,12 +18,21 @@ lazy_static! { pub fn init_pgmq(transformer: &Transformer) -> Result<()> { let qname = QUEUE_MAPPING.get(transformer).expect("invalid transformer"); - let ran: Result<_, spi::Error> = Spi::connect(|mut c| { - let _r = c.update(&format!("SELECT pgmq.create('{qname}');"), None, None)?; - Ok(()) - }); - if let Err(e) = ran { - error!("error creating job queue: {}", e); + // check if queue already created: + let queue_exists: bool = Spi::get_one(&format!( + "SELECT EXISTS (SELECT 1 FROM pgmq.meta WHERE queue_name = '{qname}');", + ))? + .context("error checking if queue exists")?; + if queue_exists { + return Ok(()); + } else { + let ran: Result<_, spi::Error> = Spi::connect(|mut c| { + let _r = c.update(&format!("SELECT pgmq.create('{qname}');"), None, None)?; + Ok(()) + }); + if let Err(e) = ran { + error!("error creating job queue: {}", e); + } } Ok(()) } diff --git a/tests/util.rs b/tests/util.rs index fd9a610..045c71a 100644 --- a/tests/util.rs +++ b/tests/util.rs @@ -2,7 +2,7 @@ pub mod common { use log::LevelFilter; use sqlx::postgres::{PgConnectOptions, PgPoolOptions}; use sqlx::ConnectOptions; - use sqlx::{FromRow, Pool, Postgres, Row}; + use sqlx::{Pool, Postgres, Row}; use url::{ParseError, Url}; pub async fn connect(url: &str) -> Pool {