Skip to content

Commit

Permalink
bulker: add support for postgres configuration source
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 3, 2024
1 parent 771f252 commit 1af5d38
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 14 deletions.
8 changes: 8 additions & 0 deletions bulkerapp/app/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,18 @@ type Config struct {
// ConfigSource source of destinations configs. Can be:
// - `file://...` for destinations config in yaml format
// - `redis` or `redis://redis_url` to load configs from redis `enrichedConnections` key
// - postgresql://postgres_url to load configs from postgresql
// - `env://PREFIX` to load each destination environment variables with like `PREFIX_ID` where ID is destination id
//
// Default: `env://BULKER_DESTINATION`
ConfigSource string `mapstructure:"CONFIG_SOURCE"`
// ConfigSourceSQLQuery for `postgresql` config source, SQL query to load connections
ConfigSourceSQLQuery string `mapstructure:"CONFIG_SOURCE_SQL_QUERY" default:"select * from enriched_connections"`
// CacheDir dir for config source data
CacheDir string `mapstructure:"CACHE_DIR"`
// ConfigRefreshPeriodSec how often config source will check for new configs. Supported by `postgresql` config sources
ConfigRefreshPeriodSec int `mapstructure:"CONFIG_REFRESH_PERIOD_SEC" default:"5"`

// RedisURL that will be used by default by all services that need Redis
RedisURL string `mapstructure:"REDIS_URL"`
RedisTLSCA string `mapstructure:"REDIS_TLS_CA"`
Expand Down
6 changes: 6 additions & 0 deletions bulkerapp/app/configuration_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func InitConfigurationSource(config *Config) (ConfigurationSource, error) {
if err != nil {
return nil, fmt.Errorf("❗error creating yaml configuration source from config file: %s: %v", filePath, err)
}
} else if strings.HasPrefix(cfgSource, "postgres") {
var err error
configurationSource, err = NewPostgresConfigurationSource(config)
if err != nil {
return nil, fmt.Errorf("❗️error while init postgres configuration source: %s: %v", cfgSource, err)
}
} else if strings.HasPrefix(cfgSource, "redis://") || strings.HasPrefix(cfgSource, "rediss://") {
var err error
configurationSource, err = NewRedisConfigurationSource(config)
Expand Down
231 changes: 231 additions & 0 deletions bulkerapp/app/postgres_configuration_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package app

import (
"context"
"encoding/json"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jitsucom/bulker/bulkerapp/metrics"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/pg"
"github.com/jitsucom/bulker/jitsubase/safego"
jsoniter "github.com/json-iterator/go"
"io"
"os"
"path"
"sync/atomic"
"time"
)

const SQLLastUpdatedQuery = `select * from last_updated`

type PostgresConfigurationSource struct {
appbase.Service
dbpool *pgxpool.Pool
changesChan chan bool
refreshPeriodSec int
inited atomic.Bool
cacheDir string
sqlQuery string
connections atomic.Pointer[map[string]*DestinationConfig]
lastModified atomic.Pointer[time.Time]
closed chan struct{}
}

type RepositoryCache struct {
Connections map[string]*DestinationConfig `json:"destinations"`
}

func NewPostgresConfigurationSource(appconfig *Config) (*PostgresConfigurationSource, error) {
base := appbase.NewServiceBase("repository")
dbpool, err := pg.NewPGPool(appconfig.ConfigSource)
if err != nil {
return nil, base.NewError("Unable to create postgres connection pool: %v\n", err)
}
r := &PostgresConfigurationSource{
Service: base,
dbpool: dbpool,
refreshPeriodSec: appconfig.ConfigRefreshPeriodSec,
changesChan: make(chan bool, 1),
cacheDir: appconfig.CacheDir,
sqlQuery: appconfig.ConfigSourceSQLQuery,
closed: make(chan struct{}),
}
r.refresh(false)
r.start()
return r, nil
}

func (r *PostgresConfigurationSource) loadCached() {
file, err := os.Open(path.Join(r.cacheDir, "repository.json"))
if err != nil {
r.Fatalf("Error opening cached repository: %v\nCannot serve without repository. Exitting...", err)
return
}
stat, err := file.Stat()
if err != nil {
r.Fatalf("Error getting cached repository info: %v\nCannot serve without repository. Exitting...", err)
return
}
fileSize := stat.Size()
if fileSize == 0 {
r.Fatalf("Cached repository is empty\nCannot serve without repository. Exitting...")
return
}
payload, err := io.ReadAll(file)
if err != nil {
r.Fatalf("Error reading cached script: %v\nCannot serve without repository. Exitting...", err)
return
}
repositoryCache := RepositoryCache{}
err = jsoniter.Unmarshal(payload, &repositoryCache)
if err != nil {
r.Fatalf("Error unmarshalling cached repository: %v\nCannot serve without repository. Exitting...", err)
return
}
r.connections.Store(&repositoryCache.Connections)
r.inited.Store(true)
r.Infof("Loaded cached repository data: %d bytes, last modified: %v", fileSize, stat.ModTime())
}

func (r *PostgresConfigurationSource) storeCached(payload RepositoryCache) {
filePath := path.Join(r.cacheDir, "repository.json")
err := os.MkdirAll(r.cacheDir, 0755)
if err != nil {
r.Errorf("Cannot write cached repository to %s: cannot make dir: %v", filePath, err)
return
}
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
r.Errorf("Cannot write cached repository to %s: %v", filePath, err)
return
}
err = json.NewEncoder(file).Encode(payload)
if err != nil {
r.Errorf("Cannot write cached repository to %s: %v", filePath, err)
return
}
err = file.Sync()
if err != nil {
r.Errorf("Cannot write cached script to %s: %v", filePath, err)
return
}
}

func (r *PostgresConfigurationSource) refresh(notify bool) {
start := time.Now()
connections := map[string]*DestinationConfig{}
var err error
defer func() {
if err != nil {
r.Errorf("Error refreshing repository: %v", err)
metrics.ConfigurationSourceError("error").Inc()
if !r.inited.Load() {
if r.cacheDir != "" {
r.loadCached()
} else {
r.Fatalf("Cannot load cached repository. No CACHE_DIR is set. Cannot serve without repository. Exitting...")
}
}
} else {
r.Debugf("Refreshed in %v", time.Now().Sub(start))
}
}()
ifModifiedSince := r.lastModified.Load()
var lastModified time.Time
err = r.dbpool.QueryRow(context.Background(), SQLLastUpdatedQuery).Scan(&lastModified)
if err != nil {
err = r.NewError("Error querying last updated: %v", err)
return
} else if errors.Is(err, pgx.ErrNoRows) || lastModified.IsZero() {
//Failed to load repository last updated date. Probably database has no records yet.
r.connections.Store(&connections)
r.inited.Store(true)
return
}
if ifModifiedSince != nil && lastModified.Compare(*ifModifiedSince) <= 0 {
return
}
r.Infof("Config updated: %s previous update date: %s`", lastModified, ifModifiedSince)

rows, err := r.dbpool.Query(context.Background(), r.sqlQuery)
if err != nil {
err = r.NewError("Error querying connections: %v", err)
return
}
defer rows.Close()
for rows.Next() {
var connectionId string
var connectionConfig string
var tp string
err = rows.Scan(&connectionId, &tp, &connectionConfig)
if err != nil {
err = r.NewError("Error scanning row: %v", err)
return
}
//r.Infof("Stream %s: %s", streamId, streamConfig)
c := DestinationConfig{}
err = jsoniter.UnmarshalFromString(connectionConfig, &c)
if err != nil {
metrics.ConfigurationSourceError("parse_error").Inc()
r.Errorf("failed to parse config for destination %s: %s: %v", connectionId, connectionConfig, err)
}
if c.UsesBulker {
connections[connectionId] = &c
}
}
r.connections.Store(&connections)
r.inited.Store(true)
r.lastModified.Store(&lastModified)
if r.cacheDir != "" {
r.storeCached(RepositoryCache{Connections: connections})
}
if notify {
select {
case r.changesChan <- true:
//notify listener if it is listening
default:
}
}
}

func (r *PostgresConfigurationSource) start() {
safego.RunWithRestart(func() {
ticker := time.NewTicker(time.Duration(r.refreshPeriodSec) * time.Second)
for {
select {
case <-ticker.C:
r.refresh(true)
case <-r.closed:
ticker.Stop()
return
}
}
})
}

func (r *PostgresConfigurationSource) Close() error {
close(r.closed)
close(r.changesChan)
r.dbpool.Close()
return nil
}

func (r *PostgresConfigurationSource) ChangesChannel() <-chan bool {
return r.changesChan
}

func (r *PostgresConfigurationSource) GetDestinationConfigs() []*DestinationConfig {
connections := *r.connections.Load()
results := make([]*DestinationConfig, 0, len(connections))
for _, connection := range connections {
results = append(results, connection)
}
return results
}

func (r *PostgresConfigurationSource) GetDestinationConfig(id string) *DestinationConfig {
connections := *r.connections.Load()
return connections[id]
}
8 changes: 4 additions & 4 deletions bulkerapp/app/redis_configuration_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ func (rcs *RedisConfigurationSource) load(notify bool) error {
defer conn.Close()
configsById, err := redis.StringMap(conn.Do("HGETALL", redisDestinationsKey))
if err != nil {
metrics.RedisConfigurationSourceError(RedisError(err)).Inc()
metrics.ConfigurationSourceError(RedisError(err)).Inc()
return rcs.NewError("failed to load destinations by key: %s : %v", redisDestinationsKey, err)
}
newHash, err := utils.HashAny(configsById)
if err != nil {
metrics.RedisConfigurationSourceError("hash_error").Inc()
metrics.ConfigurationSourceError("hash_error").Inc()
return rcs.NewError("failed generate hash of redis config: %v", err)
}
if newHash == rcs.currentHash {
Expand All @@ -154,15 +154,14 @@ func (rcs *RedisConfigurationSource) load(notify bool) error {
dstCfg := DestinationConfig{}
err := utils.ParseObject(config, &dstCfg)
if err != nil {
metrics.RedisConfigurationSourceError("parse_error").Inc()
metrics.ConfigurationSourceError("parse_error").Inc()
rcs.Errorf("failed to parse config for destination %s: %s: %v", id, config, err)
} else if dstCfg.UsesBulker {
dstCfg.Config.Id = id
rcs.Debugf("parsed config for destination %s.", id, dstCfg)
newDsts[id] = &dstCfg
}
}
metrics.RedisConfigurationSourceDestinations.Set(float64(len(newDsts)))
rcs.Lock()
rcs.destinations = newDsts
rcs.currentHash = newHash
Expand Down Expand Up @@ -225,6 +224,7 @@ func (rcs *RedisConfigurationSource) GetDestinationConfigs() []*DestinationConfi

func (rcs *RedisConfigurationSource) Close() error {
close(rcs.refreshChan)
close(rcs.changesChan)
return rcs.redisPool.Close()
}

Expand Down
14 changes: 4 additions & 10 deletions bulkerapp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,15 @@ var (
return consumerRuns.WithLabelValues(topicId, mode, destinationId, tableName, status)
}

redisConfigurationSourceError = promauto.NewCounterVec(prometheus.CounterOpts{
configurationSourceError = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "bulkerapp",
Subsystem: "redis_configuration",
Subsystem: "configuration",
Name: "error",
}, []string{"errorType"})
RedisConfigurationSourceError = func(errorType string) prometheus.Counter {
return redisConfigurationSourceError.WithLabelValues(errorType)
ConfigurationSourceError = func(errorType string) prometheus.Counter {
return configurationSourceError.WithLabelValues(errorType)
}

RedisConfigurationSourceDestinations = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "bulkerapp",
Subsystem: "redis_configuration",
Name: "destinations",
})

repositoryDestinations = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "bulkerapp",
Subsystem: "repository",
Expand Down
1 change: 1 addition & 0 deletions ingest/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func (r *Repository) start() {
case <-ticker.C:
r.refresh()
case <-r.closed:
ticker.Stop()
return
}
}
Expand Down
1 change: 1 addition & 0 deletions ingest/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (s *Script) start() {
case <-ticker.C:
s.refresh()
case <-s.closed:
ticker.Stop()
return
}
}
Expand Down

0 comments on commit 1af5d38

Please sign in to comment.