Skip to content

Commit

Permalink
feat: Public GC function of oci.Store (#656)
Browse files Browse the repository at this point in the history
Part of #472
Signed-off-by: Xiaoxuan Wang <[email protected]>
  • Loading branch information
wangxiaoxuan273 authored Jan 11, 2024
1 parent c34d275 commit 3b1dd0e
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 0 deletions.
93 changes: 93 additions & 0 deletions content/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"os"
"path"
"path/filepath"
"sync"

Expand Down Expand Up @@ -454,6 +455,77 @@ func (s *Store) writeIndexFile() error {
return os.WriteFile(s.indexPath, indexJSON, 0666)
}

// reloadIndex reloads the index and updates metadata by creating a new store.
func (s *Store) reloadIndex(ctx context.Context) error {
newStore, err := NewWithContext(ctx, s.root)
if err != nil {
return err
}
s.index = newStore.index
s.storage = newStore.storage
s.tagResolver = newStore.tagResolver
s.graph = newStore.graph
return nil
}

// GC removes garbage from Store. Unsaved index will be lost. To prevent unexpected
// loss, call SaveIndex() before GC or set AutoSaveIndex to true.
// The garbage to be cleaned are:
// - unreferenced (dangling) blobs in Store which have no predecessors
// - garbage blobs in the storage whose metadata is not stored in Store
func (s *Store) GC(ctx context.Context) error {
s.sync.Lock()
defer s.sync.Unlock()

// get reachable nodes by reloading the index
err := s.reloadIndex(ctx)
if err != nil {
return fmt.Errorf("unable to reload index: %w", err)
}
reachableNodes := s.graph.DigestSet()

// clean up garbage blobs in the storage
rootpath := filepath.Join(s.root, ocispec.ImageBlobsDir)
algDirs, err := os.ReadDir(rootpath)
if err != nil {
return err
}
for _, algDir := range algDirs {
if !algDir.IsDir() {
continue
}
alg := algDir.Name()
// skip unsupported directories
if !isKnownAlgorithm(alg) {
continue
}
algPath := path.Join(rootpath, alg)
digestEntries, err := os.ReadDir(algPath)
if err != nil {
return err
}
for _, digestEntry := range digestEntries {
if err := isContextDone(ctx); err != nil {
return err
}
dgst := digestEntry.Name()
blobDigest := digest.NewDigestFromEncoded(digest.Algorithm(alg), dgst)
if err := blobDigest.Validate(); err != nil {
// skip irrelevant content
continue
}
if !reachableNodes.Contains(blobDigest) {
// remove the blob from storage if it does not exist in Store
err = os.Remove(path.Join(algPath, dgst))
if err != nil {
return err
}
}
}
}
return nil
}

// unsafeStore is used to bypass lock restrictions in Delete.
type unsafeStore struct {
*Store
Expand All @@ -467,6 +539,17 @@ func (s *unsafeStore) Predecessors(ctx context.Context, node ocispec.Descriptor)
return s.graph.Predecessors(ctx, node)
}

// isContextDone returns an error if the context is done.
// Reference: https://pkg.go.dev/context#Context
func isContextDone(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}

// validateReference validates ref.
func validateReference(ref string) error {
if ref == "" {
Expand All @@ -476,3 +559,13 @@ func validateReference(ref string) error {
// TODO: may enforce more strict validation if needed.
return nil
}

// isKnownAlgorithm checks is a string is a supported hash algorithm
func isKnownAlgorithm(alg string) bool {
switch digest.Algorithm(alg) {
case digest.SHA256, digest.SHA512, digest.SHA384:
return true
default:
return false
}
}
206 changes: 206 additions & 0 deletions content/oci/oci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"os"
"path"
"path/filepath"
"reflect"
"strconv"
Expand Down Expand Up @@ -2844,6 +2845,199 @@ func TestStore_UntagErrorPath(t *testing.T) {
}
}

func TestStore_GC(t *testing.T) {
tempDir := t.TempDir()
s, err := New(tempDir)
if err != nil {
t.Fatal("New() error =", err)
}
ctx := context.Background()

// generate test content
var blobs [][]byte
var descs []ocispec.Descriptor
appendBlob := func(mediaType string, blob []byte) {
blobs = append(blobs, blob)
descs = append(descs, ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
})
}
generateManifest := func(config ocispec.Descriptor, subject *ocispec.Descriptor, layers ...ocispec.Descriptor) {
manifest := ocispec.Manifest{
Config: config,
Subject: subject,
Layers: layers,
}
manifestJSON, err := json.Marshal(manifest)
if err != nil {
t.Fatal(err)
}
appendBlob(ocispec.MediaTypeImageManifest, manifestJSON)
}
generateImageIndex := func(manifests ...ocispec.Descriptor) {
index := ocispec.Index{
Manifests: manifests,
}
indexJSON, err := json.Marshal(index)
if err != nil {
t.Fatal(err)
}
appendBlob(ocispec.MediaTypeImageIndex, indexJSON)
}
generateArtifactManifest := func(blobs ...ocispec.Descriptor) {
var manifest spec.Artifact
manifest.Blobs = append(manifest.Blobs, blobs...)
manifestJSON, err := json.Marshal(manifest)
if err != nil {
t.Fatal(err)
}
appendBlob(spec.MediaTypeArtifactManifest, manifestJSON)
}

appendBlob(ocispec.MediaTypeImageConfig, []byte("config")) // Blob 0
appendBlob(ocispec.MediaTypeImageLayer, []byte("blob")) // Blob 1
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer")) // Blob 2, dangling layer
generateManifest(descs[0], nil, descs[1]) // Blob 3, valid manifest
generateManifest(descs[0], &descs[3], descs[1]) // Blob 4, referrer of a valid manifest, not in index.json, should be cleaned with current implementation
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer 2")) // Blob 5, dangling layer
generateArtifactManifest(descs[4]) // blob 6, dangling artifact
generateManifest(descs[0], &descs[5], descs[1]) // Blob 7, referrer of a dangling manifest
appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer 3")) // Blob 8, dangling layer
generateArtifactManifest(descs[6]) // blob 9, dangling artifact
generateImageIndex(descs[7], descs[5]) // blob 10, dangling image index
appendBlob(ocispec.MediaTypeImageLayer, []byte("garbage layer 1")) // Blob 11, garbage layer 1
generateManifest(descs[0], nil, descs[4]) // Blob 12, garbage manifest 1
appendBlob(ocispec.MediaTypeImageConfig, []byte("garbage config")) // Blob 13, garbage config
appendBlob(ocispec.MediaTypeImageLayer, []byte("garbage layer 2")) // Blob 14, garbage layer 2
generateManifest(descs[6], nil, descs[7]) // Blob 15, garbage manifest 2
generateManifest(descs[0], &descs[13], descs[1]) // Blob 16, referrer of a garbage manifest

// push blobs 0 - blobs 10 into s
for i := 0; i <= 10; i++ {
err := s.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
t.Errorf("failed to push test content to src: %d: %v", i, err)
}
}

// remove blobs 4 - blobs 10 from index.json
for i := 4; i <= 10; i++ {
s.tagResolver.Untag(string(descs[i].Digest))
}
s.SaveIndex()

// push blobs 11 - blobs 16 into s.storage, making them garbage as their metadata
// doesn't exist in s
for i := 11; i < len(blobs); i++ {
err := s.storage.Push(ctx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
t.Errorf("failed to push test content to src: %d: %v", i, err)
}
}

// confirm that all the blobs are in the storage
for i := 11; i < len(blobs); i++ {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("descs[%d] should exist", i)
}
}

