Skip to content
This repository has been archived by the owner on Aug 4, 2021. It is now read-only.

Commit

Permalink
Monitor Prometheus liveness when PgAdvisoryLock is used
Browse files Browse the repository at this point in the history
Resign if Prometheus instance is not sending any data within configured timeout. If there is no data coming in from Prometheus, the adapter should not attempt to become a leader. Once Prometheus requests start coming in, the adapter should resume attempts to become a leader..
  • Loading branch information
niksajakovljevic committed Sep 27, 2018
1 parent e27a36a commit 44b0619
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 135 deletions.
36 changes: 30 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
_ "net/http/pprof"
"os"
"sync/atomic"
"time"

"github.com/timescale/prometheus-postgresql-adapter/log"
Expand Down Expand Up @@ -50,10 +51,12 @@ type config struct {
readOnly bool
haGroupLockId int
restElection bool
prometheusTimeout time.Duration
}

const (
tickInterval = time.Second
tickInterval = time.Second
promLivenessCheck = time.Second
)

var (
Expand Down Expand Up @@ -93,8 +96,9 @@ var (
},
[]string{"path"},
)
writeThroughput = util.NewThroughputCalc(tickInterval)
elector *util.Elector
writeThroughput = util.NewThroughputCalc(tickInterval)
elector *util.Elector
lastRequestUnixNano = time.Now().UnixNano()
)

