Skip to content

Commit

Permalink
fix: handle multiple requests for the same logs
Browse files Browse the repository at this point in the history
  • Loading branch information
smlx committed Jul 21, 2023
1 parent 72fcc6d commit 314b339
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 31 deletions.
6 changes: 3 additions & 3 deletions internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ var timeoutSeconds = int64(timeout / time.Second)

// Client is a k8s client.
type Client struct {
config *rest.Config
clientset *kubernetes.Clientset
logCIDs sync.Map
config *rest.Config
clientset *kubernetes.Clientset
logStreamIDs sync.Map
}

// NewClient creates a new kubernetes API client.
Expand Down
62 changes: 34 additions & 28 deletions internal/k8s/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/google/uuid"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -49,9 +50,9 @@ func linewiseCopy(ctx context.Context, prefix string, logs chan<- string,
// for each container.
// readLogs returns immediately, and relies on ctx cancellation to ensure the
// goroutines it starts are cleaned up.
func (c *Client) readLogs(ctx context.Context, egSend *errgroup.Group,
p *corev1.Pod, containerName string, follow bool, tailLines int64,
logs chan<- string) error {
func (c *Client) readLogs(ctx context.Context, requestID string,
egSend *errgroup.Group, p *corev1.Pod, containerName string, follow bool,
tailLines int64, logs chan<- string) error {
var cStatuses []corev1.ContainerStatus
// if containerName is not specified, send logs for all containers
if containerName == "" {
Expand All @@ -69,7 +70,9 @@ func (c *Client) readLogs(ctx context.Context, egSend *errgroup.Group,
}
for _, cStatus := range cStatuses {
// skip setting up another log stream if container is already being logged
if _, exists := c.logCIDs.LoadOrStore(cStatus.ContainerID, true); exists {
_, exists := c.logStreamIDs.LoadOrStore(
requestID+cStatus.ContainerID, true)
if exists {
continue
}
// set up stream for a single container
Expand All @@ -89,21 +92,20 @@ func (c *Client) readLogs(ctx context.Context, egSend *errgroup.Group,
cName := cStatus.Name
cID := cStatus.ContainerID
egSend.Go(func() error {
defer c.logCIDs.Delete(cID)
defer c.logStreamIDs.Delete(cID)
linewiseCopy(ctx, fmt.Sprintf("[pod/%s/%s]", p.Name, cName), logs,
logStream)
// The k8s API sometimes sends an event showing a healthy pod even though
// the pod is terminating. This seems to occur sometimes on scale down of
// a deployment. When this happens there is a race that occurs where the
// existing log stream closes (linewiseCopy() returns), then the
// "healthy" event comes in and the container log streaming is started
// again, only to return immediately. This can result in duplicated log
// lines being returned.
// To hack around this behaviour, pause here before exiting. This means
// that the container ID is retained in c.logCIDs for a brief period
// after logs stop streaming which causes "healthy pod" events from the
// k8s API to be ignored for that period and thereby avoiding duplicate
// log lines being returned.
// The k8s API sometimes sends an event showing a healthy pod _after_ the
// logStream has closed. This seems to occur sometimes on scale down of a
// deployment. When this happens there is a race that occurs where
// linewiseCopy() returns, then the "healthy" event comes in and the
// container log streaming is started again, only to return immediately.
// This can result in duplicated log lines being returned on the logs
// channel. To hack around this behaviour, pause here before exiting.
// This means that the container ID is retained in c.logCIDs for a brief
// period after logs stop streaming which causes "healthy pod" events
// from the k8s API to be ignored for that period and thereby avoiding
// duplicate log lines being returned.
time.Sleep(time.Second)
return nil
})
Expand All @@ -114,8 +116,8 @@ func (c *Client) readLogs(ctx context.Context, egSend *errgroup.Group,
// podEventHandler receives pod objects from the podInformer and, if they are
// in a ready state, starts streaming logs from them.
func (c *Client) podEventHandler(ctx context.Context,
cancel context.CancelFunc, egSend *errgroup.Group, container string,
follow bool, tailLines int64, logs chan<- string, obj any) {
cancel context.CancelFunc, requestID string, egSend *errgroup.Group,
container string, follow bool, tailLines int64, logs chan<- string, obj any) {
// panic if obj is not a pod, since we specifically use a pod informer
pod := obj.(*corev1.Pod)
if !slices.ContainsFunc(pod.Status.Conditions,
Expand All @@ -126,7 +128,7 @@ func (c *Client) podEventHandler(ctx context.Context,
return // pod not ready
}
egSend.Go(func() error {
readLogsErr := c.readLogs(ctx, egSend, pod, container, follow,
readLogsErr := c.readLogs(ctx, requestID, egSend, pod, container, follow,
tailLines, logs)
if readLogsErr != nil {
cancel()
Expand All @@ -145,7 +147,7 @@ func (c *Client) podEventHandler(ctx context.Context,
// When the caller calls Run() on the returned informer, it will start watching
// for events and sending to the logs channel.
func (c *Client) newPodInformer(ctx context.Context,
cancel context.CancelFunc, egSend *errgroup.Group,
cancel context.CancelFunc, requestID string, egSend *errgroup.Group,
namespace, deployment, container string, follow bool, tailLines int64,
logs chan<- string) (cache.SharedIndexInformer, error) {
// get the deployment
Expand All @@ -168,16 +170,16 @@ func (c *Client) newPodInformer(ctx context.Context,
// in a ready state when initially added, it doesn't start log streaming
// for those.
AddFunc: func(obj any) {
c.podEventHandler(ctx, cancel, egSend, container, follow, tailLines,
logs, obj)
c.podEventHandler(ctx, cancel, requestID, egSend, container, follow,
tailLines, logs, obj)
},
// UpdateFunc handles events for pod state changes. When new pods are added
// (e.g. deployment is scaled) it repeatedly receives events until the pod
// is in its final healthy state. For that reason, the podEventHandler()
// inspects the pod state before initiating log streaming.
UpdateFunc: func(_, obj any) {
c.podEventHandler(ctx, cancel, egSend, container, follow, tailLines,
logs, obj)
c.podEventHandler(ctx, cancel, requestID, egSend, container, follow,
tailLines, logs, obj)
},
})
if err != nil {
Expand All @@ -195,6 +197,10 @@ func (c *Client) newPodInformer(ctx context.Context,
func (c *Client) Logs(ctx context.Context, cancel context.CancelFunc,
namespace, deployment, container string, follow bool, tailLines int64,
stdio io.ReadWriter) error {
// Generate a requestID value to uniquely distinguish between simultaneous
// calls to this function. This requestID is used in readLogs() to
// distinguish entries in c.logStreamIDs.
requestID := uuid.New().String()
// clamp tailLines
if tailLines < 1 {
tailLines = defaultTailLines
Expand Down Expand Up @@ -249,8 +255,8 @@ func (c *Client) Logs(ctx context.Context, cancel context.CancelFunc,
// If following the logs, start a goroutine which watches for new (and
// existing) pods in the deployment and starts streaming logs from them.
egSend.Go(func() error {
podInformer, err := c.newPodInformer(ctx, cancel, &egSend, namespace,
deployment, container, follow, tailLines, logs)
podInformer, err := c.newPodInformer(ctx, cancel, requestID, &egSend,
namespace, deployment, container, follow, tailLines, logs)
if err != nil {
cancel()
return fmt.Errorf("couldn't construct new pod informer: %v", err)
Expand All @@ -264,7 +270,7 @@ func (c *Client) Logs(ctx context.Context, cancel context.CancelFunc,
for i := range pods.Items {
pod := pods.Items[i] // copy loop var so it can be referenced in the closure
egSend.Go(func() error {
readLogsErr := c.readLogs(ctx, &egSend, &pod, container, follow,
readLogsErr := c.readLogs(ctx, requestID, &egSend, &pod, container, follow,
tailLines, logs)
if readLogsErr != nil {
cancel()
Expand Down

0 comments on commit 314b339

Please sign in to comment.