Skip to content

Commit

Permalink
Not working...
Browse files Browse the repository at this point in the history
  • Loading branch information
tjayrush committed Sep 4, 2023
1 parent a23a1e9 commit 9a24d7f
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 149 deletions.
22 changes: 12 additions & 10 deletions src/apps/chifra/internal/scrape/handle_scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
// (or less if close to the head). The forever loop pauses each round for
// --sleep seconds (or, if not close to the head, for .25 seconds).
func (opts *ScrapeOptions) HandleScrape() error {
var blocks = make([]base.Blknum, 0, opts.BlockCnt)
var err error
var ok bool

chain := opts.Globals.Chain
testMode := opts.Globals.TestMode

Expand All @@ -42,17 +45,15 @@ func (opts *ScrapeOptions) HandleScrape() error {
// Fetch the meta data which tells us how far along the index is.
if bm.meta, err = opts.Conn.GetMetaData(testMode); err != nil {
var ErrFetchingMeta = fmt.Errorf("error fetching meta data: %s", err)
logger.Error(ErrFetchingMeta)
logger.Error(colors.BrightRed, ErrFetchingMeta, colors.Off)
goto PAUSE
}
// logger.Info(colors.Green+"meta data fetched"+colors.Off, bm.meta)

// We're may be too close to the start of the chain to have ripe blocks.
// Report no error but try again soon.
if bm.meta.ChainHeight() < opts.Settings.Unripe_dist {
goto PAUSE
}
// logger.Info(colors.Green+"ripe block found"+colors.Off, bm.meta.ChainHeight()-opts.Settings.Unripe_dist)

// The user may have restarted his node's sync (that is, started over).
// In this case, the index may be ahead of the chain, if so we go to
Expand All @@ -63,10 +64,9 @@ func (opts *ScrapeOptions) HandleScrape() error {
bm.meta.NextIndexHeight(),
bm.meta.ChainHeight(),
)
logger.Error(ErrIndexAhead)
logger.Error(colors.BrightRed, ErrIndexAhead, colors.Off)
goto PAUSE
}
// logger.Info(colors.Green+"index is not ahead of chain"+colors.Off, bm.meta.NextIndexHeight())

// Let's start a new round...
bm = BlazeManager{
Expand All @@ -86,27 +86,29 @@ func (opts *ScrapeOptions) HandleScrape() error {
bm.blockCount = utils.Min(opts.BlockCnt, bm.meta.ChainHeight()-bm.StartBlock()+1)
// Unripe_dist behind the chain tip.
bm.ripeBlock = bm.meta.ChainHeight() - opts.Settings.Unripe_dist
// logger.Info(colors.Green, bm.startBlock, bm.blockCount, bm.ripeBlock)

// These are the blocks we're going to process this round
for block := bm.StartBlock(); block < bm.EndBlock(); block++ {
blocks = append(blocks, block)
}

// Scrape this round. Only quit on catostrophic errors. Report and sleep otherwise.
if err, ok := bm.ScrapeBatch(); !ok || err != nil {
if err, ok = bm.ScrapeBatch(blocks); !ok || err != nil {
logger.Error(colors.BrightRed, err, colors.Off)
if !ok {
break
}
goto PAUSE
}
// logger.Info(colors.Green+"scrape batch complete"+colors.Off, bm.nProcessed)

// Consilidate a chunk (if possible). Only quit on catostrophic errors. Report and sleep otherwise.
if ok, err := bm.Consolidate(); !ok || err != nil {
if err, ok = bm.Consolidate(blocks); !ok || err != nil {
logger.Error(colors.BrightRed, err, colors.Off)
if !ok {
break
}
goto PAUSE
}
// logger.Info(colors.Green+"consolidate complete"+colors.Off, bm.nProcessed)

PAUSE:
// If we've gotten this far, we want to clean up the unripe files (we no longer need them).
Expand Down
33 changes: 18 additions & 15 deletions src/apps/chifra/internal/scrape/save_timestamps.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ import (
"os"
"sort"

"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/base"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/config"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/rpc"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/tslib"
)

// TODO: Protect against overwriting files on disc

func (bm *BlazeManager) SaveTimestamps(endPoint uint64) error {
func (bm *BlazeManager) WriteTimestamps(blocks []base.Blknum) error {
chain := bm.chain
conn := rpc.TempConnection(chain)

sort.Slice(bm.timestamps, func(i, j int) bool {
return bm.timestamps[i].Bn < bm.timestamps[j].Bn
})

// Assume that the existing timestamps file always contains valid timestamps in a valid order so we can only append
// Assume that the existing timestamps file always contains valid timestamps in
// a valid order so we can only append as we go (which is very fast)
tsPath := config.GetPathToIndex(chain) + "ts.bin"
fp, err := os.OpenFile(tsPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
Expand All @@ -36,39 +36,42 @@ func (bm *BlazeManager) SaveTimestamps(endPoint uint64) error {
defer func() {
tslib.ClearCache(chain)
fp.Close()
// sigintTrap.Disable(trapCh)
// writeMutex.Unlock()
}()

nTs, _ := tslib.NTimestamps(chain)

cnt := 0
for bn := nTs; bn < endPoint; bn++ {
for _, block := range blocks {
// Append to the timestamps file all the new timestamps but as we do that make sure we're
// not skipping anything at the front, in the middle, or at the end of the list
ts := tslib.TimestampRecord{}
if cnt >= len(bm.timestamps) {
ts = tslib.TimestampRecord{
Bn: uint32(bn),
Ts: uint32(conn.GetBlockTimestamp(bn)),
Bn: uint32(block),
Ts: uint32(bm.opts.Conn.GetBlockTimestamp(block)),
}
} else {
ts = bm.timestamps[cnt]
if bm.timestamps[cnt].Bn != uint32(bn) {
if bm.timestamps[cnt].Bn != uint32(block) {
ts = tslib.TimestampRecord{
Bn: uint32(bn),
Ts: uint32(conn.GetBlockTimestamp(bn)),
Bn: uint32(block),
Ts: uint32(bm.opts.Conn.GetBlockTimestamp(block)),
}
cnt-- // set it back
}
}

logger.Progress((bn%13) == 0, fmt.Sprintf("Checking or updating timestamps %-04d of %-04d (%d remaining)%s", bn, endPoint, endPoint-bn, spaces))
msg := fmt.Sprintf("Updating timestamps %-04d of %-04d (%-04d remaining)",
block,
blocks[len(blocks)-1],
blocks[len(blocks)-1]-block,
)
logger.Progress((block%13) == 0, msg)

if err = binary.Write(fp, binary.LittleEndian, &ts); err != nil {
return err
}

cnt++
}

return nil
}
38 changes: 17 additions & 21 deletions src/apps/chifra/internal/scrape/scrape_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,33 @@ package scrapePkg
// be found in the LICENSE file.

import (
"errors"
"fmt"
"strings"

"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/base"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/config"
"github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/index"
)

// ScrapeBatch is called each time around the forever loop prior to calling into
// Blaze to actually scrape the blocks.
func (bm *BlazeManager) ScrapeBatch() (error, bool) {
chain := bm.chain
// ScrapeBatch is called each time around the forever loop. It calls into
// HandleBlaze and writes the timestamps if there's no error.
func (bm *BlazeManager) ScrapeBatch(blocks []base.Blknum) (error, bool) {
indexPath := config.GetPathToIndex(bm.chain)

// Do the actual scrape, wait until it finishes, clean up and return on failure
if _, err := bm.HandleBlaze(); err != nil {
_ = index.CleanTemporaryFolders(config.GetPathToIndex(chain), false)
return err, true
if err, ok := bm.HandleBlaze(blocks); !ok || err != nil {
_ = index.CleanTemporaryFolders(indexPath, false)
return err, ok
}

start := bm.StartBlock()
end := bm.StartBlock() + bm.BlockCount()

for bn := start; bn < end; bn++ {
if !bm.processedMap[bn] {
// At least one block was not processed. This would only happen in the event of an
// error, so clean up, report the error and return. The loop will repeat.
_ = index.CleanTemporaryFolders(config.GetPathToIndex(chain), false)
msg := fmt.Sprintf("A block %d was not processed%s", bn, strings.Repeat(" ", 50))
return errors.New(msg), true
// Check to see if we missed any blocks...
for _, block := range blocks {
if !bm.processedMap[block] {
// We missed a block. We need to clean up and continue
// next time around the loop. This may happen if the
// node returns an error for example.
_ = index.CleanTemporaryFolders(indexPath, false)
return fmt.Errorf("a block (%d) was not processed", block), true
}
}

return bm.SaveTimestamps(end), true
return bm.WriteTimestamps(blocks), true
}
11 changes: 2 additions & 9 deletions src/apps/chifra/internal/scrape/scrape_blaze.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,7 @@ import (

// HandleBlaze does the actual scraping, walking through block_cnt blocks and querying traces and logs
// and then extracting addresses and timestamps from those data structures.
func (bm *BlazeManager) HandleBlaze() (ok bool, err error) {
blocks := []base.Blknum{}

start := bm.StartBlock()
end := bm.StartBlock() + bm.BlockCount()
for block := start; block < end; block++ {
blocks = append(blocks, block)
}
func (bm *BlazeManager) HandleBlaze(blocks []base.Blknum) (err error, ok bool) {

// We need three pipelines...we shove into blocks, blocks shoves into appearances and timestamps
blockChannel := make(chan base.Blknum)
Expand Down Expand Up @@ -75,7 +68,7 @@ func (bm *BlazeManager) HandleBlaze() (ok bool, err error) {
close(tsChannel)
tsWg.Wait()

return true, nil
return nil, true
}

// ProcessBlocks processes the block channel and for each block query the node for both
Expand Down
Loading

0 comments on commit 9a24d7f

Please sign in to comment.