From ccd42e923608481ed1977b0cb9282ccbf620ff17 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Wed, 15 Jun 2016 15:05:24 +0200 Subject: [PATCH] Fix bugs in DeltaFIFO --- pkg/client/cache/delta_fifo.go | 35 +++++++++++++--- pkg/client/cache/delta_fifo_test.go | 62 ++++++++++++++++++++++++----- pkg/client/cache/fifo.go | 2 + 3 files changed, 84 insertions(+), 15 deletions(-) diff --git a/pkg/client/cache/delta_fifo.go b/pkg/client/cache/delta_fifo.go index 8ff9d7a36c960..3cb077faf7e6a 100644 --- a/pkg/client/cache/delta_fifo.go +++ b/pkg/client/cache/delta_fifo.go @@ -189,12 +189,20 @@ func (f *DeltaFIFO) Delete(obj interface{}) error { // Don't provide a second report of the same deletion. return nil } - } else if _, exists, err := f.knownObjects.GetByKey(id); err == nil && !exists { - // Presumably, this was deleted when a relist happened. - // Don't provide a second report of the same deletion. - // TODO(lavalamp): This may be racy-- we aren't properly locked - // with knownObjects. - return nil + } else { + // We only want to skip the "deletion" action if the object doesn't + // exist in knownObjects and it doesn't have corresponding item in items. + // Note that even if there is a "deletion" action in items, we can ignore it, + // because it will be deduped automatically in "queueActionLocked" + _, exists, err := f.knownObjects.GetByKey(id) + _, itemsExist := f.items[id] + if err == nil && !exists && !itemsExist { + // Presumably, this was deleted when a relist happened. + // Don't provide a second report of the same deletion. + // TODO(lavalamp): This may be racy-- we aren't properly locked + // with knownObjects. + return nil + } } return f.queueActionLocked(Deleted, obj) @@ -270,6 +278,13 @@ func isDeletionDup(a, b *Delta) *Delta { return b } +// willObjectBeDeletedLocked returns true only if the last delta for the +// given object is Delete. Caller must lock first. +func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool { + deltas := f.items[id] + return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted +} + // queueActionLocked appends to the delta list for the object, calling // f.deltaCompressor if needed. Caller must lock first. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { @@ -277,6 +292,14 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err if err != nil { return KeyError{obj, err} } + + // If object is supposed to be deleted (last event is Deleted), + // then we should ignore Sync events, because it would result in + // recreation of this object. + if actionType == Sync && f.willObjectBeDeletedLocked(id) { + return nil + } + newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) if f.deltaCompressor != nil { diff --git a/pkg/client/cache/delta_fifo_test.go b/pkg/client/cache/delta_fifo_test.go index 2bead89a848ea..fb6e2a4a6782e 100644 --- a/pkg/client/cache/delta_fifo_test.go +++ b/pkg/client/cache/delta_fifo_test.go @@ -28,18 +28,22 @@ func testPop(f *DeltaFIFO) testFifoObject { } // keyLookupFunc adapts a raw function to be a KeyLookup. -type keyLookupFunc func() []string +type keyLookupFunc func() []testFifoObject // ListKeys just calls kl. func (kl keyLookupFunc) ListKeys() []string { - return kl() + result := []string{} + for _, fifoObj := range kl() { + result = append(result, fifoObj.name) + } + return result } // GetByKey returns the key if it exists in the list returned by kl. func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) { for _, v := range kl() { - if v == key { - return key, true, nil + if v.name == key { + return v, true, nil } } return nil, false, nil @@ -173,8 +177,8 @@ func TestDeltaFIFO_enqueueingWithLister(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, nil, - keyLookupFunc(func() []string { - return []string{"foo", "bar", "baz"} + keyLookupFunc(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} }), ) f.Add(mkFifoObj("foo", 10)) @@ -220,12 +224,52 @@ func TestDeltaFIFO_addReplace(t *testing.T) { } } +func TestDeltaFIFO_ResyncNonExisting(t *testing.T) { + f := NewDeltaFIFO( + testFifoObjectKeyFunc, + nil, + keyLookupFunc(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + ) + f.Delete(mkFifoObj("foo", 10)) + f.Resync() + + deltas := f.items["foo"] + if len(deltas) != 1 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[0].Type != Deleted { + t.Errorf("unexpected delta: %v", deltas[0]) + } +} + +func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) { + f := NewDeltaFIFO( + testFifoObjectKeyFunc, + nil, + keyLookupFunc(func() []testFifoObject { + return []testFifoObject{} + }), + ) + f.Add(mkFifoObj("foo", 5)) + f.Delete(mkFifoObj("foo", 6)) + + deltas := f.items["foo"] + if len(deltas) != 2 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[len(deltas)-1].Type != Deleted { + t.Errorf("unexpected delta: %v", deltas[len(deltas)-1]) + } +} + func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, nil, - keyLookupFunc(func() []string { - return []string{"foo", "bar", "baz"} + keyLookupFunc(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} }), ) f.Delete(mkFifoObj("baz", 10)) @@ -236,7 +280,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { {{Sync, mkFifoObj("foo", 5)}}, // Since "bar" didn't have a delete event and wasn't in the Replace list // it should get a tombstone key with the right Obj. - {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: "bar"}}}, + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, } for _, expected := range expectedList { diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index 17e5292977525..eaa35e62cb679 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -45,6 +45,8 @@ type Queue interface { } // Helper function for popping from Queue. +// WARNING: Do NOT use this function in non-test code to avoid races +// unless you really really really really know what you are doing. func Pop(queue Queue) interface{} { var result interface{} queue.Pop(func(obj interface{}) error {