diff --git a/stern/main.go b/stern/main.go index 0f2ac8ea..f6402073 100644 --- a/stern/main.go +++ b/stern/main.go @@ -19,7 +19,6 @@ import ( "fmt" "math/rand" "os" - "strconv" "strings" "sync" "time" @@ -31,6 +30,8 @@ import ( // Run starts the main run loop func Run(ctx context.Context, config *Config) error { + var gelfWriter *gelf.TCPWriter + rand.Seed(time.Now().UnixNano()) clientTimeoutSeconds := int64(config.ClientTimeout) clientConfig := kubernetes.NewClientConfig(config.KubeConfig, config.ContextName) @@ -39,10 +40,6 @@ func Run(ctx context.Context, config *Config) error { return err } - var writerErr error - var gelfWriter *gelf.TCPWriter - var sleep time.Duration = time.Second * 10 - if config.ExitAfter > 0 { go func() { time.AfterFunc(config.ExitAfter, func() { @@ -52,31 +49,6 @@ func Run(ctx context.Context, config *Config) error { }() } - if config.GraylogServer != "" && config.GraylogTransport == "tcp" { - for { - gelfWriter, writerErr = gelf.NewTCPWriter(config.GraylogServer + ":" + config.GraylogPort) - if writerErr != nil { - if config.GraylogRetries--; config.GraylogRetries > 0 { - // Add some randomness to prevent creating a Thundering Herd - jitter := time.Duration(rand.Int63n(int64(sleep))) - sleep = (sleep + jitter/2) - timeNow := time.Now().Format("2006/01/02 15:04:05") - os.Stderr.WriteString(fmt.Sprintf(timeNow+" Could not connect to Graylog Server, next retry in %s. "+strconv.Itoa(config.GraylogRetries)+" retries left. \n", sleep.Round(time.Second))) - time.Sleep(sleep) - gelfWriter = nil - writerErr = nil - continue - } else { - return errors.Wrap(writerErr, "setup gelf writer failed") - } - } else { - break - } - } - gelfWriter.MaxReconnect = 30 - gelfWriter.ReconnectDelay = 5 - } - var namespace string // A specific namespace is ignored if all-namespaces is provided if config.AllNamespaces { diff --git a/stern/tail.go b/stern/tail.go index 909423df..e74beeff 100644 --- a/stern/tail.go +++ b/stern/tail.go @@ -10,9 +10,11 @@ import ( "fmt" "hash/fnv" "io/ioutil" + "math/rand" "net/http" "os" "regexp" + "strconv" "text/template" "time" @@ -141,11 +143,11 @@ func (t *Tail) buildMessage(full string, level int32, customExtras map[string]in } // sendGelfMessageHttp sends a GELF message via HTTP to the graylog server -func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, retries int, transport string, port string, insecure bool) error { +func (t *Tail) sendGelfMessageHttp(gm *gelf.Message) error { var httpClient = &http.Client{Timeout: time.Duration(10) * time.Second} var tlsConfig = &tls.Config{} - if transport == "https" { + if t.Options.GraylogTransport == "https" { caCert, err := ioutil.ReadFile("/etc/ssl/certs/ca-certificates.crt") if err != nil { panic(err) @@ -155,7 +157,7 @@ func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, retries int, transport stri tlsConfig = &tls.Config{ RootCAs: caCertPool, - InsecureSkipVerify: insecure, + InsecureSkipVerify: t.Options.GraylogInsecure, } httpClient = &http.Client{ @@ -169,7 +171,7 @@ func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, retries int, transport stri } } - if transport == "http" { + if t.Options.GraylogTransport == "http" { httpClient = &http.Client{ Transport: &http.Transport{ Proxy: http.ProxyFromEnvironment, @@ -180,7 +182,7 @@ func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, retries int, transport stri } } - url := transport + "://" + t.Options.GraylogServer + ":" + port + "/gelf" + url := t.Options.GraylogTransport + "://" + t.Options.GraylogServer + ":" + t.Options.GraylogPort + "/gelf" data, err := json.Marshal(gm) if err != nil { @@ -218,7 +220,7 @@ func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, retries int, transport stri } expBackoff := backoff.NewExponentialBackOff() - expBackoff.MaxElapsedTime = time.Duration(retries) * time.Second + expBackoff.MaxElapsedTime = time.Duration(t.Options.GraylogRetries) * time.Second err = backoff.RetryNotify(operation, expBackoff, notify) if err != nil { @@ -228,7 +230,7 @@ func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, retries int, transport stri return nil } -func (t *Tail) sendGelfMessageTcp(gm *gelf.Message, gelfWriter *gelf.TCPWriter, port string) error { +func (t *Tail) sendGelfMessageTcp(gm *gelf.Message, gelfWriter *gelf.TCPWriter) error { _, err := json.Marshal(gm) if err != nil { @@ -247,6 +249,37 @@ func (t *Tail) sendGelfMessageTcp(gm *gelf.Message, gelfWriter *gelf.TCPWriter, func (t *Tail) Start(ctx context.Context, i v1.PodInterface, gelfWriter *gelf.TCPWriter, logC chan<- string) { t.podColor, t.containerColor = determineColor(t.PodName) + if t.Options.GraylogServer != "" && t.Options.GraylogTransport == "tcp" { + + var writerErr error + var sleep time.Duration = time.Second * 10 + + for { + gelfWriter, writerErr = gelf.NewTCPWriter(t.Options.GraylogServer + ":" + t.Options.GraylogPort) + if writerErr != nil { + if t.Options.GraylogRetries--; t.Options.GraylogRetries > 0 { + // Add some randomness to prevent creating a Thundering Herd + jitter := time.Duration(rand.Int63n(int64(sleep))) + sleep = (sleep + jitter/2) + timeNow := time.Now().Format("2006/01/02 15:04:05") + os.Stderr.WriteString(fmt.Sprintf(timeNow+" Could not connect to Graylog Server, next retry in %s. "+strconv.Itoa(t.Options.GraylogRetries)+" retries left. \n", sleep.Round(time.Second))) + time.Sleep(sleep) + gelfWriter = nil + writerErr = nil + continue + } else { + os.Stderr.WriteString(fmt.Sprintf("Setup GELF TCP writer failed: %s", writerErr.Error())) + return + } + } else { + break + } + } + gelfWriter.MaxReconnect = 30 + gelfWriter.ReconnectDelay = 5 + } + + // start tailing logs go func() { defer close(t.closed) @@ -366,11 +399,11 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface, gelfWriter *gelf.TC gm := t.buildMessage(logLine, 3, customExtras, host) if t.Options.GraylogTransport != "tcp" { - if err := t.sendGelfMessageHttp(gm, t.Options.GraylogRetries, t.Options.GraylogTransport, t.Options.GraylogPort, t.Options.GraylogInsecure); err != nil { + if err := t.sendGelfMessageHttp(gm); err != nil { os.Stderr.WriteString(fmt.Sprintf("Error sending GELF message: %s", err)) } } else { - if err := t.sendGelfMessageTcp(gm, gelfWriter, t.Options.GraylogPort); err != nil { + if err := t.sendGelfMessageTcp(gm, gelfWriter); err != nil { os.Stderr.WriteString(fmt.Sprintf("Error sending GELF message: %s", err)) } }