From 63fad4d8bda2c92beccec9929b976e93401ab679 Mon Sep 17 00:00:00 2001 From: Steven Hartland Date: Fri, 20 Dec 2024 15:14:37 +0000 Subject: [PATCH] feat(wait): log sub match callback (#2929) Add support for a sub match callback to wait.LogStrategy which allows containers to process the matched pattern storing details or otherwise validating them. The callback can return a PermanentError if no more retries should be attempted. --- docs/features/wait/log.md | 40 ++++++++- wait/log.go | 111 ++++++++++++++++++++----- wait/log_test.go | 169 ++++++++++++++++++++++++++------------ 3 files changed, 246 insertions(+), 74 deletions(-) diff --git a/docs/features/wait/log.md b/docs/features/wait/log.md index f1d40ff360..8466d68511 100644 --- a/docs/features/wait/log.md +++ b/docs/features/wait/log.md @@ -3,10 +3,11 @@ The Log wait strategy will check if a string occurs in the container logs for a desired number of times, and allows to set the following conditions: - the string to be waited for in the container log. -- the number of occurrences of the string to wait for, default is `1`. +- the number of occurrences of the string to wait for, default is `1` (ignored for Submatch). - look for the string using a regular expression, default is `false`. - the startup timeout to be used in seconds, default is 60 seconds. - the poll interval to be used in milliseconds, default is 100 milliseconds. +- the regular expression submatch callback, default nil (occurrences is ignored). ```golang req := ContainerRequest{ @@ -33,3 +34,40 @@ req := ContainerRequest{ WaitingFor: wait.ForLog(`.*MySQL Community Server`).AsRegexp(), } ``` + +Using regular expression with submatch: + +```golang +var host, port string +req := ContainerRequest{ + Image: "ollama/ollama:0.1.25", + ExposedPorts: []string{"11434/tcp"}, + WaitingFor: wait.ForLog(`Listening on (.*:\d+) \(version\s(.*)\)`).Submatch(func(pattern string, submatches [][][]byte) error { + var err error + for _, matches := range submatches { + if len(matches) != 3 { + err = fmt.Errorf("`%s` matched %d times, expected %d", pattern, len(matches), 3) + continue + } + host, port, err = net.SplitHostPort(string(matches[1])) + if err != nil { + return wait.NewPermanentError(fmt.Errorf("split host port: %w", err)) + } + + // Host and port successfully extracted from log. + return nil + } + + if err != nil { + // Return the last error encountered. + return err + } + + return fmt.Errorf("address and version not found: `%s` no matches", pattern) + }), +} +``` + +If the return from a Submatch callback function is a `wait.PermanentError` the +wait will stop and the error will be returned. Use `wait.NewPermanentError(err error)` +to achieve this. diff --git a/wait/log.go b/wait/log.go index 530077f909..41c96e3eb9 100644 --- a/wait/log.go +++ b/wait/log.go @@ -1,10 +1,12 @@ package wait import ( + "bytes" "context" + "errors" + "fmt" "io" "regexp" - "strings" "time" ) @@ -14,6 +16,21 @@ var ( _ StrategyTimeout = (*LogStrategy)(nil) ) +// PermanentError is a special error that will stop the wait and return an error. +type PermanentError struct { + err error +} + +// Error implements the error interface. +func (e *PermanentError) Error() string { + return e.err.Error() +} + +// NewPermanentError creates a new PermanentError. +func NewPermanentError(err error) *PermanentError { + return &PermanentError{err: err} +} + // LogStrategy will wait until a given log entry shows up in the docker logs type LogStrategy struct { // all Strategies should have a startupTimeout to avoid waiting infinitely @@ -24,6 +41,18 @@ type LogStrategy struct { IsRegexp bool Occurrence int PollInterval time.Duration + + // check is the function that will be called to check if the log entry is present. + check func([]byte) error + + // submatchCallback is a callback that will be called with the sub matches of the regexp. + submatchCallback func(pattern string, matches [][][]byte) error + + // re is the optional compiled regexp. + re *regexp.Regexp + + // log byte slice version of [LogStrategy.Log] used for count checks. + log []byte } // NewLogStrategy constructs with polling interval of 100 milliseconds and startup timeout of 60 seconds by default @@ -46,6 +75,18 @@ func (ws *LogStrategy) AsRegexp() *LogStrategy { return ws } +// Submatch configures a function that will be called with the result of +// [regexp.Regexp.FindAllSubmatch], allowing the caller to process the results. +// If the callback returns nil, the strategy will be considered successful. +// Returning a [PermanentError] will stop the wait and return an error, otherwise +// it will retry until the timeout is reached. +// [LogStrategy.Occurrence] is ignored if this option is set. +func (ws *LogStrategy) Submatch(callback func(pattern string, matches [][][]byte) error) *LogStrategy { + ws.submatchCallback = callback + + return ws +} + // WithStartupTimeout can be used to change the default startup timeout func (ws *LogStrategy) WithStartupTimeout(timeout time.Duration) *LogStrategy { ws.timeout = &timeout @@ -89,57 +130,85 @@ func (ws *LogStrategy) WaitUntilReady(ctx context.Context, target StrategyTarget timeout = *ws.timeout } + switch { + case ws.submatchCallback != nil: + ws.re = regexp.MustCompile(ws.Log) + ws.check = ws.checkSubmatch + case ws.IsRegexp: + ws.re = regexp.MustCompile(ws.Log) + ws.check = ws.checkRegexp + default: + ws.log = []byte(ws.Log) + ws.check = ws.checkCount + } + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - length := 0 - -LOOP: + var lastLen int + var lastError error for { select { case <-ctx.Done(): - return ctx.Err() + return errors.Join(lastError, ctx.Err()) default: checkErr := checkTarget(ctx, target) reader, err := target.Logs(ctx) if err != nil { + // TODO: fix as this will wait for timeout if the logs are not available. time.Sleep(ws.PollInterval) continue } b, err := io.ReadAll(reader) if err != nil { + // TODO: fix as this will wait for timeout if the logs are not readable. time.Sleep(ws.PollInterval) continue } - logs := string(b) - - switch { - case length == len(logs) && checkErr != nil: + if lastLen == len(b) && checkErr != nil { + // Log length hasn't changed so we're not making progress. return checkErr - case checkLogsFn(ws, b): - break LOOP - default: - length = len(logs) + } + + if err := ws.check(b); err != nil { + var errPermanent *PermanentError + if errors.As(err, &errPermanent) { + return err + } + + lastError = err + lastLen = len(b) time.Sleep(ws.PollInterval) continue } + + return nil } } +} + +// checkCount checks if the log entry is present in the logs using a string count. +func (ws *LogStrategy) checkCount(b []byte) error { + if count := bytes.Count(b, ws.log); count < ws.Occurrence { + return fmt.Errorf("%q matched %d times, expected %d", ws.Log, count, ws.Occurrence) + } return nil } -func checkLogsFn(ws *LogStrategy, b []byte) bool { - if ws.IsRegexp { - re := regexp.MustCompile(ws.Log) - occurrences := re.FindAll(b, -1) - - return len(occurrences) >= ws.Occurrence +// checkRegexp checks if the log entry is present in the logs using a regexp count. +func (ws *LogStrategy) checkRegexp(b []byte) error { + if matches := ws.re.FindAll(b, -1); len(matches) < ws.Occurrence { + return fmt.Errorf("`%s` matched %d times, expected %d", ws.Log, len(matches), ws.Occurrence) } - logs := string(b) - return strings.Count(logs, ws.Log) >= ws.Occurrence + return nil +} + +// checkSubmatch checks if the log entry is present in the logs using a regexp sub match callback. +func (ws *LogStrategy) checkSubmatch(b []byte) error { + return ws.submatchCallback(ws.Log, ws.re.FindAllSubmatch(b, -1)) } diff --git a/wait/log_test.go b/wait/log_test.go index 7c767c0e25..4bfbc26438 100644 --- a/wait/log_test.go +++ b/wait/log_test.go @@ -1,14 +1,17 @@ -package wait +package wait_test import ( - "bytes" "context" + "fmt" "io" + "strings" "testing" "time" "github.com/docker/docker/api/types" "github.com/stretchr/testify/require" + + "github.com/testcontainers/testcontainers-go/wait" ) const logTimeout = time.Second @@ -25,107 +28,164 @@ Donec ut libero sed arcu vehicula ultricies a non tortor. Lorem ipsum dolor sit amet, consectetur adipiscing elit.` func TestWaitForLog(t *testing.T) { - t.Run("no regexp", func(t *testing.T) { - target := NopStrategyTarget{ - ReaderCloser: io.NopCloser(bytes.NewReader([]byte("docker"))), + t.Run("string", func(t *testing.T) { + target := wait.NopStrategyTarget{ + ReaderCloser: readCloser("docker"), } - wg := NewLogStrategy("docker").WithStartupTimeout(100 * time.Millisecond) + wg := wait.NewLogStrategy("docker").WithStartupTimeout(100 * time.Millisecond) err := wg.WaitUntilReady(context.Background(), target) require.NoError(t, err) }) - t.Run("no regexp", func(t *testing.T) { - target := NopStrategyTarget{ - ReaderCloser: io.NopCloser(bytes.NewReader([]byte(loremIpsum))), + t.Run("regexp", func(t *testing.T) { + target := wait.NopStrategyTarget{ + ReaderCloser: readCloser(loremIpsum), } // get all words that start with "ip", end with "m" and has a whitespace before the "ip" - wg := NewLogStrategy(`\sip[\w]+m`).WithStartupTimeout(100 * time.Millisecond).AsRegexp() + wg := wait.NewLogStrategy(`\sip[\w]+m`).WithStartupTimeout(100 * time.Millisecond).AsRegexp() + err := wg.WaitUntilReady(context.Background(), target) + require.NoError(t, err) + }) + + t.Run("submatch/valid", func(t *testing.T) { + target := wait.NopStrategyTarget{ + ReaderCloser: readCloser("three matches: ip1m, ip2m, ip3m"), + } + + wg := wait.NewLogStrategy(`ip(\d)m`).WithStartupTimeout(100 * time.Millisecond).Submatch(func(pattern string, submatches [][][]byte) error { + if len(submatches) != 3 { + return wait.NewPermanentError(fmt.Errorf("%q matched %d times, expected %d", pattern, len(submatches), 3)) + } + return nil + }) + err := wg.WaitUntilReady(context.Background(), target) + require.NoError(t, err) + }) + + t.Run("submatch/permanent-error", func(t *testing.T) { + target := wait.NopStrategyTarget{ + ReaderCloser: readCloser("single matches: ip1m"), + } + + wg := wait.NewLogStrategy(`ip(\d)m`).WithStartupTimeout(100 * time.Millisecond).Submatch(func(pattern string, submatches [][][]byte) error { + if len(submatches) != 3 { + return wait.NewPermanentError(fmt.Errorf("%q matched %d times, expected %d", pattern, len(submatches), 3)) + } + return nil + }) + err := wg.WaitUntilReady(context.Background(), target) + require.Error(t, err) + var permanentError *wait.PermanentError + require.ErrorAs(t, err, &permanentError) + }) + + t.Run("submatch/temporary-error", func(t *testing.T) { + target := newRunningTarget() + expect := target.EXPECT() + expect.Logs(anyContext).Return(readCloser(""), nil).Once() // No matches. + expect.Logs(anyContext).Return(readCloser("ip1m, ip2m"), nil).Once() // Two matches. + expect.Logs(anyContext).Return(readCloser("ip1m, ip2m, ip3m"), nil).Once() // Three matches. + expect.Logs(anyContext).Return(readCloser("ip1m, ip2m, ip3m, ip4m"), nil) // Four matches. + + wg := wait.NewLogStrategy(`ip(\d)m`).WithStartupTimeout(400 * time.Second).Submatch(func(pattern string, submatches [][][]byte) error { + switch len(submatches) { + case 0, 2: + // Too few matches. + return fmt.Errorf("`%s` matched %d times, expected %d (temporary)", pattern, len(submatches), 3) + case 3: + // Expected number of matches should stop the wait. + return nil + default: + // Should not be triggered. + return wait.NewPermanentError(fmt.Errorf("`%s` matched %d times, expected %d (permanent)", pattern, len(submatches), 3)) + } + }) err := wg.WaitUntilReady(context.Background(), target) require.NoError(t, err) }) } func TestWaitWithExactNumberOfOccurrences(t *testing.T) { - t.Run("no regexp", func(t *testing.T) { - target := NopStrategyTarget{ - ReaderCloser: io.NopCloser(bytes.NewReader([]byte("kubernetes\r\ndocker\n\rdocker"))), + t.Run("string", func(t *testing.T) { + target := wait.NopStrategyTarget{ + ReaderCloser: readCloser("kubernetes\r\ndocker\n\rdocker"), } - wg := NewLogStrategy("docker"). + wg := wait.NewLogStrategy("docker"). WithStartupTimeout(100 * time.Millisecond). WithOccurrence(2) err := wg.WaitUntilReady(context.Background(), target) require.NoError(t, err) }) - t.Run("as regexp", func(t *testing.T) { - target := NopStrategyTarget{ - ReaderCloser: io.NopCloser(bytes.NewReader([]byte(loremIpsum))), + t.Run("regexp", func(t *testing.T) { + target := wait.NopStrategyTarget{ + ReaderCloser: readCloser(loremIpsum), } // get texts from "ip" to the next "m". // there are three occurrences of this pattern in the string: // one "ipsum mauris" and two "ipsum dolor sit am" - wg := NewLogStrategy(`ip(.*)m`).WithStartupTimeout(100 * time.Millisecond).AsRegexp().WithOccurrence(3) + wg := wait.NewLogStrategy(`ip(.*)m`).WithStartupTimeout(100 * time.Millisecond).AsRegexp().WithOccurrence(3) err := wg.WaitUntilReady(context.Background(), target) require.NoError(t, err) }) } func TestWaitWithExactNumberOfOccurrencesButItWillNeverHappen(t *testing.T) { - t.Run("no regexp", func(t *testing.T) { - target := NopStrategyTarget{ - ReaderCloser: io.NopCloser(bytes.NewReader([]byte("kubernetes\r\ndocker"))), + t.Run("string", func(t *testing.T) { + target := wait.NopStrategyTarget{ + ReaderCloser: readCloser("kubernetes\r\ndocker"), } - wg := NewLogStrategy("containerd"). + wg := wait.NewLogStrategy("containerd"). WithStartupTimeout(logTimeout). WithOccurrence(2) err := wg.WaitUntilReady(context.Background(), target) require.Error(t, err) }) - t.Run("as regexp", func(t *testing.T) { - target := NopStrategyTarget{ - ReaderCloser: io.NopCloser(bytes.NewReader([]byte(loremIpsum))), + t.Run("regexp", func(t *testing.T) { + target := wait.NopStrategyTarget{ + ReaderCloser: readCloser(loremIpsum), } // get texts from "ip" to the next "m". // there are only three occurrences matching - wg := NewLogStrategy(`do(.*)ck.+`).WithStartupTimeout(100 * time.Millisecond).AsRegexp().WithOccurrence(4) + wg := wait.NewLogStrategy(`do(.*)ck.+`).WithStartupTimeout(100 * time.Millisecond).AsRegexp().WithOccurrence(4) err := wg.WaitUntilReady(context.Background(), target) require.Error(t, err) }) } func TestWaitShouldFailWithExactNumberOfOccurrences(t *testing.T) { - t.Run("no regexp", func(t *testing.T) { - target := NopStrategyTarget{ - ReaderCloser: io.NopCloser(bytes.NewReader([]byte("kubernetes\r\ndocker"))), + t.Run("string", func(t *testing.T) { + target := wait.NopStrategyTarget{ + ReaderCloser: readCloser("kubernetes\r\ndocker"), } - wg := NewLogStrategy("docker"). + wg := wait.NewLogStrategy("docker"). WithStartupTimeout(logTimeout). WithOccurrence(2) err := wg.WaitUntilReady(context.Background(), target) require.Error(t, err) }) - t.Run("as regexp", func(t *testing.T) { - target := NopStrategyTarget{ - ReaderCloser: io.NopCloser(bytes.NewReader([]byte(loremIpsum))), + t.Run("regexp", func(t *testing.T) { + target := wait.NopStrategyTarget{ + ReaderCloser: readCloser(loremIpsum), } // get "Maecenas". // there are only one occurrence matching - wg := NewLogStrategy(`^Mae[\w]?enas\s`).WithStartupTimeout(100 * time.Millisecond).AsRegexp().WithOccurrence(2) + wg := wait.NewLogStrategy(`^Mae[\w]?enas\s`).WithStartupTimeout(100 * time.Millisecond).AsRegexp().WithOccurrence(2) err := wg.WaitUntilReady(context.Background(), target) require.Error(t, err) }) } func TestWaitForLogFailsDueToOOMKilledContainer(t *testing.T) { - target := &MockStrategyTarget{ + target := &wait.MockStrategyTarget{ LogsImpl: func(_ context.Context) (io.ReadCloser, error) { - return io.NopCloser(bytes.NewReader([]byte(""))), nil + return readCloser(""), nil }, StateImpl: func(_ context.Context) (*types.ContainerState, error) { return &types.ContainerState{ @@ -134,16 +194,16 @@ func TestWaitForLogFailsDueToOOMKilledContainer(t *testing.T) { }, } - t.Run("no regexp", func(t *testing.T) { - wg := ForLog("docker").WithStartupTimeout(logTimeout) + t.Run("string", func(t *testing.T) { + wg := wait.ForLog("docker").WithStartupTimeout(logTimeout) err := wg.WaitUntilReady(context.Background(), target) expected := "container crashed with out-of-memory (OOMKilled)" require.EqualError(t, err, expected) }) - t.Run("as regexp", func(t *testing.T) { - wg := ForLog("docker").WithStartupTimeout(logTimeout).AsRegexp() + t.Run("regexp", func(t *testing.T) { + wg := wait.ForLog("docker").WithStartupTimeout(logTimeout).AsRegexp() err := wg.WaitUntilReady(context.Background(), target) expected := "container crashed with out-of-memory (OOMKilled)" @@ -152,9 +212,9 @@ func TestWaitForLogFailsDueToOOMKilledContainer(t *testing.T) { } func TestWaitForLogFailsDueToExitedContainer(t *testing.T) { - target := &MockStrategyTarget{ + target := &wait.MockStrategyTarget{ LogsImpl: func(_ context.Context) (io.ReadCloser, error) { - return io.NopCloser(bytes.NewReader([]byte(""))), nil + return readCloser(""), nil }, StateImpl: func(_ context.Context) (*types.ContainerState, error) { return &types.ContainerState{ @@ -164,16 +224,16 @@ func TestWaitForLogFailsDueToExitedContainer(t *testing.T) { }, } - t.Run("no regexp", func(t *testing.T) { - wg := ForLog("docker").WithStartupTimeout(logTimeout) + t.Run("string", func(t *testing.T) { + wg := wait.ForLog("docker").WithStartupTimeout(logTimeout) err := wg.WaitUntilReady(context.Background(), target) expected := "container exited with code 1" require.EqualError(t, err, expected) }) - t.Run("as regexp", func(t *testing.T) { - wg := ForLog("docker").WithStartupTimeout(logTimeout).AsRegexp() + t.Run("regexp", func(t *testing.T) { + wg := wait.ForLog("docker").WithStartupTimeout(logTimeout).AsRegexp() err := wg.WaitUntilReady(context.Background(), target) expected := "container exited with code 1" @@ -182,9 +242,9 @@ func TestWaitForLogFailsDueToExitedContainer(t *testing.T) { } func TestWaitForLogFailsDueToUnexpectedContainerStatus(t *testing.T) { - target := &MockStrategyTarget{ + target := &wait.MockStrategyTarget{ LogsImpl: func(_ context.Context) (io.ReadCloser, error) { - return io.NopCloser(bytes.NewReader([]byte(""))), nil + return readCloser(""), nil }, StateImpl: func(_ context.Context) (*types.ContainerState, error) { return &types.ContainerState{ @@ -193,19 +253,24 @@ func TestWaitForLogFailsDueToUnexpectedContainerStatus(t *testing.T) { }, } - t.Run("no regexp", func(t *testing.T) { - wg := ForLog("docker").WithStartupTimeout(logTimeout) + t.Run("string", func(t *testing.T) { + wg := wait.ForLog("docker").WithStartupTimeout(logTimeout) err := wg.WaitUntilReady(context.Background(), target) expected := "unexpected container status \"dead\"" require.EqualError(t, err, expected) }) - t.Run("as regexp", func(t *testing.T) { - wg := ForLog("docker").WithStartupTimeout(logTimeout).AsRegexp() + t.Run("regexp", func(t *testing.T) { + wg := wait.ForLog("docker").WithStartupTimeout(logTimeout).AsRegexp() err := wg.WaitUntilReady(context.Background(), target) expected := "unexpected container status \"dead\"" require.EqualError(t, err, expected) }) } + +// readCloser returns an io.ReadCloser that reads from s. +func readCloser(s string) io.ReadCloser { + return io.NopCloser(strings.NewReader((s))) +}