diff --git a/docker.go b/docker.go index f4458bd3fe..8d350dfbbf 100644 --- a/docker.go +++ b/docker.go @@ -49,7 +49,7 @@ const ( ReaperDefault = "reaper_default" // Default network name when bridge is not available packagePath = "github.com/testcontainers/testcontainers-go" - logStoppedForOutOfSyncMessage = "Stopping log consumer: Headers out of sync" + logRestartedForOutOfSyncMessage = "headers out of sync, will retry" ) // DockerContainer represents a container started using Docker @@ -66,7 +66,7 @@ type DockerContainer struct { terminationSignal chan bool consumers []LogConsumer raw *types.ContainerJSON - stopProducer chan bool + stopProducer context.CancelFunc logger Logging lifecycleHooks []ContainerLifecycleHooks } @@ -615,9 +615,9 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { return errors.New("log producer already started") } - c.stopProducer = make(chan bool) + ctx, c.stopProducer = context.WithCancel(ctx) - go func(stop <-chan bool) { + go func() { since := "" // if the socket is closed we will make additional logs request with updated Since timestamp BEGIN: @@ -628,26 +628,38 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { Since: since, } - ctx, cancel := context.WithTimeout(ctx, time.Second*5) - defer cancel() - r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options) if err != nil { - // if we can't get the logs, panic, we can't return an error to anything - // from within this goroutine - panic(err) + // if we can't get the logs, retry in one second. + if ctx.Err() != nil { + // context done. + return + } + + c.logger.Printf("cannot get logs for container %q: %v", c.ID, err) + + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + } + + goto BEGIN } defer c.provider.Close() for { select { - case <-stop: - err := r.Close() - if err != nil { - // we can't close the read closer, this should never happen - panic(err) + case <-ctx.Done(): + if ctx.Err() != nil { + err := r.Close() + if err != nil { + // we can't close the read closer, this should never happen + panic(err) + } } - return + + continue default: h := make([]byte, 8) _, err := io.ReadFull(r, h) @@ -658,13 +670,18 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond())) goto BEGIN } - if errors.Is(err, context.DeadlineExceeded) { - // Probably safe to continue here - continue - } - _, _ = fmt.Fprintf(os.Stderr, "container log error: %+v. %s", err, logStoppedForOutOfSyncMessage) + + c.logger.Printf("read log header: %+v. %s", err, logRestartedForOutOfSyncMessage) + // if we would continue here, the next header-read will result into random data... - return + // we need to restart the whole request. + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + } + + goto BEGIN } count := binary.BigEndian.Uint32(h[4:]) @@ -673,7 +690,13 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { } logType := h[0] if logType > 2 { - _, _ = fmt.Fprintf(os.Stderr, "received invalid log type: %d", logType) + select { + case <-ctx.Done(): + return + default: + c.logger.Printf("received invalid log type: %d", logType) + } + // sometimes docker returns logType = 3 which is an undocumented log type, so treat it as stdout logType = 1 } @@ -684,15 +707,25 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { b := make([]byte, count) _, err = io.ReadFull(r, b) if err != nil { - // TODO: add-logger: use logger to log out this error - _, _ = fmt.Fprintf(os.Stderr, "error occurred reading log with known length %s", err.Error()) - if errors.Is(err, context.DeadlineExceeded) { - // Probably safe to continue here - continue + select { + case <-ctx.Done(): + return + default: + c.logger.Printf("error occurred reading log with known length %s", err.Error()) } - // we can not continue here as the next read most likely will not be the next header - _, _ = fmt.Fprintln(os.Stderr, logStoppedForOutOfSyncMessage) - return + + // if we would continue here, the next header-read will result into random data... + // we need to restart the whole request. + + c.logger.Printf("read log message: %+v. %s", err, logRestartedForOutOfSyncMessage) + + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + } + + goto BEGIN } for _, c := range c.consumers { c.Accept(Log{ @@ -702,7 +735,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { } } } - }(c.stopProducer) + }() return nil } @@ -711,7 +744,8 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error { // and sending them to each added LogConsumer func (c *DockerContainer) StopLogProducer() error { if c.stopProducer != nil { - c.stopProducer <- true + // Cancel the producer's context. + c.stopProducer() c.stopProducer = nil } return nil