From 44b0619528a93aa02d2b1d836f67d97e96fa822e Mon Sep 17 00:00:00 2001 From: niksa Date: Tue, 25 Sep 2018 17:42:33 +0200 Subject: [PATCH] Monitor Prometheus liveness when PgAdvisoryLock is used Resign if Prometheus instance is not sending any data within configured timeout. If there is no data coming in from Prometheus, the adapter should not attempt to become a leader. Once Prometheus requests start coming in, the adapter should resume attempts to become a leader.. --- main.go | 36 +++++- postgresql/client.go | 10 +- postgresql/client_test.go | 242 ++++++++++++++++++++++---------------- util/election.go | 91 ++++++++++---- util/lock.go | 7 +- 5 files changed, 251 insertions(+), 135 deletions(-) diff --git a/main.go b/main.go index 02dad5f..aab6025 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ import ( "net/http" _ "net/http/pprof" "os" + "sync/atomic" "time" "github.com/timescale/prometheus-postgresql-adapter/log" @@ -50,10 +51,12 @@ type config struct { readOnly bool haGroupLockId int restElection bool + prometheusTimeout time.Duration } const ( - tickInterval = time.Second + tickInterval = time.Second + promLivenessCheck = time.Second ) var ( @@ -93,8 +96,9 @@ var ( }, []string{"path"}, ) - writeThroughput = util.NewThroughputCalc(tickInterval) - elector *util.Elector + writeThroughput = util.NewThroughputCalc(tickInterval) + elector *util.Elector + lastRequestUnixNano = time.Now().UnixNano() ) func init() { @@ -147,7 +151,9 @@ func parseFlags() *config { flag.StringVar(&cfg.telemetryPath, "web.telemetry-path", "/metrics", "Address to listen on for web endpoints.") flag.StringVar(&cfg.logLevel, "log.level", "debug", "The log level to use [ \"error\", \"warn\", \"info\", \"debug\" ].") flag.BoolVar(&cfg.readOnly, "read.only", false, "Read-only mode. Don't write to database.") - flag.IntVar(&cfg.haGroupLockId, "leader-election.pg-advisory-lock-id", 0, "Unique advisory lock id per adapter high-availability group. Set it if you want to use leader election implementation based on PostgreSQL advisory lock") + flag.IntVar(&cfg.haGroupLockId, "leader-election.pg-advisory-lock-id", 0, "Unique advisory lock id per adapter high-availability group. Set it if you want to use leader election implementation based on PostgreSQL advisory lock.") + flag.DurationVar(&cfg.prometheusTimeout, "leader-election.pg-advisory-lock.prometheus-timeout", -1, "Adapter will resign if there are no requests from Prometheus within a given timeout (0 means no timeout). "+ + "Note: make sure that only one Prometheus instance talks to the adapter. Timeout value should be co-related with Prometheus scrape interval but add enough `slack` to prevent random flips.") flag.BoolVar(&cfg.restElection, "leader-election.rest", false, "Enable REST interface for the leader election") flag.Parse() @@ -190,16 +196,33 @@ func initElector(cfg *config, db *sql.DB) *util.Elector { os.Exit(1) } if cfg.restElection { - return util.NewElector(util.NewRestElection(), false) + return util.NewElector(util.NewRestElection()) } if cfg.haGroupLockId != 0 { + if cfg.prometheusTimeout == -1 { + log.Error("msg", "Prometheus timeout configuration must be set when using PG advisory lock") + os.Exit(1) + } lock, err := util.NewPgAdvisoryLock(cfg.haGroupLockId, db) if err != nil { log.Error("msg", "Error creating advisory lock", "haGroupLockId", cfg.haGroupLockId, "err", err) os.Exit(1) } + scheduledElector := util.NewScheduledElector(lock) log.Info("msg", "Initialized leader election based on PostgreSQL advisory lock") - return util.NewElector(lock, true) + if cfg.prometheusTimeout != 0 { + go func() { + ticker := time.NewTicker(promLivenessCheck) + for { + select { + case <-ticker.C: + lastReq := atomic.LoadInt64(&lastRequestUnixNano) + scheduledElector.PrometheusLivenessCheck(lastReq, cfg.prometheusTimeout) + } + } + }() + } + return &scheduledElector.Elector } else { log.Warn("msg", "No adapter leader election. Group lock id is not set. Possible duplicate write load if running adapter in high-availability mode") return nil @@ -338,6 +361,7 @@ func protoToSamples(req *prompb.WriteRequest) model.Samples { } func sendSamples(w writer, samples model.Samples) error { + atomic.StoreInt64(&lastRequestUnixNano, time.Now().UnixNano()) begin := time.Now() shouldWrite := true var err error diff --git a/postgresql/client.go b/postgresql/client.go index a246683..d3b2bb3 100644 --- a/postgresql/client.go +++ b/postgresql/client.go @@ -244,7 +244,6 @@ func (c *Client) Write(samples model.Samples) error { return err } - if copyTable == fmt.Sprintf("%s_tmp", c.cfg.table) { stmtLabels, err := tx.Prepare(fmt.Sprintf(sqlInsertLabels, c.cfg.table, c.cfg.table)) if err != nil { @@ -281,7 +280,6 @@ func (c *Client) Write(samples model.Samples) error { } } - err = copyStmt.Close() if err != nil { log.Error("msg", "Error on COPY Close when writing samples", "err", err) @@ -317,6 +315,14 @@ func createOrderedKeys(m *map[string]string) []string { return keys } +func (c *Client) Close() { + if c.DB != nil { + if err := c.DB.Close(); err != nil { + log.Error("msg", err.Error()) + } + } +} + func (l *sampleLabels) Scan(value interface{}) error { if value == nil { l = &sampleLabels{} diff --git a/postgresql/client_test.go b/postgresql/client_test.go index d67c778..283b964 100644 --- a/postgresql/client_test.go +++ b/postgresql/client_test.go @@ -9,6 +9,7 @@ import ( "github.com/timescale/prometheus-postgresql-adapter/log" "github.com/timescale/prometheus-postgresql-adapter/util" "testing" + "time" ) var ( @@ -59,127 +60,160 @@ func TestBuildCommand(t *testing.T) { } func TestWriteCommand(t *testing.T) { - dbSetup(t) + withDB(t, func(db *sql.DB, t *testing.T) { + cfg := &Config{} + ParseFlags(cfg) + cfg.database = *database - cfg := &Config{} - ParseFlags(cfg) - cfg.database = *database + c := NewClient(cfg) - c := NewClient(cfg) - - sample := []*model.Sample{ - { - Metric: model.Metric{ - model.MetricNameLabel: "test_metric", - "label1": "1", + sample := []*model.Sample{ + { + Metric: model.Metric{ + model.MetricNameLabel: "test_metric", + "label1": "1", + }, + Value: 123.1, + Timestamp: 1234567, }, - Value: 123.1, - Timestamp: 1234567, - }, - { - Metric: model.Metric{ - model.MetricNameLabel: "test_metric", - "label1": "1", + { + Metric: model.Metric{ + model.MetricNameLabel: "test_metric", + "label1": "1", + }, + Value: 123.2, + Timestamp: 1234568, }, - Value: 123.2, - Timestamp: 1234568, - }, - { - Metric: model.Metric{ - model.MetricNameLabel: "test_metric", + { + Metric: model.Metric{ + model.MetricNameLabel: "test_metric", + }, + Value: 123.2, + Timestamp: 1234569, }, - Value: 123.2, - Timestamp: 1234569, - }, - { - Metric: model.Metric{ - model.MetricNameLabel: "test_metric_2", - "label1": "1", + { + Metric: model.Metric{ + model.MetricNameLabel: "test_metric_2", + "label1": "1", + }, + Value: 123.4, + Timestamp: 1234570, }, - Value: 123.4, - Timestamp: 1234570, - }, - } - - c.Write(sample) + } - db, err := sql.Open("postgres", fmt.Sprintf("host=localhost dbname=%s user=postgres sslmode=disable", *database)) - if err != nil { - t.Fatal(err) - } + c.Write(sample) - var cnt int - err = db.QueryRow("SELECT count(*) FROM metrics").Scan(&cnt) - if err != nil { - t.Fatal(err) - } + var cnt int + err := c.DB.QueryRow("SELECT count(*) FROM metrics").Scan(&cnt) + if err != nil { + t.Fatal(err) + } - if cnt != 4 { - t.Fatal("Wrong cnt: ", cnt) - } + if cnt != 4 { + t.Fatal("Wrong cnt: ", cnt) + } + c.Close() + }) } func TestPgAdvisoryLock(t *testing.T) { - db := dbSetup(t) - lock, err := util.NewPgAdvisoryLock(1, db) - if err != nil { - t.Fatal(err) - } - if !lock.Locked() { - t.Error("Couldn't obtain the lock") - } - - newLock, err := util.NewPgAdvisoryLock(1, db) - if err != nil { - t.Fatal(err) - } - if newLock.Locked() { - t.Error("Lock should have already been taken") - } - - if err = lock.Release(); err != nil { - t.Errorf("Failed to release a lock. Error: %v", err) - } - - if lock.Locked() { - t.Error("Should be unlocked after release") - } - - newLock.TryLock() - - if !newLock.Locked() { - t.Error("New lock should take over") - } + withDB(t, func(db *sql.DB, t *testing.T) { + lock, err := util.NewPgAdvisoryLock(1, db) + if err != nil { + t.Fatal(err) + } + if !lock.Locked() { + t.Error("Couldn't obtain the lock") + } + + newLock, err := util.NewPgAdvisoryLock(1, db) + if err != nil { + t.Fatal(err) + } + if newLock.Locked() { + t.Error("Lock should have already been taken") + } + + if err = lock.Release(); err != nil { + t.Errorf("Failed to release a lock. Error: %v", err) + } + + if lock.Locked() { + t.Error("Should be unlocked after release") + } + + newLock.TryLock() + + if !newLock.Locked() { + t.Error("New lock should take over") + } + }) } func TestElector(t *testing.T) { - db := dbSetup(t) - lock1, err := util.NewPgAdvisoryLock(1, db) - if err != nil { - t.Error(err) - } - elector1 := util.NewElector(lock1, false) - leader, _ := elector1.Elect() - if !leader { - t.Error("Failed to become a leader") - } - - lock2, err := util.NewPgAdvisoryLock(1, db) - if err != nil { - t.Error(err) - } - elector2 := util.NewElector(lock2, false) - leader, _ = elector2.Elect() - if leader { - t.Error("Shouldn't be possible") - } + withDB(t, func(db *sql.DB, t *testing.T) { + lock1, err := util.NewPgAdvisoryLock(2, db) + if err != nil { + t.Error(err) + } + elector1 := util.NewElector(lock1) + leader, _ := elector1.BecomeLeader() + if !leader { + t.Error("Failed to become a leader") + } + + lock2, err := util.NewPgAdvisoryLock(2, db) + if err != nil { + t.Error(err) + } + elector2 := util.NewElector(lock2) + leader, _ = elector2.BecomeLeader() + if leader { + t.Error("Shouldn't be possible") + } + + elector1.Resign() + leader, _ = elector2.BecomeLeader() + if !leader { + t.Error("Should become a leader") + } + }) +} - elector1.Resign() - leader, _ = elector2.Elect() - if !leader { - t.Error("Should become a leader") - } +func TestPromethuesLivenessCheck(t *testing.T) { + withDB(t, func(db *sql.DB, t *testing.T) { + lock1, err := util.NewPgAdvisoryLock(3, db) + if err != nil { + t.Error(err) + } + elector := util.NewScheduledElector(lock1) + leader, _ := elector.Elect() + if !leader { + t.Error("Failed to become a leader") + } + elector.PrometheusLivenessCheck(0, 0) + leader, _ = lock1.IsLeader() + if leader { + t.Error("Shouldn't be a leader") + } + if !elector.IsPausedScheduledElection() { + t.Error("Scheduled election should be paused") + } + elector.PrometheusLivenessCheck(time.Now().UnixNano(), time.Hour) + if elector.IsPausedScheduledElection() { + t.Error("Scheduled election shouldn't be paused anymore") + } + }) +} +func withDB(t *testing.T, f func(db *sql.DB, t *testing.T)) { + db := dbSetup(t) + defer func() { + if db != nil { + db.Close() + } + }() + f(db, t) } func dbSetup(t *testing.T) *sql.DB { diff --git a/util/election.go b/util/election.go index 5cc8120..db2c2ee 100644 --- a/util/election.go +++ b/util/election.go @@ -10,9 +10,7 @@ import ( "time" ) -const ( - electionInterval = time.Millisecond * 300 -) +const electionInterval = time.Millisecond * 300 // Election defines an interface for adapter leader election. // If you are running Prometheus in HA mode where each Prometheus instance sends data to corresponding adapter you probably @@ -28,20 +26,11 @@ type Election interface { // Elector is `Election` wrapper that provides cross-cutting concerns(eg. logging) and some common features shared among all election implementations. type Elector struct { - election Election - scheduled bool - ticker *time.Ticker + election Election } -func NewElector(election Election, scheduled bool) *Elector { - var t *time.Ticker - if scheduled { - t = time.NewTicker(electionInterval) - } - elector := &Elector{election: election, ticker: t, scheduled: scheduled} - if scheduled { - go elector.scheduledElection() - } +func NewElector(election Election) *Elector { + elector := &Elector{election: election} return elector } @@ -69,26 +58,79 @@ func (e *Elector) Resign() error { if err != nil { log.Error("err", "Failed to resign", "err", err) } else { - log.Info("msg", "Instance is not a leader anymore") + log.Info("msg", "Instance is no longer a leader") } return err } -func (e *Elector) scheduledElection() { +// ScheduledElector triggers election on scheduled interval. Currently used in combination with PgAdvisoryLock +type ScheduledElector struct { + Elector + ticker *time.Ticker + pausedScheduledElection bool +} + +func NewScheduledElector(election Election) *ScheduledElector { + scheduledElector := &ScheduledElector{Elector: Elector{election}, ticker: time.NewTicker(electionInterval)} + go scheduledElector.scheduledElection() + return scheduledElector +} + +func (se *ScheduledElector) pauseScheduledElection() { + se.pausedScheduledElection = true +} + +func (se *ScheduledElector) resumeScheduledElection() { + se.pausedScheduledElection = false +} + +func (se *ScheduledElector) IsPausedScheduledElection() bool { + return se.pausedScheduledElection +} + +func (se *ScheduledElector) PrometheusLivenessCheck(lastRequestUnixNano int64, timeout time.Duration) { + elapsed := time.Now().Sub(time.Unix(0, lastRequestUnixNano)) + leader, err := se.IsLeader() + if err != nil { + log.Error("msg", err.Error()) + } + if leader { + if elapsed > timeout { + log.Warn("msg", "Prometheus timeout exceeded", "timeout", timeout) + se.pauseScheduledElection() + log.Warn("msg", "Scheduled election is paused. Instance is removed from election pool.") + err := se.Resign() + if err != nil { + log.Error("msg", err.Error()) + } + } + } else { + if se.IsPausedScheduledElection() && elapsed < timeout { + log.Info("msg", "Prometheus seems alive. Resuming scheduled election.") + se.resumeScheduledElection() + } + } +} + +func (se *ScheduledElector) scheduledElection() { for { select { - case <-e.ticker.C: - e.Elect() + case <-se.ticker.C: + if !se.pausedScheduledElection { + se.Elect() + } else { + log.Debug("msg", "Scheduled election is paused. Instance can't become a leader until scheduled election is resumed (Prometheus comes up again)") + } } } } -func (e *Elector) Elect() (bool, error) { - leader, err := e.IsLeader() +func (se *ScheduledElector) Elect() (bool, error) { + leader, err := se.IsLeader() if err != nil { log.Error("msg", "Leader check failed", "err", err) } else if !leader { - leader, err = e.BecomeLeader() + leader, err = se.BecomeLeader() if err != nil { log.Error("msg", "Failed while becoming a leader", "err", err) } @@ -98,6 +140,8 @@ func (e *Elector) Elect() (bool, error) { // RestElection is a REST interface allowing to plug in any external leader election mechanism. // Remote service can use REST endpoints to manage leader election thus block or allow writes. +// Using RestElection over PgAdvisoryLock is encouraged as it is more robust and gives more control over +// the election process, however it does require additional engineering effort. type RestElection struct { leader bool mutex sync.RWMutex @@ -113,6 +157,7 @@ func (r *RestElection) handleLeader() http.HandlerFunc { return func(response http.ResponseWriter, request *http.Request) { switch request.Method { case http.MethodGet: + // leader check leader, err := r.IsLeader() if err != nil { log.Error("msg", "Failed on leader check", "err", err) @@ -135,6 +180,7 @@ func (r *RestElection) handleLeader() http.HandlerFunc { } switch flag { case 0: + // resign err = r.Resign() if err != nil { log.Error("err", err) @@ -143,6 +189,7 @@ func (r *RestElection) handleLeader() http.HandlerFunc { } fmt.Fprintf(response, "%v", true) case 1: + // become a leader leader, err := r.BecomeLeader() if err != nil { log.Error("msg", "Failed to become a leader", "err", err) diff --git a/util/lock.go b/util/lock.go index 64856e8..4011547 100644 --- a/util/lock.go +++ b/util/lock.go @@ -17,7 +17,12 @@ const ( // PgAdvisoryLock is implementation of leader election based on PostgreSQL advisory locks. All adapters withing a HA group are trying // to obtain an advisory lock for particular group. The one who holds the lock can write to the database. Due to the fact // that Prometheus HA setup provides no consistency guarantees this implementation is best effort in regards -// to metrics that is written (some duplicates or data loss are possible during failover) +// to metrics that is written (some duplicates or data loss are possible during fail-over) +// `leader-election.pg-advisory-lock.prometheus-timeout` config must be set when using PgAdvisoryLock. It will +// trigger leader resign (if instance is a leader) and will prevent an instance to become a leader if there are no requests coming +// from Prometheus within a given timeout. Make sure to provide a reasonable value for the timeout (should be co-related with +// Prometheus scrape interval, eg. 2x or 3x more then scrape interval to prevent leader flipping). +// Recommended architecture when using PgAdvisoryLock is to have one adapter instance for one Prometheus instance. type PgAdvisoryLock struct { conn *sql.Conn mutex sync.RWMutex