Skip to content

Commit

Permalink
feature/optimize (#13)
Browse files Browse the repository at this point in the history
* feat: move print stdout to separate functions
* feat: configure k8s client qps and burst
* fix: remove race condition with duplicate closing
* fix: correctly print log following to stdout
* docs: add new args
* feat: bump version
  • Loading branch information
derdanne authored Feb 1, 2023
1 parent 51d103c commit 9a812a2
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 62 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ The `pod` query is a regular expression so you could provide `"web-\w"` to tail
| `--graylog-cacerts` | `/etc/ssl/certs/ca-certificates.crt` | Specify CA Certs file for Graylog Server https endpoint |
| `--graylog-insecure` | `false` | Do not verify Graylog server certificate |
| `--client-timeout` | `3600` | Specify Kubernetes watch client timeout in seconds |
| `--client-qps` | `100` | Specify Kubernetes client qps |
| `--client-burst` | `200` | Specify Kubernetes client burst |
| `--exit-after` | | Specify after how much seconds the program will exit (time duration like 24h, 30m, or 2h). Will not exit if unspecified. |
| `template` | | Template to use for log lines, leave empty to use --output flag |

Expand Down
10 changes: 9 additions & 1 deletion cmd/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/fatih/color"
)

const version = "2.5.0"
const version = "2.6.0"

type Options struct {
container string
Expand Down Expand Up @@ -66,6 +66,8 @@ type Options struct {
graylogCacerts string
graylogInsecure bool
clientTimeout int64
clientQPS float32
clientBurst int
exitAfter time.Duration
}

Expand All @@ -83,6 +85,8 @@ var opts = &Options{
graylogCacerts: "/etc/ssl/certs/ca-certificates.crt",
graylogInsecure: false,
clientTimeout: 3600,
clientQPS: 100,
clientBurst: 200,
exitAfter: 0,
}

Expand Down Expand Up @@ -120,6 +124,8 @@ func Run() {
cmd.Flags().StringVar(&opts.graylogCacerts, "graylog-cacerts", opts.graylogCacerts, "Specify CA Certs file for Graylog Server https endpoint")
cmd.Flags().BoolVar(&opts.graylogInsecure, "graylog-insecure", opts.graylogInsecure, "Do not verify Graylog server certificate")
cmd.Flags().Int64VarP(&opts.clientTimeout, "client-timeout", "T", opts.clientTimeout, "Specify Kubernetes watch client timeout in seconds")
cmd.Flags().Float32Var(&opts.clientQPS, "client-qps", opts.clientQPS, "Specify Kubernetes client QPS")
cmd.Flags().IntVar(&opts.clientBurst, "client-burst", opts.clientBurst, "Specify Kubernetes client Burst")
cmd.Flags().DurationVarP(&opts.exitAfter, "exit-after", "X", opts.exitAfter, "Specify after how much time the program will exit. Default is not to exit")

// Specify custom bash completion function
Expand Down Expand Up @@ -331,6 +337,8 @@ func parseConfig(args []string) (*stern.Config, error) {
GraylogCacerts: opts.graylogCacerts,
GraylogInsecure: opts.graylogInsecure,
ClientTimeout: opts.clientTimeout,
ClientQPS: opts.clientQPS,
ClientBurst: opts.clientBurst,
ExitAfter: opts.exitAfter,
}, nil
}
Expand Down
7 changes: 5 additions & 2 deletions kubernetes/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ func NewClientConfig(configPath string, contextName string) clientcmd.ClientConf
}

