From 314b339d9f10c1c59078ff01924b3fce3c31aef6 Mon Sep 17 00:00:00 2001 From: Scott Leggett Date: Fri, 21 Jul 2023 14:59:24 +0800 Subject: [PATCH] fix: handle multiple requests for the same logs --- internal/k8s/client.go | 6 ++-- internal/k8s/logs.go | 62 +++++++++++++++++++++++------------------- 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/internal/k8s/client.go b/internal/k8s/client.go index f0bbc632..bc656b3e 100644 --- a/internal/k8s/client.go +++ b/internal/k8s/client.go @@ -21,9 +21,9 @@ var timeoutSeconds = int64(timeout / time.Second) // Client is a k8s client. type Client struct { - config *rest.Config - clientset *kubernetes.Clientset - logCIDs sync.Map + config *rest.Config + clientset *kubernetes.Clientset + logStreamIDs sync.Map } // NewClient creates a new kubernetes API client. diff --git a/internal/k8s/logs.go b/internal/k8s/logs.go index 350a7d68..5cba5dc5 100644 --- a/internal/k8s/logs.go +++ b/internal/k8s/logs.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/google/uuid" "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" @@ -49,9 +50,9 @@ func linewiseCopy(ctx context.Context, prefix string, logs chan<- string, // for each container. // readLogs returns immediately, and relies on ctx cancellation to ensure the // goroutines it starts are cleaned up. -func (c *Client) readLogs(ctx context.Context, egSend *errgroup.Group, - p *corev1.Pod, containerName string, follow bool, tailLines int64, - logs chan<- string) error { +func (c *Client) readLogs(ctx context.Context, requestID string, + egSend *errgroup.Group, p *corev1.Pod, containerName string, follow bool, + tailLines int64, logs chan<- string) error { var cStatuses []corev1.ContainerStatus // if containerName is not specified, send logs for all containers if containerName == "" { @@ -69,7 +70,9 @@ func (c *Client) readLogs(ctx context.Context, egSend *errgroup.Group, } for _, cStatus := range cStatuses { // skip setting up another log stream if container is already being logged - if _, exists := c.logCIDs.LoadOrStore(cStatus.ContainerID, true); exists { + _, exists := c.logStreamIDs.LoadOrStore( + requestID+cStatus.ContainerID, true) + if exists { continue } // set up stream for a single container @@ -89,21 +92,20 @@ func (c *Client) readLogs(ctx context.Context, egSend *errgroup.Group, cName := cStatus.Name cID := cStatus.ContainerID egSend.Go(func() error { - defer c.logCIDs.Delete(cID) + defer c.logStreamIDs.Delete(cID) linewiseCopy(ctx, fmt.Sprintf("[pod/%s/%s]", p.Name, cName), logs, logStream) - // The k8s API sometimes sends an event showing a healthy pod even though - // the pod is terminating. This seems to occur sometimes on scale down of - // a deployment. When this happens there is a race that occurs where the - // existing log stream closes (linewiseCopy() returns), then the - // "healthy" event comes in and the container log streaming is started - // again, only to return immediately. This can result in duplicated log - // lines being returned. - // To hack around this behaviour, pause here before exiting. This means - // that the container ID is retained in c.logCIDs for a brief period - // after logs stop streaming which causes "healthy pod" events from the - // k8s API to be ignored for that period and thereby avoiding duplicate - // log lines being returned. + // The k8s API sometimes sends an event showing a healthy pod _after_ the + // logStream has closed. This seems to occur sometimes on scale down of a + // deployment. When this happens there is a race that occurs where + // linewiseCopy() returns, then the "healthy" event comes in and the + // container log streaming is started again, only to return immediately. + // This can result in duplicated log lines being returned on the logs + // channel. To hack around this behaviour, pause here before exiting. + // This means that the container ID is retained in c.logCIDs for a brief + // period after logs stop streaming which causes "healthy pod" events + // from the k8s API to be ignored for that period and thereby avoiding + // duplicate log lines being returned. time.Sleep(time.Second) return nil }) @@ -114,8 +116,8 @@ func (c *Client) readLogs(ctx context.Context, egSend *errgroup.Group, // podEventHandler receives pod objects from the podInformer and, if they are // in a ready state, starts streaming logs from them. func (c *Client) podEventHandler(ctx context.Context, - cancel context.CancelFunc, egSend *errgroup.Group, container string, - follow bool, tailLines int64, logs chan<- string, obj any) { + cancel context.CancelFunc, requestID string, egSend *errgroup.Group, + container string, follow bool, tailLines int64, logs chan<- string, obj any) { // panic if obj is not a pod, since we specifically use a pod informer pod := obj.(*corev1.Pod) if !slices.ContainsFunc(pod.Status.Conditions, @@ -126,7 +128,7 @@ func (c *Client) podEventHandler(ctx context.Context, return // pod not ready } egSend.Go(func() error { - readLogsErr := c.readLogs(ctx, egSend, pod, container, follow, + readLogsErr := c.readLogs(ctx, requestID, egSend, pod, container, follow, tailLines, logs) if readLogsErr != nil { cancel() @@ -145,7 +147,7 @@ func (c *Client) podEventHandler(ctx context.Context, // When the caller calls Run() on the returned informer, it will start watching // for events and sending to the logs channel. func (c *Client) newPodInformer(ctx context.Context, - cancel context.CancelFunc, egSend *errgroup.Group, + cancel context.CancelFunc, requestID string, egSend *errgroup.Group, namespace, deployment, container string, follow bool, tailLines int64, logs chan<- string) (cache.SharedIndexInformer, error) { // get the deployment @@ -168,16 +170,16 @@ func (c *Client) newPodInformer(ctx context.Context, // in a ready state when initially added, it doesn't start log streaming // for those. AddFunc: func(obj any) { - c.podEventHandler(ctx, cancel, egSend, container, follow, tailLines, - logs, obj) + c.podEventHandler(ctx, cancel, requestID, egSend, container, follow, + tailLines, logs, obj) }, // UpdateFunc handles events for pod state changes. When new pods are added // (e.g. deployment is scaled) it repeatedly receives events until the pod // is in its final healthy state. For that reason, the podEventHandler() // inspects the pod state before initiating log streaming. UpdateFunc: func(_, obj any) { - c.podEventHandler(ctx, cancel, egSend, container, follow, tailLines, - logs, obj) + c.podEventHandler(ctx, cancel, requestID, egSend, container, follow, + tailLines, logs, obj) }, }) if err != nil { @@ -195,6 +197,10 @@ func (c *Client) newPodInformer(ctx context.Context, func (c *Client) Logs(ctx context.Context, cancel context.CancelFunc, namespace, deployment, container string, follow bool, tailLines int64, stdio io.ReadWriter) error { + // Generate a requestID value to uniquely distinguish between simultaneous + // calls to this function. This requestID is used in readLogs() to + // distinguish entries in c.logStreamIDs. + requestID := uuid.New().String() // clamp tailLines if tailLines < 1 { tailLines = defaultTailLines @@ -249,8 +255,8 @@ func (c *Client) Logs(ctx context.Context, cancel context.CancelFunc, // If following the logs, start a goroutine which watches for new (and // existing) pods in the deployment and starts streaming logs from them. egSend.Go(func() error { - podInformer, err := c.newPodInformer(ctx, cancel, &egSend, namespace, - deployment, container, follow, tailLines, logs) + podInformer, err := c.newPodInformer(ctx, cancel, requestID, &egSend, + namespace, deployment, container, follow, tailLines, logs) if err != nil { cancel() return fmt.Errorf("couldn't construct new pod informer: %v", err) @@ -264,7 +270,7 @@ func (c *Client) Logs(ctx context.Context, cancel context.CancelFunc, for i := range pods.Items { pod := pods.Items[i] // copy loop var so it can be referenced in the closure egSend.Go(func() error { - readLogsErr := c.readLogs(ctx, &egSend, &pod, container, follow, + readLogsErr := c.readLogs(ctx, requestID, &egSend, &pod, container, follow, tailLines, logs) if readLogsErr != nil { cancel()