From e5e6e170648b4b8759b38e50b49cef4f55b37d80 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 22 Nov 2024 12:28:32 -0600 Subject: [PATCH] Fix tip listener (#439) (#447) * add more timestamps for tracking * stablize tips * clean * reverted to older reporter monitor * fix tip listener * handle concurrent map access * lint * remove extra sleep (cherry picked from commit 5059edff53840a1bec2928d15522486c69c4acee) Co-authored-by: tkernell --- daemons/reporter/client/broadcast_message.go | 9 +++- daemons/reporter/client/client.go | 2 + daemons/reporter/client/reporter_monitors.go | 45 ++++++++++++-------- 3 files changed, 37 insertions(+), 19 deletions(-) diff --git a/daemons/reporter/client/broadcast_message.go b/daemons/reporter/client/broadcast_message.go index 72390adc0..15693227f 100644 --- a/daemons/reporter/client/broadcast_message.go +++ b/daemons/reporter/client/broadcast_message.go @@ -32,7 +32,10 @@ func (c *Client) generateDepositmessages(ctx context.Context) error { } queryId := hex.EncodeToString(utils.QueryIDFromData(depositQuerydata)) - if depositReportMap[queryId] { + mutex.RLock() + depositReported := depositReportMap[queryId] + mutex.RUnlock() + if depositReported { return fmt.Errorf("already reported for this bridge deposit tx") } msg := oracletypes.MsgSubmitValue{ @@ -45,7 +48,9 @@ func (c *Client) generateDepositmessages(ctx context.Context) error { c.logger.Error("sending submit deposit transaction", "error", err) } c.logger.Info(fmt.Sprintf("Response from bridge tx report: %v", resp.TxResult)) + mutex.Lock() depositReportMap[queryId] = true + mutex.Unlock() return nil } @@ -94,7 +99,9 @@ func (c *Client) GenerateAndBroadcastSpotPriceReport(ctx context.Context, qd []b return fmt.Errorf("error sending tx: %w", err) } fmt.Println("response after submit message", resp.TxResult.Code) + mutex.Lock() commitedIds[querymeta.Id] = true + mutex.Unlock() return nil } diff --git a/daemons/reporter/client/client.go b/daemons/reporter/client/client.go index 9b2ce4552..9d122d127 100644 --- a/daemons/reporter/client/client.go +++ b/daemons/reporter/client/client.go @@ -32,6 +32,8 @@ var ( depositReportMap = make(map[string]bool) ) +var mutex = &sync.RWMutex{} + type Client struct { // reporter account name AccountName string diff --git a/daemons/reporter/client/reporter_monitors.go b/daemons/reporter/client/reporter_monitors.go index 1656057a6..05bc798d4 100644 --- a/daemons/reporter/client/reporter_monitors.go +++ b/daemons/reporter/client/reporter_monitors.go @@ -21,10 +21,11 @@ func (c *Client) MonitorCyclelistQuery(ctx context.Context, wg *sync.WaitGroup) if err != nil { // log error c.logger.Error("getting current query", "error", err) - time.Sleep(100 * time.Millisecond) - continue } - if bytes.Equal(querydata, prevQueryData) || commitedIds[querymeta.Id] { + mutex.RLock() + committed := commitedIds[querymeta.Id] + mutex.RUnlock() + if bytes.Equal(querydata, prevQueryData) || committed { time.Sleep(100 * time.Millisecond) continue } @@ -35,6 +36,7 @@ func (c *Client) MonitorCyclelistQuery(ctx context.Context, wg *sync.WaitGroup) c.logger.Error("Generating CycleList message", "error", err) } }(ctx, querydata, querymeta) + err = c.WaitForBlockHeight(ctx, int64(querymeta.Expiration)) if err != nil { c.logger.Error("Error waiting for block height", "error", err) @@ -62,7 +64,7 @@ func (c *Client) MonitorTokenBridgeReports(ctx context.Context, wg *sync.WaitGro func (c *Client) MonitorForTippedQueries(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - var localWG sync.WaitGroup + for { res, err := c.OracleQueryClient.TippedQueries(ctx, &oracletypes.QueryTippedQueriesRequest{ Pagination: &query.PageRequest{ @@ -78,29 +80,31 @@ func (c *Client) MonitorForTippedQueries(ctx context.Context, wg *sync.WaitGroup time.Sleep(200 * time.Millisecond) continue } + status, err := c.cosmosCtx.Client.Status(ctx) if err != nil { c.logger.Info("Error getting status from client: ", err) + time.Sleep(200 * time.Millisecond) + continue } + height := uint64(status.SyncInfo.LatestBlockHeight) + + // Create a new WaitGroup for this batch of tips + var batchWG sync.WaitGroup + for i := 0; i < len(res.Queries); i++ { - if height > res.Queries[i].Expiration || strings.EqualFold(res.Queries[i].QueryType, "SpotPrice") { - if len(res.Queries) == 1 || i == (len(res.Queries)-1) { - time.Sleep(200 * time.Millisecond) - } - continue - } - if commitedIds[res.Queries[i].Id] { - if len(res.Queries) == 1 || i == (len(res.Queries)-1) { - time.Sleep(200 * time.Millisecond) - } + mutex.RLock() + committed := commitedIds[res.Queries[i].Id] + mutex.RUnlock() + if height > res.Queries[i].Expiration || committed || strings.EqualFold(res.Queries[i].QueryType, "SpotPrice") { continue } - localWG.Add(1) + batchWG.Add(1) go func(query *oracletypes.QueryMeta) { - defer localWG.Done() - err := c.GenerateAndBroadcastSpotPriceReport(ctx, query.QueryData, query) + defer batchWG.Done() + err := c.GenerateAndBroadcastSpotPriceReport(ctx, query.GetQueryData(), query) if err != nil { c.logger.Error("Error generating report for tipped query: ", err) } else { @@ -108,6 +112,11 @@ func (c *Client) MonitorForTippedQueries(ctx context.Context, wg *sync.WaitGroup } }(res.Queries[i]) } - localWG.Wait() + + // Wait for all reports in this batch to complete + batchWG.Wait() + + // Add a small delay between batches to prevent overwhelming the system + time.Sleep(500 * time.Millisecond) } }