Skip to content

Commit

Permalink
allow setting unix socket via guc (#50)
Browse files Browse the repository at this point in the history
* bump toml

* add guc

* add test

* trim log statements

* add migrations
  • Loading branch information
ChuckHend authored Feb 13, 2024
1 parent 2c5664e commit d822e39
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pg_later"
version = "0.0.15"
version = "0.1.0"
edition = "2021"
publish = false

Expand Down
2 changes: 1 addition & 1 deletion Trunk.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "Execute SQL statements now and check the results later."
homepage = "https://github.com/tembo-io/pg_later"
documentation = "https://github.com/tembo-io/pg_later"
categories = ["orchestration"]
version = "0.0.15"
version = "0.1.0"

[build]
postgres_version = "15"
Expand Down
Empty file.
Empty file added sql/pg_later--0.0.15--0.1.0.sql
Empty file.
2 changes: 2 additions & 0 deletions src/bgw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ use sqlx::Postgres;
use std::time::Duration;

use crate::executor::{query_to_json, Job};
use crate::guc::init_guc;
use crate::util;
use anyhow::Result;

pub const PGMQ_QUEUE_NAME: &str = "pg_later_jobs";

#[pg_guard]
pub extern "C" fn _PG_init() {
init_guc();
BackgroundWorkerBuilder::new("PG Later Background Worker")
.set_function("background_worker_main")
.set_library("pg_later")
Expand Down
47 changes: 47 additions & 0 deletions src/guc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use anyhow::Result;
use core::ffi::CStr;
use pgrx::*;

pub static PGLATER_SOCKET_URL: GucSetting<Option<&CStr>> = GucSetting::<Option<&CStr>>::new(None);

// initialize GUCs
pub fn init_guc() {
GucRegistry::define_string_guc(
"pglater.host",
"unix socket url for Postgres",
"unix socket path to the Postgres instance. Optional. Can also be set in environment variable PGLATER_SOCKET_URL.",
&PGLATER_SOCKET_URL,
GucContext::Suset, GucFlags::default());
}

// for handling of GUCs that can be error prone
#[derive(Debug)]
pub enum PglaterGUC {
Host,
}

/// a convenience function to get this project's GUCs
pub fn get_guc(guc: PglaterGUC) -> Option<String> {
let val = match guc {
PglaterGUC::Host => PGLATER_SOCKET_URL.get(),
};
if let Some(cstr) = val {
if let Ok(s) = handle_cstr(cstr) {
Some(s)
} else {
error!("failed to convert CStr to str");
}
} else {
info!("no value set for GUC: {:?}", guc);
None
}
}

#[allow(dead_code)]
fn handle_cstr(cstr: &CStr) -> Result<String> {
if let Ok(s) = cstr.to_str() {
Ok(s.to_owned())
} else {
Err(anyhow::anyhow!("failed to convert CStr to str"))
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod api;
mod bgw;
mod clf;
mod executor;
mod guc;
mod util;

/// This module is required by `cargo pgrx test` invocations.
Expand Down
67 changes: 51 additions & 16 deletions src/util.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use anyhow::Result;
use pgrx::prelude::*;
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
use sqlx::{Pool, Postgres};
use std::env;

use anyhow::Result;

use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
use url::{ParseError, Url};

use crate::guc;

#[derive(Clone, Debug)]
pub struct Config {
pub pg_conn_str: String,
pub vectorize_socket_url: Option<String>,
pub env_socket_url: Option<String>,
pub guc_host: Option<String>,
}

impl Default for Config {
Expand All @@ -20,12 +21,13 @@ impl Default for Config {
"DATABASE_URL",
"postgresql://postgres:[email protected]:5432/postgres",
),
vectorize_socket_url: env::var("PGLATER_SOCKET_URL").ok(),
env_socket_url: env::var("PGLATER_SOCKET_URL").ok(),
guc_host: guc::get_guc(guc::PglaterGUC::Host),
}
}
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct PostgresSocketConnection {
user: Option<String>,
dbname: Option<String>,
Expand Down Expand Up @@ -72,6 +74,7 @@ pub async fn get_pg_conn() -> Result<Pool<Postgres>> {
pub fn get_pgc_socket_opt(socket_conn: PostgresSocketConnection) -> Result<PgConnectOptions> {
let mut opts = PgConnectOptions::new();
opts = opts.socket(socket_conn.host.expect("missing socket host"));
log!("socket options: {:?}", opts);
if socket_conn.port.is_some() {
opts = opts.port(socket_conn.port.expect("missing socket port"));
} else {
Expand All @@ -89,7 +92,6 @@ pub fn get_pgc_socket_opt(socket_conn: PostgresSocketConnection) -> Result<PgCon
}
if socket_conn.password.is_some() {
opts = opts.password(&socket_conn.password.expect("missing socket password"));
} else {
}
Ok(opts)
}
Expand All @@ -107,17 +109,50 @@ fn get_pgc_tcp_opt(url: Url) -> Result<PgConnectOptions> {

pub fn get_pg_options() -> Result<PgConnectOptions> {
let cfg = Config::default();
match cfg.vectorize_socket_url {
Some(socket_url) => {
log!("PGLATER_SOCKET_URL={:?}", socket_url);
let socket_conn = PostgresSocketConnection::from_unix_socket_string(&socket_url)
.expect("failed to parse socket url");

let guc_host: Option<String> = cfg.guc_host;
let env_socket: Option<String> = cfg.env_socket_url;
let env_url: String = cfg.pg_conn_str;

match (guc_host.as_ref(), env_socket.as_ref()) {
(Some(guc), _) => {
log!("pg-later: connecting with value from pglater.host");
let socket_conn = PostgresSocketConnection::from_unix_socket_string(&guc)
.expect("invalid value in pglater.host");
get_pgc_socket_opt(socket_conn)
}
(None, Some(env)) => {
log!("pg-later: connecting with value from env PGLATER_SOCKET_URL");
let socket_conn = PostgresSocketConnection::from_unix_socket_string(&env)
.expect("invalid value in env PGLATER_SOCKET_URL");
get_pgc_socket_opt(socket_conn)
}
None => {
log!("DATABASE_URL={}", cfg.pg_conn_str);
let url = Url::parse(&cfg.pg_conn_str)?;
(None, None) => {
log!("pg-later: connecting with value from env DATABASE_URL");
let url = Url::parse(&env_url)?;
get_pgc_tcp_opt(url)
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_connection_parsing_socket() {
let expected = PostgresSocketConnection {
user: Some("me".to_string()),
dbname: Some("pg_later_test".to_string()),
host: Some("/home/me/.pgrx".to_string()),
password: Some("pw".to_string()),
port: Some(5432),
};

let parsed = PostgresSocketConnection::from_unix_socket_string(
"postgresql:///?user=me&host=/home/me/.pgrx&password=pw&port=5432&dbname=pg_later_test",
)
.unwrap();
assert_eq!(parsed, expected);
}
}

0 comments on commit d822e39

Please sign in to comment.