Skip to content

Commit

Permalink
Enhance HA "Taking over", "Handing over" logging
Browse files Browse the repository at this point in the history
The reason for a switch in the HA roles was not always directly clear.
This change now introduces additional debug logging, indicating the
reasoning for either taking over or handing over the HA responsibility.

An antagonistic logging message to "Another instance is active" was
introduced, indicating that "[n]o other instance is active, considering
[itself] as active". However, as both those messages are containing
debug information, their log level was degraded to DEBUG. This allowed
removing the whole shouldLog logic.

Next to additional logging messages, both the takeover and handover
channel are now transporting a string to communicate the reason instead
of an empty struct{}. By doing so, both the "Taking over" and "Handing
over" log messages are enriched with reason.

While dealing with the code, some function signature documentation were
added, to ease both mine as well as the understanding of future readers.

Additionally, the error handling of the SQL query selecting active
Icinga DB instances was changed slightly to also handle wrapped
sql.ErrNoRows errors.

Closes #688.
  • Loading branch information
oxzi committed Mar 20, 2024
1 parent 8875ce1 commit cd82b1b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 44 deletions.
8 changes: 4 additions & 4 deletions cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func run() int {
hactx, cancelHactx := context.WithCancel(ctx)
for hactx.Err() == nil {
select {
case <-ha.Takeover():
logger.Info("Taking over")
case takeoverReason := <-ha.Takeover():
logger.Infow("Taking over", zap.String("reason", takeoverReason))

go func() {
for hactx.Err() == nil {
Expand Down Expand Up @@ -324,8 +324,8 @@ func run() int {
}
}
}()
case <-ha.Handover():
logger.Warn("Handing over")
case handoverReason := <-ha.Handover():
logger.Warnw("Handing over", zap.String("reason", handoverReason))

cancelHactx()
case <-hactx.Done():
Expand Down
74 changes: 34 additions & 40 deletions pkg/icingadb/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type HA struct {
heartbeat *icingaredis.Heartbeat
logger *logging.Logger
responsible bool
handover chan struct{}
takeover chan struct{}
handover chan string
takeover chan string
done chan struct{}
errOnce sync.Once
errMu sync.Mutex
Expand All @@ -64,8 +64,8 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger
db: db,
heartbeat: heartbeat,
logger: logger,
handover: make(chan struct{}),
takeover: make(chan struct{}),
handover: make(chan string),
takeover: make(chan string),
done: make(chan struct{}),
}

Expand Down Expand Up @@ -107,13 +107,13 @@ func (h *HA) Err() error {
return h.err
}

// Handover returns a channel with which handovers are signaled.
func (h *HA) Handover() chan struct{} {
// Handover returns a channel with which handovers and their reasons are signaled.
func (h *HA) Handover() chan string {
return h.handover
}

// Takeover returns a channel with which takeovers are signaled.
func (h *HA) Takeover() chan struct{} {
// Takeover returns a channel with which takeovers and their reasons are signaled.
func (h *HA) Takeover() chan string {
return h.takeover
}

Expand Down Expand Up @@ -141,10 +141,6 @@ func (h *HA) controller() {

oldInstancesRemoved := false

logTicker := time.NewTicker(time.Second * 60)
defer logTicker.Stop()
shouldLog := true

for {
select {
case m := <-h.heartbeat.Events():
Expand All @@ -160,7 +156,7 @@ func (h *HA) controller() {
}
if tt.Before(now.Add(-1 * timeout)) {
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
h.signalHandover()
h.signalHandover("received heartbeat from the past")
h.realizeLostHeartbeat()
continue
}
Expand Down Expand Up @@ -196,23 +192,17 @@ func (h *HA) controller() {
h.environmentMu.Unlock()
}

select {
case <-logTicker.C:
shouldLog = true
default:
}

var realizeCtx context.Context
var cancelRealizeCtx context.CancelFunc
if h.responsible {
realizeCtx, cancelRealizeCtx = context.WithDeadline(h.ctx, m.ExpiryTime())
} else {
realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx)
}
err = h.realize(realizeCtx, s, t, envId, shouldLog)
err = h.realize(realizeCtx, s, t, envId)
cancelRealizeCtx()
if errors.Is(err, context.DeadlineExceeded) {
h.signalHandover()
h.signalHandover("context deadline exceeded")
continue
}
if err != nil {
Expand All @@ -223,11 +213,9 @@ func (h *HA) controller() {
go h.removeOldInstances(s, envId)
oldInstancesRemoved = true
}

shouldLog = false
} else {
h.logger.Error("Lost heartbeat")
h.signalHandover()
h.signalHandover("lost heartbeat")
h.realizeLostHeartbeat()
}
case <-h.heartbeat.Done():
Expand All @@ -240,7 +228,8 @@ func (h *HA) controller() {
}
}

func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error {
// realize a HA cycle triggered by a heartbeat event.
func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary) error {
var takeover, otherResponsible bool

err := retry.WithBackoff(
Expand Down Expand Up @@ -271,18 +260,20 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
errQuery := tx.QueryRowxContext(
ctx, query, envId, "y", h.instanceId, time.Now().Add(-1*timeout).UnixMilli(),
).StructScan(instance)
switch errQuery {
case nil:
switch {
case errQuery == nil:
otherResponsible = true
if shouldLog {
h.logger.Infow("Another instance is active",
zap.String("instance_id", instance.Id.String()),
zap.String("environment", envId.String()),
"heartbeat", instance.Heartbeat,
zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())))
}
case sql.ErrNoRows:
h.logger.Debugw("Another instance is active",
zap.String("instance_id", instance.Id.String()),
zap.String("environment", envId.String()),
zap.Time("heartbeat", instance.Heartbeat.Time()),
zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())))
case errors.Is(errQuery, sql.ErrNoRows):
takeover = true
h.logger.Debugw("No other instance is active, consider myself as active",
zap.Bool("responsible", h.responsible),
zap.String("instance_id", h.instanceId.String()),
zap.String("environment", envId.String()))
default:
return internal.CantPerformQuery(errQuery, query)
}
Expand Down Expand Up @@ -355,7 +346,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return errors.Wrap(err, "can't insert environment")
}

h.signalTakeover()
h.signalTakeover("no other instance is active")
} else if otherResponsible {
if state, _ := h.state.Load(); !state.otherResponsible {
state.otherResponsible = true
Expand All @@ -366,6 +357,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return nil
}

// realizeLostHeartbeat updates "responsible = n" for this HA into the database.
func (h *HA) realizeLostHeartbeat() {
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE id = ?")
if _, err := h.db.ExecContext(h.ctx, stmt, "n", h.instanceId); err != nil && !utils.IsContextCanceled(err) {
Expand Down Expand Up @@ -421,7 +413,8 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
}
}

func (h *HA) signalHandover() {
// signalHandover gives up HA.responsible and notifies the HA.Handover chan.
func (h *HA) signalHandover(reason string) {
if h.responsible {
h.state.Store(haState{
responsibleTsMilli: time.Now().UnixMilli(),
Expand All @@ -430,15 +423,16 @@ func (h *HA) signalHandover() {
})

select {
case h.handover <- struct{}{}:
case h.handover <- reason:
h.responsible = false
case <-h.ctx.Done():
// Noop
}
}
}

func (h *HA) signalTakeover() {
// signalTakeover claims HA.responsible and notifies the HA.Takeover chan.
func (h *HA) signalTakeover(reason string) {
if !h.responsible {
h.state.Store(haState{
responsibleTsMilli: time.Now().UnixMilli(),
Expand All @@ -447,7 +441,7 @@ func (h *HA) signalTakeover() {
})

select {
case h.takeover <- struct{}{}:
case h.takeover <- reason:
h.responsible = true
case <-h.ctx.Done():
// Noop
Expand Down

0 comments on commit cd82b1b

Please sign in to comment.