// perform GC
if err = s.GC(ctx); err != nil {
t.Fatal(err)
}

// verify existence
wantExistence := []bool{true, true, false, true, false, false, false, false, false, false, false, false, false, false, false, false, false}
for i, wantValue := range wantExistence {
exists, err := s.Exists(ctx, descs[i])
if err != nil {
t.Fatal(err)
}
if exists != wantValue {
t.Fatalf("want existence %d to be %v, got %v", i, wantValue, exists)
}
}
}

func TestStore_GCErrorPath(t *testing.T) {
tempDir := t.TempDir()
s, err := New(tempDir)
if err != nil {
t.Fatal("New() error =", err)
}
ctx := context.Background()

// generate test content
var blobs [][]byte
var descs []ocispec.Descriptor
appendBlob := func(mediaType string, blob []byte) {
blobs = append(blobs, blob)
descs = append(descs, ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
})
}
appendBlob(ocispec.MediaTypeImageLayer, []byte("valid blob")) // Blob 0

// push the valid blob
err = s.Push(ctx, descs[0], bytes.NewReader(blobs[0]))
if err != nil {
t.Error("failed to push test content to src")
}

// write random contents
algPath := path.Join(tempDir, "blobs")
dgstPath := path.Join(algPath, "sha256")
if err := os.WriteFile(path.Join(algPath, "other"), []byte("random"), 0444); err != nil {
t.Fatal("error calling WriteFile(), error =", err)
}
if err := os.WriteFile(path.Join(dgstPath, "other2"), []byte("random2"), 0444); err != nil {
t.Fatal("error calling WriteFile(), error =", err)
}

