Skip to content

Commit

Permalink
Fix tip listener (#439) (#447)
Browse files Browse the repository at this point in the history
* add more timestamps for tracking

* stablize tips

* clean

* reverted to older reporter monitor

* fix tip listener

* handle concurrent map access

* lint

* remove extra sleep

(cherry picked from commit 5059edf)

Co-authored-by: tkernell <[email protected]>
  • Loading branch information
github-actions[bot] and tkernell authored Nov 22, 2024
1 parent 61c2d0f commit e5e6e17
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 19 deletions.
9 changes: 8 additions & 1 deletion daemons/reporter/client/broadcast_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ func (c *Client) generateDepositmessages(ctx context.Context) error {
}

queryId := hex.EncodeToString(utils.QueryIDFromData(depositQuerydata))
if depositReportMap[queryId] {
mutex.RLock()
depositReported := depositReportMap[queryId]
mutex.RUnlock()
if depositReported {
return fmt.Errorf("already reported for this bridge deposit tx")
}
msg := oracletypes.MsgSubmitValue{
Expand All @@ -45,7 +48,9 @@ func (c *Client) generateDepositmessages(ctx context.Context) error {
c.logger.Error("sending submit deposit transaction", "error", err)
}
c.logger.Info(fmt.Sprintf("Response from bridge tx report: %v", resp.TxResult))
mutex.Lock()
depositReportMap[queryId] = true
mutex.Unlock()

return nil
}
Expand Down Expand Up @@ -94,7 +99,9 @@ func (c *Client) GenerateAndBroadcastSpotPriceReport(ctx context.Context, qd []b
return fmt.Errorf("error sending tx: %w", err)
}
fmt.Println("response after submit message", resp.TxResult.Code)
mutex.Lock()
commitedIds[querymeta.Id] = true
mutex.Unlock()

return nil
}
2 changes: 2 additions & 0 deletions daemons/reporter/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
depositReportMap = make(map[string]bool)
)

var mutex = &sync.RWMutex{}

type Client struct {
// reporter account name
AccountName string
Expand Down
45 changes: 27 additions & 18 deletions daemons/reporter/client/reporter_monitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ func (c *Client) MonitorCyclelistQuery(ctx context.Context, wg *sync.WaitGroup)
if err != nil {
// log error
c.logger.Error("getting current query", "error", err)
time.Sleep(100 * time.Millisecond)
continue
}
if bytes.Equal(querydata, prevQueryData) || commitedIds[querymeta.Id] {
mutex.RLock()
committed := commitedIds[querymeta.Id]
mutex.RUnlock()
if bytes.Equal(querydata, prevQueryData) || committed {
time.Sleep(100 * time.Millisecond)
continue
}
Expand All @@ -35,6 +36,7 @@ func (c *Client) MonitorCyclelistQuery(ctx context.Context, wg *sync.WaitGroup)
c.logger.Error("Generating CycleList message", "error", err)
}
}(ctx, querydata, querymeta)

err = c.WaitForBlockHeight(ctx, int64(querymeta.Expiration))
if err != nil {
c.logger.Error("Error waiting for block height", "error", err)
Expand Down Expand Up @@ -62,7 +64,7 @@ func (c *Client) MonitorTokenBridgeReports(ctx context.Context, wg *sync.WaitGro

func (c *Client) MonitorForTippedQueries(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
var localWG sync.WaitGroup

for {
res, err := c.OracleQueryClient.TippedQueries(ctx, &oracletypes.QueryTippedQueriesRequest{
Pagination: &query.PageRequest{
Expand All @@ -78,36 +80,43 @@ func (c *Client) MonitorForTippedQueries(ctx context.Context, wg *sync.WaitGroup
time.Sleep(200 * time.Millisecond)
continue
}

status, err := c.cosmosCtx.Client.Status(ctx)
if err != nil {
c.logger.Info("Error getting status from client: ", err)
time.Sleep(200 * time.Millisecond)
continue
}

height := uint64(status.SyncInfo.LatestBlockHeight)

// Create a new WaitGroup for this batch of tips
var batchWG sync.WaitGroup

for i := 0; i < len(res.Queries); i++ {
if height > res.Queries[i].Expiration || strings.EqualFold(res.Queries[i].QueryType, "SpotPrice") {
if len(res.Queries) == 1 || i == (len(res.Queries)-1) {
time.Sleep(200 * time.Millisecond)
}
continue
}
if commitedIds[res.Queries[i].Id] {
if len(res.Queries) == 1 || i == (len(res.Queries)-1) {
time.Sleep(200 * time.Millisecond)
}
mutex.RLock()
committed := commitedIds[res.Queries[i].Id]
mutex.RUnlock()
if height > res.Queries[i].Expiration || committed || strings.EqualFold(res.Queries[i].QueryType, "SpotPrice") {
continue
}

localWG.Add(1)
batchWG.Add(1)
go func(query *oracletypes.QueryMeta) {
defer localWG.Done()
err := c.GenerateAndBroadcastSpotPriceReport(ctx, query.QueryData, query)
defer batchWG.Done()
err := c.GenerateAndBroadcastSpotPriceReport(ctx, query.GetQueryData(), query)
if err != nil {
c.logger.Error("Error generating report for tipped query: ", err)
} else {
c.logger.Info("Broadcasted report for tipped query")
}
}(res.Queries[i])
}
localWG.Wait()

// Wait for all reports in this batch to complete
batchWG.Wait()

// Add a small delay between batches to prevent overwhelming the system
time.Sleep(500 * time.Millisecond)
}
}

0 comments on commit e5e6e17

Please sign in to comment.