Skip to content

Commit

Permalink
Merge pull request #243 from uselagoon/logs2
Browse files Browse the repository at this point in the history
Implement log fetching
  • Loading branch information
smlx authored Dec 18, 2023
2 parents 541af01 + b781495 commit 00164b5
Show file tree
Hide file tree
Showing 12 changed files with 776 additions and 75 deletions.
1 change: 1 addition & 0 deletions cmd/ssh-portal/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package main implements the ssh-portal executable.
package main

import (
Expand Down
13 changes: 7 additions & 6 deletions cmd/ssh-portal/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (

// ServeCmd represents the serve command.
type ServeCmd struct {
NATSServer string `kong:"required,env='NATS_URL',help='NATS server URL (nats://... or tls://...)'"`
SSHServerPort uint `kong:"default='2222',env='SSH_SERVER_PORT',help='Port the SSH server will listen on for SSH client connections'"`
HostKeyECDSA string `kong:"env='HOST_KEY_ECDSA',help='PEM encoded ECDSA host key'"`
HostKeyED25519 string `kong:"env='HOST_KEY_ED25519',help='PEM encoded Ed25519 host key'"`
HostKeyRSA string `kong:"env='HOST_KEY_RSA',help='PEM encoded RSA host key'"`
NATSServer string `kong:"required,env='NATS_URL',help='NATS server URL (nats://... or tls://...)'"`
SSHServerPort uint `kong:"default='2222',env='SSH_SERVER_PORT',help='Port the SSH server will listen on for SSH client connections'"`
HostKeyECDSA string `kong:"env='HOST_KEY_ECDSA',help='PEM encoded ECDSA host key'"`
HostKeyED25519 string `kong:"env='HOST_KEY_ED25519',help='PEM encoded Ed25519 host key'"`
HostKeyRSA string `kong:"env='HOST_KEY_RSA',help='PEM encoded RSA host key'"`
LogAccessEnabled bool `kong:"env='LOG_ACCESS_ENABLED',help='Allow any user who can SSH into a pod to also access its logs.'"`
}

// Run the serve command to handle SSH connection requests.
Expand Down Expand Up @@ -72,5 +73,5 @@ func (cmd *ServeCmd) Run(log *zap.Logger) error {
}
}
// start serving SSH connection requests
return sshserver.Serve(ctx, log, nc, l, c, hostkeys)
return sshserver.Serve(ctx, log, nc, l, c, hostkeys, cmd.LogAccessEnabled)
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ require (
go.opentelemetry.io/otel v1.21.0
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.16.0
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/oauth2 v0.15.0
golang.org/x/sync v0.3.0
k8s.io/api v0.28.4
k8s.io/apimachinery v0.28.4
k8s.io/client-go v0.28.4
Expand Down Expand Up @@ -65,7 +67,6 @@ require (
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
10 changes: 8 additions & 2 deletions internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package k8s

import (
"sync"
"time"

"k8s.io/client-go/kubernetes"
Expand All @@ -14,10 +15,15 @@ const (
timeout = 90 * time.Second
)

// timeoutSeconds defines the common timeout for k8s API operations in the type
// required by metav1.ListOptions.
var timeoutSeconds = int64(timeout / time.Second)

// Client is a k8s client.
type Client struct {
config *rest.Config
clientset *kubernetes.Clientset
config *rest.Config
clientset *kubernetes.Clientset
logStreamIDs sync.Map
}

// NewClient creates a new kubernetes API client.
Expand Down
3 changes: 2 additions & 1 deletion internal/k8s/finddeployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ func (c *Client) FindDeployment(ctx context.Context, namespace,
service string) (string, error) {
deployments, err := c.clientset.AppsV1().Deployments(namespace).
List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("lagoon.sh/service=%s", service),
LabelSelector: fmt.Sprintf("lagoon.sh/service=%s", service),
TimeoutSeconds: &timeoutSeconds,
})
if err != nil {
return "", fmt.Errorf("couldn't list deployments: %v", err)
Expand Down
294 changes: 294 additions & 0 deletions internal/k8s/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
package k8s

import (
"bufio"
"context"
"fmt"
"io"
"sync"
"time"

"github.com/google/uuid"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

var (
// defaultTailLines is the number of log lines to tail by default if no number
// is specified
defaultTailLines int64 = 32
// maxTailLines is the maximum number of log lines to tail
maxTailLines int64 = 1024
// limitBytes defines the maximum number of bytes of logs returned from a
// single container
limitBytes int64 = 1 * 1024 * 1024 // 1MiB
)

// linewiseCopy reads strings separated by \n from logStream, and writes them
// with the given prefix and \n stripped to the logs channel. It returns when
// ctx is cancelled or the logStream closes.
func linewiseCopy(ctx context.Context, prefix string, logs chan<- string,
logStream io.ReadCloser) {
defer logStream.Close()
s := bufio.NewScanner(logStream)
for s.Scan() {
select {
case logs <- fmt.Sprintf("%s %s", prefix, s.Text()):
case <-ctx.Done():
return
}
}
}

// readLogs reads logs from the given pod, writing them back to the logs
// channel in a linewise manner. A goroutine is started via egSend to tail logs
// for each container. requestID is used to de-duplicate simultaneous logs
// requests associated with a single call to the higher-level Logs() function.
//
// readLogs returns immediately, and relies on ctx cancellation to ensure the
// goroutines it starts are cleaned up.
func (c *Client) readLogs(ctx context.Context, requestID string,
egSend *errgroup.Group, p *corev1.Pod, containerName string, follow bool,
tailLines int64, logs chan<- string) error {
var cStatuses []corev1.ContainerStatus
// if containerName is not specified, send logs for all containers
if containerName == "" {
cStatuses = p.Status.ContainerStatuses
} else {
for _, cStatus := range p.Status.ContainerStatuses {
if containerName == cStatus.Name {
cStatuses = append(cStatuses, cStatus)
break
}
}
if len(cStatuses) == 0 {
return fmt.Errorf("couldn't find container: %s", containerName)
}
}
for _, cStatus := range cStatuses {
// skip setting up another log stream if container is already being logged
_, exists := c.logStreamIDs.LoadOrStore(requestID+cStatus.ContainerID, true)
if exists {
continue
}
// set up stream for a single container
req := c.clientset.CoreV1().Pods(p.Namespace).GetLogs(p.Name,
&corev1.PodLogOptions{
Container: cStatus.Name,
Follow: follow,
Timestamps: true,
TailLines: &tailLines,
LimitBytes: &limitBytes,
})
logStream, err := req.Stream(ctx)
if err != nil {
return fmt.Errorf("couldn't stream logs: %v", err)
}
// copy loop vars so they can be referenced in the closure
cName := cStatus.Name
cID := cStatus.ContainerID
egSend.Go(func() error {
defer c.logStreamIDs.Delete(cID)
linewiseCopy(ctx, fmt.Sprintf("[pod/%s/%s]", p.Name, cName), logs,
logStream)
// When a pod is terminating, the k8s API sometimes sends an event
// showing a healthy pod _after_ an existing logStream for the same pod
// has closed. This happens occasionally on scale-down of a deployment.
// When this occurs there is a race where linewiseCopy() returns, then
// the "healthy" event comes in and linewiseCopy() is called again, only
// to return immediately. This can result in duplicated log lines being
// returned on the logs channel.
// To hack around this behaviour, pause here before exiting. This means
// that the container ID is retained in c.logStreamIDs for a brief period
// after logs stop streaming, which causes "healthy pod" events from the
// k8s API to be ignored for that period and thereby avoiding duplicate
// log lines being returned to the caller.
time.Sleep(time.Second)
return nil
})
}
return nil
}

// podEventHandler receives pod objects from the podInformer and, if they are
// in a ready state, starts streaming logs from them.
func (c *Client) podEventHandler(ctx context.Context,
cancel context.CancelFunc, requestID string, egSend *errgroup.Group,
container string, follow bool, tailLines int64, logs chan<- string, obj any) {
// panic if obj is not a pod, since we specifically use a pod informer
pod := obj.(*corev1.Pod)
if !slices.ContainsFunc(pod.Status.Conditions,
func(cond corev1.PodCondition) bool {
return cond.Type == corev1.ContainersReady &&
cond.Status == corev1.ConditionTrue
}) {
return // pod not ready
}
egSend.Go(func() error {
readLogsErr := c.readLogs(ctx, requestID, egSend, pod, container, follow,
tailLines, logs)
if readLogsErr != nil {
cancel()
return fmt.Errorf("couldn't read logs on new pod: %v", readLogsErr)
}
return nil
})
}

// newPodInformer sets up a k8s informer on pods in the given deployment, and
// returns the informer in an inert state. The informer is configured with
// event handlers to read logs from pods in the deployment, writing log lines
// back to the logs channel. It transparently handles the deployment scaling up
// and down (e.g. pods being added / deleted / restarted).
//
// When the caller calls Run() on the returned informer, it will start watching
// for events and sending to the logs channel.
func (c *Client) newPodInformer(ctx context.Context,
cancel context.CancelFunc, requestID string, egSend *errgroup.Group,
namespace, deployment, container string, follow bool, tailLines int64,
logs chan<- string) (cache.SharedIndexInformer, error) {
// get the deployment
d, err := c.clientset.AppsV1().Deployments(namespace).Get(ctx, deployment,
metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("couldn't get deployment: %v", err)
}
// configure the informer factory, filtering on deployment selector labels
factory := informers.NewSharedInformerFactoryWithOptions(c.clientset,
time.Hour, informers.WithNamespace(namespace),
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.LabelSelector = labels.SelectorFromSet(
d.Spec.Selector.MatchLabels).String()
}))
// construct the informer
podInformer := factory.Core().V1().Pods().Informer()
_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// AddFunc handles events for new and existing pods. Since new pods are not
// in a ready state when initially added, it doesn't start log streaming
// for those.
AddFunc: func(obj any) {
c.podEventHandler(ctx, cancel, requestID, egSend, container, follow,
tailLines, logs, obj)
},
// UpdateFunc handles events for pod state changes. When new pods are added
// (e.g. deployment is scaled up) it repeatedly receives events until the
// pod is in its final healthy state. For that reason, the
// podEventHandler() inspects the pod state before initiating log
// streaming.
UpdateFunc: func(_, obj any) {
c.podEventHandler(ctx, cancel, requestID, egSend, container, follow,
tailLines, logs, obj)
},
})
if err != nil {
return nil, fmt.Errorf("couldn't add event handlers to informer: %v", err)
}
return podInformer, nil
}

// Logs takes a target namespace, deployment, and stdio stream, and writes the
// log output of the pods of of the deployment to the stdio stream. If
// container is specified, only logs of this container within the deployment
// are returned.
//
// This function exits on one of the following events:
//
// 1. It finishes sending the logs of the pods. This only occurs if
// follow=false.
// 2. ctx is cancelled (signalling that the SSH channel was closed).
// 3. An unrecoverable error occurs.
func (c *Client) Logs(ctx context.Context,
namespace, deployment, container string, follow bool, tailLines int64,
stdio io.ReadWriter) error {
// Wrap the context so we can cancel subroutines of this function on error.
childCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Generate a requestID value to uniquely distinguish between multiple calls
// to this function. This requestID is used in readLogs() to distinguish
// entries in c.logStreamIDs.
requestID := uuid.New().String()
// clamp tailLines
if tailLines < 1 {
tailLines = defaultTailLines
}
if tailLines > maxTailLines {
tailLines = maxTailLines
}
// put sending goroutines in an errgroup.Group to handle errors, and
// receiving goroutines in a waitgroup (since they have no errors)
var egSend errgroup.Group
var wgRecv sync.WaitGroup
// initialise a buffered channel for the worker goroutines to write to, and
// for this function to read log lines from
logs := make(chan string, 4)
// start a goroutine reading from the logs channel and writing back to stdio
wgRecv.Add(1)
go func() {
defer wgRecv.Done()
for {
select {
case msg := <-logs:
// ignore errors writing to stdio. this may happen if the client
// disconnects after reading off the channel but before the log can be
// written. there's nothing we can do in this case and we'll select
// ctx.Done() shortly anyway.
_, _ = fmt.Fprintln(stdio, msg)
case <-childCtx.Done():
return // context done - client went away or error within Logs()
}
}
}()
if follow {
// If following the logs, start a goroutine which watches for new (and
// existing) pods in the deployment and starts streaming logs from them.
egSend.Go(func() error {
podInformer, err := c.newPodInformer(childCtx, cancel, requestID,
&egSend, namespace, deployment, container, follow, tailLines, logs)
if err != nil {
return fmt.Errorf("couldn't construct new pod informer: %v", err)
}
podInformer.Run(childCtx.Done())
return nil
})
} else {
// If not following the logs, avoid constructing an informer. Instead just
// read the logs from all existing pods.
d, err := c.clientset.AppsV1().Deployments(namespace).Get(childCtx,
deployment, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("couldn't get deployment: %v", err)
}
pods, err := c.clientset.CoreV1().Pods(namespace).List(childCtx,
metav1.ListOptions{
LabelSelector: labels.FormatLabels(d.Spec.Selector.MatchLabels),
})
if err != nil {
return fmt.Errorf("couldn't get pods: %v", err)
}
if len(pods.Items) == 0 {
return fmt.Errorf("no pods for deployment %s", deployment)
}
for i := range pods.Items {
pod := pods.Items[i] // copy loop var so it can be referenced in the closure
egSend.Go(func() error {
readLogsErr := c.readLogs(childCtx, requestID, &egSend, &pod,
container, follow, tailLines, logs)
if readLogsErr != nil {
return fmt.Errorf("couldn't read logs on existing pods: %v", readLogsErr)
}
return nil
})
}
}
// Wait for the writes to finish, then close the logs channel, wait for the
// read goroutine to exit, and return any sendErr.
sendErr := egSend.Wait()
cancel()
wgRecv.Wait()
return sendErr
}
Loading

0 comments on commit 00164b5

Please sign in to comment.