-
Notifications
You must be signed in to change notification settings - Fork 266
/
diff.go
152 lines (133 loc) · 3.86 KB
/
diff.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package iavl
import (
"bytes"
"github.com/cosmos/iavl/proto"
)
type (
KVPair = proto.KVPair
ChangeSet = proto.ChangeSet
)
// KVPairReceiver is callback parameter of method `extractStateChanges` to receive stream of `KVPair`s.
type KVPairReceiver func(pair *KVPair) error
// extractStateChanges extracts the state changes by between two versions of the tree.
// it first traverse the `root` tree until the first `sharedNode` and record the new leave nodes,
// then traverse the `prevRoot` tree until the current `sharedNode` to find out orphaned leave nodes,
// compare orphaned leave nodes and new leave nodes to produce stream of `KVPair`s and passed to callback.
//
// The algorithm don't run in constant memory strictly, but it tried the best the only
// keep minimal intermediate states in memory.
func (ndb *nodeDB) extractStateChanges(prevVersion int64, prevRoot, root []byte, receiver KVPairReceiver) error {
curIter, err := NewNodeIterator(root, ndb)
if err != nil {
return err
}
prevIter, err := NewNodeIterator(prevRoot, ndb)
if err != nil {
return err
}
var (
// current shared node between two versions
sharedNode *Node
// record the newly added leaf nodes during the traversal to the `sharedNode`,
// will be compared with found orphaned nodes to produce change set stream.
newLeaves []*Node
)
// consumeNewLeaves concumes remaining `newLeaves` nodes and produce insertion `KVPair`.
consumeNewLeaves := func() error {
for _, node := range newLeaves {
if err := receiver(&KVPair{
Key: node.key,
Value: node.value,
}); err != nil {
return err
}
}
newLeaves = newLeaves[:0]
return nil
}
// advanceSharedNode forward `curIter` until the next `sharedNode`,
// `sharedNode` will be `nil` if the new version is exhausted.
// it also records the new leaf nodes during the traversal.
advanceSharedNode := func() error {
if err := consumeNewLeaves(); err != nil {
return err
}
sharedNode = nil
for curIter.Valid() {
node := curIter.GetNode()
shared := node.nodeKey.version <= prevVersion
curIter.Next(shared)
if shared {
sharedNode = node
break
} else if node.isLeaf() {
newLeaves = append(newLeaves, node)
}
}
return nil
}
if err := advanceSharedNode(); err != nil {
return err
}
// addOrphanedLeave receives a new orphaned leave node found in previous version,
// compare with the current newLeaves, to produce `iavl.KVPair` stream.
addOrphanedLeave := func(orphaned *Node) error {
for len(newLeaves) > 0 {
newLeave := newLeaves[0]
switch bytes.Compare(orphaned.key, newLeave.key) {
case 1:
// consume a new node as insertion and continue
newLeaves = newLeaves[1:]
if err := receiver(&KVPair{
Key: newLeave.key,
Value: newLeave.value,
}); err != nil {
return err
}
continue
case -1:
// removal, don't consume new nodes
return receiver(&KVPair{
Delete: true,
Key: orphaned.key,
})
case 0:
// update, consume the new node and stop
newLeaves = newLeaves[1:]
return receiver(&KVPair{
Key: newLeave.key,
Value: newLeave.value,
})
}
}
// removal
return receiver(&KVPair{
Delete: true,
Key: orphaned.key,
})
}
// Traverse `prevIter` to find orphaned nodes in the previous version,
// and compare them with newLeaves to generate `KVPair` stream.
for prevIter.Valid() {
node := prevIter.GetNode()
shared := sharedNode != nil && (node == sharedNode || bytes.Equal(node.hash, sharedNode.hash))
// skip sub-tree of shared nodes
prevIter.Next(shared)
if shared {
if err := advanceSharedNode(); err != nil {
return err
}
} else if node.isLeaf() {
if err := addOrphanedLeave(node); err != nil {
return err
}
}
}
if err := consumeNewLeaves(); err != nil {
return err
}
if err := curIter.Error(); err != nil {
return err
}
return prevIter.Error()
}