Skip to content

Commit

Permalink
Updating scraper
Browse files Browse the repository at this point in the history
  • Loading branch information
tjayrush committed Sep 14, 2023
1 parent cdb8473 commit c592069
Show file tree
Hide file tree
Showing 7 changed files with 523 additions and 366 deletions.
182 changes: 121 additions & 61 deletions src/apps/chifra/internal/scrape/handle_scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,111 +7,171 @@ package scrapePkg
import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/base"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/colors"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/config"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/file"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/index"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/tslib"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/utils"
)

// HandleScrape enters a forever loop and continually scrapes either BlockCnt blocks
// or until the chain is caught up. It pauses for Sleep --sleep seconds between each scrape.
// HandleScrape enters a forever loop and continually scrapes --block_cnt blocks
// (or less if close to the head). The forever loop pauses each round for
// --sleep seconds (or, if not close to the head, for .25 seconds).
func (opts *ScrapeOptions) HandleScrape() error {
var blocks = make([]base.Blknum, 0, opts.BlockCnt)
var err error
var ok bool

chain := opts.Globals.Chain
testMode := opts.Globals.TestMode
origBlockCnt := opts.BlockCnt

blazeMan := BlazeManager{
chain: chain,
timestamps: make([]tslib.TimestampRecord, 0, origBlockCnt),
processedMap: make(map[base.Blknum]bool, origBlockCnt),
nProcessed: 0,
opts: opts,
}
blazeMan.meta, err = opts.Conn.GetMetaData(testMode)
if err != nil {
return err
}

// Clean the temporary files and makes sure block zero has been processed
if ok, err := opts.Prepare(); !ok || err != nil {
return err
}

ripeBlock := base.Blknum(0)
unripePath := filepath.Join(config.GetPathToIndex(chain), "unripe")

// The forever loop. Loop until the user hits Cntl+C or the server tells us to stop.
runCount := uint64(0)
// Loop until the user hits Cntl+C, until runCount runs out, or until
// the server tells us to stop.
for {
if blazeMan.meta, err = opts.Conn.GetMetaData(testMode); err != nil {
logger.Error(fmt.Sprintf("Error fetching meta data: %s. Sleeping...", err))
goto PAUSE
// We create a new manager for each loop...we will populate it in a minute...
bm := BlazeManager{
chain: chain,
}

if blazeMan.meta.NextIndexHeight() > blazeMan.meta.ChainHeight() {
// If the user is re-syncing the chain, the index may be ahead of the chain,
// so we go to sleep and try again later.
msg := fmt.Sprintf("The index (%d) is ahead of the chain (%d).",
blazeMan.meta.NextIndexHeight(),
blazeMan.meta.ChainHeight(),
)
logger.Error(msg)
// Fetch the meta data which tells us how far along the index is.
if bm.meta, err = opts.Conn.GetMetaData(testMode); err != nil {
var ErrFetchingMeta = fmt.Errorf("error fetching meta data: %s", err)
logger.Error(colors.BrightRed+ErrFetchingMeta.Error(), colors.Off)
goto PAUSE
}

opts.StartBlock = blazeMan.meta.NextIndexHeight()
opts.BlockCnt = origBlockCnt
if (blazeMan.StartBlock() + blazeMan.BlockCount()) > blazeMan.meta.ChainHeight() {
opts.BlockCnt = (blazeMan.meta.Latest - blazeMan.StartBlock())
// This only happens if the chain and the index scraper are both started at the
// same time (rarely). This protects against the case where the chain has no ripe blocks.
// Report no error and sleep for a while.
if bm.meta.ChainHeight() < opts.Settings.Unripe_dist {
goto PAUSE
}

// The 'ripeBlock' is the head of the chain unless the chain is further along
// than 'UnripeDist.' If it is, the `ripeBlock` is 'UnripeDist' behind the
// head (i.e., 28 blocks usually - six minutes)
ripeBlock = blazeMan.meta.Latest
if ripeBlock > opts.Settings.Unripe_dist {
ripeBlock = blazeMan.meta.Latest - opts.Settings.Unripe_dist
// Another rare case, but here the user has reset his/her node but not removed
// the index. In this case, the index is ahead of the chain. We go to sleep and
// try again later in the hopes that the chain catches up.
if bm.meta.NextIndexHeight() > bm.meta.ChainHeight()+1 {
var ErrIndexAhead = fmt.Errorf(
"index (%d) is ahead of chain (%d)",
bm.meta.NextIndexHeight(),
bm.meta.ChainHeight(),
)
logger.Error(colors.BrightRed+ErrIndexAhead.Error(), colors.Off)
goto PAUSE
}

blazeMan = BlazeManager{
// Let's start a new round...
bm = BlazeManager{
chain: chain,
opts: opts,
nProcessed: 0,
ripeBlock: ripeBlock,
timestamps: make([]tslib.TimestampRecord, 0, origBlockCnt),
processedMap: make(map[base.Blknum]bool, origBlockCnt),
meta: blazeMan.meta,
nRipe: 0,
nUnripe: 0,
timestamps: make([]tslib.TimestampRecord, 0, opts.BlockCnt),
processedMap: make(map[base.Blknum]bool, opts.BlockCnt),
meta: bm.meta,
nChannels: int(opts.Settings.Channel_count),
}

// Here we do the actual scrape for this round. If anything goes wrong, the
// function will have cleaned up (i.e. remove the unstaged ripe blocks). Note
// that we don't quit, instead we sleep and we retry continually.
if err := blazeMan.HandleScrapeBlaze(); err != nil {
logger.Error(colors.BrightRed, err, colors.Off)
// Order dependant, be careful!
// first block to scrape (one past end of previous round).
bm.startBlock = bm.meta.NextIndexHeight()
// user supplied, but not so many to pass the chain tip.
bm.blockCount = utils.Min(opts.BlockCnt, bm.meta.ChainHeight()-bm.StartBlock()+1)
// Unripe_dist behind the chain tip.
bm.ripeBlock = bm.meta.ChainHeight() - opts.Settings.Unripe_dist

// These are the blocks we're going to process this round
blocks = make([]base.Blknum, 0, bm.BlockCount())
for block := bm.StartBlock(); block < bm.EndBlock(); block++ {
blocks = append(blocks, block)
}

if len(blocks) == 0 {
logger.Info("no blocks to scrape")
goto PAUSE
}

// Try to create chunks...
if ok, err := blazeMan.Consolidate(); !ok || err != nil {
logger.Error(err)
if opts.Globals.Verbose {
logger.Info("chain head: ", bm.meta.ChainHeight())
logger.Info("opts.BlockCnt: ", opts.BlockCnt)
logger.Info("ripe block: ", bm.ripeBlock)
logger.Info("perChunk: ", bm.PerChunk())
logger.Info("start block: ", bm.StartBlock())
logger.Info("block count: ", bm.BlockCount())
logger.Info("len(blocks): ", len(blocks))
if len(blocks) > 0 {
logger.Info("blocks[0]: ", blocks[0])
logger.Info("blocks[len(blocks)-1]:", blocks[len(blocks)-1])
}
}

// Scrape this round. Only quit on catostrophic errors. Report and sleep otherwise.
if err, ok = bm.ScrapeBatch(blocks); !ok || err != nil {
if err != nil {
logger.Error(colors.BrightRed+err.Error(), colors.Off)
}
if !ok {
break
}
goto PAUSE
}

if bm.nRipe == 0 {
logger.Info(colors.Green+"no ripe files to consolidate", spaces, colors.Off)
goto PAUSE

} else {
// Consilidate a chunk (if possible). Only quit on catostrophic errors. Report and sleep otherwise.
if err, ok = bm.Consolidate(blocks); !ok || err != nil {
if err != nil {
logger.Error(colors.BrightRed+err.Error(), colors.Off)
}
if !ok {
break
}
goto PAUSE
}
}

PAUSE:
// The chain frequently re-orgs. Before sleeping, we remove any unripe files so they
// are re-queried in the next round. This is the reason for the unripePath.
if err = os.RemoveAll(unripePath); err != nil {
return err
runCount++
if opts.RunCount != 0 && runCount >= opts.RunCount {
// No reason to clean up here. Next round will do so and user can use these files in the meantime.
logger.Info("run count reached")
break
}

blazeMan.Pause()
// sleep for a bit (there's no new blocks anyway if we're caught up).
opts.pause(bm.meta.ChainHeight() - bm.meta.StageHeight())

// defensive programming - just double checking our own understanding...
count := file.NFilesInFolder(bm.RipeFolder())
if count != 0 {
_ = index.CleanEphemeralIndexFolders(chain)
err := fmt.Errorf("%d unexpected ripe files in %s", count, bm.RipeFolder())
logger.Error(colors.BrightRed+err.Error(), colors.Off)
}

// We want to clean up the unripe files. The chain may have (it frequently does)
// re-orged. We want to re-qeury these next round. This is why we have an unripePath.
if err = os.RemoveAll(bm.UnripeFolder()); err != nil {
logger.Error(colors.BrightRed, err, colors.Off)
return err
}
}

// We've left the loop and we're done.
return nil
}

var spaces = strings.Repeat(" ", 50)
77 changes: 77 additions & 0 deletions src/apps/chifra/internal/scrape/save_timestamps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package scrapePkg

// Copyright 2021 The TrueBlocks Authors. All rights reserved.
// Use of this source code is governed by a license that can
// be found in the LICENSE file.

import (
"encoding/binary"
"fmt"
"os"
"sort"

"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/base"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/config"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/tslib"
)

// TODO: Protect against overwriting files on disc

func (bm *BlazeManager) WriteTimestamps(blocks []base.Blknum) error {
chain := bm.chain

sort.Slice(bm.timestamps, func(i, j int) bool {
return bm.timestamps[i].Bn < bm.timestamps[j].Bn
})

// Assume that the existing timestamps file always contains valid timestamps in
// a valid order so we can only append as we go (which is very fast)
tsPath := config.GetPathToIndex(chain) + "ts.bin"
fp, err := os.OpenFile(tsPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return err
}

defer func() {
tslib.ClearCache(chain)
fp.Close()
}()

cnt := 0
for _, block := range blocks {
// Append to the timestamps file all the new timestamps but as we do that make sure we're
// not skipping anything at the front, in the middle, or at the end of the list
ts := tslib.TimestampRecord{}
if cnt >= len(bm.timestamps) {
ts = tslib.TimestampRecord{
Bn: uint32(block),
Ts: uint32(bm.opts.Conn.GetBlockTimestamp(block)),
}
} else {
ts = bm.timestamps[cnt]
if bm.timestamps[cnt].Bn != uint32(block) {
ts = tslib.TimestampRecord{
Bn: uint32(block),
Ts: uint32(bm.opts.Conn.GetBlockTimestamp(block)),
}
cnt-- // set it back
}
}

msg := fmt.Sprintf("Updating timestamps %-04d of %-04d (%-04d remaining)"+spaces,
block,
blocks[len(blocks)-1],
blocks[len(blocks)-1]-block,
)
logger.Progress((block%13) == 0, msg)

if err = binary.Write(fp, binary.LittleEndian, &ts); err != nil {
return err
}

cnt++
}

return nil
}
Loading

0 comments on commit c592069

Please sign in to comment.