Skip to content

Commit

Permalink
AQFWrapper lastSync is not modified from AfterSyncHook
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh-98 committed Feb 13, 2024
1 parent 599ebae commit 9e155ec
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 51 deletions.
6 changes: 5 additions & 1 deletion models/aggregated_block_feed/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func (mdl *AQFWrapper) AddYearnFeed(adapter ds.SyncAdapterI) {
log.Fatal("Failed in parsing yearn feed for aggregated yearn feed")
}
mdl.LastSync = utils.Min(adapter.GetLastSync(), mdl.LastSync)
// log.Info(adapter.GetAddress(), "added to aggregatedpricefeed has last_sync", adapter.GetLastSync())
mdl.QueryFeeds[adapter.GetAddress()] = yearnFeed
}

Expand Down Expand Up @@ -153,3 +152,8 @@ func (mdl AQFWrapper) getFeeds(blockNum int64, neededTokens map[string]bool) (re
func (mdl AQFWrapper) ChainlinkPriceUpdatedAt(token string, blockNums []int64) {
mdl.queryPFdeps.chainlinkPriceUpdatedAt(token, blockNums)
}

func (mdl AQFWrapper) AfterSyncHook(blockNum int64) {
// don't do any thing as the lastSync should not be updated from outside.
// It is the min of the lastsync of all the interally managed queryPriceFeeds
}
82 changes: 44 additions & 38 deletions models/aggregated_block_feed/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,56 +18,62 @@ import (
// "fmt"
)

// update all queryAdapter only if for the lastBlockNumber as we have fetched prices for that block.
// not update for toSinceTill. if the interval is more than the syncCycle in engin/index.go, the queryAdpater lastsync will be updated but the prices will not be fetched
func (mdl *AQFWrapper) Query(toSinceTill int64) {
if len(mdl.QueryFeeds) == 0 {
return
func (mdl *AQFWrapper) fetchAllPrices(toSinceTill int64) int64 {
queryFrom := mdl.GetLastSync() + mdl.Interval
if queryFrom > toSinceTill {
return mdl.GetLastSync()
}
log.Infof("Sync %s from %d to %d", mdl.GetName(), queryFrom, toSinceTill)
// for concurrency
concurrentThreads := 6
ch := make(chan int, concurrentThreads)
// msg
queryFrom := mdl.GetLastSync() + mdl.Interval
log.Infof("Sync %s from %d to %d", mdl.GetName(), queryFrom, toSinceTill)
wg := &sync.WaitGroup{}
//
// timer with query of block
lastblockNum := queryFrom
{
rounds := 0
loopStartTime := time.Now()
roundStartTime := time.Now()
wg := &sync.WaitGroup{}
for blockNum := lastblockNum; blockNum <= toSinceTill; blockNum += mdl.Interval {
mdl.queryPFdeps.aggregatedFetchedBlocks =
append(mdl.queryPFdeps.aggregatedFetchedBlocks, blockNum)
ch <- 1
wg.Add(1)
go mdl.queryAsync(blockNum, ch, wg)
if rounds%100 == 0 {
timeLeft := (time.Since(loopStartTime).Seconds() * float64(toSinceTill-blockNum)) /
float64(blockNum-mdl.GetLastSync())
timeLeft /= 60
log.Infof("Synced %d in %d rounds(%fs): TimeLeft %f mins", blockNum, rounds, time.Since(roundStartTime).Seconds(), timeLeft)
roundStartTime = time.Now()
}
rounds++
lastblockNum = blockNum
rounds := 0
loopStartTime := time.Now()
roundStartTime := time.Now()

blockNum := mdl.GetLastSync() + mdl.Interval
for ; blockNum <= toSinceTill; blockNum += mdl.Interval {
mdl.queryPFdeps.aggregatedFetchedBlocks =
append(mdl.queryPFdeps.aggregatedFetchedBlocks, blockNum)
ch <- 1
wg.Add(1)
go mdl.queryAsync(blockNum, ch, wg)
if rounds%100 == 0 {
timeLeft := (time.Since(loopStartTime).Seconds() * float64(toSinceTill-blockNum)) /
float64(blockNum-mdl.GetLastSync())
timeLeft /= 60
log.Infof("Synced %d in %d rounds(%fs): TimeLeft %f mins", blockNum, rounds, time.Since(roundStartTime).Seconds(), timeLeft)
roundStartTime = time.Now()
}
wg.Wait()
rounds++
}
wg.Wait()
return blockNum - mdl.Interval
}

// update all queryAdapter only if for the lastBlockNumber as we have fetched prices for that block.
// not update for toSinceTill. if the interval is more than the syncCycle in engin/index.go, the queryAdpater lastsync will be updated but the prices will not be fetched
func (mdl *AQFWrapper) Query(toSinceTill int64) {
if len(mdl.QueryFeeds) == 0 {
return
}

syncedTill := mdl.fetchAllPrices(toSinceTill)
//
// set last_sync on querypricefeed
for _, adapter := range mdl.QueryFeeds {
// yearn price feed can't be disabled from v2
if lastblockNum <= adapter.GetLastSync() || adapter.IsDisabled() {
if syncedTill <= adapter.GetLastSync() || adapter.IsDisabled() {
continue
}
adapter.AfterSyncHook(lastblockNum)
}
// db has saved prices till mdl.GetLastSync()
// queryFrom starts
if mdl.GetLastSync()+mdl.Interval != queryFrom {
log.Fatal("failed reduntant check, to make sure lastSync of AQFWrapper is not updated before addingQueryPrices for extra")
adapter.AfterSyncHook(syncedTill)
}
mdl.addQueryPrices(mdl.GetLastSync())
mdl.addQueryPrices(mdl.GetLastSync()) // use previous lastSync for getting extra prices
//
mdl.LastSync = syncedTill
}

func (mdl *AQFWrapper) addQueryPrices(clearExtraBefore int64) {
Expand Down
11 changes: 0 additions & 11 deletions models/credit_manager/cm_common/session_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,6 @@ func (mdl CommonCMAdapter) UpdateClosedSessionStatus(sessionId string, status in
mdl.closedSessions[sessionId].Status = status
}

// func main() {
// var f map[string]string
// var s *map[string]string = &f
// if s != nil {
// fmt.Println("s is not nil")
// }
// if *s == nil {
// fmt.Println("val s is pointing to is nil")
// }
// }
//
// collateral
func (mdl CommonCMAdapter) AddCollateralToSession(blockNum int64, sessionId, token string, amount *big.Int) {
if !mdl.Repo.IsDieselToken(token) && mdl.Repo.GetGearTokenAddr() != token {
Expand Down
2 changes: 1 addition & 1 deletion services/trace_service/parity_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (app ParityFetcher) getDataOnRPC(rpc, txHash string) ([]RPCTrace, error) {
return nil, fmt.Errorf(traceObj.Error.Message)
}
if len(traceObj.Result) == 0 {
fmt.Println(string(data))
log.Info(string(data))
}
return traceObj.Result, nil
}
Expand Down

0 comments on commit 9e155ec

Please sign in to comment.