From ada9d428c123d117be6ff3a19c9886d95e47bb07 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Thu, 8 Jun 2023 16:24:36 +0200 Subject: [PATCH 01/10] fix: synchronize consumer in tests The consumer's Accept method is running in a different gorutine than the main test. We need to synchronize access to the messages slice. Some tests waited for Ack before accessing Msgs and were properly synchronized. TestContainerLogWithErrClosed accessed Msgs while the consumer was running and the memory access was not synchronized. Test_StartStop did not wait for the expected lines to appear before stopping the log producer, there was nothing to guarantee that at least one line would be read. The test passed because currently the producer does not interrupt outstanding HTTP requests to fetch logs, so it implicitly waited for some logs to be received. We now wait for the mlem line to appear before stopping the log producer. --- logconsumer_test.go | 97 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 73 insertions(+), 24 deletions(-) diff --git a/logconsumer_test.go b/logconsumer_test.go index da43790ad1..b16efbf254 100644 --- a/logconsumer_test.go +++ b/logconsumer_test.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "strings" + "sync" "testing" "time" @@ -20,8 +21,18 @@ import ( const lastMessage = "DONE" type TestLogConsumer struct { - Msgs []string - Ack chan bool + mu sync.Mutex + msgs []string + Ack chan bool + waitingFor string + ackWait chan bool +} + +func NewTestLogConsumer() *TestLogConsumer { + return &TestLogConsumer{ + msgs: []string{}, + Ack: make(chan bool), + } } func (g *TestLogConsumer) Accept(l Log) { @@ -31,7 +42,41 @@ func (g *TestLogConsumer) Accept(l Log) { return } - g.Msgs = append(g.Msgs, s) + g.mu.Lock() + defer g.mu.Unlock() + g.msgs = append(g.msgs, s) + if g.waitingFor != "" && s == fmt.Sprintf("echo %s\n", g.waitingFor) { + close(g.ackWait) + g.waitingFor = "" + } +} + +// WaitFor waits for s to appear in the output. +// It returns an error if another wait is already in progress or the context is canceled. +func (g *TestLogConsumer) WaitFor(ctx context.Context, s string) error { + g.mu.Lock() + if g.waitingFor != "" { + g.mu.Unlock() + return fmt.Errorf("already waiting") + } + g.waitingFor = s + g.ackWait = make(chan bool) + g.mu.Unlock() + select { + case <-ctx.Done(): + return ctx.Err() + case <-g.ackWait: + return nil + } +} + +// Msgs returns messages received so far. +// The caller must not modify the contents of the slice. +func (g *TestLogConsumer) Msgs() []string { + g.mu.Lock() + v := g.msgs[0:len(g.msgs):len(g.msgs)] + g.mu.Unlock() + return v } func Test_LogConsumerGetsCalled(t *testing.T) { @@ -56,12 +101,9 @@ func Test_LogConsumerGetsCalled(t *testing.T) { ep, err := c.Endpoint(ctx, "http") require.NoError(t, err) - g := TestLogConsumer{ - Msgs: []string{}, - Ack: make(chan bool), - } + g := NewTestLogConsumer() - c.FollowOutput(&g) + c.FollowOutput(g) err = c.StartLogProducer(ctx) require.NoError(t, err) @@ -81,7 +123,7 @@ func Test_LogConsumerGetsCalled(t *testing.T) { t.Fatal("never received final log message") } assert.Nil(t, c.StopLogProducer()) - assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs) + assert.Equal(t, []string{"ready\n", "echo hello\n", "echo there\n"}, g.Msgs()) terminateContainerOnEnd(t, ctx, c) } @@ -173,11 +215,11 @@ func Test_MultipleLogConsumers(t *testing.T) { ep, err := c.Endpoint(ctx, "http") require.NoError(t, err) - first := TestLogConsumer{Msgs: []string{}, Ack: make(chan bool)} - second := TestLogConsumer{Msgs: []string{}, Ack: make(chan bool)} + first := NewTestLogConsumer() + second := NewTestLogConsumer() - c.FollowOutput(&first) - c.FollowOutput(&second) + c.FollowOutput(first) + c.FollowOutput(second) err = c.StartLogProducer(ctx) require.NoError(t, err) @@ -192,8 +234,8 @@ func Test_MultipleLogConsumers(t *testing.T) { <-second.Ack assert.Nil(t, c.StopLogProducer()) - assert.Equal(t, []string{"ready\n", "echo mlem\n"}, first.Msgs) - assert.Equal(t, []string{"ready\n", "echo mlem\n"}, second.Msgs) + assert.Equal(t, []string{"ready\n", "echo mlem\n"}, first.Msgs()) + assert.Equal(t, []string{"ready\n", "echo mlem\n"}, second.Msgs()) assert.Nil(t, c.Terminate(ctx)) } @@ -219,9 +261,9 @@ func Test_StartStop(t *testing.T) { ep, err := c.Endpoint(ctx, "http") require.NoError(t, err) - g := TestLogConsumer{Msgs: []string{}, Ack: make(chan bool)} + g := NewTestLogConsumer() - c.FollowOutput(&g) + c.FollowOutput(g) require.NoError(t, c.StopLogProducer(), "nothing should happen even if the producer is not started") require.NoError(t, c.StartLogProducer(ctx)) @@ -230,6 +272,11 @@ func Test_StartStop(t *testing.T) { _, err = http.Get(ep + "/stdout?echo=mlem") require.NoError(t, err) + waitCtx, cancelWait := context.WithTimeout(ctx, 5*time.Second) + err = g.WaitFor(waitCtx, "mlem") + cancelWait() + require.NoError(t, err) + require.NoError(t, c.StopLogProducer()) require.NoError(t, c.StartLogProducer(ctx)) @@ -248,7 +295,7 @@ func Test_StartStop(t *testing.T) { "ready\n", "echo mlem\n", "echo mlem2\n", - }, g.Msgs) + }, g.Msgs()) assert.Nil(t, c.Terminate(ctx)) } @@ -339,7 +386,7 @@ func TestContainerLogWithErrClosed(t *testing.T) { // Gather the initial container logs time.Sleep(time.Second * 1) - existingLogs := len(consumer.Msgs) + existingLogs := len(consumer.Msgs()) hitNginx := func() { i, _, err := dind.Exec(ctx, []string{"wget", "--spider", "localhost:" + port.Port()}) @@ -350,10 +397,11 @@ func TestContainerLogWithErrClosed(t *testing.T) { hitNginx() time.Sleep(time.Second * 1) - if len(consumer.Msgs)-existingLogs != 1 { - t.Fatalf("logConsumer should have 1 new log message, instead has: %v", consumer.Msgs[existingLogs:]) + logs2 := consumer.Msgs() + if len(logs2)-existingLogs != 1 { + t.Fatalf("logConsumer should have 1 new log message, instead has: %v", logs2[existingLogs:]) } - existingLogs = len(consumer.Msgs) + existingLogs = len(consumer.Msgs()) iptableArgs := []string{ "INPUT", "-p", "tcp", "--dport", "2375", @@ -373,10 +421,11 @@ func TestContainerLogWithErrClosed(t *testing.T) { hitNginx() hitNginx() time.Sleep(time.Second * 1) - if len(consumer.Msgs)-existingLogs != 2 { + logs3 := consumer.Msgs() + if len(logs3)-existingLogs != 2 { t.Fatalf( "LogConsumer should have 2 new log messages after detecting closed connection and"+ - " re-requesting logs. Instead has:\n%s", consumer.Msgs[existingLogs:], + " re-requesting logs. Instead has:\n%s", logs3[existingLogs:], ) } } From 075b0f3ef06eccc2030a5fa22445ac6825b01d40 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Tue, 21 Mar 2023 15:58:49 +0100 Subject: [PATCH 02/10] fix: don't panic when logs waits for more than 5 seconds This removes panic when logs endpoint takes more than 5 seconds to respond. The panic happened at least with podman when no new logs appear when using follow and since parameters. We keep retrying until the context is canceled (the retry request would fail anyway with canceled context) or the producer is stopped, whichever comes first. This makes the retry behavior consistent with closed connections handling. Outstanding HTTP calls for fetching logs are now interrupted when a producer is stopped. Previously the consumer and StopProducer() waited for the HTTP call to complete. This should fix https://github.com/testcontainers/testcontainers-go/issues/946 --- docker.go | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/docker.go b/docker.go index a9f2d583db..9a589ae4bd 100644 --- a/docker.go +++ b/docker.go @@ -69,7 +69,7 @@ type DockerContainer struct { terminationSignal chan bool consumers []LogConsumer raw *types.ContainerJSON - stopProducer chan bool + stopProducer context.CancelFunc logger Logging lifecycleHooks []ContainerLifecycleHooks } @@ -632,9 +632,9 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { return errors.New("log producer already started") } - c.stopProducer = make(chan bool) + ctx, c.stopProducer = context.WithCancel(ctx) - go func(stop <-chan bool) { + go func() { since := "" // if the socket is closed we will make additional logs request with updated Since timestamp BEGIN: @@ -645,20 +645,22 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { Since: since, } - ctx, cancel := context.WithTimeout(ctx, time.Second*5) - defer cancel() - r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options) if err != nil { - // if we can't get the logs, panic, we can't return an error to anything - // from within this goroutine - panic(err) + // if we can't get the logs, retry in one second. + c.logger.Printf("cannot get logs for container %q: %v", c.ID, err) + if ctx.Err() != nil { + // context done. + return + } + time.Sleep(1 * time.Second) + goto BEGIN } defer c.provider.Close() for { select { - case <-stop: + case <-ctx.Done(): err := r.Close() if err != nil { // we can't close the read closer, this should never happen @@ -719,7 +721,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { } } } - }(c.stopProducer) + }() return nil } @@ -728,7 +730,8 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { // and sending them to each added LogConsumer func (c *DockerContainer) StopLogProducer() error { if c.stopProducer != nil { - c.stopProducer <- true + // Cancel the producer's context. + c.stopProducer() c.stopProducer = nil } return nil From 0b6d915cbfb09a2028ca9d5a7f0b2279df418150 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Wed, 23 Aug 2023 10:02:25 +0200 Subject: [PATCH 03/10] Add a comment explaining synchronization --- logconsumer_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/logconsumer_test.go b/logconsumer_test.go index 2539ab7669..5f8e4163c2 100644 --- a/logconsumer_test.go +++ b/logconsumer_test.go @@ -42,6 +42,8 @@ func (g *TestLogConsumer) Accept(l Log) { return } + // Accept is called from a different goroutine than WaitFor. + // We need to synchronize access and notify the waiting goroutine so that it always sees the updated msgs. g.mu.Lock() defer g.mu.Unlock() g.msgs = append(g.msgs, s) From bc8715ed4b5f4ce7c7901acddffdb96d746fa983 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Wed, 23 Aug 2023 10:55:16 +0200 Subject: [PATCH 04/10] Handle errors in log producer gracefully If the context is done, we close the log producer. That is not an error, the context cancellation signals that the consumer should stop. If there is a non-context error during the HTTP call or while reading the response, retry the HTTP request in 1 second again. Previously, the error handling was inconsistent: - an error while reading HTTP response headers would retry the HTTP request - but an error while reading the body would just end the log producer With this commit, the error handling should be more consistent. --- docker.go | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/docker.go b/docker.go index 7e084a88ee..98a043ef30 100644 --- a/docker.go +++ b/docker.go @@ -51,7 +51,7 @@ const ( ReaperDefault = "reaper_default" // Default network name when bridge is not available packagePath = "github.com/testcontainers/testcontainers-go" - logStoppedForOutOfSyncMessage = "Stopping log consumer: Headers out of sync" + logRestartedForOutOfSyncMessage = "headers out of sync, will retry" ) // DockerContainer represents a container started using Docker @@ -633,25 +633,27 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options) if err != nil { // if we can't get the logs, retry in one second. - c.logger.Printf("cannot get logs for container %q: %v", c.ID, err) if ctx.Err() != nil { // context done. return } + c.logger.Printf("cannot get logs for container %q: %v", c.ID, err) time.Sleep(1 * time.Second) goto BEGIN } defer c.provider.Close() for { - select { - case <-ctx.Done(): + if ctx.Err() != nil { err := r.Close() if err != nil { // we can't close the read closer, this should never happen panic(err) } - return + } + select { + case <-ctx.Done(): + continue default: h := make([]byte, 8) _, err := io.ReadFull(r, h) @@ -662,13 +664,15 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond())) goto BEGIN } - if errors.Is(err, context.DeadlineExceeded) { - // Probably safe to continue here + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + // If the outer context is done, loop will exit in the next iteration. continue } - _, _ = fmt.Fprintf(os.Stderr, "container log error: %+v. %s", err, logStoppedForOutOfSyncMessage) + _, _ = fmt.Fprintf(os.Stderr, "read log header: %+v. %s", err, logRestartedForOutOfSyncMessage) // if we would continue here, the next header-read will result into random data... - return + // we need to restart the whole request. + time.Sleep(1 * time.Second) + goto BEGIN } count := binary.BigEndian.Uint32(h[4:]) @@ -690,13 +694,16 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { if err != nil { // TODO: add-logger: use logger to log out this error _, _ = fmt.Fprintf(os.Stderr, "error occurred reading log with known length %s", err.Error()) - if errors.Is(err, context.DeadlineExceeded) { - // Probably safe to continue here + + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + // If the outer context is done, loop will exit in the next iteration. continue } - // we can not continue here as the next read most likely will not be the next header - _, _ = fmt.Fprintln(os.Stderr, logStoppedForOutOfSyncMessage) - return + // if we would continue here, the next header-read will result into random data... + // we need to restart the whole request. + _, _ = fmt.Fprintf(os.Stderr, "read log message: %+v. %s", err, logRestartedForOutOfSyncMessage) + time.Sleep(1 * time.Second) + goto BEGIN } for _, c := range c.consumers { c.Accept(Log{ From 1cdb8ce740b0d808731578e1054b57674dfcb6f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 17 Oct 2023 17:56:48 +0200 Subject: [PATCH 05/10] chore: use time.After instead of sleep --- docker.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/docker.go b/docker.go index 2d903c3d6d..aed1d54a0d 100644 --- a/docker.go +++ b/docker.go @@ -636,7 +636,13 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { return } c.logger.Printf("cannot get logs for container %q: %v", c.ID, err) - time.Sleep(1 * time.Second) + + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + } + goto BEGIN } defer c.provider.Close() @@ -669,7 +675,12 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { _, _ = fmt.Fprintf(os.Stderr, "read log header: %+v. %s", err, logRestartedForOutOfSyncMessage) // if we would continue here, the next header-read will result into random data... // we need to restart the whole request. - time.Sleep(1 * time.Second) + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + } + goto BEGIN } @@ -700,7 +711,13 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { // if we would continue here, the next header-read will result into random data... // we need to restart the whole request. _, _ = fmt.Fprintf(os.Stderr, "read log message: %+v. %s", err, logRestartedForOutOfSyncMessage) - time.Sleep(1 * time.Second) + + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + } + goto BEGIN } for _, c := range c.consumers { From 9e4b5a1ca76c94f4918755abcea36197bc8b3e89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 18 Oct 2023 07:42:21 +0200 Subject: [PATCH 06/10] chore: print logs honouring context cancellation --- docker.go | 43 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/docker.go b/docker.go index aed1d54a0d..9a0a0642b2 100644 --- a/docker.go +++ b/docker.go @@ -635,7 +635,15 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { // context done. return } - c.logger.Printf("cannot get logs for container %q: %v", c.ID, err) + + // we need a separate select here in order to be able to print the error + // and this cannot be done in the same select that checks for the timeout. + select { + case <-ctx.Done(): + return + default: + c.logger.Printf("cannot get logs for container %q: %v", c.ID, err) + } select { case <-ctx.Done(): @@ -672,7 +680,14 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { // If the outer context is done, loop will exit in the next iteration. continue } - _, _ = fmt.Fprintf(os.Stderr, "read log header: %+v. %s", err, logRestartedForOutOfSyncMessage) + + select { + case <-ctx.Done(): + return + default: + c.logger.Printf("read log header: %+v. %s", err, logRestartedForOutOfSyncMessage) + } + // if we would continue here, the next header-read will result into random data... // we need to restart the whole request. select { @@ -690,7 +705,13 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { } logType := h[0] if logType > 2 { - _, _ = fmt.Fprintf(os.Stderr, "received invalid log type: %d", logType) + select { + case <-ctx.Done(): + return + default: + c.logger.Printf("received invalid log type: %d", logType) + } + // sometimes docker returns logType = 3 which is an undocumented log type, so treat it as stdout logType = 1 } @@ -701,8 +722,12 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { b := make([]byte, count) _, err = io.ReadFull(r, b) if err != nil { - // TODO: add-logger: use logger to log out this error - _, _ = fmt.Fprintf(os.Stderr, "error occurred reading log with known length %s", err.Error()) + select { + case <-ctx.Done(): + return + default: + c.logger.Printf("error occurred reading log with known length %s", err.Error()) + } if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { // If the outer context is done, loop will exit in the next iteration. @@ -710,7 +735,13 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { } // if we would continue here, the next header-read will result into random data... // we need to restart the whole request. - _, _ = fmt.Fprintf(os.Stderr, "read log message: %+v. %s", err, logRestartedForOutOfSyncMessage) + + select { + case <-ctx.Done(): + return + default: + c.logger.Printf("read log message: %+v. %s", err, logRestartedForOutOfSyncMessage) + } select { case <-ctx.Done(): From 7453e4aebd55170c17434073ba060d3501f7211c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 24 Oct 2023 09:58:06 +0200 Subject: [PATCH 07/10] chore: merge context done conditions --- docker.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/docker.go b/docker.go index ddf2628c67..75cafcfabd 100644 --- a/docker.go +++ b/docker.go @@ -656,15 +656,16 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { defer c.provider.Close() for { - if ctx.Err() != nil { - err := r.Close() - if err != nil { - // we can't close the read closer, this should never happen - panic(err) - } - } select { case <-ctx.Done(): + if ctx.Err() != nil { + err := r.Close() + if err != nil { + // we can't close the read closer, this should never happen + panic(err) + } + } + continue default: h := make([]byte, 8) From e14344453b2e1f913e0c0dafa1b17674d39c4fbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 24 Oct 2023 11:49:53 +0200 Subject: [PATCH 08/10] chore: remove context cancelation checks, as the error comes from an operation that does not do anything with context --- docker.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/docker.go b/docker.go index 75cafcfabd..310ae45140 100644 --- a/docker.go +++ b/docker.go @@ -730,10 +730,6 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { c.logger.Printf("error occurred reading log with known length %s", err.Error()) } - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - // If the outer context is done, loop will exit in the next iteration. - continue - } // if we would continue here, the next header-read will result into random data... // we need to restart the whole request. From 8c9fbd327d446bd23748e7d4b5a90a8fe4f2d9c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 24 Oct 2023 11:51:20 +0200 Subject: [PATCH 09/10] chore: remove context cancelation checks, as the error comes from an operation that does not do anything with context --- docker.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/docker.go b/docker.go index 310ae45140..ad8d61e4df 100644 --- a/docker.go +++ b/docker.go @@ -677,10 +677,6 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond())) goto BEGIN } - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - // If the outer context is done, loop will exit in the next iteration. - continue - } select { case <-ctx.Done(): From 51d6f8bdeff364eb8b7fd72c217848ffc3f3d26e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 25 Oct 2023 10:48:30 +0200 Subject: [PATCH 10/10] chore: simplify print --- docker.go | 23 +++-------------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/docker.go b/docker.go index ad8d61e4df..8d350dfbbf 100644 --- a/docker.go +++ b/docker.go @@ -636,14 +636,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { return } - // we need a separate select here in order to be able to print the error - // and this cannot be done in the same select that checks for the timeout. - select { - case <-ctx.Done(): - return - default: - c.logger.Printf("cannot get logs for container %q: %v", c.ID, err) - } + c.logger.Printf("cannot get logs for container %q: %v", c.ID, err) select { case <-ctx.Done(): @@ -678,12 +671,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { goto BEGIN } - select { - case <-ctx.Done(): - return - default: - c.logger.Printf("read log header: %+v. %s", err, logRestartedForOutOfSyncMessage) - } + c.logger.Printf("read log header: %+v. %s", err, logRestartedForOutOfSyncMessage) // if we would continue here, the next header-read will result into random data... // we need to restart the whole request. @@ -729,12 +717,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { // if we would continue here, the next header-read will result into random data... // we need to restart the whole request. - select { - case <-ctx.Done(): - return - default: - c.logger.Printf("read log message: %+v. %s", err, logRestartedForOutOfSyncMessage) - } + c.logger.Printf("read log message: %+v. %s", err, logRestartedForOutOfSyncMessage) select { case <-ctx.Done():