Skip to content

Commit

Permalink
Fix bugs in DeltaFIFO
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed Jun 15, 2016
1 parent 3c822c0 commit ccd42e9
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 15 deletions.
35 changes: 29 additions & 6 deletions pkg/client/cache/delta_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,20 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
// Don't provide a second report of the same deletion.
return nil
}
} else if _, exists, err := f.knownObjects.GetByKey(id); err == nil && !exists {
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
// TODO(lavalamp): This may be racy-- we aren't properly locked
// with knownObjects.
return nil
} else {
// We only want to skip the "deletion" action if the object doesn't
// exist in knownObjects and it doesn't have corresponding item in items.
// Note that even if there is a "deletion" action in items, we can ignore it,
// because it will be deduped automatically in "queueActionLocked"
_, exists, err := f.knownObjects.GetByKey(id)
_, itemsExist := f.items[id]
if err == nil && !exists && !itemsExist {
// Presumably, this was deleted when a relist happened.
// Don't provide a second report of the same deletion.
// TODO(lavalamp): This may be racy-- we aren't properly locked
// with knownObjects.
return nil
}
}

return f.queueActionLocked(Deleted, obj)
Expand Down Expand Up @@ -270,13 +278,28 @@ func isDeletionDup(a, b *Delta) *Delta {
return b
}

// willObjectBeDeletedLocked returns true only if the last delta for the
// given object is Delete. Caller must lock first.
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
deltas := f.items[id]
return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
}

// queueActionLocked appends to the delta list for the object, calling
// f.deltaCompressor if needed. Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}

// If object is supposed to be deleted (last event is Deleted),
// then we should ignore Sync events, because it would result in
// recreation of this object.
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}

newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if f.deltaCompressor != nil {
Expand Down
62 changes: 53 additions & 9 deletions pkg/client/cache/delta_fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,22 @@ func testPop(f *DeltaFIFO) testFifoObject {
}

// keyLookupFunc adapts a raw function to be a KeyLookup.
type keyLookupFunc func() []string
type keyLookupFunc func() []testFifoObject

// ListKeys just calls kl.
func (kl keyLookupFunc) ListKeys() []string {
return kl()
result := []string{}
for _, fifoObj := range kl() {
result = append(result, fifoObj.name)
}
return result
}

// GetByKey returns the key if it exists in the list returned by kl.
func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
for _, v := range kl() {
if v == key {
return key, true, nil
if v.name == key {
return v, true, nil
}
}
return nil, false, nil
Expand Down Expand Up @@ -173,8 +177,8 @@ func TestDeltaFIFO_enqueueingWithLister(t *testing.T) {
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
nil,
keyLookupFunc(func() []string {
return []string{"foo", "bar", "baz"}
keyLookupFunc(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
)
f.Add(mkFifoObj("foo", 10))
Expand Down Expand Up @@ -220,12 +224,52 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
}
}

func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
nil,
keyLookupFunc(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5)}
}),
)
f.Delete(mkFifoObj("foo", 10))
f.Resync()

deltas := f.items["foo"]
if len(deltas) != 1 {
t.Fatalf("unexpected deltas length: %v", deltas)
}
if deltas[0].Type != Deleted {
t.Errorf("unexpected delta: %v", deltas[0])
}
}

func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) {
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
nil,
keyLookupFunc(func() []testFifoObject {
return []testFifoObject{}
}),
)
f.Add(mkFifoObj("foo", 5))
f.Delete(mkFifoObj("foo", 6))

deltas := f.items["foo"]
if len(deltas) != 2 {
t.Fatalf("unexpected deltas length: %v", deltas)
}
if deltas[len(deltas)-1].Type != Deleted {
t.Errorf("unexpected delta: %v", deltas[len(deltas)-1])
}
}

func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
nil,
keyLookupFunc(func() []string {
return []string{"foo", "bar", "baz"}
keyLookupFunc(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
)
f.Delete(mkFifoObj("baz", 10))
Expand All @@ -236,7 +280,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
{{Sync, mkFifoObj("foo", 5)}},
// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: "bar"}}},
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},
}

for _, expected := range expectedList {
Expand Down
2 changes: 2 additions & 0 deletions pkg/client/cache/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Queue interface {
}

// Helper function for popping from Queue.
// WARNING: Do NOT use this function in non-test code to avoid races
// unless you really really really really know what you are doing.
func Pop(queue Queue) interface{} {
var result interface{}
queue.Pop(func(obj interface{}) error {
Expand Down

0 comments on commit ccd42e9

Please sign in to comment.