Skip to content

Commit

Permalink
feat: add tip listener to daemon (#382) (#385)
Browse files Browse the repository at this point in the history
* added function to listen for tipped queries

* added tip listening process to reporter daemon client and updated app.go so that it doesn't generate the config files every time but just reads them. Allowing users to just edit the config files to add pairs or remove sources

* made some small changes to function names and added a check to make sure the query that was tipped is a spot price

* made lint fixes

(cherry picked from commit 06ce0ad)

Co-authored-by: CJPotter10 <[email protected]>
  • Loading branch information
github-actions[bot] and CJPotter10 authored Oct 23, 2024
1 parent f09c43c commit 31a05b4
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 5 deletions.
3 changes: 0 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,6 @@ func New(
// Start server for handling gRPC messages from daemons.
go app.Server.Start()

configs.WriteDefaultPricefeedExchangeToml(homePath)
configs.WriteDefaultMarketParamsToml(homePath)

exchangeQueryConfig := configs.ReadExchangeQueryConfigFile(homePath)
marketParamsConfig := configs.ReadMarketParamsConfigFile(homePath)
// Start pricefeed client for sending prices for the pricefeed server to consume. These prices
Expand Down
2 changes: 1 addition & 1 deletion daemons/reporter/client/broadcast_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *Client) generateDepositmessages(ctx context.Context) error {
// return nil
// }

func (c *Client) CyclelistMessages(ctx context.Context, qd []byte, querymeta *oracletypes.QueryMeta) error {
func (c *Client) GenerateAndBroadcastSpotPriceReport(ctx context.Context, qd []byte, querymeta *oracletypes.QueryMeta) error {
value, err := c.median(qd)
if err != nil {
return fmt.Errorf("error getting median from median client': %w", err)
Expand Down
3 changes: 3 additions & 0 deletions daemons/reporter/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ func StartReporterDaemonTaskLoop(
wg.Add(1)
go client.MonitorTokenBridgeReports(ctx, &wg)

wg.Add(1)
go client.MonitorForTippedQueries(ctx, &wg)

wg.Wait()
}

Expand Down
49 changes: 48 additions & 1 deletion daemons/reporter/client/reporter_monitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package client
import (
"bytes"
"context"
"strings"
"sync"
"time"

oracletypes "github.com/tellor-io/layer/x/oracle/types"

"github.com/cosmos/cosmos-sdk/types/query"
)

func (c *Client) MonitorCyclelistQuery(ctx context.Context, wg *sync.WaitGroup) {
Expand All @@ -25,7 +28,7 @@ func (c *Client) MonitorCyclelistQuery(ctx context.Context, wg *sync.WaitGroup)
}

go func(ctx context.Context, qd []byte, qm *oracletypes.QueryMeta) {
err := c.CyclelistMessages(ctx, querydata, qm)
err := c.GenerateAndBroadcastSpotPriceReport(ctx, querydata, qm)
if err != nil {
c.logger.Error("Generating CycleList message", "error", err)
}
Expand Down Expand Up @@ -55,3 +58,47 @@ func (c *Client) MonitorTokenBridgeReports(ctx context.Context, wg *sync.WaitGro
time.Sleep(4 * time.Minute)
}
}

func (c *Client) MonitorForTippedQueries(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
var localWG sync.WaitGroup
for {
res, err := c.OracleQueryClient.TippedQueries(ctx, &oracletypes.QueryTippedQueriesRequest{
Pagination: &query.PageRequest{
Offset: 0,
},
})
if err != nil {
c.logger.Error("Error querying for TippedQueries: ", err)
time.Sleep(200 * time.Millisecond)
continue
}
if len(res.Queries) == 0 {
c.logger.Info("No tipped queries returned")
time.Sleep(200 * time.Millisecond)
continue
}
status, err := c.cosmosCtx.Client.Status(ctx)
if err != nil {
c.logger.Info("Error getting status from client: ", err)
}
height := uint64(status.SyncInfo.LatestBlockHeight)
for i := 0; i < len(res.Queries); i++ {
if height > res.Queries[i].Expiration || commitedIds[res.Queries[i].Id] || strings.EqualFold(res.Queries[i].QueryType, "SpotPrice") {
continue
}

localWG.Add(1)
go func(query *oracletypes.QueryMeta) {
defer localWG.Done()
err := c.GenerateAndBroadcastSpotPriceReport(ctx, query.GetQueryData(), query)
if err != nil {
c.logger.Error("Error generating report for tipped query: ", err)
}
}(res.Queries[i])
}

wg.Wait()

}
}

0 comments on commit 31a05b4

Please sign in to comment.