From 9a812a2cdee797dffbfb0d8eaab2cbf401d2b59e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Klockenk=C3=A4mper?= Date: Wed, 1 Feb 2023 21:32:10 +0100 Subject: [PATCH] feature/optimize (#13) * feat: move print stdout to separate functions * feat: configure k8s client qps and burst * fix: remove race condition with duplicate closing * fix: correctly print log following to stdout * docs: add new args * feat: bump version --- README.md | 2 + cmd/cli.go | 10 +++- kubernetes/clientset.go | 7 ++- stern/config.go | 2 + stern/main.go | 9 ++-- stern/tail.go | 102 ++++++++++++++++++---------------------- 6 files changed, 70 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index e2b59b41..8271b484 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,8 @@ The `pod` query is a regular expression so you could provide `"web-\w"` to tail | `--graylog-cacerts` | `/etc/ssl/certs/ca-certificates.crt` | Specify CA Certs file for Graylog Server https endpoint | | `--graylog-insecure` | `false` | Do not verify Graylog server certificate | | `--client-timeout` | `3600` | Specify Kubernetes watch client timeout in seconds | +| `--client-qps` | `100` | Specify Kubernetes client qps | +| `--client-burst` | `200` | Specify Kubernetes client burst | | `--exit-after` | | Specify after how much seconds the program will exit (time duration like 24h, 30m, or 2h). Will not exit if unspecified. | | `template` | | Template to use for log lines, leave empty to use --output flag | diff --git a/cmd/cli.go b/cmd/cli.go index 156239e7..1141cb3d 100644 --- a/cmd/cli.go +++ b/cmd/cli.go @@ -36,7 +36,7 @@ import ( "github.com/fatih/color" ) -const version = "2.5.0" +const version = "2.6.0" type Options struct { container string @@ -66,6 +66,8 @@ type Options struct { graylogCacerts string graylogInsecure bool clientTimeout int64 + clientQPS float32 + clientBurst int exitAfter time.Duration } @@ -83,6 +85,8 @@ var opts = &Options{ graylogCacerts: "/etc/ssl/certs/ca-certificates.crt", graylogInsecure: false, clientTimeout: 3600, + clientQPS: 100, + clientBurst: 200, exitAfter: 0, } @@ -120,6 +124,8 @@ func Run() { cmd.Flags().StringVar(&opts.graylogCacerts, "graylog-cacerts", opts.graylogCacerts, "Specify CA Certs file for Graylog Server https endpoint") cmd.Flags().BoolVar(&opts.graylogInsecure, "graylog-insecure", opts.graylogInsecure, "Do not verify Graylog server certificate") cmd.Flags().Int64VarP(&opts.clientTimeout, "client-timeout", "T", opts.clientTimeout, "Specify Kubernetes watch client timeout in seconds") + cmd.Flags().Float32Var(&opts.clientQPS, "client-qps", opts.clientQPS, "Specify Kubernetes client QPS") + cmd.Flags().IntVar(&opts.clientBurst, "client-burst", opts.clientBurst, "Specify Kubernetes client Burst") cmd.Flags().DurationVarP(&opts.exitAfter, "exit-after", "X", opts.exitAfter, "Specify after how much time the program will exit. Default is not to exit") // Specify custom bash completion function @@ -331,6 +337,8 @@ func parseConfig(args []string) (*stern.Config, error) { GraylogCacerts: opts.graylogCacerts, GraylogInsecure: opts.graylogInsecure, ClientTimeout: opts.clientTimeout, + ClientQPS: opts.clientQPS, + ClientBurst: opts.clientBurst, ExitAfter: opts.exitAfter, }, nil } diff --git a/kubernetes/clientset.go b/kubernetes/clientset.go index 974cf2a8..b8925605 100644 --- a/kubernetes/clientset.go +++ b/kubernetes/clientset.go @@ -45,9 +45,12 @@ func NewClientConfig(configPath string, contextName string) clientcmd.ClientConf } // NewClientSet returns a new Kubernetes client for a client config -func NewClientSet(clientConfig clientcmd.ClientConfig) (*kubernetes.Clientset, error) { +func NewClientSet(clientConfig clientcmd.ClientConfig, clientQPS float32, clientBurst int) (*kubernetes.Clientset, error) { c, err := clientConfig.ClientConfig() - + c.QPS = clientQPS + c.Burst = clientBurst + c.DisableCompression = false + c.UserAgent = "Go-http-client/2.0: stern-logs" if err != nil { return nil, errors.Wrap(err, "failed to get client config") } diff --git a/stern/config.go b/stern/config.go index 12431480..7c6c3b86 100644 --- a/stern/config.go +++ b/stern/config.go @@ -48,5 +48,7 @@ type Config struct { GraylogCacerts string GraylogInsecure bool ClientTimeout int64 + ClientQPS float32 + ClientBurst int ExitAfter time.Duration } diff --git a/stern/main.go b/stern/main.go index 136f310d..4178cf6d 100644 --- a/stern/main.go +++ b/stern/main.go @@ -123,7 +123,7 @@ func Run(ctx context.Context, config *Config) error { rand.Seed(time.Now().UnixNano()) clientTimeoutSeconds := int64(config.ClientTimeout) clientConfig := kubernetes.NewClientConfig(config.KubeConfig, config.ContextName) - clientset, err := kubernetes.NewClientSet(clientConfig) + clientset, err := kubernetes.NewClientSet(clientConfig, config.ClientQPS, config.ClientBurst) if err != nil { return err } @@ -227,9 +227,9 @@ OUTER: if existing != nil { if existing.Active { continue - } else { // cleanup failed tail to restart + } else { tailsMutex.Lock() - // tails[id].Close() + tails[id].Close() delete(tails, id) tailsMutex.Unlock() } @@ -270,7 +270,8 @@ OUTER: continue } tailsMutex.Lock() - // tails[id].Close() + tails[id].Close() + tails[id].PrintClose() delete(tails, id) tailsMutex.Unlock() } diff --git a/stern/tail.go b/stern/tail.go index 29eb6e39..b0aaf96a 100644 --- a/stern/tail.go +++ b/stern/tail.go @@ -111,8 +111,6 @@ func (t *Tail) buildMessage(full string, level int32, customExtras map[string]in timestamp, err := time.Parse(time.RFC3339Nano, full[0:30]) timeunix = float64(timestamp.UnixNano()/int64(time.Millisecond)) / 1000.0 if err != nil { - // fmt.Fprintf(os.Stderr, full) - // os.Stderr.WriteString(fmt.Sprintf("time parse failed: " + full[0:30] + "\n")) timeunix = float64(time.Now().Unix()/int64(time.Millisecond)) / 1000.0 } @@ -166,7 +164,7 @@ func (t *Tail) sendGelfMessageHttp(gm *gelf.Message, httpClient *http.Client, ca } if resp.StatusCode != 202 { - return fmt.Errorf("GELF HTTP message delivery failed with status: %v", resp.Status) + return fmt.Errorf("GELF HTTP message delivery failed with status: %v. Message was: %s", resp.Status, gm.Short) } return nil @@ -208,8 +206,6 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface, gelfWriter *gelf.TC // start tailing logs go func() { - defer close(t.closed) - timestamps := t.Options.Timestamps if t.Options.GraylogServer != "" { timestamps = true @@ -225,75 +221,46 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface, gelfWriter *gelf.TC stream, err := req.Stream(ctx) if err != nil { - //fmt.Println(errors.Wrapf(err, "Error opening stream to %s/%s: %s\n", t.Namespace, t.PodName, t.ContainerName)) - g := color.New(color.FgHiYellow, color.Bold).SprintFunc() - p := t.podColor.SprintFunc() - c := t.containerColor.SprintFunc() - if t.Options.Namespace { - logC <- fmt.Sprintf("%s %s %s > %s: %s\n", g("~"), p(t.Namespace), p(t.PodName), c(t.ContainerName), err) - } else { - logC <- fmt.Sprintf("%s %s > %s: %s\n", g("~"), p(t.PodName), c(t.ContainerName), err) - } + t.PrintStreamNotReady(err) t.Active = false return } - - g := color.New(color.FgHiGreen, color.Bold).SprintFunc() - p := t.podColor.SprintFunc() - c := t.containerColor.SprintFunc() - if t.Options.Namespace { - logC <- fmt.Sprintf("%s %s %s > %s\n", g("+"), p(t.Namespace), p(t.PodName), c(t.ContainerName)) - } else { - logC <- fmt.Sprintf("%s %s > %s\n", g("+"), p(t.PodName), c(t.ContainerName)) - } + t.PrintOpen() + defer stream.Close() go func() { <-t.closed - r := color.New(color.FgHiRed, color.Bold).SprintFunc() - p := t.podColor.SprintFunc() - if t.Options.Namespace { - logC <- fmt.Sprintf("%s %s %s\n", r("-"), p(t.Namespace), p(t.PodName)) - } else { - logC <- fmt.Sprintf("%s %s\n", r("-"), p(t.PodName)) - } + stream.Close() t.Active = false }() - reader := bufio.NewReader(stream) + OUTER: for { - select { - case <-ctx.Done(): - return - default: - } - line, err := reader.ReadBytes('\n') if err != nil { return } logLine := string(line) - // print log message - matched := true - for _, re := range t.Options.Exclude { - if re.MatchString(logLine) { - matched = false - return + for _, rex := range t.Options.Exclude { + if rex.MatchString(logLine) { + continue OUTER } } - for _, re := range t.Options.Include { - if re.MatchString(logLine) { - matched = true - return + if len(t.Options.Include) != 0 { + matches := false + for _, rin := range t.Options.Include { + if rin.MatchString(logLine) { + matches = true + break + } + } + if !matches { + continue OUTER } } - - if !matched { - return - } - if t.Options.GraylogServer != "" { customExtras := map[string]interface{}{ "Namespace": t.Namespace, @@ -320,11 +287,11 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface, gelfWriter *gelf.TC if t.Options.GraylogTransport != "tcp" { if err := t.sendGelfMessageHttp(gm, httpClient, caCertPool); err != nil { - os.Stderr.WriteString(fmt.Sprintf("Error sending GELF message: %s", err)) + os.Stderr.WriteString(fmt.Sprintf("Error sending GELF [%s: %s - %s] message: %s", t.Namespace, t.PodName, t.ContainerName, err)) } } else { if err := t.sendGelfMessageTcp(gm, gelfWriter); err != nil { - os.Stderr.WriteString(fmt.Sprintf("Error sending GELF message: %s", err)) + os.Stderr.WriteString(fmt.Sprintf("Error sending GELF [%s: %s - %s] message: %s", t.Namespace, t.PodName, t.ContainerName, err)) } } } @@ -360,6 +327,10 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface, gelfWriter *gelf.TC // Close stops tailing func (t *Tail) Close() { + close(t.closed) +} + +func (t *Tail) PrintClose() { r := color.New(color.FgHiRed, color.Bold).SprintFunc() p := t.podColor.SprintFunc() if t.Options.Namespace { @@ -367,7 +338,28 @@ func (t *Tail) Close() { } else { fmt.Fprintf(os.Stderr, "%s %s\n", r("-"), p(t.PodName)) } - close(t.closed) +} + +func (t *Tail) PrintOpen() { + g := color.New(color.FgHiGreen, color.Bold).SprintFunc() + p := t.podColor.SprintFunc() + c := t.containerColor.SprintFunc() + if t.Options.Namespace { + fmt.Fprintf(os.Stderr, "%s %s %s > %s\n", g("+"), p(t.Namespace), p(t.PodName), c(t.ContainerName)) + } else { + fmt.Fprintf(os.Stderr, "%s %s > %s\n", g("+"), p(t.PodName), c(t.ContainerName)) + } +} + +func (t *Tail) PrintStreamNotReady(err error) { + g := color.New(color.FgHiYellow, color.Bold).SprintFunc() + p := t.podColor.SprintFunc() + c := t.containerColor.SprintFunc() + if t.Options.Namespace { + fmt.Fprintf(os.Stderr, "%s %s %s > %s: %s\n", g("~"), p(t.Namespace), p(t.PodName), c(t.ContainerName), err) + } else { + fmt.Fprintf(os.Stderr, "%s %s > %s: %s\n", g("~"), p(t.PodName), c(t.ContainerName), err) + } } // Log is the object which will be used together with the template to generate