From ca391bb1b975b95f0803dfd94b2ea69c73308fc2 Mon Sep 17 00:00:00 2001 From: Dirk Faust Date: Mon, 24 Apr 2023 14:19:08 +0200 Subject: [PATCH 1/3] Align header reads --- docker.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/docker.go b/docker.go index be1646a6c0..1f73fe6ac5 100644 --- a/docker.go +++ b/docker.go @@ -669,7 +669,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { return default: h := make([]byte, 8) - _, err := r.Read(h) + _, err := io.ReadFull(r, h) if err != nil { // proper type matching requires https://go-review.googlesource.com/c/go/+/250357/ (go 1.16) if strings.Contains(err.Error(), "use of closed network connection") { @@ -677,9 +677,14 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond())) goto BEGIN } - // this explicitly ignores errors - // because we want to keep procesing even if one of our reads fails - continue + if errors.Is(err, context.DeadlineExceeded) { + // Probably safe to continue here + continue + } + _, _ = fmt.Fprintf(os.Stderr, "container log error: %+v", err) + // if we would continue here, the next header-read will result into random data... + <-stop + return } count := binary.BigEndian.Uint32(h[4:]) @@ -697,11 +702,17 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { logTypes := []string{"", StdoutLog, StderrLog} b := make([]byte, count) - _, err = r.Read(b) + _, err = io.ReadFull(r, b) if err != nil { // TODO: add-logger: use logger to log out this error _, _ = fmt.Fprintf(os.Stderr, "error occurred reading log with known length %s", err.Error()) - continue + if errors.Is(err, context.DeadlineExceeded) { + // Probably safe to continue here + continue + } + // we can not continie here as the next read most likely will not be the next header + <-stop + return } for _, c := range c.consumers { c.Accept(Log{ From e3fec54e60376b4f3ac807a0fce749aeb956607b Mon Sep 17 00:00:00 2001 From: Dirk Faust Date: Tue, 25 Apr 2023 10:50:01 +0200 Subject: [PATCH 2/3] remove stop channel usage --- docker.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docker.go b/docker.go index 1f73fe6ac5..6cc6ff9f3e 100644 --- a/docker.go +++ b/docker.go @@ -683,7 +683,6 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { } _, _ = fmt.Fprintf(os.Stderr, "container log error: %+v", err) // if we would continue here, the next header-read will result into random data... - <-stop return } @@ -710,8 +709,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { // Probably safe to continue here continue } - // we can not continie here as the next read most likely will not be the next header - <-stop + // we can not continue here as the next read most likely will not be the next header return } for _, c := range c.consumers { From 49fe828dbe139aa953be694b0334e238e34d245f Mon Sep 17 00:00:00 2001 From: Dirk Faust Date: Tue, 2 May 2023 12:36:17 +0200 Subject: [PATCH 3/3] Inform about stopped log consumer --- docker.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docker.go b/docker.go index 6cc6ff9f3e..66ddcd4bd6 100644 --- a/docker.go +++ b/docker.go @@ -53,6 +53,8 @@ const ( Podman = "podman" ReaperDefault = "reaper_default" // Default network name when bridge is not available packagePath = "github.com/testcontainers/testcontainers-go" + + logStoppedForOutOfSyncMessage = "Stopping log consumer: Headers out of sync" ) // DockerContainer represents a container started using Docker @@ -681,7 +683,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { // Probably safe to continue here continue } - _, _ = fmt.Fprintf(os.Stderr, "container log error: %+v", err) + _, _ = fmt.Fprintf(os.Stderr, "container log error: %+v. %s", err, logStoppedForOutOfSyncMessage) // if we would continue here, the next header-read will result into random data... return } @@ -710,6 +712,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { continue } // we can not continue here as the next read most likely will not be the next header + _, _ = fmt.Fprintln(os.Stderr, logStoppedForOutOfSyncMessage) return } for _, c := range c.consumers {