Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wait): log sub match callback #2929

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion docs/features/wait/log.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
The Log wait strategy will check if a string occurs in the container logs for a desired number of times, and allows to set the following conditions:

- the string to be waited for in the container log.
- the number of occurrences of the string to wait for, default is `1`.
- the number of occurrences of the string to wait for, default is `1` (ignored for Submatch).
- look for the string using a regular expression, default is `false`.
- the startup timeout to be used in seconds, default is 60 seconds.
- the poll interval to be used in milliseconds, default is 100 milliseconds.
- the regular expression submatch callback, default nil (occurrences is ignored).

```golang
req := ContainerRequest{
Expand All @@ -33,3 +34,40 @@ req := ContainerRequest{
WaitingFor: wait.ForLog(`.*MySQL Community Server`).AsRegexp(),
}
```

Using regular expression with submatch:

```golang
var host, port string
req := ContainerRequest{
Image: "ollama/ollama:0.1.25",
ExposedPorts: []string{"11434/tcp"},
WaitingFor: wait.ForLog(`Listening on (.*:\d+) \(version\s(.*)\)`).Submatch(func(pattern string, submatches [][][]byte) error {
var err error
for _, matches := range submatches {
if len(matches) != 3 {
err = fmt.Errorf("`%s` matched %d times, expected %d", pattern, len(matches), 3)
continue
}
host, port, err = net.SplitHostPort(string(matches[1]))
if err != nil {
return wait.NewPermanentError(fmt.Errorf("split host port: %w", err))
}

// Host and port successfully extracted from log.
return nil
}

if err != nil {
// Return the last error encountered.
return err
}

return fmt.Errorf("address and version not found: `%s` no matches", pattern)
}),
}
```

If the return from a Submatch callback function is a `wait.PermanentError` the
wait will stop and the error will be returned. Use `wait.NewPermanentError(err error)`
to achieve this.
111 changes: 90 additions & 21 deletions wait/log.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package wait

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"regexp"
"strings"
"time"
)

Expand All @@ -14,6 +16,21 @@ var (
_ StrategyTimeout = (*LogStrategy)(nil)
)

// PermanentError is a special error that will stop the wait and return an error.
type PermanentError struct {
err error
}

// Error implements the error interface.
func (e *PermanentError) Error() string {
return e.err.Error()
}

// NewPermanentError creates a new PermanentError.
func NewPermanentError(err error) *PermanentError {
return &PermanentError{err: err}
}

// LogStrategy will wait until a given log entry shows up in the docker logs
type LogStrategy struct {
// all Strategies should have a startupTimeout to avoid waiting infinitely
Expand All @@ -24,6 +41,18 @@ type LogStrategy struct {
IsRegexp bool
Occurrence int
PollInterval time.Duration

// check is the function that will be called to check if the log entry is present.
check func([]byte) error

// submatchCallback is a callback that will be called with the sub matches of the regexp.
submatchCallback func(pattern string, matches [][][]byte) error

// re is the optional compiled regexp.
re *regexp.Regexp

// log byte slice version of [LogStrategy.Log] used for count checks.
log []byte
}

// NewLogStrategy constructs with polling interval of 100 milliseconds and startup timeout of 60 seconds by default
Expand All @@ -46,6 +75,18 @@ func (ws *LogStrategy) AsRegexp() *LogStrategy {
return ws
}

// Submatch configures a function that will be called with the result of
// [regexp.Regexp.FindAllSubmatch], allowing the caller to process the results.
// If the callback returns nil, the strategy will be considered successful.
// Returning a [PermanentError] will stop the wait and return an error, otherwise
// it will retry until the timeout is reached.
// [LogStrategy.Occurrence] is ignored if this option is set.
func (ws *LogStrategy) Submatch(callback func(pattern string, matches [][][]byte) error) *LogStrategy {
ws.submatchCallback = callback

return ws
}

// WithStartupTimeout can be used to change the default startup timeout
func (ws *LogStrategy) WithStartupTimeout(timeout time.Duration) *LogStrategy {
ws.timeout = &timeout
Expand Down Expand Up @@ -89,57 +130,85 @@ func (ws *LogStrategy) WaitUntilReady(ctx context.Context, target StrategyTarget
timeout = *ws.timeout
}

switch {
case ws.submatchCallback != nil:
ws.re = regexp.MustCompile(ws.Log)
ws.check = ws.checkSubmatch
case ws.IsRegexp:
ws.re = regexp.MustCompile(ws.Log)
ws.check = ws.checkRegexp
default:
ws.log = []byte(ws.Log)
ws.check = ws.checkCount
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

length := 0

LOOP:
var lastLen int
var lastError error
for {
select {
case <-ctx.Done():
return ctx.Err()
return errors.Join(lastError, ctx.Err())
default:
checkErr := checkTarget(ctx, target)

reader, err := target.Logs(ctx)
if err != nil {
// TODO: fix as this will wait for timeout if the logs are not available.
time.Sleep(ws.PollInterval)
continue
}

b, err := io.ReadAll(reader)
if err != nil {
// TODO: fix as this will wait for timeout if the logs are not readable.
time.Sleep(ws.PollInterval)
continue
}

logs := string(b)

switch {
case length == len(logs) && checkErr != nil:
if lastLen == len(b) && checkErr != nil {
// Log length hasn't changed so we're not making progress.
return checkErr
case checkLogsFn(ws, b):
break LOOP
default:
length = len(logs)
}

if err := ws.check(b); err != nil {
var errPermanent *PermanentError
if errors.As(err, &errPermanent) {
return err
}

lastError = err
lastLen = len(b)
time.Sleep(ws.PollInterval)
continue
}

return nil
}
}
}

// checkCount checks if the log entry is present in the logs using a string count.
func (ws *LogStrategy) checkCount(b []byte) error {
if count := bytes.Count(b, ws.log); count < ws.Occurrence {
return fmt.Errorf("%q matched %d times, expected %d", ws.Log, count, ws.Occurrence)
}

return nil
}

func checkLogsFn(ws *LogStrategy, b []byte) bool {
if ws.IsRegexp {
re := regexp.MustCompile(ws.Log)
occurrences := re.FindAll(b, -1)

return len(occurrences) >= ws.Occurrence
// checkRegexp checks if the log entry is present in the logs using a regexp count.
func (ws *LogStrategy) checkRegexp(b []byte) error {
if matches := ws.re.FindAll(b, -1); len(matches) < ws.Occurrence {
return fmt.Errorf("`%s` matched %d times, expected %d", ws.Log, len(matches), ws.Occurrence)
}

logs := string(b)
return strings.Count(logs, ws.Log) >= ws.Occurrence
return nil
}

// checkSubmatch checks if the log entry is present in the logs using a regexp sub match callback.
func (ws *LogStrategy) checkSubmatch(b []byte) error {
return ws.submatchCallback(ws.Log, ws.re.FindAllSubmatch(b, -1))
}
Loading
Loading