// NewClientSet returns a new Kubernetes client for a client config
func NewClientSet(clientConfig clientcmd.ClientConfig) (*kubernetes.Clientset, error) {
func NewClientSet(clientConfig clientcmd.ClientConfig, clientQPS float32, clientBurst int) (*kubernetes.Clientset, error) {
c, err := clientConfig.ClientConfig()

c.QPS = clientQPS
c.Burst = clientBurst
c.DisableCompression = false
c.UserAgent = "Go-http-client/2.0: stern-logs"
if err != nil {
return nil, errors.Wrap(err, "failed to get client config")
}
Expand Down
2 changes: 2 additions & 0 deletions stern/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,7 @@ type Config struct {
GraylogCacerts string
GraylogInsecure bool
ClientTimeout int64
ClientQPS float32
ClientBurst int
ExitAfter time.Duration
}
9 changes: 5 additions & 4 deletions stern/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func Run(ctx context.Context, config *Config) error {
rand.Seed(time.Now().UnixNano())
clientTimeoutSeconds := int64(config.ClientTimeout)
clientConfig := kubernetes.NewClientConfig(config.KubeConfig, config.ContextName)
clientset, err := kubernetes.NewClientSet(clientConfig)
clientset, err := kubernetes.NewClientSet(clientConfig, config.ClientQPS, config.ClientBurst)
if err != nil {
return err
}
Expand Down Expand Up @@ -227,9 +227,9 @@ OUTER:
if existing != nil {
if existing.Active {
continue
} else { // cleanup failed tail to restart
} else {
tailsMutex.Lock()
// tails[id].Close()
tails[id].Close()
delete(tails, id)
tailsMutex.Unlock()
}
Expand Down Expand Up @@ -270,7 +270,8 @@ OUTER:
continue
}
tailsMutex.Lock()
// tails[id].Close()
tails[id].Close()
tails[id].PrintClose()
delete(tails, id)
tailsMutex.Unlock()
}
Expand Down
102 changes: 47 additions & 55 deletions stern/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ func (t *Tail) buildMessage(full string, level int32, customExtras map[string]in
timestamp, err := time.Parse(time.RFC3339Nano, full[0:30])
timeunix = float64(timestamp.UnixNano()/int64(time.Millisecond)) / 1000.0
if err != nil {
// fmt.Fprintf(os.Stderr, full)
// os.Stderr.WriteString(fmt.Sprintf("time parse failed: " + full[0:30] + "\n"))
timeunix = float64(time.Now().Unix()/int64(time.Millisecond)) / 1000.0
}

Expand Down Expand Up @@ -166,7 +164,7 @@ func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, httpClient *http.Client, ca
}

if resp.StatusCode != 202 {
return fmt.Errorf("GELF HTTP message delivery failed with status: %v", resp.Status)
return fmt.Errorf("GELF HTTP message delivery failed with status: %v. Message was: %s", resp.Status, gm.Short)
}

return nil
Expand Down Expand Up @@ -208,8 +206,6 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface, gelfWriter *gelf.TC

// start tailing logs
go func() {
defer close(t.closed)

timestamps := t.Options.Timestamps
if t.Options.GraylogServer != "" {
timestamps = true
Expand All @@ -225,75 +221,46 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface, gelfWriter *gelf.TC

stream, err := req.Stream(ctx)
if err != nil {
//fmt.Println(errors.Wrapf(err, "Error opening stream to %s/%s: %s\n", t.Namespace, t.PodName, t.ContainerName))
g := color.New(color.FgHiYellow, color.Bold).SprintFunc()
p := t.podColor.SprintFunc()
c := t.containerColor.SprintFunc()
if t.Options.Namespace {
logC <- fmt.Sprintf("%s %s %s > %s: %s\n", g("~"), p(t.Namespace), p(t.PodName), c(t.ContainerName), err)
} else {
logC <- fmt.Sprintf("%s %s > %s: %s\n", g("~"), p(t.PodName), c(t.ContainerName), err)
}
t.PrintStreamNotReady(err)
t.Active = false
return
}

g := color.New(color.FgHiGreen, color.Bold).SprintFunc()
p := t.podColor.SprintFunc()
c := t.containerColor.SprintFunc()
if t.Options.Namespace {
logC <- fmt.Sprintf("%s %s %s > %s\n", g("+"), p(t.Namespace), p(t.PodName), c(t.ContainerName))
} else {
logC <- fmt.Sprintf("%s %s > %s\n", g("+"), p(t.PodName), c(t.ContainerName))
}
t.PrintOpen()
defer stream.Close()

go func() {
<-t.closed
r := color.New(color.FgHiRed, color.Bold).SprintFunc()
p := t.podColor.SprintFunc()
if t.Options.Namespace {
logC <- fmt.Sprintf("%s %s %s\n", r("-"), p(t.Namespace), p(t.PodName))
} else {
logC <- fmt.Sprintf("%s %s\n", r("-"), p(t.PodName))
}
stream.Close()
t.Active = false
}()

reader := bufio.NewReader(stream)

OUTER:
for {
select {
case <-ctx.Done():
return
default:
}

line, err := reader.ReadBytes('\n')
if err != nil {
return
}
logLine := string(line)

// print log message
matched := true
for _, re := range t.Options.Exclude {
if re.MatchString(logLine) {
matched = false
return
for _, rex := range t.Options.Exclude {
if rex.MatchString(logLine) {
continue OUTER
}
}

for _, re := range t.Options.Include {
if re.MatchString(logLine) {
matched = true
return
if len(t.Options.Include) != 0 {
matches := false
for _, rin := range t.Options.Include {
if rin.MatchString(logLine) {
matches = true
break
}
}
if !matches {
continue OUTER
}
}

if !matched {
return
}

if t.Options.GraylogServer != "" {
customExtras := map[string]interface{}{
"Namespace": t.Namespace,
Expand All @@ -320,11 +287,11 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface, gelfWriter *gelf.TC

if t.Options.GraylogTransport != "tcp" {
if err := t.sendGelfMessageHttp(gm, httpClient, caCertPool); err != nil {
os.Stderr.WriteString(fmt.Sprintf("Error sending GELF message: %s", err))
os.Stderr.WriteString(fmt.Sprintf("Error sending GELF [%s: %s - %s] message: %s", t.Namespace, t.PodName, t.ContainerName, err))
}
} else {
if err := t.sendGelfMessageTcp(gm, gelfWriter); err != nil {
os.Stderr.WriteString(fmt.Sprintf("Error sending GELF message: %s", err))
os.Stderr.WriteString(fmt.Sprintf("Error sending GELF [%s: %s - %s] message: %s", t.Namespace, t.PodName, t.ContainerName, err))
}
}
}
Expand Down Expand Up @@ -360,14 +327,39 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface, gelfWriter *gelf.TC

// Close stops tailing
func (t *Tail) Close() {
close(t.closed)
}

func (t *Tail) PrintClose() {
r := color.New(color.FgHiRed, color.Bold).SprintFunc()
p := t.podColor.SprintFunc()
if t.Options.Namespace {
fmt.Fprintf(os.Stderr, "%s %s %s\n", r("-"), p(t.Namespace), p(t.PodName))
} else {
fmt.Fprintf(os.Stderr, "%s %s\n", r("-"), p(t.PodName))
}
close(t.closed)
}

func (t *Tail) PrintOpen() {
g := color.New(color.FgHiGreen, color.Bold).SprintFunc()
p := t.podColor.SprintFunc()
c := t.containerColor.SprintFunc()
if t.Options.Namespace {
fmt.Fprintf(os.Stderr, "%s %s %s > %s\n", g("+"), p(t.Namespace), p(t.PodName), c(t.ContainerName))
} else {
fmt.Fprintf(os.Stderr, "%s %s > %s\n", g("+"), p(t.PodName), c(t.ContainerName))
}
}

func (t *Tail) PrintStreamNotReady(err error) {
g := color.New(color.FgHiYellow, color.Bold).SprintFunc()
p := t.podColor.SprintFunc()
c := t.containerColor.SprintFunc()
if t.Options.Namespace {
fmt.Fprintf(os.Stderr, "%s %s %s > %s: %s\n", g("~"), p(t.Namespace), p(t.PodName), c(t.ContainerName), err)
} else {
fmt.Fprintf(os.Stderr, "%s %s > %s: %s\n", g("~"), p(t.PodName), c(t.ContainerName), err)
}
}

// Log is the object which will be used together with the template to generate
Expand Down

0 comments on commit 9a812a2

Please sign in to comment.