Skip to content

Commit

Permalink
feat: directly read from tailOptions (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
derdanne authored Jan 24, 2023
1 parent 5948fda commit 4ac0faa
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 39 deletions.
32 changes: 2 additions & 30 deletions stern/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -31,6 +30,8 @@ import (

// Run starts the main run loop
func Run(ctx context.Context, config *Config) error {
var gelfWriter *gelf.TCPWriter

rand.Seed(time.Now().UnixNano())
clientTimeoutSeconds := int64(config.ClientTimeout)
clientConfig := kubernetes.NewClientConfig(config.KubeConfig, config.ContextName)
Expand All @@ -39,10 +40,6 @@ func Run(ctx context.Context, config *Config) error {
return err
}

var writerErr error
var gelfWriter *gelf.TCPWriter
var sleep time.Duration = time.Second * 10

if config.ExitAfter > 0 {
go func() {
time.AfterFunc(config.ExitAfter, func() {
Expand All @@ -52,31 +49,6 @@ func Run(ctx context.Context, config *Config) error {
}()
}

if config.GraylogServer != "" && config.GraylogTransport == "tcp" {
for {
gelfWriter, writerErr = gelf.NewTCPWriter(config.GraylogServer + ":" + config.GraylogPort)
if writerErr != nil {
if config.GraylogRetries--; config.GraylogRetries > 0 {
// Add some randomness to prevent creating a Thundering Herd
jitter := time.Duration(rand.Int63n(int64(sleep)))
sleep = (sleep + jitter/2)
timeNow := time.Now().Format("2006/01/02 15:04:05")
os.Stderr.WriteString(fmt.Sprintf(timeNow+" Could not connect to Graylog Server, next retry in %s. "+strconv.Itoa(config.GraylogRetries)+" retries left. \n", sleep.Round(time.Second)))
time.Sleep(sleep)
gelfWriter = nil
writerErr = nil
continue
} else {
return errors.Wrap(writerErr, "setup gelf writer failed")
}
} else {
break
}
}
gelfWriter.MaxReconnect = 30
gelfWriter.ReconnectDelay = 5
}

var namespace string
// A specific namespace is ignored if all-namespaces is provided
if config.AllNamespaces {
Expand Down
51 changes: 42 additions & 9 deletions stern/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"fmt"
"hash/fnv"
"io/ioutil"
"math/rand"
"net/http"
"os"
"regexp"
"strconv"
"text/template"
"time"

Expand Down Expand Up @@ -141,11 +143,11 @@ func (t *Tail) buildMessage(full string, level int32, customExtras map[string]in
}

// sendGelfMessageHttp sends a GELF message via HTTP to the graylog server
func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, retries int, transport string, port string, insecure bool) error {
func (t *Tail) sendGelfMessageHttp(gm *gelf.Message) error {
var httpClient = &http.Client{Timeout: time.Duration(10) * time.Second}
var tlsConfig = &tls.Config{}

if transport == "https" {
if t.Options.GraylogTransport == "https" {
caCert, err := ioutil.ReadFile("/etc/ssl/certs/ca-certificates.crt")
if err != nil {
panic(err)
Expand All @@ -155,7 +157,7 @@ func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, retries int, transport stri

tlsConfig = &tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: insecure,
InsecureSkipVerify: t.Options.GraylogInsecure,
}

httpClient = &http.Client{
Expand All @@ -169,7 +171,7 @@ func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, retries int, transport stri
}
}

if transport == "http" {
if t.Options.GraylogTransport == "http" {
httpClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
Expand All @@ -180,7 +182,7 @@ func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, retries int, transport stri
}
}

url := transport + "://" + t.Options.GraylogServer + ":" + port + "/gelf"
url := t.Options.GraylogTransport + "://" + t.Options.GraylogServer + ":" + t.Options.GraylogPort + "/gelf"

data, err := json.Marshal(gm)
if err != nil {
Expand Down Expand Up @@ -218,7 +220,7 @@ func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, retries int, transport stri
}

expBackoff := backoff.NewExponentialBackOff()
expBackoff.MaxElapsedTime = time.Duration(retries) * time.Second
expBackoff.MaxElapsedTime = time.Duration(t.Options.GraylogRetries) * time.Second

err = backoff.RetryNotify(operation, expBackoff, notify)
if err != nil {
Expand All @@ -228,7 +230,7 @@ func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, retries int, transport stri
return nil
}

func (t *Tail) sendGelfMessageTcp(gm *gelf.Message, gelfWriter *gelf.TCPWriter, port string) error {
func (t *Tail) sendGelfMessageTcp(gm *gelf.Message, gelfWriter *gelf.TCPWriter) error {

_, err := json.Marshal(gm)
if err != nil {
Expand All @@ -247,6 +249,37 @@ func (t *Tail) sendGelfMessageTcp(gm *gelf.Message, gelfWriter *gelf.TCPWriter,
func (t *Tail) Start(ctx context.Context, i v1.PodInterface, gelfWriter *gelf.TCPWriter, logC chan<- string) {
t.podColor, t.containerColor = determineColor(t.PodName)

if t.Options.GraylogServer != "" && t.Options.GraylogTransport == "tcp" {

var writerErr error
var sleep time.Duration = time.Second * 10

for {
gelfWriter, writerErr = gelf.NewTCPWriter(t.Options.GraylogServer + ":" + t.Options.GraylogPort)
if writerErr != nil {
if t.Options.GraylogRetries--; t.Options.GraylogRetries > 0 {
// Add some randomness to prevent creating a Thundering Herd
jitter := time.Duration(rand.Int63n(int64(sleep)))
sleep = (sleep + jitter/2)
timeNow := time.Now().Format("2006/01/02 15:04:05")
os.Stderr.WriteString(fmt.Sprintf(timeNow+" Could not connect to Graylog Server, next retry in %s. "+strconv.Itoa(t.Options.GraylogRetries)+" retries left. \n", sleep.Round(time.Second)))
time.Sleep(sleep)
gelfWriter = nil
writerErr = nil
continue
} else {
os.Stderr.WriteString(fmt.Sprintf("Setup GELF TCP writer failed: %s", writerErr.Error()))
return
}
} else {
break
}
}
gelfWriter.MaxReconnect = 30
gelfWriter.ReconnectDelay = 5
}


// start tailing logs
go func() {
defer close(t.closed)
Expand Down Expand Up @@ -366,11 +399,11 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface, gelfWriter *gelf.TC
gm := t.buildMessage(logLine, 3, customExtras, host)

if t.Options.GraylogTransport != "tcp" {
if err := t.sendGelfMessageHttp(gm, t.Options.GraylogRetries, t.Options.GraylogTransport, t.Options.GraylogPort, t.Options.GraylogInsecure); err != nil {
if err := t.sendGelfMessageHttp(gm); err != nil {
os.Stderr.WriteString(fmt.Sprintf("Error sending GELF message: %s", err))
}
} else {
if err := t.sendGelfMessageTcp(gm, gelfWriter, t.Options.GraylogPort); err != nil {
if err := t.sendGelfMessageTcp(gm, gelfWriter); err != nil {
os.Stderr.WriteString(fmt.Sprintf("Error sending GELF message: %s", err))
}
}
Expand Down

0 comments on commit 4ac0faa

Please sign in to comment.