diff --git a/fly/cmd/historical_uptime/main.go b/fly/cmd/historical_uptime/main.go index ec59f6a8..5d0be54f 100644 --- a/fly/cmd/historical_uptime/main.go +++ b/fly/cmd/historical_uptime/main.go @@ -267,6 +267,7 @@ func main() { // Add channel capacity checks go monitorChannelCapacity(rootCtx, logger, "obsvC", obsvC) + go monitorChannelCapacity(rootCtx, logger, "batchObsvC", batchObsvC) // Heartbeat updates heartbeatC := make(chan *gossipv1.Heartbeat, 50) diff --git a/fly/pkg/historical_uptime/helpers.go b/fly/pkg/historical_uptime/helpers.go index 9edeb87e..fb8801fc 100644 --- a/fly/pkg/historical_uptime/helpers.go +++ b/fly/pkg/historical_uptime/helpers.go @@ -45,7 +45,7 @@ func InitializeMissingObservationsCount(logger *zap.Logger, messages []*types.Me func DecrementMissingObservationsCount(logger *zap.Logger, guardianMissingObservations map[string]map[string]int, messageObservations map[types.MessageID][]*types.Observation) { // Keep track of processed observations to avoid duplicates - processed := make(map[string]map[string]bool) + processed := make(map[string]map[string]struct{}) for messageID, observations := range messageObservations { chainID, err := messageID.ChainID() @@ -63,15 +63,16 @@ func DecrementMissingObservationsCount(logger *zap.Logger, guardianMissingObserv // Check if we've already processed this guardian for this message if processed[string(messageID)] == nil { - processed[string(messageID)] = make(map[string]bool) + processed[string(messageID)] = make(map[string]struct{}) } - if processed[string(messageID)][guardianName] { + + if _, exists := processed[string(messageID)][guardianName]; exists { logger.Warn("Duplicate observation", zap.String("messageID", string(messageID)), zap.String("guardian", guardianName)) continue } // Mark as processed - processed[string(messageID)][guardianName] = true + processed[string(messageID)][guardianName] = struct{}{} // Safely decrement the count if guardianMissingObservations[guardianName] == nil {