From d28fa552f99046dd3a61145d7b087e17d1c162d3 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Wed, 18 Sep 2024 11:23:41 +0200 Subject: [PATCH 1/3] `HA`: Use `sync/atomic#Pointer` instead of our own wrapper Go 1.19 introduced `sync/atomic#Pointer` among other things, so we no longer need to use the Atomic wrapper from our Icinga Go library. --- pkg/icingadb/ha.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 6460ac32d..1bb04b17c 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -7,7 +7,6 @@ import ( "encoding/hex" "github.com/google/uuid" "github.com/icinga/icinga-go-library/backoff" - "github.com/icinga/icinga-go-library/com" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/retry" @@ -19,6 +18,7 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" "sync" + "sync/atomic" "time" ) @@ -35,7 +35,7 @@ type haState struct { // HA provides high availability and indicates whether a Takeover or Handover must be made. type HA struct { - state com.Atomic[haState] + state atomic.Pointer[haState] ctx context.Context cancelCtx context.CancelFunc instanceId types.Binary @@ -71,6 +71,8 @@ func NewHA(ctx context.Context, db *database.DB, heartbeat *icingaredis.Heartbea done: make(chan struct{}), } + ha.state.Store(&haState{}) + go ha.controller() return ha @@ -121,7 +123,8 @@ func (h *HA) Takeover() chan string { // State returns the status quo. func (h *HA) State() (responsibleTsMilli int64, responsible, otherResponsible bool) { - state, _ := h.state.Load() + state := h.state.Load() + return state.responsibleTsMilli, state.responsible, state.otherResponsible } @@ -428,9 +431,12 @@ func (h *HA) realize( h.signalTakeover(takeover) } else if otherResponsible { - if state, _ := h.state.Load(); !state.otherResponsible { - state.otherResponsible = true - h.state.Store(state) + if state := h.state.Load(); !state.otherResponsible { + // Dereference pointer to create a copy of the value it points to. + // Ensures that any modifications do not directly affect the original data unless explicitly stored back. + newState := *state + newState.otherResponsible = true + h.state.Store(&newState) } } @@ -496,7 +502,7 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar // signalHandover gives up HA.responsible and notifies the HA.Handover chan. func (h *HA) signalHandover(reason string) { if h.responsible { - h.state.Store(haState{ + h.state.Store(&haState{ responsibleTsMilli: time.Now().UnixMilli(), responsible: false, otherResponsible: false, @@ -514,7 +520,7 @@ func (h *HA) signalHandover(reason string) { // signalTakeover claims HA.responsible and notifies the HA.Takeover chan. func (h *HA) signalTakeover(reason string) { if !h.responsible { - h.state.Store(haState{ + h.state.Store(&haState{ responsibleTsMilli: time.Now().UnixMilli(), responsible: true, otherResponsible: false, From 7cf54c1b252e31f2764c0dc2f7c4a794e660c131 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 24 Oct 2024 10:37:04 +0200 Subject: [PATCH 2/3] `telemetry`: Use `sync/atomic#Pointer` instead of our own wrapper Go 1.19 introduced `sync/atomic#Pointer` among other things, so we no longer need to use the Atomic wrapper from our Icinga Go library. --- cmd/icingadb/main.go | 3 ++- pkg/icingaredis/telemetry/heartbeat.go | 9 ++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index bd94686d7..dc0098aba 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -116,6 +116,7 @@ func run() int { ha = icingadb.NewHA(ctx, db, heartbeat, logs.GetChildLogger("high-availability")) telemetryLogger := logs.GetChildLogger("telemetry") + telemetry.LastSuccessfulSync.Store(&telemetry.SuccessfulSync{}) telemetry.StartHeartbeat(ctx, rc, telemetryLogger, ha, heartbeat) telemetry.WriteStats(ctx, rc, telemetryLogger) } @@ -250,7 +251,7 @@ func run() int { logger := logs.GetChildLogger("config-sync") if synctx.Err() == nil { - telemetry.LastSuccessfulSync.Store(telemetry.SuccessfulSync{ + telemetry.LastSuccessfulSync.Store(&telemetry.SuccessfulSync{ FinishMilli: syncEnd.UnixMilli(), DurationMilli: elapsed.Milliseconds(), }) diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go index 041a696d0..61bf44ad0 100644 --- a/pkg/icingaredis/telemetry/heartbeat.go +++ b/pkg/icingaredis/telemetry/heartbeat.go @@ -3,7 +3,6 @@ package telemetry import ( "context" "fmt" - "github.com/icinga/icinga-go-library/com" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/periodic" "github.com/icinga/icinga-go-library/redis" @@ -81,7 +80,7 @@ func GetCurrentDbConnErr() (string, int64) { var OngoingSyncStartMilli int64 // LastSuccessfulSync is to be updated by the main() function. -var LastSuccessfulSync com.Atomic[SuccessfulSync] +var LastSuccessfulSync atomic.Pointer[SuccessfulSync] var boolToStr = map[bool]string{false: "0", true: "1"} var startTime = time.Now().UnixMilli() @@ -101,7 +100,7 @@ func StartHeartbeat( heartbeat := heartbeat.LastReceived() responsibleTsMilli, responsible, otherResponsible := ha.State() ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli) - sync, _ := LastSuccessfulSync.Load() + lastSync := LastSuccessfulSync.Load() dbConnErr, dbConnErrSinceMilli := GetCurrentDbConnErr() now := time.Now() @@ -117,8 +116,8 @@ func StartHeartbeat( "ha-responsible-ts": strconv.FormatInt(responsibleTsMilli, 10), "ha-other-responsible": boolToStr[otherResponsible], "sync-ongoing-since": strconv.FormatInt(ongoingSyncStart, 10), - "sync-success-finish": strconv.FormatInt(sync.FinishMilli, 10), - "sync-success-duration": strconv.FormatInt(sync.DurationMilli, 10), + "sync-success-finish": strconv.FormatInt(lastSync.FinishMilli, 10), + "sync-success-duration": strconv.FormatInt(lastSync.DurationMilli, 10), } ctx, cancel := context.WithDeadline(ctx, tick.Time.Add(interval)) From 95f2763a73dc9a711d39db2acfa044e7ca362475 Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Thu, 24 Oct 2024 11:43:55 +0200 Subject: [PATCH 3/3] `telemetry`: Fix `atomic.Pointer` initialisation responsibility Return `atomic.pointer`, so that initialisation is not the responsibility of another package. --- cmd/icingadb/main.go | 6 +++--- pkg/icingaredis/telemetry/heartbeat.go | 13 ++++++++----- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index dc0098aba..4c0bb8f7e 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -101,6 +101,7 @@ func run() int { // the heartbeat is not read while HA gets stuck when updating the instance table. var heartbeat *icingaredis.Heartbeat var ha *icingadb.HA + var telemetrySyncStats *atomic.Pointer[telemetry.SuccessfulSync] { rc, err := cmd.Redis(logs.GetChildLogger("redis")) if err != nil { @@ -116,8 +117,7 @@ func run() int { ha = icingadb.NewHA(ctx, db, heartbeat, logs.GetChildLogger("high-availability")) telemetryLogger := logs.GetChildLogger("telemetry") - telemetry.LastSuccessfulSync.Store(&telemetry.SuccessfulSync{}) - telemetry.StartHeartbeat(ctx, rc, telemetryLogger, ha, heartbeat) + telemetrySyncStats = telemetry.StartHeartbeat(ctx, rc, telemetryLogger, ha, heartbeat) telemetry.WriteStats(ctx, rc, telemetryLogger) } // Closing ha on exit ensures that this instance retracts its heartbeat @@ -251,7 +251,7 @@ func run() int { logger := logs.GetChildLogger("config-sync") if synctx.Err() == nil { - telemetry.LastSuccessfulSync.Store(&telemetry.SuccessfulSync{ + telemetrySyncStats.Store(&telemetry.SuccessfulSync{ FinishMilli: syncEnd.UnixMilli(), DurationMilli: elapsed.Milliseconds(), }) diff --git a/pkg/icingaredis/telemetry/heartbeat.go b/pkg/icingaredis/telemetry/heartbeat.go index 61bf44ad0..c29b15335 100644 --- a/pkg/icingaredis/telemetry/heartbeat.go +++ b/pkg/icingaredis/telemetry/heartbeat.go @@ -79,16 +79,17 @@ func GetCurrentDbConnErr() (string, int64) { // OngoingSyncStartMilli is to be updated by the main() function. var OngoingSyncStartMilli int64 -// LastSuccessfulSync is to be updated by the main() function. -var LastSuccessfulSync atomic.Pointer[SuccessfulSync] - var boolToStr = map[bool]string{false: "0", true: "1"} var startTime = time.Now().UnixMilli() // StartHeartbeat periodically writes heartbeats to Redis for being monitored by Icinga 2. +// It returns an atomic pointer to SuccessfulSync, +// which contains synchronisation statistics that the caller should update. func StartHeartbeat( ctx context.Context, client *redis.Client, logger *logging.Logger, ha ha, heartbeat *icingaredis.Heartbeat, -) { +) *atomic.Pointer[SuccessfulSync] { + var syncStats atomic.Pointer[SuccessfulSync] + syncStats.Store(&SuccessfulSync{}) goMetrics := NewGoMetrics() const interval = time.Second @@ -100,7 +101,7 @@ func StartHeartbeat( heartbeat := heartbeat.LastReceived() responsibleTsMilli, responsible, otherResponsible := ha.State() ongoingSyncStart := atomic.LoadInt64(&OngoingSyncStartMilli) - lastSync := LastSuccessfulSync.Load() + lastSync := syncStats.Load() dbConnErr, dbConnErrSinceMilli := GetCurrentDbConnErr() now := time.Now() @@ -144,6 +145,8 @@ func StartHeartbeat( silenceUntil = time.Time{} } }) + + return &syncStats } type goMetrics struct {