From f39abd45c1cbd2d428e33ccb2d5df13bfbe65d38 Mon Sep 17 00:00:00 2001 From: Jay Kothari Date: Tue, 3 Oct 2023 16:03:09 -0400 Subject: [PATCH] basic setup --- .github/workflows/extension_ci.yml | 227 ++++++++--------------- Cargo.toml | 5 +- README.md | 96 +++------- clerk_fdw.control | 5 - prometheus_fdw.control | 5 + src/lib.rs | 282 ++++++++++++----------------- 6 files changed, 224 insertions(+), 396 deletions(-) delete mode 100644 clerk_fdw.control create mode 100644 prometheus_fdw.control diff --git a/.github/workflows/extension_ci.yml b/.github/workflows/extension_ci.yml index 6eb6a2f..f2a1d7f 100644 --- a/.github/workflows/extension_ci.yml +++ b/.github/workflows/extension_ci.yml @@ -1,155 +1,80 @@ -name: Clerk FDW Extension +# name: Prometheus FDW Extension -defaults: - run: - shell: bash - working-directory: ./ +# defaults: +# run: +# shell: bash +# working-directory: ./ -on: - pull_request: - branches: - - main - push: - branches: - - main - release: - types: - - created +# on: +# pull_request: +# branches: +# - main +# push: +# branches: +# - main +# release: +# types: +# - created -jobs: - lint: - name: Run linters - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v3 - - name: Install Rust minimal nightly with clippy and rustfmt - uses: actions-rs/toolchain@v1 - with: - profile: minimal - toolchain: nightly - components: rustfmt, clippy - - uses: Swatinem/rust-cache@v2 - with: - prefix-key: "clerk_fdw-extension-lint" - # Update cache directories as needed - cache-directories: | - /home/runner/.pgrx - - uses: ./.github/actions/pgx-init - with: - working-directory: ./ - - name: Cargo format - run: cargo +nightly fmt --all --check - - name: Clippy - run: cargo clippy +# jobs: +# lint: +# name: Run linters +# runs-on: ubuntu-22.04 +# steps: +# - uses: actions/checkout@v3 +# - name: Install Rust minimal nightly with clippy and rustfmt +# uses: actions-rs/toolchain@v1 +# with: +# profile: minimal +# toolchain: nightly +# components: rustfmt, clippy +# - uses: Swatinem/rust-cache@v2 +# with: +# prefix-key: "prometheus_fdw-extension-lint" +# # Update cache directories as needed +# cache-directories: | +# /home/runner/.pgrx +# - uses: ./.github/actions/pgx-init +# with: +# working-directory: ./ +# - name: Cargo format +# run: cargo +nightly fmt --all --check +# - name: Clippy +# run: cargo clippy - # figure out how to test work - # test: - # name: Run tests - # runs-on: ubuntu-22.04 - # steps: - # - uses: actions/checkout@v2 - # - name: Install Rust stable toolchain - # uses: actions-rs/toolchain@v1 - # with: - # toolchain: stable - # - uses: Swatinem/rust-cache@v2 - # with: - # prefix-key: "pgmq-extension-test" - # workspaces: | - # pgmq - # # Additional directories to cache - # cache-directories: | - # /home/runner/.pgrx - # - uses: ./.github/actions/pgx-init - # with: - # working-directory: ./ - # - name: test - # run: | - # sudo apt-get update && sudo apt-get install -y postgresql-server-dev-14 - # git clone https://github.com/pgpartman/pg_partman.git && \ - # cd pg_partman && \ - # sudo make install && cd ../ - # cp /usr/share/postgresql/14/extension/pg_partman* ~/.pgrx/15.4/pgrx-install/share/postgresql/extension/ - # cp /usr/lib/postgresql/14/lib/pg_partman_bgw.so ~/.pgrx/15.4/pgrx-install/lib/postgresql/ - # rm -rf ./target/pgrx-test-data-* || true - # pg_version=$(stoml Cargo.toml features.default) - # cargo pgrx run ${pg_version} --pgcli || true - # cargo pgrx test ${pg_version} - - publish: - # only publish release events - if: github.event_name == 'release' - name: trunk publish - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - name: Install Rust stable toolchain - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - - uses: Swatinem/rust-cache@v2 - with: - prefix-key: "clerk_fdw-extension-test" - cache-directories: | - /home/runner/.pgrx - - name: Install stoml and pg-trunk - shell: bash - run: | - set -xe - wget https://github.com/freshautomations/stoml/releases/download/v0.7.1/stoml_linux_amd64 &> /dev/null - mv stoml_linux_amd64 stoml - chmod +x stoml - sudo mv stoml /usr/local/bin/ - cargo install pg-trunk - - name: trunk build - working-directory: ./ - run: trunk build - - name: trunk publish - working-directory: ./ - env: - TRUNK_API_TOKEN: ${{ secrets.TRUNK_AUTH_TOKEN }} - run: | - ext_ver=$(stoml Cargo.toml package.version) - ext_repo=$(stoml Cargo.toml package.repository) - trunk publish clerk_fdw --version ${ext_ver} --file .trunk/clerk_fdw-${ext_ver}.tar.gz --description "Foreign Data wrapper for clerk" --homepage "https://github.com/tembo-io/clerk_fdw" --repository "https://github.com/tembo-io/clerk_fdw" --license "PostgreSQL" --category connectors - # build_and_push: - # name: Build and push images - # needs: - # - publish - # runs-on: - # - self-hosted - # - dind - # - large-8x8 - # outputs: - # short_sha: ${{ steps.versions.outputs.SHORT_SHA }} - # steps: - # - name: Check out the repo - # uses: actions/checkout@v3 - # - name: Install stoml and pg-trunk - # shell: bash - # run: | - # set -xe - # wget https://github.com/freshautomations/stoml/releases/download/v0.7.1/stoml_linux_amd64 &> /dev/null - # mv stoml_linux_amd64 stoml - # chmod +x stoml - # sudo mv stoml /usr/local/bin/ - # - name: Set version strings - # id: versions - # run: | - # echo "SHORT_SHA=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT - # echo "TAG_VER=$(/usr/local/bin/stoml Cargo.toml package.version)" >> $GITHUB_OUTPUT - # - name: Build and upload image - # run: | - # docker build --build-arg="PGMQ_VER=${{ steps.versions.outputs.TAG_VER }}" -t pgmq-pg images/pgmq-pg - # docker tag pgmq-pg quay.io/tembo/pgmq-pg:v${{ steps.versions.outputs.TAG_VER }} - # docker tag pgmq-pg quay.io/tembo/pgmq-pg:latest - # - name: Login to Quay - # uses: docker/login-action@v2 - # with: - # registry: quay.io/tembo - # username: ${{ secrets.QUAY_USER_TEMBO }} - # password: ${{ secrets.QUAY_PASSWORD_TEMBO }} - # - name: Push image - # run: | - # docker push quay.io/tembo/pgmq-pg:v${{ steps.versions.outputs.TAG_VER }} - # docker push quay.io/tembo/pgmq-pg:latest +# publish: +# # only publish release events +# if: github.event_name == 'release' +# name: trunk publish +# runs-on: ubuntu-22.04 +# steps: +# - uses: actions/checkout@v2 +# - name: Install Rust stable toolchain +# uses: actions-rs/toolchain@v1 +# with: +# toolchain: stable +# - uses: Swatinem/rust-cache@v2 +# with: +# prefix-key: "prometheus_fdw-extension-test" +# cache-directories: | +# /home/runner/.pgrx +# - name: Install stoml and pg-trunk +# shell: bash +# run: | +# set -xe +# wget https://github.com/freshautomations/stoml/releases/download/v0.7.1/stoml_linux_amd64 &> /dev/null +# mv stoml_linux_amd64 stoml +# chmod +x stoml +# sudo mv stoml /usr/local/bin/ +# cargo install pg-trunk +# - name: trunk build +# working-directory: ./ +# run: trunk build +# - name: trunk publish +# working-directory: ./ +# env: +# TRUNK_API_TOKEN: ${{ secrets.TRUNK_AUTH_TOKEN }} +# run: | +# ext_ver=$(stoml Cargo.toml package.version) +# ext_repo=$(stoml Cargo.toml package.repository) +# trunk publish prometheus_fdw --version ${ext_ver} --file .trunk/prometheus_fdw-${ext_ver}.tar.gz --description "Foreign Data wrapper for prometheus" --homepage "https://github.com/tembo-io/prometheus_fdw" --repository "https://github.com/tembo-io/prometheus_fdw" --license "PostgreSQL" --category connectors \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 556e4b5..6446d9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "clerk_fdw" -version = "0.2.4" +name = "prometheus_fdw" +version = "0.0.0" edition = "2021" [lib] @@ -17,7 +17,6 @@ pg_test = [] [dependencies] chrono = "0.4.26" -clerk-rs = "0.1.7" pgrx = "=0.9.7" reqwest = "0.11.18" reqwest-middleware = "0.2.3" diff --git a/README.md b/README.md index 8871a42..eb9dec8 100644 --- a/README.md +++ b/README.md @@ -1,95 +1,53 @@ -## Clerk_fdw - -This section is about how to use the clerk foreign data wrapper in the data warehouse to collect valuable analytics +## Prometheus_fdw ### Pre-requisistes -- have the v0.2.2 of `clerk_fdw` extension enabled in your instance +- have the v0.2.2 of `prometheus_fdw` extension enabled in your instance + +`create extension prometheus_fdw;` Create the foreign data wrapper: ``` -create foreign data wrapper clerk_wrapper - handler clerk_fdw_handler - validator clerk_fdw_validator; +create foreign data wrapper prometheus_wrapper + handler prometheus_fdw_handler + validator prometheus_fdw_validator; ``` -Connect to clerk using your credentials: - ``` -create server my_clerk_server - foreign data wrapper clerk_wrapper - options ( - api_key '') +create server my_prometheus_server + foreign data wrapper prometheus_wrapper; ``` Create Foreign Table: -### User table - -This table will store information about the users. -Note: The current limit is 500 users. This should be increased in future versions. +### Metric Labels Table ``` -create foreign table clerk_users ( - user_id text, - first_name text, - last_name text, - email text, - gender text, - created_at bigint, - updated_at bigint, - last_sign_in_at bigint, - phone_numbers bigint, - username text - ) - server my_clerk_server - options ( - object 'users' - ); - -``` - -### Organization Table - -This table will store information about the organizations. -Note: The current limit is 500 organizations. This should be increased in future versions. - -``` -create foreign table clerk_organizations ( - organization_id text, - name text, - slug text, - created_at bigint, - updated_at bigint, - created_by text +CREATE FOREIGN TABLE IF NOT EXISTS metric_labels ( + metric_id BIGINT, + metric_name TEXT NOT NULL, + metric_name_label TEXT NOT NULL, + metric_labels jsonb ) -server my_clerk_server -options ( - object 'organizations' +SERVER my_prometheus_server +OPTIONS ( + object 'metric_labels' ); ``` -### Junction Table +### Metrics Value Table -This table connects the `clerk_users` and `clerk_orgs`. It lists out all users and their roles in each organization. +NOTE: NEED TO ADD PARTIONTION TO THIS TABLE ``` -create foreign table clerk_organization_memberships ( - user_id text, - organization_id text, - role text -) -server my_clerk_server +CREATE FOREIGN TABLE IF NOT EXISTS metric_values ( + metric_id BIGINT, + metric_time TIMESTAMPTZ, + metric_value FLOAT8 + ) +server my_prometheus_server options ( - object 'organization_memberships' + object 'metric_values' ); ``` - -NOTE: There is a 0.5 second sleep timer between each request so that we do not overload clerk servers. The reponse might take a while and it is reccomended that you store the information in a local table for quick access. - -Query from the Foreign Table: -`select * from clerk_users` - -To get all members of an organization: -`select * from organization_memberships where organization_id='org_id';` diff --git a/clerk_fdw.control b/clerk_fdw.control deleted file mode 100644 index 189209b..0000000 --- a/clerk_fdw.control +++ /dev/null @@ -1,5 +0,0 @@ -comment = 'clerk_fdw: A foreign data wrapper for the clerk.com API' -default_version = '@CARGO_VERSION@' -module_pathname = '$libdir/clerk_fdw' -relocatable = false -superuser = true diff --git a/prometheus_fdw.control b/prometheus_fdw.control new file mode 100644 index 0000000..1f1f461 --- /dev/null +++ b/prometheus_fdw.control @@ -0,0 +1,5 @@ +comment = 'prometheus_fdw: A foreign data wrapper for prometheus' +default_version = '@CARGO_VERSION@' +module_pathname = '$libdir/prometheus_fdw' +relocatable = false +superuser = true diff --git a/src/lib.rs b/src/lib.rs index 068bd28..330e7c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,48 +93,27 @@ fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> Vec { let mut result = Vec::new(); match obj { - "users" => { + "metric_labels" => { result = body_to_rows( resp, "data", vec![ - ("id", "user_id", "string"), - ("first_name", "first_name", "string"), - ("last_name", "last_name", "string"), - ("email_addresses", "email", "string"), - ("gender", "gender", "string"), - ("created_at", "created_at", "i64"), - ("updated_at", "updated_at", "i64"), - ("last_sign_in_at", "last_sign_in_at", "i64"), - ("phone_numbers", "phone_numbers", "i64"), - ("username", "username", "string"), + ("id", "metric_id", "i64"), + ("metric_name", "metric_name", "string"), + ("metric_name_label", "metric_name_label", "string"), + ("metric_labels", "metric_labels", "json"), ], tgt_cols, ); } - "organizations" => { + "metric_values" => { result = body_to_rows( resp, "data", vec![ - ("id", "organization_id", "string"), - ("name", "name", "string"), - ("slug", "slug", "string"), - ("created_at", "created_at", "i64"), - ("updated_at", "updated_at", "i64"), - ("created_by", "created_by", "string"), - ], - tgt_cols, - ); - } - "organization_memberships" => { - result = body_to_rows( - resp, - "data", - vec![ - ("public_user_data.user_id", "user_id", "string"), - ("organization.id", "organization_id", "string"), - ("role", "role", "string"), + ("id", "metric_id", "i64"), + ("timestamp", "timestamp", "i64"), + ("value", "value", "i64"), ], tgt_cols, ); @@ -148,88 +127,144 @@ fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> Vec { } #[wrappers_fdw( - version = "0.2.4", + version = "0.0.0", author = "Jay Kothari", website = "https://tembo.io" )] -pub(crate) struct ClerkFdw { +pub(crate) struct PrometheusFdw { rt: Runtime, - token: Option, client: Option, scan_result: Option>, tgt_cols: Vec, } -impl ClerkFdw { - const DEFAULT_BASE_URL: &'static str = "https://api.clerk.com/v1"; +impl PrometheusFdw { + const DEFAULT_BASE_URL: &'static str = + "https://prometheus-control-1.use1.dev.plat.cdb-svc.com/"; + + fn map_operator(op: &str) -> &str { + match op { + "=" => "=\"", + "!=" => "!=\"", + ">" => ">\"", + "<" => "<\"", + ">=" => ">=\"", + "<=" => "<=\"", + _ => { + println!("unsupported operator: {}", op); + "\"" + } + } + } + + fn value_to_promql_string(value: &supabase_wrappers::interface::Value) -> String { + match value { + supabase_wrappers::interface::Value::Cell(cell) => match cell { + supabase_wrappers::interface::Cell::String(s) => s.clone(), + supabase_wrappers::interface::Cell::I8(i) => i.to_string(), + supabase_wrappers::interface::Cell::I16(i) => i.to_string(), + supabase_wrappers::interface::Cell::I32(i) => i.to_string(), + supabase_wrappers::interface::Cell::I64(i) => i.to_string(), + supabase_wrappers::interface::Cell::F32(f) => f.to_string(), + supabase_wrappers::interface::Cell::F64(f) => f.to_string(), + supabase_wrappers::interface::Cell::Bool(b) => b.to_string(), + supabase_wrappers::interface::Cell::Date(d) => d.to_string(), + supabase_wrappers::interface::Cell::Timestamp(ts) => ts.to_string(), + supabase_wrappers::interface::Cell::Json(j) => { + match serde_json::to_string(j) { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to serialize JsonB to String: {}", e); + String::new() // Return an empty string on error + } + } + } + supabase_wrappers::interface::Cell::Numeric(n) => n.to_string(), + }, + supabase_wrappers::interface::Value::Array(cells) => { + // Join the string representations of the cells with commas + cells + .iter() + .map(|cell| match cell { + supabase_wrappers::interface::Cell::String(s) => s.clone(), + supabase_wrappers::interface::Cell::I8(i) => i.to_string(), + supabase_wrappers::interface::Cell::I16(i) => i.to_string(), + supabase_wrappers::interface::Cell::I32(i) => i.to_string(), + supabase_wrappers::interface::Cell::I64(i) => i.to_string(), + supabase_wrappers::interface::Cell::F32(f) => f.to_string(), + supabase_wrappers::interface::Cell::F64(f) => f.to_string(), + supabase_wrappers::interface::Cell::Bool(b) => b.to_string(), + supabase_wrappers::interface::Cell::Date(d) => d.to_string(), + supabase_wrappers::interface::Cell::Timestamp(ts) => ts.to_string(), + supabase_wrappers::interface::Cell::Json(j) => { + match serde_json::to_string(j) { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to serialize JsonB to String: {}", e); + String::new() // Return an empty string on error + } + } + } + supabase_wrappers::interface::Cell::Numeric(n) => n.to_string(), + }) + .collect::>() + .join(",") + } + } + } - // TODO: will have to incorportate offset at some point - const PAGE_SIZE: usize = 500; + fn build_url(&self, obj: &str, _options: &HashMap, quals: &[Qual]) -> String { + let base_url = "https://prometheus-control-1.use1.dev.plat.cdb-svc.com/api/v1/query"; - fn build_url(&self, obj: &str, options: &HashMap) -> String { match obj { - "users" => { - let base_url = Self::DEFAULT_BASE_URL.to_owned(); - let ret = format!("{}/users?limit={}", base_url, Self::PAGE_SIZE,); - ret - } - "organizations" => { - let base_url = Self::DEFAULT_BASE_URL.to_owned(); - let ret = format!("{}/organizations?limit={}", base_url, Self::PAGE_SIZE,); - ret - } - "organization_memberships" => { - let base_url = Self::DEFAULT_BASE_URL.to_owned(); - let org_id = options - .get("organization_id") - .expect("Organization ID required"); - let ret = format!( - "{}/organizations/{}/memberships?limit={}", - base_url, - org_id, - Self::PAGE_SIZE - ); - ret + "metric_labels" | "metric_values" => { + // Find the metric_name filter from quals + let metric_name_filter = quals + .iter() + .find(|qual| qual.field == "metric_name" && qual.operator == "="); + + // If a metric_name filter is found, build the query URL + if let Some(metric_name_qual) = metric_name_filter { + let metric_name = Self::value_to_promql_string(&metric_name_qual.value); + let ret = format!("{}?query={}", base_url, metric_name); + ret + } else { + println!("No metric_name filter found in quals"); + "".to_string() + } } _ => { - warning!("unsupported object: {:#?}", obj); - return "".to_string(); + println!("unsupported object: {:#?}", obj); + "".to_string() } } } + + // Helper function to map SQL operators to PromQL operators } -impl ForeignDataWrapper for ClerkFdw { - fn new(options: &HashMap) -> Self { +impl ForeignDataWrapper for PrometheusFdw { + fn new(_options: &HashMap) -> Self { let mut ret = Self { rt: create_async_runtime(), - token: None, client: None, tgt_cols: Vec::new(), scan_result: None, }; - let token = if let Some(access_token) = options.get("api_key") { - access_token.to_owned() - } else { - warning!("Cannot find api_key in options"); - let access_token = env::var("CLERK_API_KEY").unwrap(); - access_token - }; - - ret.token = Some(token); - // create client let client = reqwest::Client::new(); ret.client = Some(client); + warning!("created client"); + ret } fn begin_scan( &mut self, - _quals: &[Qual], + quals: &[Qual], columns: &[Column], _sorts: &[Sort], _limit: &Option, @@ -242,102 +277,13 @@ impl ForeignDataWrapper for ClerkFdw { self.scan_result = None; self.tgt_cols = columns.to_vec(); - let api_key = self.token.as_ref().unwrap(); + let api_key = "".to_string(); if let Some(client) = &self.client { let mut result = Vec::new(); - if obj == "organization_memberships" { - // Get all organizations first - let org_url = self.build_url("organizations", options); - - self.rt.block_on(async { - let org_resp = client - .get(&org_url) - .header("Authorization", format!("Bearer {}", api_key)) - .send() - .await; - - if let Ok(org_res) = org_resp { - if org_res.status().is_success() { - let org_body = org_res.text().await.unwrap(); - let org_json: JsonValue = serde_json::from_str(&org_body).unwrap(); - - if let Some(org_data) = - org_json.get("data").and_then(|data| data.as_array()) - { - for org in org_data { - if let Some(org_id) = org.get("id").and_then(|id| id.as_str()) { - // Build the URL for memberships using org_id - let membership_url = format!( - "{}/organizations/{}/memberships?limit={}", - Self::DEFAULT_BASE_URL, - org_id, - Self::PAGE_SIZE - ); - - let membership_resp = client - .get(&membership_url) - .header("Authorization", format!("Bearer {}", api_key)) - .send() - .await; - - match membership_resp { - Ok(mem_res) => { - if mem_res.status().is_success() { - let mem_body = mem_res.text().await.unwrap(); - let mem_json: JsonValue = - serde_json::from_str(&mem_body).unwrap(); - // info!("mem_json: {:#?}", mem_json); - - let mut rows = resp_to_rows( - &obj, - &mem_json, - &self.tgt_cols[..], - ); - result.append(&mut rows); - } - } - Err(_) => continue, - }; - - // Introduce a delay of 0.05 seconds - std::thread::sleep(std::time::Duration::from_millis(50)); - } - } - } - } - } - }); - } else { - let url = self.build_url(&obj, options); - - // this is where i need to make changes - self.rt.block_on(async { - let resp = client - .get(&url) - .header("Authorization", format!("Bearer {}", api_key)) - .send() - .await; - - match resp { - Ok(res) => { - if res.status().is_success() { - let body = res.text().await.unwrap(); - let json: JsonValue = serde_json::from_str(&body).unwrap(); - let mut rows = resp_to_rows(&obj, &json, &self.tgt_cols[..]); - result.append(&mut rows); - } else { - warning!("Failed request with status: {}", res.status()); - } - } - Err(error) => { - warning!("Error: {:#?}", error); - return; - } - }; - }); - } + let url = self.build_url("metric_labels", options, quals); + warning!("url: {}", url); self.scan_result = Some(result); }