Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support deletion for memory graph #606

Merged
merged 13 commits into from
Oct 16, 2023
5 changes: 5 additions & 0 deletions internal/container/set/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ func (s Set[T]) Contains(item T) bool {
_, ok := s[item]
return ok
}

// Delete deletes an item from the set.
func (s Set[T]) Delete(item T) {
delete(s, item)
}
8 changes: 8 additions & 0 deletions internal/container/set/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,12 @@ func TestSet(t *testing.T) {
if got, want := len(set), 2; got != want {
t.Errorf("len(Set) = %v, want %v", got, want)
}
// test deleting a key
set.Delete(key1)
if got, want := set.Contains(key1), false; got != want {
t.Errorf("Set.Contains(%s) = %v, want %v", key1, got, want)
}
if got, want := len(set), 1; got != want {
t.Errorf("len(Set) = %v, want %v", got, want)
}
}
114 changes: 67 additions & 47 deletions internal/graph/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,68 +23,54 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/internal/container/set"
"oras.land/oras-go/v2/internal/descriptor"
"oras.land/oras-go/v2/internal/status"
"oras.land/oras-go/v2/internal/syncutil"
)

// Memory is a memory based PredecessorFinder.
type Memory struct {
predecessors sync.Map // map[descriptor.Descriptor]map[descriptor.Descriptor]ocispec.Descriptor
indexed sync.Map // map[descriptor.Descriptor]any
nodes map[descriptor.Descriptor]ocispec.Descriptor // nodes saves the map keys of ocispec.Descriptor
predecessors map[descriptor.Descriptor]set.Set[descriptor.Descriptor]
successors map[descriptor.Descriptor]set.Set[descriptor.Descriptor]
lock sync.RWMutex
}

// NewMemory creates a new memory PredecessorFinder.
func NewMemory() *Memory {
return &Memory{}
return &Memory{
nodes: make(map[descriptor.Descriptor]ocispec.Descriptor),
predecessors: make(map[descriptor.Descriptor]set.Set[descriptor.Descriptor]),
successors: make(map[descriptor.Descriptor]set.Set[descriptor.Descriptor]),
}
}

// Index indexes predecessors for each direct successor of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *Memory) Index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) error {
successors, err := content.Successors(ctx, fetcher, node)
if err != nil {
return err
}

m.index(ctx, node, successors)
return nil
_, err := m.index(ctx, fetcher, node)
return err
}

// Index indexes predecessors for all the successors of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *Memory) IndexAll(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) error {
// track content status
tracker := status.NewTracker()

var fn syncutil.GoFunc[ocispec.Descriptor]
fn = func(ctx context.Context, region *syncutil.LimitedRegion, desc ocispec.Descriptor) error {
// skip the node if other go routine is working on it
_, committed := tracker.TryCommit(desc)
if !committed {
return nil
}

// skip the node if it has been indexed
key := descriptor.FromOCI(desc)
_, exists := m.indexed.Load(key)
if exists {
return nil
}

successors, err := content.Successors(ctx, fetcher, desc)
successors, err := m.index(ctx, fetcher, desc)
if err != nil {
if errors.Is(err, errdef.ErrNotFound) {
// skip the node if it does not exist
return nil
}
return err
}
m.index(ctx, desc, successors)
m.indexed.Store(key, nil)

if len(successors) > 0 {
// traverse and index successors
return syncutil.Go(ctx, nil, fn, successors...)
Expand All @@ -96,39 +82,73 @@ func (m *Memory) IndexAll(ctx context.Context, fetcher content.Fetcher, node oci

// Predecessors returns the nodes directly pointing to the current node.
// Predecessors returns nil without error if the node does not exists in the
// store.
// Like other operations, calling Predecessors() is go-routine safe. However,
// it does not necessarily correspond to any consistent snapshot of the stored
// contents.
// store. Like other operations, calling Predecessors() is go-routine safe.
// However, it does not necessarily correspond to any consistent snapshot of
// the stored contents.
func (m *Memory) Predecessors(_ context.Context, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
m.lock.RLock()
defer m.lock.RUnlock()

key := descriptor.FromOCI(node)
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
value, exists := m.predecessors.Load(key)
set, exists := m.predecessors[key]
if !exists {
return nil, nil
}
predecessors := value.(*sync.Map)

var res []ocispec.Descriptor
predecessors.Range(func(key, value interface{}) bool {
res = append(res, value.(ocispec.Descriptor))
return true
})
for k := range set {
res = append(res, m.nodes[k])
}
return res, nil
}

// Remove removes the node from its predecessors and successors.
func (m *Memory) Remove(ctx context.Context, node ocispec.Descriptor) error {
m.lock.Lock()
defer m.lock.Unlock()

nodeKey := descriptor.FromOCI(node)
// remove the node from its successors' predecessor list
for successorKey := range m.successors[nodeKey] {
predecessorEntry := m.predecessors[successorKey]
predecessorEntry.Delete(nodeKey)

// if none of the predecessors of the node still exists, we remove the
// predecessors entry. Otherwise, we do not remove the entry.
if len(predecessorEntry) == 0 {
delete(m.predecessors, successorKey)
}
}
delete(m.successors, nodeKey)
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
delete(m.nodes, nodeKey)
return nil
}

// index indexes predecessors for each direct successor of the given node.
// There is no data consistency issue as long as deletion is not implemented
// for the underlying storage.
func (m *Memory) index(ctx context.Context, node ocispec.Descriptor, successors []ocispec.Descriptor) {
if len(successors) == 0 {
return
func (m *Memory) index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) ([]ocispec.Descriptor, error) {
successors, err := content.Successors(ctx, fetcher, node)
if err != nil {
return nil, err
}
m.lock.Lock()
defer m.lock.Unlock()

// index the node
nodeKey := descriptor.FromOCI(node)
m.nodes[nodeKey] = node

predecessorKey := descriptor.FromOCI(node)
// for each successor, put it into the node's successors list, and
// put node into the succeesor's predecessors list
successorSet := set.New[descriptor.Descriptor]()
m.successors[nodeKey] = successorSet
for _, successor := range successors {
successorKey := descriptor.FromOCI(successor)
value, _ := m.predecessors.LoadOrStore(successorKey, &sync.Map{})
predecessors := value.(*sync.Map)
predecessors.Store(predecessorKey, node)
successorSet.Add(successorKey)
predecessorSet, exists := m.predecessors[successorKey]
if !exists {
predecessorSet = set.New[descriptor.Descriptor]()
m.predecessors[successorKey] = predecessorSet
}
predecessorSet.Add(nodeKey)
}
return successors, nil
}
Loading