Skip to content

Commit

Permalink
check if queue already exists (#28)
Browse files Browse the repository at this point in the history
* fix .table bug

* unused import

* add upgrade script
  • Loading branch information
ChuckHend authored Nov 21, 2023
1 parent 67c3132 commit 484d89e
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "vectorize"
version = "0.6.0"
version = "0.6.1"
edition = "2021"
publish = false

Expand Down
2 changes: 1 addition & 1 deletion Trunk.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "The simplest way to orchestrate vector search on Postgres."
homepage = "https://github.com/tembo-io/pg_vectorize"
documentation = "https://github.com/tembo-io/pg_vectorize"
categories = ["orchestration", "machine_learning"]
version = "0.6.0"
version = "0.6.1"

[build]
postgres_version = "15"
Expand Down
Empty file added sql/vectorize--0.6.0--0.6.1.sql
Empty file.
4 changes: 1 addition & 3 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ fn table(
// TODO: first batch update
// initialize cron
let _ = init::init_cron(&schedule, &job_name); // handle this error
Ok(format!(
"{schema}.{table}.{columns:?}.{transformer}.{search_alg}"
))
Ok(format!("Successfully created job: {job_name}"))
}

#[pg_extern]
Expand Down
23 changes: 16 additions & 7 deletions src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{query::check_input, types, types::TableMethod, types::Transformer};
use pgrx::prelude::*;
use std::collections::HashMap;

use anyhow::Result;
use anyhow::{Context, Result};
use lazy_static::lazy_static;

lazy_static! {
Expand All @@ -18,12 +18,21 @@ lazy_static! {

pub fn init_pgmq(transformer: &Transformer) -> Result<()> {
let qname = QUEUE_MAPPING.get(transformer).expect("invalid transformer");
let ran: Result<_, spi::Error> = Spi::connect(|mut c| {
let _r = c.update(&format!("SELECT pgmq.create('{qname}');"), None, None)?;
Ok(())
});
if let Err(e) = ran {
error!("error creating job queue: {}", e);
// check if queue already created:
let queue_exists: bool = Spi::get_one(&format!(
"SELECT EXISTS (SELECT 1 FROM pgmq.meta WHERE queue_name = '{qname}');",
))?
.context("error checking if queue exists")?;
if queue_exists {
return Ok(());
} else {
let ran: Result<_, spi::Error> = Spi::connect(|mut c| {
let _r = c.update(&format!("SELECT pgmq.create('{qname}');"), None, None)?;
Ok(())
});
if let Err(e) = ran {
error!("error creating job queue: {}", e);
}
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod common {
use log::LevelFilter;
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
use sqlx::ConnectOptions;
use sqlx::{FromRow, Pool, Postgres, Row};
use sqlx::{Pool, Postgres, Row};
use url::{ParseError, Url};

pub async fn connect(url: &str) -> Pool<Postgres> {
Expand Down

0 comments on commit 484d89e

Please sign in to comment.