-
Notifications
You must be signed in to change notification settings - Fork 295
/
Copy pathpreimages.go
379 lines (355 loc) · 10.3 KB
/
preimages.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
package core
import (
"encoding/csv"
"fmt"
"io"
"os"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/pkg/errors"
"github.com/syndtr/goleveldb/leveldb"
)
// ImportPreimages is public so `main.go` can call it directly`
func ImportPreimages(chain BlockChain, path string) error {
reader, err := os.Open(path)
if err != nil {
return fmt.Errorf("could not open file for reading: %s", err)
}
defer reader.Close()
csvReader := csv.NewReader(reader)
dbReader := chain.ChainDb()
imported := uint64(0)
for {
record, err := csvReader.Read()
if errors.Is(err, io.EOF) {
return fmt.Errorf("MyBlockNumber field missing, cannot proceed")
}
if err != nil {
return fmt.Errorf("could not read from reader: %s", err)
}
// this means the address is a number
if blockNumber, err := strconv.ParseUint(record[1], 10, 64); err == nil {
if record[0] == "MyBlockNumber" {
// set this value in database, and prometheus, if needed
prev, err := rawdb.ReadPreimageImportBlock(dbReader)
if err != nil {
if !errors.Is(err, leveldb.ErrNotFound) {
return fmt.Errorf("no prior value found, overwriting: %s", err)
}
}
if blockNumber > prev {
if rawdb.WritePreimageImportBlock(dbReader, blockNumber) != nil {
return fmt.Errorf("error saving last import block: %s", err)
}
}
// this is the last record
imported = blockNumber
break
}
}
key := ethCommon.HexToHash(record[0])
value := ethCommon.Hex2Bytes(record[1])
// validate
if crypto.Keccak256Hash(value) != key {
fmt.Println("Data mismatch: skipping", record)
continue
}
// add to database
_ = rawdb.WritePreimages(
dbReader, map[ethCommon.Hash][]byte{
key: value,
},
)
}
// now, at this point, we will have to generate missing pre-images
if imported != 0 {
genStart, _ := rawdb.ReadPreImageStartBlock(dbReader)
genEnd, _ := rawdb.ReadPreImageEndBlock(dbReader)
current := chain.CurrentBlock().NumberU64()
toGenStart, toGenEnd := FindMissingRange(imported, genStart, genEnd, current)
if toGenStart != 0 && toGenEnd != 0 {
if err := GeneratePreimages(
chain, toGenStart, toGenEnd,
); err != nil {
return fmt.Errorf("error generating: %s", err)
}
}
}
return nil
}
// ExportPreimages is public so `main.go` can call it directly`
func ExportPreimages(chain BlockChain, path string) error {
if err := chain.CommitPreimages(); err != nil {
return fmt.Errorf("unable to commit preimages: %w", err)
}
// set up csv
writer, err := os.Create(path)
if err != nil {
utils.Logger().Error().
Msgf("unable to create file at %s due to %s", path, err)
return fmt.Errorf(
"unable to create file at %s due to %s",
path, err,
)
}
csvWriter := csv.NewWriter(writer)
// open trie
block := chain.CurrentBlock()
statedb, err := chain.StateAt(block.Root())
if err != nil {
utils.Logger().Error().
Msgf(
"unable to open statedb at %s due to %s",
block.Root(), err,
)
return fmt.Errorf(
"unable to open statedb at %x due to %s",
block.Root(), err,
)
}
trie, err := statedb.Database().OpenTrie(
block.Root(),
)
if err != nil {
utils.Logger().Error().
Msgf(
"unable to open trie at %x due to %s",
block.Root(), err,
)
return fmt.Errorf(
"unable to open trie at %x due to %s",
block.Root(), err,
)
}
accountIterator := trie.NodeIterator(nil)
dbReader := chain.ChainDb()
for accountIterator.Next(true) {
// the leaf nodes of the MPT represent accounts
if !accountIterator.Leaf() {
continue
}
// the leaf key is the hashed address
hashed := accountIterator.LeafKey()
asHash := ethCommon.BytesToHash(hashed)
// obtain the corresponding address
preimage := rawdb.ReadPreimage(
dbReader, asHash,
)
if len(preimage) == 0 {
utils.Logger().Warn().
Msgf("Address not found for %x", asHash)
continue
}
address := ethCommon.BytesToAddress(preimage)
// key value format, so hash of value is first
csvWriter.Write([]string{
fmt.Sprintf("%x", asHash.Bytes()),
fmt.Sprintf("%x", address.Bytes()),
})
}
// lastly, write the block number
csvWriter.Write(
[]string{
"MyBlockNumber",
block.Number().String(),
},
)
// to disk
csvWriter.Flush()
if err := csvWriter.Error(); err != nil {
utils.Logger().Error().
Msgf("unable to write csv due to %s", err)
return fmt.Errorf("unable to write csv due to %s", err)
}
writer.Close()
return nil
}
func GeneratePreimages(chain BlockChain, start, end uint64) error {
if start < 2 {
return fmt.Errorf("too low starting point %d", start)
}
fmt.Println("generating from", start, "to", end)
// fetch all the blocks, from start and end both inclusive
// then execute them - the execution will write the pre-images
// to disk and we are good to go
// attempt to find a block number for which we have block and state
// with number < start
var startingState *state.DB
var startingBlock *types.Block
for i := start - 1; i > 0; i-- {
fmt.Println("finding block number", i)
startingBlock = chain.GetBlockByNumber(i)
if startingBlock == nil {
fmt.Println("not found block number", i)
// rewound too much in snapdb, so exit loop
// although this is only designed for s2/s3 nodes in mind
// which do not have such a snapdb
break
}
fmt.Println("found block number", startingBlock.NumberU64(), startingBlock.Root().Hex())
stateAt, err := chain.StateAt(startingBlock.Root())
if err != nil {
continue
}
startingState = stateAt
break
}
if startingBlock == nil || startingState == nil {
return fmt.Errorf("no eligible starting block with state found")
}
var endingState *state.DB
var errProcess error
// now execute block T+1 based on starting state
for i := startingBlock.NumberU64() + 1; i <= end; i++ {
if i%10000 == 0 {
fmt.Println("processing block", i)
}
block := chain.GetBlockByNumber(i)
if block == nil {
// because we have startingBlock we must have all following
return fmt.Errorf("block %d not found", i)
}
stateAt, _ := chain.StateAt(block.Root())
_, _, _, _, _, _, endingState, errProcess = chain.Processor().Process(block, startingState, *chain.GetVMConfig(), false)
if errProcess != nil {
return fmt.Errorf("error executing block #%d: %s", i, errProcess)
}
if stateAt != nil {
if root, err := endingState.Commit(false); err != nil {
return fmt.Errorf("unable to commit state for block '%d': %w", i, err)
} else if root.Hex() != block.Root().Hex() {
return fmt.Errorf("block root hashes different after commit commitRoot='%s' blockRoot='%s'", root.Hex(), block.Root().Hex())
}
if err := chain.CommitPreimages(); err != nil {
return fmt.Errorf("error committing preimages for block '%d': %w", i, err)
}
startingState = stateAt
} else {
startingState = endingState
}
}
// force any pre-images in memory so far to go to disk, if they haven't already
fmt.Println("committing images")
if endingState != nil {
if _, err := endingState.Commit(false); err != nil {
return fmt.Errorf("unable to commit state for block: %w", err)
}
}
if err := chain.CommitPreimages(); err != nil {
return fmt.Errorf("error committing preimages %s", err)
}
if _, _, err := rawdb.WritePreImageStartEndBlock(chain.ChainDb(), startingBlock.NumberU64(), end); err != nil {
return fmt.Errorf("error writing pre-image gen blocks %s", err)
}
return nil
}
func FindMissingRange(
imported, start, end, current uint64,
) (uint64, uint64) {
// both are unset
if start == 0 && end == 0 {
if imported < current {
return imported + 1, current
} else {
return 0, 0
}
}
// constraints: start <= end <= current
// in regular usage, we should have end == current
// however, with the GenerateFlag usage, we can have end < current
check1 := start <= end
if !check1 {
panic("Start > End")
}
check2 := end <= current
if !check2 {
panic("End > Current")
}
// imported can sit in any of the 4 ranges
if imported < start {
// both inclusive
return imported + 1, start - 1
}
if imported < end {
return end + 1, current
}
if imported < current {
return imported + 1, current
}
// future data imported
if current < imported {
return 0, 0
}
return 0, 0
}
func VerifyPreimages(header *block.Header, chain BlockChain) (uint64, error) {
var existingPreimages uint64
parentRoot := chain.GetBlockByHash(
header.ParentHash(),
).Root() // for examining MPT at this root, should exist
db, err := chain.StateAt(parentRoot)
if err != nil {
return 0, err
}
trie, err := db.Database().OpenTrie(parentRoot)
if err != nil {
return 0, err
}
if err := chain.CommitPreimages(); err != nil {
return 0, fmt.Errorf("unable to commit preimages: %w", err)
}
diskDB := db.Database().DiskDB()
// start the iteration
accountIterator := trie.NodeIterator(nil)
for accountIterator.Next(true) {
// leaf means leaf node of the MPT, which is an account
// the leaf key is the address
if accountIterator.Leaf() {
key := accountIterator.LeafKey()
preimage := rawdb.ReadPreimage(diskDB, common.BytesToHash(key))
if len(preimage) == 0 {
err := errors.New(
fmt.Sprintf(
"cannot find preimage for %x after '%d' accounts", key, existingPreimages,
),
)
utils.Logger().Warn().Msg(err.Error())
return existingPreimages, err
}
address := common.BytesToAddress(preimage)
// skip blank address
if address == (common.Address{}) {
continue
}
existingPreimages++
}
}
return existingPreimages, nil
}
func WritePreimagesMetricsIntoPrometheus(chain BlockChain, sendMetrics func(preimageStart, preimageEnd, lastPreimageImport, verifiedAddresses uint64, shard uint32)) {
shardID := chain.ShardID()
if shardID < 2 {
return
}
ticker := time.NewTicker(time.Minute * 5)
dbReader := chain.ChainDb()
for {
select {
case <-ticker.C:
lastImport, _ := rawdb.ReadPreimageImportBlock(dbReader)
startBlock, _ := rawdb.ReadPreImageStartBlock(dbReader)
endBlock, _ := rawdb.ReadPreImageEndBlock(dbReader)
chain.CurrentBlock().NumberU64()
verify, _ := VerifyPreimages(chain.CurrentBlock().Header(), chain)
sendMetrics(startBlock, endBlock, lastImport, verify, shardID)
}
}
}