diff --git a/daemon/images/image_exporter.go b/daemon/images/image_exporter.go index 0a863dd6049cb..0c41d80e61eab 100644 --- a/daemon/images/image_exporter.go +++ b/daemon/images/image_exporter.go @@ -16,7 +16,7 @@ import ( // outStream is the writer which the images are written to. func (i *ImageService) ExportImage(ctx context.Context, names []string, outStream io.Writer) error { imageExporter := tarexport.NewTarExporter(i.imageStore, i.layerStore, i.referenceStore, i) - return imageExporter.Save(names, outStream) + return imageExporter.Save(ctx, names, outStream) } func (i *ImageService) PerformWithBaseFS(ctx context.Context, c *container.Container, fn func(root string) error) error { @@ -46,5 +46,5 @@ func (i *ImageService) PerformWithBaseFS(ctx context.Context, c *container.Conta // ball containing images and metadata. func (i *ImageService) LoadImage(ctx context.Context, inTar io.ReadCloser, outStream io.Writer, quiet bool) error { imageExporter := tarexport.NewTarExporter(i.imageStore, i.layerStore, i.referenceStore, i) - return imageExporter.Load(inTar, outStream, quiet) + return imageExporter.Load(ctx, inTar, outStream, quiet) } diff --git a/image/image.go b/image/image.go index 9bfa8602f281e..77b6731f5bc15 100644 --- a/image/image.go +++ b/image/image.go @@ -1,6 +1,7 @@ package image // import "github.com/docker/docker/image" import ( + "context" "encoding/json" "errors" "io" @@ -279,9 +280,9 @@ func NewHistory(author, comment, createdBy string, isEmptyLayer bool) History { // Exporter provides interface for loading and saving images type Exporter interface { - Load(io.ReadCloser, io.Writer, bool) error + Load(context.Context, io.ReadCloser, io.Writer, bool) error // TODO: Load(net.Context, io.ReadCloser, <- chan StatusMessage) error - Save([]string, io.Writer) error + Save(context.Context, []string, io.Writer) error } // NewFromJSON creates an Image configuration from json. diff --git a/image/tarexport/load.go b/image/tarexport/load.go index fe09f7191ee84..afbfe0989d370 100644 --- a/image/tarexport/load.go +++ b/image/tarexport/load.go @@ -11,12 +11,14 @@ import ( "reflect" "runtime" + "github.com/containerd/containerd/tracing" "github.com/containerd/log" "github.com/distribution/reference" "github.com/docker/distribution" "github.com/docker/docker/api/types/events" "github.com/docker/docker/image" v1 "github.com/docker/docker/image/v1" + "github.com/docker/docker/internal/ioutils" "github.com/docker/docker/layer" "github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/chrootarchive" @@ -28,7 +30,13 @@ import ( "github.com/opencontainers/go-digest" ) -func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) error { +func (l *tarexporter) Load(ctx context.Context, inTar io.ReadCloser, outStream io.Writer, quiet bool) (outErr error) { + ctx, span := tracing.StartSpan(ctx, "tarexport.Load") + defer span.End() + defer func() { + span.SetStatus(outErr) + }() + var progressOutput progress.Output if !quiet { progressOutput = streamformatter.NewJSONProgressOutput(outStream, false) @@ -41,9 +49,10 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) } defer os.RemoveAll(tmpDir) - if err := chrootarchive.Untar(inTar, tmpDir, nil); err != nil { + if err := untar(ctx, inTar, tmpDir); err != nil { return err } + // read manifest, if no file then load in legacy mode manifestPath, err := safePath(tmpDir, manifestFileName) if err != nil { @@ -72,6 +81,11 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) var imageRefCount int for _, m := range manifest { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } configPath, err := safePath(tmpDir, m.Config) if err != nil { return err @@ -95,6 +109,11 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) } for i, diffID := range img.RootFS.DiffIDs { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } layerPath, err := safePath(tmpDir, m.Layers[i]) if err != nil { return err @@ -103,7 +122,7 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) r.Append(diffID) newLayer, err := l.lss.Get(r.ChainID()) if err != nil { - newLayer, err = l.loadLayer(layerPath, rootFS, diffID.String(), m.LayerSources[diffID], progressOutput) + newLayer, err = l.loadLayer(ctx, layerPath, rootFS, diffID.String(), m.LayerSources[diffID], progressOutput) if err != nil { return err } @@ -155,6 +174,15 @@ func (l *tarexporter) Load(inTar io.ReadCloser, outStream io.Writer, quiet bool) return nil } +func untar(ctx context.Context, inTar io.ReadCloser, tmpDir string) error { + _, trace := tracing.StartSpan(ctx, "chrootarchive.Untar") + defer trace.End() + + err := chrootarchive.Untar(ioutils.NewCtxReader(ctx, inTar), tmpDir, nil) + trace.SetStatus(err) + return err +} + func (l *tarexporter) setParentID(id, parentID image.ID) error { img, err := l.is.Get(id) if err != nil { @@ -170,7 +198,14 @@ func (l *tarexporter) setParentID(id, parentID image.ID) error { return l.is.SetParent(id, parentID) } -func (l *tarexporter) loadLayer(filename string, rootFS image.RootFS, id string, foreignSrc distribution.Descriptor, progressOutput progress.Output) (layer.Layer, error) { +func (l *tarexporter) loadLayer(ctx context.Context, filename string, rootFS image.RootFS, id string, foreignSrc distribution.Descriptor, progressOutput progress.Output) (_ layer.Layer, outErr error) { + ctx, span := tracing.StartSpan(ctx, "loadLayer") + span.SetAttributes(tracing.Attribute("image.id", id)) + defer span.End() + defer func() { + span.SetStatus(outErr) + }() + // We use sequential file access to avoid depleting the standby list on Windows. // On Linux, this equates to a regular os.Open. rawTar, err := sequential.Open(filename) @@ -193,7 +228,7 @@ func (l *tarexporter) loadLayer(filename string, rootFS image.RootFS, id string, r = rawTar } - inflatedLayerData, err := archive.DecompressStream(r) + inflatedLayerData, err := archive.DecompressStream(ioutils.NewCtxReader(ctx, r)) if err != nil { return nil, err } @@ -332,7 +367,7 @@ func (l *tarexporter) legacyLoadImage(oldID, sourceDir string, loadedMap map[str if err != nil { return err } - newLayer, err := l.loadLayer(layerPath, *rootFS, oldID, distribution.Descriptor{}, progressOutput) + newLayer, err := l.loadLayer(context.TODO(), layerPath, *rootFS, oldID, distribution.Descriptor{}, progressOutput) if err != nil { return err } diff --git a/image/tarexport/save.go b/image/tarexport/save.go index 103c8b37662d0..c9d1e5de70b2e 100644 --- a/image/tarexport/save.go +++ b/image/tarexport/save.go @@ -11,12 +11,14 @@ import ( "time" "github.com/containerd/containerd/images" + "github.com/containerd/containerd/tracing" "github.com/containerd/log" "github.com/distribution/reference" "github.com/docker/distribution" "github.com/docker/docker/api/types/events" "github.com/docker/docker/image" v1 "github.com/docker/docker/image/v1" + "github.com/docker/docker/internal/ioutils" "github.com/docker/docker/layer" "github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/system" @@ -42,20 +44,20 @@ type saveSession struct { savedConfigs map[string]struct{} } -func (l *tarexporter) Save(names []string, outStream io.Writer) error { - images, err := l.parseNames(names) +func (l *tarexporter) Save(ctx context.Context, names []string, outStream io.Writer) error { + images, err := l.parseNames(ctx, names) if err != nil { return err } // Release all the image top layer references defer l.releaseLayerReferences(images) - return (&saveSession{tarexporter: l, images: images}).save(outStream) + return (&saveSession{tarexporter: l, images: images}).save(ctx, outStream) } // parseNames will parse the image names to a map which contains image.ID to *imageDescriptor. // Each imageDescriptor holds an image top layer reference named 'layerRef'. It is taken here, should be released later. -func (l *tarexporter) parseNames(names []string) (desc map[image.ID]*imageDescriptor, rErr error) { +func (l *tarexporter) parseNames(ctx context.Context, names []string) (desc map[image.ID]*imageDescriptor, rErr error) { imgDescr := make(map[image.ID]*imageDescriptor) defer func() { if rErr != nil { @@ -92,6 +94,12 @@ func (l *tarexporter) parseNames(names []string) (desc map[image.ID]*imageDescri } for _, name := range names { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + ref, err := reference.ParseAnyReference(name) if err != nil { return nil, err @@ -179,7 +187,7 @@ func (l *tarexporter) releaseLayerReferences(imgDescr map[image.ID]*imageDescrip return nil } -func (s *saveSession) save(outStream io.Writer) error { +func (s *saveSession) save(ctx context.Context, outStream io.Writer) error { s.savedConfigs = make(map[string]struct{}) s.savedLayers = make(map[layer.DiffID]distribution.Descriptor) @@ -199,7 +207,13 @@ func (s *saveSession) save(outStream io.Writer) error { var manifestDescriptors []ocispec.Descriptor for id, imageDescr := range s.images { - foreignSrcs, err := s.saveImage(id) + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + foreignSrcs, err := s.saveImage(ctx, id) if err != nil { return err } @@ -370,17 +384,34 @@ func (s *saveSession) save(outStream io.Writer) error { return errors.Wrap(err, "error writing oci index file") } + return s.writeTar(ctx, tempDir, outStream) +} + +func (s *saveSession) writeTar(ctx context.Context, tempDir string, outStream io.Writer) error { + ctx, span := tracing.StartSpan(ctx, "writeTar") + defer span.End() + fs, err := archive.Tar(tempDir, archive.Uncompressed) if err != nil { + span.SetStatus(err) return err } defer fs.Close() - _, err = io.Copy(outStream, fs) + _, err = ioutils.CopyCtx(ctx, outStream, fs) + + span.SetStatus(err) return err } -func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Descriptor, error) { +func (s *saveSession) saveImage(ctx context.Context, id image.ID) (_ map[layer.DiffID]distribution.Descriptor, outErr error) { + ctx, span := tracing.StartSpan(ctx, "saveImage") + span.SetAttributes(tracing.Attribute("image.id", id.String())) + defer span.End() + defer func() { + span.SetStatus(outErr) + }() + img := s.images[id].image if len(img.RootFS.DiffIDs) == 0 { return nil, fmt.Errorf("empty export - not implemented") @@ -390,6 +421,11 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc var layers []layer.DiffID var foreignSrcs map[layer.DiffID]distribution.Descriptor for i, diffID := range img.RootFS.DiffIDs { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } v1ImgCreated := time.Unix(0, 0) v1Img := image.V1Image{ // This is for backward compatibility used for @@ -412,7 +448,7 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc } v1Img.OS = img.OS - src, err := s.saveConfigAndLayer(rootFS.ChainID(), v1Img, img.Created) + src, err := s.saveConfigAndLayer(ctx, rootFS.ChainID(), v1Img, img.Created) if err != nil { return nil, err } @@ -457,7 +493,17 @@ func (s *saveSession) saveImage(id image.ID) (map[layer.DiffID]distribution.Desc return foreignSrcs, nil } -func (s *saveSession) saveConfigAndLayer(id layer.ChainID, legacyImg image.V1Image, createdTime *time.Time) (distribution.Descriptor, error) { +func (s *saveSession) saveConfigAndLayer(ctx context.Context, id layer.ChainID, legacyImg image.V1Image, createdTime *time.Time) (_ distribution.Descriptor, outErr error) { + ctx, span := tracing.StartSpan(ctx, "saveConfigAndLayer") + span.SetAttributes( + tracing.Attribute("layer.id", id.String()), + tracing.Attribute("image.id", legacyImg.ID), + ) + defer span.End() + defer func() { + span.SetStatus(outErr) + }() + outDir := filepath.Join(s.outDir, ocispec.ImageBlobsDir) if _, ok := s.savedConfigs[legacyImg.ID]; !ok { @@ -512,7 +558,7 @@ func (s *saveSession) saveConfigAndLayer(id layer.ChainID, legacyImg image.V1Ima digester := digest.Canonical.Digester() digestedArch := io.TeeReader(arch, digester.Hash()) - tarSize, err := io.Copy(tarFile, digestedArch) + tarSize, err := ioutils.CopyCtx(ctx, tarFile, digestedArch) if err != nil { return distribution.Descriptor{}, err } diff --git a/internal/ioutils/copy.go b/internal/ioutils/copy.go new file mode 100644 index 0000000000000..e24fe7ee5a0a8 --- /dev/null +++ b/internal/ioutils/copy.go @@ -0,0 +1,57 @@ +package ioutils + +import ( + "context" + "io" +) + +// CopyCtx copies from src to dst until either EOF is reached on src or a context is cancelled. +// The writer is not closed when the context is cancelled. +// +// After CopyCtx exits due to context cancellation, the goroutine that performed +// the copy may still be running if either the reader or writer blocks. +func CopyCtx(ctx context.Context, dst io.Writer, src io.Reader) (n int64, err error) { + copyDone := make(chan struct{}) + + src = &readerCtx{ctx: ctx, r: src} + + go func() { + n, err = io.Copy(dst, src) + close(copyDone) + }() + + select { + case <-ctx.Done(): + return -1, ctx.Err() + case <-copyDone: + } + + return n, err +} + +type readerCtx struct { + ctx context.Context + r io.Reader +} + +// NewCtxReader wraps the given reader with a reader that doesn't proceed with +// reading if the context is done. +// +// Note: Read will still block if the underlying reader blocks. +func NewCtxReader(ctx context.Context, r io.Reader) io.Reader { + return &readerCtx{ctx: ctx, r: r} +} + +func (r *readerCtx) Read(p []byte) (n int, err error) { + if err := r.ctx.Err(); err != nil { + return 0, err + } + + n, outErr := r.r.Read(p) + + if err := r.ctx.Err(); err != nil { + return 0, err + } + + return n, outErr +} diff --git a/internal/ioutils/copy_test.go b/internal/ioutils/copy_test.go new file mode 100644 index 0000000000000..0f7574c07b483 --- /dev/null +++ b/internal/ioutils/copy_test.go @@ -0,0 +1,35 @@ +package ioutils + +import ( + "bytes" + "context" + "testing" + "time" +) + +type blockingReader struct{} + +func (r blockingReader) Read(p []byte) (int, error) { + time.Sleep(time.Second) + return 0, nil +} + +func TestCopyCtx(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*5) + defer cancel() + + dst := new(bytes.Buffer) + + finished := make(chan struct{}) + + go func() { + CopyCtx(ctx, dst, blockingReader{}) + close(finished) + }() + + select { + case <-finished: + case <-time.After(time.Millisecond * 100): + t.Fatal("CopyCtx did not return after context was cancelled") + } +}