From 9a24d7f35887adb399b0b0b1723179912806a2d8 Mon Sep 17 00:00:00 2001 From: Thomas Jay Rush Date: Mon, 4 Sep 2023 00:33:35 -0400 Subject: [PATCH] Not working... --- .../chifra/internal/scrape/handle_scrape.go | 22 +- .../chifra/internal/scrape/save_timestamps.go | 33 +-- .../chifra/internal/scrape/scrape_batch.go | 38 ++-- .../chifra/internal/scrape/scrape_blaze.go | 11 +- .../internal/scrape/scrape_consolidate.go | 197 +++++++++--------- .../chifra/internal/scrape/scrape_manager.go | 5 + 6 files changed, 157 insertions(+), 149 deletions(-) diff --git a/src/apps/chifra/internal/scrape/handle_scrape.go b/src/apps/chifra/internal/scrape/handle_scrape.go index b1b54010cd..3dcfe5e8e4 100644 --- a/src/apps/chifra/internal/scrape/handle_scrape.go +++ b/src/apps/chifra/internal/scrape/handle_scrape.go @@ -21,7 +21,10 @@ import ( // (or less if close to the head). The forever loop pauses each round for // --sleep seconds (or, if not close to the head, for .25 seconds). func (opts *ScrapeOptions) HandleScrape() error { + var blocks = make([]base.Blknum, 0, opts.BlockCnt) var err error + var ok bool + chain := opts.Globals.Chain testMode := opts.Globals.TestMode @@ -42,17 +45,15 @@ func (opts *ScrapeOptions) HandleScrape() error { // Fetch the meta data which tells us how far along the index is. if bm.meta, err = opts.Conn.GetMetaData(testMode); err != nil { var ErrFetchingMeta = fmt.Errorf("error fetching meta data: %s", err) - logger.Error(ErrFetchingMeta) + logger.Error(colors.BrightRed, ErrFetchingMeta, colors.Off) goto PAUSE } - // logger.Info(colors.Green+"meta data fetched"+colors.Off, bm.meta) // We're may be too close to the start of the chain to have ripe blocks. // Report no error but try again soon. if bm.meta.ChainHeight() < opts.Settings.Unripe_dist { goto PAUSE } - // logger.Info(colors.Green+"ripe block found"+colors.Off, bm.meta.ChainHeight()-opts.Settings.Unripe_dist) // The user may have restarted his node's sync (that is, started over). // In this case, the index may be ahead of the chain, if so we go to @@ -63,10 +64,9 @@ func (opts *ScrapeOptions) HandleScrape() error { bm.meta.NextIndexHeight(), bm.meta.ChainHeight(), ) - logger.Error(ErrIndexAhead) + logger.Error(colors.BrightRed, ErrIndexAhead, colors.Off) goto PAUSE } - // logger.Info(colors.Green+"index is not ahead of chain"+colors.Off, bm.meta.NextIndexHeight()) // Let's start a new round... bm = BlazeManager{ @@ -86,27 +86,29 @@ func (opts *ScrapeOptions) HandleScrape() error { bm.blockCount = utils.Min(opts.BlockCnt, bm.meta.ChainHeight()-bm.StartBlock()+1) // Unripe_dist behind the chain tip. bm.ripeBlock = bm.meta.ChainHeight() - opts.Settings.Unripe_dist - // logger.Info(colors.Green, bm.startBlock, bm.blockCount, bm.ripeBlock) + + // These are the blocks we're going to process this round + for block := bm.StartBlock(); block < bm.EndBlock(); block++ { + blocks = append(blocks, block) + } // Scrape this round. Only quit on catostrophic errors. Report and sleep otherwise. - if err, ok := bm.ScrapeBatch(); !ok || err != nil { + if err, ok = bm.ScrapeBatch(blocks); !ok || err != nil { logger.Error(colors.BrightRed, err, colors.Off) if !ok { break } goto PAUSE } - // logger.Info(colors.Green+"scrape batch complete"+colors.Off, bm.nProcessed) // Consilidate a chunk (if possible). Only quit on catostrophic errors. Report and sleep otherwise. - if ok, err := bm.Consolidate(); !ok || err != nil { + if err, ok = bm.Consolidate(blocks); !ok || err != nil { logger.Error(colors.BrightRed, err, colors.Off) if !ok { break } goto PAUSE } - // logger.Info(colors.Green+"consolidate complete"+colors.Off, bm.nProcessed) PAUSE: // If we've gotten this far, we want to clean up the unripe files (we no longer need them). diff --git a/src/apps/chifra/internal/scrape/save_timestamps.go b/src/apps/chifra/internal/scrape/save_timestamps.go index bd6c6c5e04..952e85ee70 100644 --- a/src/apps/chifra/internal/scrape/save_timestamps.go +++ b/src/apps/chifra/internal/scrape/save_timestamps.go @@ -10,23 +10,23 @@ import ( "os" "sort" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/base" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/config" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger" - "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/rpc" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/tslib" ) // TODO: Protect against overwriting files on disc -func (bm *BlazeManager) SaveTimestamps(endPoint uint64) error { +func (bm *BlazeManager) WriteTimestamps(blocks []base.Blknum) error { chain := bm.chain - conn := rpc.TempConnection(chain) sort.Slice(bm.timestamps, func(i, j int) bool { return bm.timestamps[i].Bn < bm.timestamps[j].Bn }) - // Assume that the existing timestamps file always contains valid timestamps in a valid order so we can only append + // Assume that the existing timestamps file always contains valid timestamps in + // a valid order so we can only append as we go (which is very fast) tsPath := config.GetPathToIndex(chain) + "ts.bin" fp, err := os.OpenFile(tsPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) if err != nil { @@ -36,39 +36,42 @@ func (bm *BlazeManager) SaveTimestamps(endPoint uint64) error { defer func() { tslib.ClearCache(chain) fp.Close() - // sigintTrap.Disable(trapCh) - // writeMutex.Unlock() }() - nTs, _ := tslib.NTimestamps(chain) - cnt := 0 - for bn := nTs; bn < endPoint; bn++ { + for _, block := range blocks { // Append to the timestamps file all the new timestamps but as we do that make sure we're // not skipping anything at the front, in the middle, or at the end of the list ts := tslib.TimestampRecord{} if cnt >= len(bm.timestamps) { ts = tslib.TimestampRecord{ - Bn: uint32(bn), - Ts: uint32(conn.GetBlockTimestamp(bn)), + Bn: uint32(block), + Ts: uint32(bm.opts.Conn.GetBlockTimestamp(block)), } } else { ts = bm.timestamps[cnt] - if bm.timestamps[cnt].Bn != uint32(bn) { + if bm.timestamps[cnt].Bn != uint32(block) { ts = tslib.TimestampRecord{ - Bn: uint32(bn), - Ts: uint32(conn.GetBlockTimestamp(bn)), + Bn: uint32(block), + Ts: uint32(bm.opts.Conn.GetBlockTimestamp(block)), } cnt-- // set it back } } - logger.Progress((bn%13) == 0, fmt.Sprintf("Checking or updating timestamps %-04d of %-04d (%d remaining)%s", bn, endPoint, endPoint-bn, spaces)) + msg := fmt.Sprintf("Updating timestamps %-04d of %-04d (%-04d remaining)", + block, + blocks[len(blocks)-1], + blocks[len(blocks)-1]-block, + ) + logger.Progress((block%13) == 0, msg) + if err = binary.Write(fp, binary.LittleEndian, &ts); err != nil { return err } cnt++ } + return nil } diff --git a/src/apps/chifra/internal/scrape/scrape_batch.go b/src/apps/chifra/internal/scrape/scrape_batch.go index 4c21037136..1859f3e137 100644 --- a/src/apps/chifra/internal/scrape/scrape_batch.go +++ b/src/apps/chifra/internal/scrape/scrape_batch.go @@ -5,37 +5,33 @@ package scrapePkg // be found in the LICENSE file. import ( - "errors" "fmt" - "strings" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/base" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/config" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/index" ) -// ScrapeBatch is called each time around the forever loop prior to calling into -// Blaze to actually scrape the blocks. -func (bm *BlazeManager) ScrapeBatch() (error, bool) { - chain := bm.chain +// ScrapeBatch is called each time around the forever loop. It calls into +// HandleBlaze and writes the timestamps if there's no error. +func (bm *BlazeManager) ScrapeBatch(blocks []base.Blknum) (error, bool) { + indexPath := config.GetPathToIndex(bm.chain) - // Do the actual scrape, wait until it finishes, clean up and return on failure - if _, err := bm.HandleBlaze(); err != nil { - _ = index.CleanTemporaryFolders(config.GetPathToIndex(chain), false) - return err, true + if err, ok := bm.HandleBlaze(blocks); !ok || err != nil { + _ = index.CleanTemporaryFolders(indexPath, false) + return err, ok } - start := bm.StartBlock() - end := bm.StartBlock() + bm.BlockCount() - - for bn := start; bn < end; bn++ { - if !bm.processedMap[bn] { - // At least one block was not processed. This would only happen in the event of an - // error, so clean up, report the error and return. The loop will repeat. - _ = index.CleanTemporaryFolders(config.GetPathToIndex(chain), false) - msg := fmt.Sprintf("A block %d was not processed%s", bn, strings.Repeat(" ", 50)) - return errors.New(msg), true + // Check to see if we missed any blocks... + for _, block := range blocks { + if !bm.processedMap[block] { + // We missed a block. We need to clean up and continue + // next time around the loop. This may happen if the + // node returns an error for example. + _ = index.CleanTemporaryFolders(indexPath, false) + return fmt.Errorf("a block (%d) was not processed", block), true } } - return bm.SaveTimestamps(end), true + return bm.WriteTimestamps(blocks), true } diff --git a/src/apps/chifra/internal/scrape/scrape_blaze.go b/src/apps/chifra/internal/scrape/scrape_blaze.go index 950f0fecb2..c02d5b7272 100644 --- a/src/apps/chifra/internal/scrape/scrape_blaze.go +++ b/src/apps/chifra/internal/scrape/scrape_blaze.go @@ -20,14 +20,7 @@ import ( // HandleBlaze does the actual scraping, walking through block_cnt blocks and querying traces and logs // and then extracting addresses and timestamps from those data structures. -func (bm *BlazeManager) HandleBlaze() (ok bool, err error) { - blocks := []base.Blknum{} - - start := bm.StartBlock() - end := bm.StartBlock() + bm.BlockCount() - for block := start; block < end; block++ { - blocks = append(blocks, block) - } +func (bm *BlazeManager) HandleBlaze(blocks []base.Blknum) (err error, ok bool) { // We need three pipelines...we shove into blocks, blocks shoves into appearances and timestamps blockChannel := make(chan base.Blknum) @@ -75,7 +68,7 @@ func (bm *BlazeManager) HandleBlaze() (ok bool, err error) { close(tsChannel) tsWg.Wait() - return true, nil + return nil, true } // ProcessBlocks processes the block channel and for each block query the node for both diff --git a/src/apps/chifra/internal/scrape/scrape_consolidate.go b/src/apps/chifra/internal/scrape/scrape_consolidate.go index a0dd4f941e..ebe1afd325 100644 --- a/src/apps/chifra/internal/scrape/scrape_consolidate.go +++ b/src/apps/chifra/internal/scrape/scrape_consolidate.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" "strings" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/base" @@ -13,118 +12,129 @@ import ( "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/file" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/index" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger" - "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/rpc" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/utils" ) const asciiAppearanceSize = 59 // Consolidate calls into the block scraper to (a) call Blaze and (b) consolidate if applicable -func (bm *BlazeManager) Consolidate() (bool, error) { +func (bm *BlazeManager) Consolidate(blocks []base.Blknum) (error, bool) { chain := bm.chain + settings := bm.opts.Settings + testMode := bm.opts.Globals.TestMode - // Get a sorted list of files in the ripe folder ripeFolder := filepath.Join(config.GetPathToIndex(chain), "ripe") - ripeFileList, err := os.ReadDir(ripeFolder) + stageFolder := filepath.Join(config.GetPathToIndex(chain), "staging") + + ripeFiles, err := os.ReadDir(ripeFolder) if err != nil { - return true, err + return err, true } - stageFolder := filepath.Join(config.GetPathToIndex(chain), "staging") - if len(ripeFileList) == 0 { - // On active chains, this most likely never happens, but on some less used or private chains, this is a frequent occurrence. - // return a message, but don't do anything about it. - msg := fmt.Sprintf("No new blocks at block %d (%d away from head)%s", bm.meta.Latest, (bm.meta.Latest - bm.meta.Ripe), spaces) - logger.Info(msg) - - // we need to move the file to the end of the scraped range so we show progress - stageFn, _ := file.LatestFileInFolder(stageFolder) // it may not exist... - stageRange := base.RangeFromFilename(stageFn) - end := bm.StartBlock() + bm.BlockCount() - newRangeLast := utils.Min(bm.ripeBlock, end-1) - if stageRange.Last < newRangeLast { - newRange := base.FileRange{First: stageRange.First, Last: newRangeLast} - newFilename := filepath.Join(stageFolder, newRange.String()+".txt") - _ = os.Rename(stageFn, newFilename) - _ = os.Remove(stageFn) // seems redundant, but may not be on some operating systems + if len(ripeFiles) == 0 { + // There are no new ripeFiles, but we've processed all blocks in the block + // list. We want to rename the stage file (if it exists) to reflect the + // new stage head. If the file doesn't exist, just touch it. The scraper + // uses the stage file's name to know where to start the next loop. + curStage, _ := file.LatestFileInFolder(stageFolder) // it may not exist... + curRange := base.RangeFromFilename(curStage) + + // All blocks in the list have been processed, so we know where the stage ends. + newRange := base.FileRange{First: curRange.First, Last: blocks[len(blocks)-1]} + newStage := filepath.Join(stageFolder, newRange.String()+".txt") + + if file.FileExists(curStage) { + _ = os.Rename(curStage, newStage) + _ = os.Remove(curStage) // seems redundant, but may not be on some operating systems + } else { + file.Touch(newStage) } - return true, nil + + // Let the user know something happened... + return fmt.Errorf("no new ripe blocks found at %d (%d away from head)", + bm.meta.ChainHeight(), + (bm.meta.ChainHeight() - bm.meta.Ripe), + ), true } - // Check to see if we got as many ripe files as we were expecting. In the case when AllowMissing is true, we - // can't really know, but if AllowMissing is false, then the number of files should be the same as the range width - ripeCnt := len(ripeFileList) - unripeDist := bm.opts.Settings.Unripe_dist - if uint64(ripeCnt) < (bm.BlockCount() - unripeDist) { - // Then, if they are not at least sequential, clean up and try again... - allowMissing := bm.opts.Settings.Allow_missing - if err := isListSequential(chain, ripeFileList, allowMissing); err != nil { + // If the number of ripe files is less than we expected... + if uint64(len(ripeFiles)) < (bm.BlockCount() - settings.Unripe_dist) { + // ...at least make sure they're sequential (if the chain so desires). + if err := bm.isListSequential(chain, ripeFiles); err != nil { _ = index.CleanTemporaryFolders(config.GetPathToCache(chain), false) - return true, err + return err, true } } stageFn, _ := file.LatestFileInFolder(stageFolder) // it may not exist... - nAppsThen := int(file.FileSize(stageFn) / asciiAppearanceSize) - - // ripeRange := rangeFromFileList(ripeFileList) stageRange := base.RangeFromFilename(stageFn) + nAppsThen := int(file.FileSize(stageFn) / asciiAppearanceSize) - curRange := base.FileRange{First: bm.StartBlock(), Last: bm.StartBlock() + bm.BlockCount() - 1} + var curRange base.FileRange if file.FileExists(stageFn) { curRange = stageRange + } else { + curRange = base.FileRange{First: blocks[0], Last: blocks[len(blocks)-1]} } - // Note, this file may be empty or non-existant - tmpPath := filepath.Join(config.GetPathToCache(chain) + "tmp") - backupFn, err := file.MakeBackup(tmpPath, stageFn) + // Note, the stage may be empty or non-existant + backupFn, err := file.MakeBackup(filepath.Join(config.GetPathToCache(chain)+"tmp"), stageFn) if err != nil { - return true, errors.New("Could not create backup file: " + err.Error()) + return errors.New("Could not create backup file: " + err.Error()), true } - - // logger.Info("Created backup file for stage") defer func() { + // If the backup file exists, the function did not complete. In that case, we replace the original file. if backupFn != "" && file.FileExists(backupFn) { - // If the backup file exists, something failed, so we replace the original file. - // logger.Info("Replacing backed up staging file") _ = os.Rename(backupFn, stageFn) _ = os.Remove(backupFn) // seems redundant, but may not be on some operating systems } }() - appearances := file.AsciiFileToLines(stageFn) - os.Remove(stageFn) // we have a backup copy, so it's not so bad to delete it here - for _, ripeFile := range ripeFileList { - ripePath := filepath.Join(ripeFolder, ripeFile.Name()) - appearances = append(appearances, file.AsciiFileToLines(ripePath)...) - os.Remove(ripePath) // if we fail halfway through, this will get noticed next time around and cleaned up - curCount := uint64(len(appearances)) + // Note: + // if we fail at any point past this, the backup file will replace the original file - ripeRange := base.RangeFromFilename(ripePath) + // Start by copying in the existing appearances on the stage... + appearances := file.AsciiFileToLines(stageFn) + os.Remove(stageFn) // We have a backup copy, so it's okay to remove this here... + + // Spin through each ripe file and add it to the appearances array until we have either (a) a snap + // point or (b) enough appearances to overtop. In either case, we consolidate the appearances into + // a chunk, create its bloom, and write it to disk. Note that each time we copy one of the ripe + // files, we remove it. + for _, ripeFile := range ripeFiles { + thisFile := filepath.Join(ripeFolder, ripeFile.Name()) + ripeRange := base.RangeFromFilename(thisFile) curRange.Last = ripeRange.Last - isSnap := (curRange.Last >= bm.opts.Settings.First_snap && (curRange.Last%bm.opts.Settings.Snap_to_grid) == 0) - isOvertop := (curCount >= uint64(bm.opts.Settings.Apps_per_chunk)) + // append the appearances from this file + appearances = append(appearances, file.AsciiFileToLines(thisFile)...) + os.Remove(thisFile) // if we fail halfway through, this will get noticed next time around and cleaned up + + curCount := uint64(len(appearances)) + isSnap := (curRange.Last >= settings.First_snap && (curRange.Last%settings.Snap_to_grid) == 0) + isOvertop := curCount >= settings.Apps_per_chunk if isSnap || isOvertop { - // we're consolidating... + // We've found a chunk. We can consolidate it. A chunk is a relational table relating + // a table of addresses to where they appear in a table of appearances. We store, for + // each address, its appearances. appMap := make(index.AddressAppearanceMap, len(appearances)) for _, line := range appearances { parts := strings.Split(line, "\t") - if len(parts) == 3 { - addr := strings.ToLower(parts[0]) - bn, _ := strconv.ParseUint(parts[1], 10, 32) - txid, _ := strconv.ParseUint(parts[2], 10, 32) - appMap[addr] = append(appMap[addr], index.AppearanceRecord{ - BlockNumber: uint32(bn), - TransactionId: uint32(txid), - }) + if len(parts) != 3 { + // protect ourselves -- TODO: should we report this? + continue } + addr := strings.ToLower(parts[0]) + appMap[addr] = append(appMap[addr], index.AppearanceRecord{ + BlockNumber: uint32(utils.MustParseUint(parts[1])), + TransactionId: uint32(utils.MustParseUint(parts[2])), + }) } indexPath := config.GetPathToIndex(chain) + "finalized/" + curRange.String() + ".bin" if report, err := index.WriteChunk(chain, indexPath, appMap, len(appearances), bm.opts.Pin, bm.opts.Remote); err != nil { - return false, err + return err, false } else if report == nil { logger.Fatal("Should not happen, write chunk returned empty report") } else { @@ -132,46 +142,49 @@ func (bm *BlazeManager) Consolidate() (bool, error) { report.Report() } - curRange.First = curRange.Last + 1 - appearances = []string{} + // There may be more than one chunk in the current stage, so continue until we're finished. + appearances = []string{} // clear } } + // There are almost certainly appearances left over that were not consolidated. Write these back to the stage. if len(appearances) > 0 { - lineLast := appearances[len(appearances)-1] - parts := strings.Split(lineLast, "\t") - Last := uint64(0) - if len(parts) > 1 { - Last, _ = strconv.ParseUint(parts[1], 10, 32) - Last = utils.Max(utils.Min(bm.ripeBlock, bm.StartBlock()+bm.BlockCount()-1), Last) - } else { - return true, errors.New("Cannot find last block number at lineLast in consolidate: " + lineLast) - } + lastLine := appearances[len(appearances)-1] + parts := strings.Split(lastLine, "\t") + if len(parts) < 2 { + // note that we don't remove the backup file + return fmt.Errorf("cannot find block number at last line (%s)", lastLine), true - conn := rpc.TempConnection(chain) - m, _ := conn.GetMetaData(bm.opts.Globals.TestMode) - rng := base.FileRange{First: m.Finalized + 1, Last: Last} - f := fmt.Sprintf("%s.txt", rng) - fileName := filepath.Join(config.GetPathToIndex(chain), "staging", f) - err = file.LinesToAsciiFile(fileName, appearances) - if err != nil { - os.Remove(fileName) // cleans up by replacing the previous stage - return true, err + } else { + last := blocks[len(blocks)-1] + meta, _ := bm.opts.Conn.GetMetaData(testMode) // the meta may have changed when we wrote the chunks + newStageRng := base.FileRange{First: meta.Finalized + 1, Last: last} + newStageFn := filepath.Join(config.GetPathToIndex(chain), "staging", fmt.Sprintf("%s.txt", newStageRng)) + err = file.LinesToAsciiFile(newStageFn, appearances) + if err != nil { + os.Remove(newStageFn) // remove it if it was created, note that we don't remove the backup file + return err, true + } } } + // Let the user know what happened... stageFn, _ = file.LatestFileInFolder(stageFolder) // it may not exist... nAppsNow := int(file.FileSize(stageFn) / asciiAppearanceSize) - bm.report(int(bm.opts.Settings.Apps_per_chunk), nAppsThen, nAppsNow) + bm.report(int(settings.Apps_per_chunk), nAppsThen, nAppsNow) - os.Remove(backupFn) // commits the change + // Commit the change by deleting the backup file (if the backup file exists + // when this function exits, it will replace the original file - thereby + // starting over). + os.Remove(backupFn) - return true, err + return err, true } -func isListSequential(chain string, ripeFileList []os.DirEntry, allowMissing bool) error { +func (bm *BlazeManager) isListSequential(chain string, ripeFiles []os.DirEntry) error { prev := base.NotARange - for _, file := range ripeFileList { + allowMissing := bm.opts.Settings.Allow_missing + for _, file := range ripeFiles { fileRange := base.RangeFromFilename(file.Name()) if prev != base.NotARange && prev != fileRange { if !prev.Preceeds(fileRange, !allowMissing) { @@ -182,7 +195,3 @@ func isListSequential(chain string, ripeFileList []os.DirEntry, allowMissing boo } return nil } - -var spaces = strings.Repeat(" ", 40) - -// TODO: chifra scrape misreports appearances per block when consolidating #2291 (closed, but copied here as a TODO) diff --git a/src/apps/chifra/internal/scrape/scrape_manager.go b/src/apps/chifra/internal/scrape/scrape_manager.go index 92f1885c45..57f581b7dc 100644 --- a/src/apps/chifra/internal/scrape/scrape_manager.go +++ b/src/apps/chifra/internal/scrape/scrape_manager.go @@ -39,6 +39,11 @@ func (bm *BlazeManager) BlockCount() base.Blknum { return bm.blockCount } +// EndBlock returns the last block to process for this pass of the scraper. +func (bm *BlazeManager) EndBlock() base.Blknum { + return bm.startBlock + bm.blockCount +} + // Report prints out a report of the progress of the scraper. func (bm *BlazeManager) report(perChunk, nAppsThen, nAppsNow int) { need := perChunk - utils.Min(perChunk, nAppsNow)