func init() {
Expand Down Expand Up @@ -147,7 +151,9 @@ func parseFlags() *config {
flag.StringVar(&cfg.telemetryPath, "web.telemetry-path", "/metrics", "Address to listen on for web endpoints.")
flag.StringVar(&cfg.logLevel, "log.level", "debug", "The log level to use [ \"error\", \"warn\", \"info\", \"debug\" ].")
flag.BoolVar(&cfg.readOnly, "read.only", false, "Read-only mode. Don't write to database.")
flag.IntVar(&cfg.haGroupLockId, "leader-election.pg-advisory-lock-id", 0, "Unique advisory lock id per adapter high-availability group. Set it if you want to use leader election implementation based on PostgreSQL advisory lock")
flag.IntVar(&cfg.haGroupLockId, "leader-election.pg-advisory-lock-id", 0, "Unique advisory lock id per adapter high-availability group. Set it if you want to use leader election implementation based on PostgreSQL advisory lock.")
flag.DurationVar(&cfg.prometheusTimeout, "leader-election.pg-advisory-lock.prometheus-timeout", -1, "Adapter will resign if there are no requests from Prometheus within a given timeout (0 means no timeout). "+
"Note: make sure that only one Prometheus instance talks to the adapter. Timeout value should be co-related with Prometheus scrape interval but add enough `slack` to prevent random flips.")
flag.BoolVar(&cfg.restElection, "leader-election.rest", false, "Enable REST interface for the leader election")
flag.Parse()

Expand Down Expand Up @@ -190,16 +196,33 @@ func initElector(cfg *config, db *sql.DB) *util.Elector {
os.Exit(1)
}
if cfg.restElection {
return util.NewElector(util.NewRestElection(), false)
return util.NewElector(util.NewRestElection())
}
if cfg.haGroupLockId != 0 {
if cfg.prometheusTimeout == -1 {
log.Error("msg", "Prometheus timeout configuration must be set when using PG advisory lock")
os.Exit(1)
}
lock, err := util.NewPgAdvisoryLock(cfg.haGroupLockId, db)
if err != nil {
log.Error("msg", "Error creating advisory lock", "haGroupLockId", cfg.haGroupLockId, "err", err)
os.Exit(1)
}
scheduledElector := util.NewScheduledElector(lock)
log.Info("msg", "Initialized leader election based on PostgreSQL advisory lock")
return util.NewElector(lock, true)
if cfg.prometheusTimeout != 0 {
go func() {
ticker := time.NewTicker(promLivenessCheck)
for {
select {
case <-ticker.C:
lastReq := atomic.LoadInt64(&lastRequestUnixNano)
scheduledElector.PrometheusLivenessCheck(lastReq, cfg.prometheusTimeout)
}
}
}()
}
return &scheduledElector.Elector
} else {
log.Warn("msg", "No adapter leader election. Group lock id is not set. Possible duplicate write load if running adapter in high-availability mode")
return nil
Expand Down Expand Up @@ -338,6 +361,7 @@ func protoToSamples(req *prompb.WriteRequest) model.Samples {
}

func sendSamples(w writer, samples model.Samples) error {
atomic.StoreInt64(&lastRequestUnixNano, time.Now().UnixNano())
begin := time.Now()
shouldWrite := true
var err error
Expand Down
10 changes: 8 additions & 2 deletions postgresql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func (c *Client) Write(samples model.Samples) error {
return err
}


if copyTable == fmt.Sprintf("%s_tmp", c.cfg.table) {
stmtLabels, err := tx.Prepare(fmt.Sprintf(sqlInsertLabels, c.cfg.table, c.cfg.table))
if err != nil {
Expand Down Expand Up @@ -281,7 +280,6 @@ func (c *Client) Write(samples model.Samples) error {
}
}


err = copyStmt.Close()
if err != nil {
log.Error("msg", "Error on COPY Close when writing samples", "err", err)
Expand Down Expand Up @@ -317,6 +315,14 @@ func createOrderedKeys(m *map[string]string) []string {
return keys
}

func (c *Client) Close() {
if c.DB != nil {
if err := c.DB.Close(); err != nil {
log.Error("msg", err.Error())
}
}
}

func (l *sampleLabels) Scan(value interface{}) error {
if value == nil {
l = &sampleLabels{}
Expand Down
242 changes: 138 additions & 104 deletions postgresql/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/timescale/prometheus-postgresql-adapter/log"
"github.com/timescale/prometheus-postgresql-adapter/util"
"testing"
"time"
)

var (
Expand Down Expand Up @@ -59,127 +60,160 @@ func TestBuildCommand(t *testing.T) {
}

func TestWriteCommand(t *testing.T) {
dbSetup(t)
withDB(t, func(db *sql.DB, t *testing.T) {
cfg := &Config{}
ParseFlags(cfg)
cfg.database = *database

cfg := &Config{}
ParseFlags(cfg)
cfg.database = *database
c := NewClient(cfg)

c := NewClient(cfg)

sample := []*model.Sample{
{
Metric: model.Metric{
model.MetricNameLabel: "test_metric",
"label1": "1",
sample := []*model.Sample{
{
Metric: model.Metric{
model.MetricNameLabel: "test_metric",
"label1": "1",
},
Value: 123.1,
Timestamp: 1234567,
},
Value: 123.1,
Timestamp: 1234567,
},
{
Metric: model.Metric{
model.MetricNameLabel: "test_metric",
"label1": "1",
{
Metric: model.Metric{
model.MetricNameLabel: "test_metric",
"label1": "1",
},
Value: 123.2,
Timestamp: 1234568,
},
Value: 123.2,
Timestamp: 1234568,
},
{
Metric: model.Metric{
model.MetricNameLabel: "test_metric",
{
Metric: model.Metric{
model.MetricNameLabel: "test_metric",
},
Value: 123.2,
Timestamp: 1234569,
},
Value: 123.2,
Timestamp: 1234569,
},
{
Metric: model.Metric{
model.MetricNameLabel: "test_metric_2",
"label1": "1",
{
Metric: model.Metric{
model.MetricNameLabel: "test_metric_2",
"label1": "1",
},
Value: 123.4,
Timestamp: 1234570,
},
Value: 123.4,
Timestamp: 1234570,
},
}

c.Write(sample)
}

db, err := sql.Open("postgres", fmt.Sprintf("host=localhost dbname=%s user=postgres sslmode=disable", *database))
if err != nil {
t.Fatal(err)
}
c.Write(sample)

var cnt int
err = db.QueryRow("SELECT count(*) FROM metrics").Scan(&cnt)
if err != nil {
t.Fatal(err)
}
var cnt int
err := c.DB.QueryRow("SELECT count(*) FROM metrics").Scan(&cnt)
if err != nil {
t.Fatal(err)
}

if cnt != 4 {
t.Fatal("Wrong cnt: ", cnt)
}
if cnt != 4 {
t.Fatal("Wrong cnt: ", cnt)
}
c.Close()
})
}

func TestPgAdvisoryLock(t *testing.T) {
db := dbSetup(t)
lock, err := util.NewPgAdvisoryLock(1, db)
if err != nil {
t.Fatal(err)
}
if !lock.Locked() {
t.Error("Couldn't obtain the lock")
}

newLock, err := util.NewPgAdvisoryLock(1, db)
if err != nil {
t.Fatal(err)
}
if newLock.Locked() {
t.Error("Lock should have already been taken")
}

if err = lock.Release(); err != nil {
t.Errorf("Failed to release a lock. Error: %v", err)
}

if lock.Locked() {
t.Error("Should be unlocked after release")
}

newLock.TryLock()

if !newLock.Locked() {
t.Error("New lock should take over")
}
withDB(t, func(db *sql.DB, t *testing.T) {
lock, err := util.NewPgAdvisoryLock(1, db)
if err != nil {
t.Fatal(err)
}
if !lock.Locked() {
t.Error("Couldn't obtain the lock")
}

newLock, err := util.NewPgAdvisoryLock(1, db)
if err != nil {
t.Fatal(err)
}
if newLock.Locked() {
t.Error("Lock should have already been taken")
}

if err = lock.Release(); err != nil {
t.Errorf("Failed to release a lock. Error: %v", err)
}

if lock.Locked() {
t.Error("Should be unlocked after release")
}

newLock.TryLock()

if !newLock.Locked() {
t.Error("New lock should take over")
}
})
}

func TestElector(t *testing.T) {
db := dbSetup(t)
lock1, err := util.NewPgAdvisoryLock(1, db)
if err != nil {
t.Error(err)
}
elector1 := util.NewElector(lock1, false)
leader, _ := elector1.Elect()
if !leader {
t.Error("Failed to become a leader")
}

lock2, err := util.NewPgAdvisoryLock(1, db)
if err != nil {
t.Error(err)
}
elector2 := util.NewElector(lock2, false)
leader, _ = elector2.Elect()
if leader {
t.Error("Shouldn't be possible")
}
withDB(t, func(db *sql.DB, t *testing.T) {
lock1, err := util.NewPgAdvisoryLock(2, db)
if err != nil {
t.Error(err)
}
elector1 := util.NewElector(lock1)
leader, _ := elector1.BecomeLeader()
if !leader {
t.Error("Failed to become a leader")
}

lock2, err := util.NewPgAdvisoryLock(2, db)
if err != nil {
t.Error(err)
}
elector2 := util.NewElector(lock2)
leader, _ = elector2.BecomeLeader()
if leader {
t.Error("Shouldn't be possible")
}

elector1.Resign()
leader, _ = elector2.BecomeLeader()
if !leader {
t.Error("Should become a leader")
}
})
}

elector1.Resign()
leader, _ = elector2.Elect()
if !leader {
t.Error("Should become a leader")
}
func TestPromethuesLivenessCheck(t *testing.T) {
withDB(t, func(db *sql.DB, t *testing.T) {
lock1, err := util.NewPgAdvisoryLock(3, db)
if err != nil {
t.Error(err)
}
elector := util.NewScheduledElector(lock1)
leader, _ := elector.Elect()
if !leader {
t.Error("Failed to become a leader")
}
elector.PrometheusLivenessCheck(0, 0)
leader, _ = lock1.IsLeader()
if leader {
t.Error("Shouldn't be a leader")
}
if !elector.IsPausedScheduledElection() {
t.Error("Scheduled election should be paused")
}
elector.PrometheusLivenessCheck(time.Now().UnixNano(), time.Hour)
if elector.IsPausedScheduledElection() {
t.Error("Scheduled election shouldn't be paused anymore")
}
})
}

func withDB(t *testing.T, f func(db *sql.DB, t *testing.T)) {
db := dbSetup(t)
defer func() {
if db != nil {
db.Close()
}
}()
f(db, t)
}

func dbSetup(t *testing.T) *sql.DB {
Expand Down
Loading

0 comments on commit 44b0619

Please sign in to comment.