Skip to content

Commit

Permalink
MOD healthChecker notifiers settings
Browse files Browse the repository at this point in the history
MOD extend bots and sinks with errorMsg
  • Loading branch information
madebyrogal committed Jul 4, 2024
1 parent 9b6b3ce commit 87e88de
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 73 deletions.
107 changes: 67 additions & 40 deletions cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ const (
reportHeartbeatMaxRetries = 30
)

var healthNotifiers = make(map[string]health.Notifier)

func main() {
// Set up context
ctx := signals.SetupSignalHandler()
Expand Down Expand Up @@ -262,8 +264,9 @@ func run(ctx context.Context) (err error) {
Index: commGroupIdx + 1,
}

scheduleBotNotifier := func(in bot.Bot) {
bots[fmt.Sprintf("%s-%s", commGroupName, in.IntegrationName())] = in
scheduleBotNotifier := func(in bot.Bot, key string) {
setHealthBotNotifier(in, key)
bots[key] = in
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter)
return in.Start(ctx)
Expand All @@ -272,72 +275,105 @@ func run(ctx context.Context) (err error) {

// Run bots
if commGroupCfg.SocketSlack.Enabled {
notifierKey := fmt.Sprintf("%s-%s", commGroupName, config.SocketSlackCommPlatformIntegration)
sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating SocketSlack bot", err)
errorMsg := fmt.Sprintf("while creating SocketSlack bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(sb, notifierKey)
}
scheduleBotNotifier(sb)
}

if commGroupCfg.CloudSlack.Enabled {
notifierKey := fmt.Sprintf("%s-%s", commGroupName, config.CloudSlackCommPlatformIntegration)
sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating CloudSlack bot", err)
errorMsg := fmt.Sprintf("while creating CloudSlack bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(sb, notifierKey)
}
scheduleBotNotifier(sb)
}

if commGroupCfg.Mattermost.Enabled {
notifierKey := fmt.Sprintf("%s-%s", commGroupName, config.MattermostCommPlatformIntegration)
mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Mattermost bot", err)
errorMsg := fmt.Sprintf("while creating Mattermost bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(mb, notifierKey)
}
scheduleBotNotifier(mb)
}

if commGroupCfg.CloudTeams.Enabled {
notifierKey := fmt.Sprintf("%s-%s", commGroupName, config.CloudTeamsCommPlatformIntegration)
ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating CloudTeams bot", err)
errorMsg := fmt.Sprintf("while creating CloudTeams bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(ctb, notifierKey)
}
scheduleBotNotifier(ctb)
}

if commGroupCfg.Discord.Enabled {
notifierKey := fmt.Sprintf("%s-%s", commGroupName, config.DiscordCommPlatformIntegration)
db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Discord bot", err)
errorMsg := fmt.Sprintf("while creating Discord bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(db, notifierKey)
}
scheduleBotNotifier(db)
}

// Run sinks
if commGroupCfg.Elasticsearch.Enabled {
notifierKey := fmt.Sprintf("%s-%d", config.ElasticsearchCommPlatformIntegration, commGroupIdx)
es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter)
if err != nil {
return reportFatalError("while creating Elasticsearch sink", err)
errorMsg := fmt.Sprintf("while creating Elasticsearch sink: %s", err.Error())
setHealthSinkNotifier(sink.NewSinkFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Errorf(errorMsg)
} else {
setHealthSinkNotifier(es, notifierKey)
sinkNotifiers = append(sinkNotifiers, es)
}
sinkNotifiers = append(sinkNotifiers, es)
}

if commGroupCfg.Webhook.Enabled {
notifierKey := fmt.Sprintf("%s-%d", config.WebhookCommPlatformIntegration, commGroupIdx)
wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter)
if err != nil {
return reportFatalError("while creating Webhook sink", err)
errorMsg := fmt.Sprintf("while creating Webhook sink: %s", err.Error())
setHealthSinkNotifier(sink.NewSinkFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Errorf(errorMsg)
} else {
setHealthSinkNotifier(wh, notifierKey)
sinkNotifiers = append(sinkNotifiers, wh)
}

sinkNotifiers = append(sinkNotifiers, wh)
}
if commGroupCfg.PagerDuty.Enabled {
notifierKey := fmt.Sprintf("%s-%d", config.PagerDutyCommPlatformIntegration, commGroupIdx)
pd, err := sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter)
if err != nil {
return reportFatalError("while creating PagerDuty sink", err)
errorMsg := fmt.Sprintf("while creating PagerDuty sink: %s", err.Error())
setHealthSinkNotifier(sink.NewSinkFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Errorf(errorMsg)
} else {
setHealthSinkNotifier(pd, notifierKey)
sinkNotifiers = append(sinkNotifiers, pd)
}

sinkNotifiers = append(sinkNotifiers, pd)
}
}
healthChecker.SetNotifiers(getHealthNotifiers(bots, sinkNotifiers))
healthChecker.SetNotifiers(healthNotifiers)

if conf.ConfigWatcher.Enabled {
restarter := reloader.NewRestarter(
Expand Down Expand Up @@ -496,12 +532,7 @@ func getAnalyticsReporter(disableAnalytics bool, logger logrus.FieldLogger) (ana
return nil, fmt.Errorf("while creating new Analytics Client: %w", err)
}

analyticsReporter := analytics.NewSegmentReporter(wrappedLogger, segmentCli)
if err != nil {
return nil, err
}

return analyticsReporter, nil
return analytics.NewSegmentReporter(wrappedLogger, segmentCli), nil
}

func getK8sClients(cfg *rest.Config) (dynamic.Interface, discovery.DiscoveryInterface, error) {
Expand Down Expand Up @@ -555,15 +586,15 @@ func sendHelp(ctx context.Context, s *storage.Help, clusterName string, executor

var sent []string

for key, notifier := range notifiers {
for key, notifierItem := range notifiers {
if alreadySentHelp[key] {
continue
}

help := interactive.NewHelpMessage(notifier.IntegrationName(), clusterName, executors).Build(true)
err := notifier.SendMessageToAll(ctx, help)
help := interactive.NewHelpMessage(notifierItem.IntegrationName(), clusterName, executors).Build(true)
err := notifierItem.SendMessageToAll(ctx, help)
if err != nil {
return fmt.Errorf("while sending help message for %s: %w", notifier.IntegrationName(), err)
return fmt.Errorf("while sending help message for %s: %w", notifierItem.IntegrationName(), err)
}
sent = append(sent, key)
}
Expand All @@ -585,14 +616,10 @@ func findVersions(cli *kubernetes.Clientset) (string, string, error) {
return fmt.Sprintf("K8s Server Version: %s\nBotkube version: %s", k8sVer.String(), botkubeVersion), k8sVer.String(), nil
}

func getHealthNotifiers(bots map[string]bot.Bot, sinks []notifier.Sink) map[string]health.Notifier {
notifiers := make(map[string]health.Notifier)
for key, botInstance := range bots {
notifiers[key] = botInstance
}
for key, sinkInstance := range sinks {
notifiers[fmt.Sprintf("%s-%d", sinkInstance.IntegrationName(), key)] = sinkInstance
}
func setHealthBotNotifier(bot bot.HealthNotifierBot, key string) {
healthNotifiers[key] = bot
}

return notifiers
func setHealthSinkNotifier(sink sink.HealthNotifierSink, key string) {
healthNotifiers[key] = sink
}
1 change: 1 addition & 0 deletions internal/health/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type PlatformStatus struct {
Status PlatformStatusMsg `json:"status,omitempty"`
Restarts string `json:"restarts,omitempty"`
Reason FailureReasonMsg `json:"reason,omitempty"`
ErrorMsg string `json:"error_msg,omitempty"`
}

// status defines bot agent status.
Expand Down
35 changes: 35 additions & 0 deletions pkg/bot/bot_failed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package bot

import (
"github.com/kubeshop/botkube/internal/health"
)

type HealthNotifierBot interface {
GetStatus() health.PlatformStatus
}

// FailedBot mock of bot, uses for healthChecker.
type FailedBot struct {
status health.PlatformStatusMsg
failureReason health.FailureReasonMsg
errorMsg string
}

// NewBotFailed creates a new FailedBot instance.
func NewBotFailed(failureReason health.FailureReasonMsg, errorMsg string) *FailedBot {
return &FailedBot{
status: health.StatusUnHealthy,
failureReason: failureReason,
errorMsg: errorMsg,
}
}

// GetStatus gets bot status.
func (b *FailedBot) GetStatus() health.PlatformStatus {
return health.PlatformStatus{
Status: b.status,
Restarts: "0/0",
Reason: b.failureReason,
ErrorMsg: b.errorMsg,
}
}
9 changes: 6 additions & 3 deletions pkg/bot/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Discord struct {
shutdownOnce sync.Once
status health.PlatformStatusMsg
failureReason health.FailureReasonMsg
errorMsg string
}

// discordMessage contains message details to execute command and send back the result.
Expand Down Expand Up @@ -124,7 +125,7 @@ func (b *Discord) Start(ctx context.Context) error {
// Open a websocket connection to Discord and begin listening.
err := b.api.Open()
if err != nil {
b.setFailureReason(health.FailureReasonConnectionError)
b.setFailureReason(health.FailureReasonConnectionError, fmt.Sprintf("while opening connection: %s", err.Error()))
return fmt.Errorf("while opening connection: %w", err)
}

Expand All @@ -134,7 +135,7 @@ func (b *Discord) Start(ctx context.Context) error {
}

b.log.Info("Botkube connected to Discord!")
b.setFailureReason("")
b.setFailureReason("", "")
go b.startMessageProcessor(ctx)
<-ctx.Done()
b.log.Info("Shutdown requested. Finishing...")
Expand Down Expand Up @@ -423,13 +424,14 @@ func discordError(err error, channel string) error {
return err
}

func (b *Discord) setFailureReason(reason health.FailureReasonMsg) {
func (b *Discord) setFailureReason(reason health.FailureReasonMsg, errorMsg string) {
if reason == "" {
b.status = health.StatusHealthy
} else {
b.status = health.StatusUnHealthy
}
b.failureReason = reason
b.errorMsg = errorMsg
}

// GetStatus gets bot status.
Expand All @@ -438,5 +440,6 @@ func (b *Discord) GetStatus() health.PlatformStatus {
Status: b.status,
Restarts: "0/0",
Reason: b.failureReason,
ErrorMsg: b.errorMsg,
}
}
11 changes: 7 additions & 4 deletions pkg/bot/mattermost.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Mattermost struct {
shutdownOnce sync.Once
status health.PlatformStatusMsg
failureReason health.FailureReasonMsg
errorMsg string
}

// mattermostMessage contains message details to execute command and send back the result
Expand Down Expand Up @@ -164,7 +165,7 @@ func (b *Mattermost) Start(ctx context.Context) error {
// Check connection to Mattermost server
err := b.checkServerConnection(ctx)
if err != nil {
b.setStatusReason(health.FailureReasonConnectionError)
b.setStatusReason(health.FailureReasonConnectionError, fmt.Sprintf("while pinging Mattermost server %q: %s", b.serverURL, err.Error()))
return fmt.Errorf("while pinging Mattermost server %q: %w", b.serverURL, err)
}

Expand All @@ -177,7 +178,7 @@ func (b *Mattermost) Start(ctx context.Context) error {
// For now, we are adding retry logic to reconnect to the server
// https://github.com/kubeshop/botkube/issues/201
b.log.Info("Botkube connected to Mattermost!")
b.setStatusReason("")
b.setStatusReason("", "")
go b.startMessageProcessor(ctx)

for {
Expand All @@ -190,7 +191,7 @@ func (b *Mattermost) Start(ctx context.Context) error {
var appErr error
b.wsClient, appErr = model.NewWebSocketClient4(b.webSocketURL, b.apiClient.AuthToken)
if appErr != nil {
b.setStatusReason(health.FailureReasonConnectionError)
b.setStatusReason(health.FailureReasonConnectionError, fmt.Sprintf("while creating WebSocket connection: %s", appErr.Error()))
return fmt.Errorf("while creating WebSocket connection: %w", appErr)
}
b.listen(ctx)
Expand Down Expand Up @@ -593,13 +594,14 @@ func postFromEvent(event *model.WebSocketEvent) (*model.Post, error) {
return post, nil
}

func (b *Mattermost) setStatusReason(reason health.FailureReasonMsg) {
func (b *Mattermost) setStatusReason(reason health.FailureReasonMsg, errorMsg string) {
if reason == "" {
b.status = health.StatusHealthy
} else {
b.status = health.StatusUnHealthy
}
b.failureReason = reason
b.errorMsg = errorMsg
}

// GetStatus gets bot status.
Expand All @@ -608,5 +610,6 @@ func (b *Mattermost) GetStatus() health.PlatformStatus {
Status: b.status,
Restarts: "0/0",
Reason: b.failureReason,
ErrorMsg: b.errorMsg,
}
}
Loading

0 comments on commit 87e88de

Please sign in to comment.