// perform GC
if err = s.GC(ctx); err != nil {
t.Fatal(err)
}

appendBlob(ocispec.MediaTypeImageLayer, []byte("valid blob 2")) // Blob 1

// push the valid blob
err = s.Push(ctx, descs[1], bytes.NewReader(blobs[1]))
if err != nil {
t.Error("failed to push test content to src")
}

// unknown algorithm
if err := os.Mkdir(path.Join(algPath, "sha666"), 0777); err != nil {
t.Fatal(err)
}
if err = s.GC(ctx); err != nil {
t.Fatal("this error should be silently ignored")
}

// os.Remove() error
badDigest := digest.FromBytes([]byte("bad digest")).Encoded()
badPath := path.Join(algPath, "sha256", badDigest)
if err := os.Mkdir(badPath, 0777); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(path.Join(badPath, "whatever"), []byte("extra content"), 0444); err != nil {
t.Fatal("error calling WriteFile(), error =", err)
}
if err = s.GC(ctx); err == nil {
t.Fatal("expect an error when os.Remove()")
}
}

func equalDescriptorSet(actual []ocispec.Descriptor, expected []ocispec.Descriptor) bool {
if len(actual) != len(expected) {
return false
Expand All @@ -2863,3 +3057,15 @@ func equalDescriptorSet(actual []ocispec.Descriptor, expected []ocispec.Descript
}
return true
}

func Test_isContextDone(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
if err := isContextDone(ctx); err != nil {
t.Errorf("expect error = %v, got %v", nil, err)
}
cancel()
if err := isContextDone(ctx); err != context.Canceled {
t.Errorf("expect error = %v, got %v", context.Canceled, err)
}
}
10 changes: 10 additions & 0 deletions internal/graph/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"sync"

"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
Expand Down Expand Up @@ -147,6 +148,15 @@ func (m *Memory) Remove(node ocispec.Descriptor) []ocispec.Descriptor {
return danglings
}

// DigestSet returns the set of node digest in memory.
func (m *Memory) DigestSet() set.Set[digest.Digest] {
s := set.New[digest.Digest]()
for desc := range m.nodes {
s.Add(desc.Digest)
}
return s
}

// index indexes predecessors for each direct successor of the given node.
func (m *Memory) index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
successors, err := content.Successors(ctx, fetcher, node)
Expand Down
Loading

0 comments on commit 3b1dd0e

Please sign in to comment.