Skip to content

Commit

Permalink
pgpool. manually set search_path (schema parameter)
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Dec 30, 2023
1 parent bf48e43 commit ea9b0d8
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 15 deletions.
3 changes: 2 additions & 1 deletion ingest/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jitsucom/bulker/eventslog"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/logging"
"github.com/jitsucom/bulker/jitsubase/pg"
"github.com/jitsucom/bulker/jitsubase/utils"
"github.com/jitsucom/bulker/kafkabase"
"net/http"
Expand All @@ -34,7 +35,7 @@ func (a *Context) InitContext(settings *appbase.AppSettings) error {
if err != nil {
return err
}
a.dbpool, err = pgxpool.New(context.Background(), a.config.DatabaseURL)
a.dbpool, err = pg.NewPGPool(a.config.DatabaseURL)
if err != nil {
return fmt.Errorf("Unable to create postgres connection pool: %v\n", err)
}
Expand Down
2 changes: 1 addition & 1 deletion ingest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/gin-gonic/gin v1.9.1
github.com/jackc/pgx/v5 v5.4.3
github.com/jackc/pgx/v5 v5.5.1
github.com/json-iterator/go v1.1.12
github.com/penglongli/gin-metrics v0.1.10
github.com/prometheus/client_golang v1.17.0
Expand Down
2 changes: 1 addition & 1 deletion ingest/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.5.1 h1:5I9etrGkLrN+2XPCsi6XLlV5DITbSL/xBZdmAxFcXPI=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
Expand Down
6 changes: 5 additions & 1 deletion jitsubase/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/gin-gonic/gin v1.9.1
github.com/google/martian v2.1.0+incompatible
github.com/google/uuid v1.5.0
github.com/jackc/pgx/v5 v5.5.1
github.com/joomcode/errorx v1.1.1
github.com/json-iterator/go v1.1.12
github.com/mitchellh/hashstructure/v2 v2.0.2
Expand All @@ -31,6 +32,9 @@ require (
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand All @@ -54,9 +58,9 @@ require (
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
5 changes: 5 additions & 0 deletions jitsubase/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgx/v5 v5.5.1 h1:5I9etrGkLrN+2XPCsi6XLlV5DITbSL/xBZdmAxFcXPI=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/joomcode/errorx v1.1.1 h1:/LFG/qSk1gUTuZjs+qlyOJEpcVjD9DXgBNFhdZkQrjY=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
Expand Down Expand Up @@ -341,6 +345,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
35 changes: 35 additions & 0 deletions jitsubase/pg/pgpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package pg

import (
"context"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
"regexp"
)

var schemaRegex = regexp.MustCompile(`(?:search_path|schema)=([^$]+)`)

func extractSchema(url string) string {
parts := schemaRegex.FindStringSubmatch(url)
if len(parts) == 2 {
return parts[1]
} else {
return ""
}
}

func NewPGPool(url string) (*pgxpool.Pool, error) {
pgCfg, err := pgxpool.ParseConfig(url)
if err != nil {
return nil, fmt.Errorf("Unable to create postgres connection pool: %v\n", err)
}
schema := extractSchema(url)
if schema != "" {
pgCfg.ConnConfig.RuntimeParams["search_path"] = schema
}
dbpool, err := pgxpool.NewWithConfig(context.Background(), pgCfg)
if err != nil {
return nil, fmt.Errorf("Unable to create postgres connection pool: %v\n", err)
}
return dbpool, nil
}
3 changes: 2 additions & 1 deletion sync-controller/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/pg"
"net/http"
"time"
)
Expand All @@ -24,7 +25,7 @@ func (a *Context) InitContext(settings *appbase.AppSettings) error {
if err != nil {
return err
}
a.dbpool, err = pgxpool.New(context.Background(), a.config.DatabaseURL)
a.dbpool, err = pg.NewPGPool(a.config.DatabaseURL)
if err != nil {
return fmt.Errorf("Unable to create postgres connection pool: %v\n", err)
}
Expand Down
3 changes: 2 additions & 1 deletion sync-controller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ go 1.21

require (
github.com/gin-gonic/gin v1.9.1
github.com/jackc/pgx/v5 v5.4.3
github.com/hjson/hjson-go/v4 v4.3.1
github.com/jackc/pgx/v5 v5.5.1
github.com/mitchellh/mapstructure v1.5.0
github.com/spf13/viper v1.17.0
k8s.io/api v0.28.3
Expand Down
3 changes: 2 additions & 1 deletion sync-controller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hjson/hjson-go/v4 v4.3.1 h1:wfmDwHGxjzmYKXRFL0Qr9nonY/Xxe5y7IalwjlY7ekA=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.5.1 h1:5I9etrGkLrN+2XPCsi6XLlV5DITbSL/xBZdmAxFcXPI=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand Down
2 changes: 1 addition & 1 deletion sync-sidecar/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

require (
github.com/google/uuid v1.5.0
github.com/jackc/pgx/v5 v5.4.3
github.com/jackc/pgx/v5 v5.5.1
)

require (
Expand Down
2 changes: 1 addition & 1 deletion sync-sidecar/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.5.1 h1:5I9etrGkLrN+2XPCsi6XLlV5DITbSL/xBZdmAxFcXPI=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
Expand Down
5 changes: 2 additions & 3 deletions sync-sidecar/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package main

import (
"bufio"
"context"
"encoding/json"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jitsucom/bulker/jitsubase/pg"
"github.com/jitsucom/bulker/sync-sidecar/db"
"io"
"net/url"
Expand Down Expand Up @@ -116,7 +115,7 @@ type ReadSideCar struct {

func (s *ReadSideCar) Run() {
var err error
s.dbpool, err = pgxpool.New(context.Background(), s.databaseURL)
s.dbpool, err = pg.NewPGPool(s.databaseURL)
if err != nil {
s.panic("Unable to create postgres connection pool: %v", err)
}
Expand Down
5 changes: 2 additions & 3 deletions sync-sidecar/spec_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package main

import (
"bufio"
"context"
"encoding/json"
"fmt"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jitsucom/bulker/jitsubase/pg"
"github.com/jitsucom/bulker/sync-sidecar/db"
"os"
"strings"
Expand All @@ -19,7 +18,7 @@ type SpecCatalogSideCar struct {

func (s *SpecCatalogSideCar) Run() {
var err error
s.dbpool, err = pgxpool.New(context.Background(), s.databaseURL)
s.dbpool, err = pg.NewPGPool(s.databaseURL)
if err != nil {
s.panic("Unable to create postgres connection pool: %v", err)
}
Expand Down

0 comments on commit ea9b0d8

Please sign in to comment.