Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug/goroutine manage #501

Merged
merged 19 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 77 additions & 48 deletions daemons/reporter/client/broadcast_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,54 +42,13 @@ func (c *Client) GenerateDepositMessages(ctx context.Context) error {
return nil
}

msg := oracletypes.MsgSubmitValue{
msg := &oracletypes.MsgSubmitValue{
Creator: c.accAddr.String(),
QueryData: depositQuerydata,
Value: value,
}

// Add retry logic for transaction sending
maxRetries := 5
for attempt := 1; attempt <= maxRetries; attempt++ {
resp, err := c.sendTx(ctx, &msg)
if err != nil {
c.logger.Error("submitting deposit report transaction",
"error", err,
"attempt", attempt,
"queryId", queryId)

if attempt == maxRetries {
// Don't mark as reported if all retries failed
return fmt.Errorf("failed to submit deposit after %d attempts: %w", maxRetries, err)
}

// Wait before retry with exponential backoff
time.Sleep(time.Second * time.Duration(2^attempt))
continue
}

// Check transaction success
if resp.TxResult.Code != 0 {
c.logger.Error("deposit report transaction failed",
"code", resp.TxResult.Code,
"queryId", queryId)
// remove oldest deposit report and move on to next one
c.TokenDepositsCache.RemoveOldestReport()
return fmt.Errorf("transaction failed with code %d", resp.TxResult.Code)
}

// Remove oldest deposit report from cache
c.TokenDepositsCache.RemoveOldestReport()

// Only mark as reported if transaction was successful
mutex.Lock()
depositReportMap[queryId] = true
mutex.Unlock()

c.logger.Info(fmt.Sprintf("Response from bridge tx report: %v", resp.TxResult))

return nil
}
c.txChan <- TxChannelInfo{Msg: msg, isBridge: true, NumRetries: 5}

return nil
}
Expand Down Expand Up @@ -133,11 +92,8 @@ func (c *Client) GenerateAndBroadcastSpotPriceReport(ctx context.Context, qd []b
Value: value,
}

resp, err := c.sendTx(ctx, msg)
if err != nil {
return fmt.Errorf("error sending tx: %w", err)
}
fmt.Println("response after submit message", resp.TxResult.Code)
c.txChan <- TxChannelInfo{Msg: msg, isBridge: false, NumRetries: 0}

mutex.Lock()
commitedIds[querymeta.Id] = true
mutex.Unlock()
Expand All @@ -146,3 +102,76 @@ func (c *Client) GenerateAndBroadcastSpotPriceReport(ctx context.Context, qd []b

return nil
}

func (c *Client) HandleBridgeDepositTxInChannel(ctx context.Context, data TxChannelInfo) {
resp, err := c.sendTx(ctx, data.Msg)
if err != nil {
c.logger.Error("submitting deposit report transaction",
"error", err,
"attemptsLeft", data.NumRetries)

if data.NumRetries == 0 {
// Don't mark as reported if all retries failed
c.logger.Error(fmt.Sprintf("failed to submit deposit after all allotted attempts attempts: %v", err))
return
}

data.NumRetries--
c.txChan <- data
}

var bridgeDepositMsg *oracletypes.MsgSubmitValue
var queryId []byte
if msg, ok := data.Msg.(*oracletypes.MsgSubmitValue); ok {
bridgeDepositMsg = msg
} else {
c.logger.Error("Could not go from sdk.Msg to types.MsgSubmitValue")
return
}

queryId = utils.QueryIDFromData(bridgeDepositMsg.GetQueryData())

// Check transaction success
if resp.TxResult.Code != 0 {
c.logger.Error("deposit report transaction failed",
"code", resp.TxResult.Code,
"queryId", queryId)
return
}

// Remove oldest deposit report from cache
c.TokenDepositsCache.RemoveOldestReport()

// Only mark as reported if transaction was successful
mutex.Lock()
depositReportMap[hex.EncodeToString(queryId)] = true
mutex.Unlock()

c.logger.Info(fmt.Sprintf("Response from bridge tx report: %v", resp.TxResult))
}

func (c *Client) BroadcastTxMsgToChain() {
for obj := range c.txChan {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
done := make(chan struct{})
go func() {
defer close(done)
if !obj.isBridge {
_, err := c.sendTx(ctx, obj.Msg)
if err != nil {
c.logger.Error(fmt.Sprintf("Error sending tx: %v", err))
}
} else {
c.HandleBridgeDepositTxInChannel(ctx, obj)
}
}()

select {
case <-done:
cancel()
case <-ctx.Done():
c.logger.Error("broadcasting tx timed out")
cancel()
}
}
}
18 changes: 16 additions & 2 deletions daemons/reporter/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ var (

var mutex = &sync.RWMutex{}

type TxChannelInfo struct {
Msg sdk.Msg
isBridge bool
NumRetries uint8
}

type Client struct {
// reporter account name
AccountName string
Expand All @@ -55,17 +61,19 @@ type Client struct {
accAddr sdk.AccAddress
minGasFee string
// logger is the logger for the daemon.
logger log.Logger
txMutex sync.Mutex
logger log.Logger
txChan chan TxChannelInfo
}

func NewClient(clctx client.Context, logger log.Logger, accountName, valGasMin string) *Client {
logger = logger.With("module", "reporter-client")
txChan := make(chan TxChannelInfo)
return &Client{
AccountName: accountName,
cosmosCtx: clctx,
logger: logger,
minGasFee: valGasMin,
txChan: txChan,
}
}

Expand Down Expand Up @@ -178,6 +186,9 @@ func StartReporterDaemonTaskLoop(

var wg sync.WaitGroup

wg.Add(1)
go client.BroadcastTxMsgToChain()

wg.Add(1)
go client.MonitorCyclelistQuery(ctx, &wg)

Expand All @@ -187,6 +198,9 @@ func StartReporterDaemonTaskLoop(
wg.Add(1)
go client.MonitorForTippedQueries(ctx, &wg)

wg.Add(1)
go client.WithdrawAndStakeEarnedRewardsPeriodically(ctx, &wg)

wg.Wait()
}

Expand Down
34 changes: 33 additions & 1 deletion daemons/reporter/client/reporter_monitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/shirou/gopsutil/v3/process"
tokenbridgetipstypes "github.com/tellor-io/layer/daemons/server/types/token_bridge_tips"
oracletypes "github.com/tellor-io/layer/x/oracle/types"
reportertypes "github.com/tellor-io/layer/x/reporter/types"

"github.com/cosmos/cosmos-sdk/types/query"
)
Expand Down Expand Up @@ -52,6 +54,7 @@ func (c *Client) MonitorCyclelistQuery(ctx context.Context, wg *sync.WaitGroup)
txCtx, cancel := context.WithTimeout(ctx, defaultTxTimeout)
done := make(chan struct{})

c.logger.Info(fmt.Sprintf("starting to generate spot price report at %d", time.Now().Unix()))
go func() {
defer close(done)
err := c.GenerateAndBroadcastSpotPriceReport(txCtx, querydata, querymeta)
Expand All @@ -64,7 +67,7 @@ func (c *Client) MonitorCyclelistQuery(ctx context.Context, wg *sync.WaitGroup)
case <-done:
cancel()
case <-txCtx.Done():
c.logger.Error("report generation timed out")
c.logger.Error(fmt.Sprintf("report generation timed out at %d", time.Now().Unix()))
cancel()
}

Expand Down Expand Up @@ -177,6 +180,35 @@ func (c *Client) MonitorForTippedQueries(ctx context.Context, wg *sync.WaitGroup
}
}

func (c *Client) WithdrawAndStakeEarnedRewardsPeriodically(ctx context.Context, wg *sync.WaitGroup) {
freqVar := os.Getenv("WITHDRAW_FREQUENCY")
if freqVar == "" {
freqVar = "43200" // default to being 12 hours or 43200 seconds
}
frequency, err := strconv.Atoi(freqVar)
if err != nil {
c.logger.Error("Could not start auto rewards withdrawal process due to incorrect parameter. Please enter the number of seconds to wait in between claiming rewards")
return
}

for {
valAddr := os.Getenv("REPORTERS_VALIDATOR_ADDRESS")
if valAddr == "" {
fmt.Println("Returning from Withdraw Monitor due to no validator address env variable was found")
time.Sleep(time.Duration(frequency) * time.Second)
continue
}

withdrawMsg := &reportertypes.MsgWithdrawTip{
SelectorAddress: c.accAddr.String(),
ValidatorAddress: valAddr,
}
c.txChan <- TxChannelInfo{Msg: withdrawMsg, isBridge: false, NumRetries: 0}

time.Sleep(time.Duration(frequency) * time.Second)
}
}

func (c *Client) LogProcessStats() {
count := runtime.NumGoroutine()
c.logger.Info(fmt.Sprintf("Number of Goroutines: %d\n", count))
Expand Down
10 changes: 7 additions & 3 deletions daemons/reporter/client/tx_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,15 @@ func (c *Client) WaitForTx(ctx context.Context, hash string) (*cmttypes.ResultTx
if err != nil {
return nil, fmt.Errorf("unable to decode tx hash '%s'; err: %w", hash, err)
}

startTimestamp := time.Now().UnixMilli()
for waiting {
resp, err := c.cosmosCtx.Client.Tx(ctx, bz, false)
if err != nil {
if strings.Contains(err.Error(), "not found") {
if time.Now().UnixMilli()-startTimestamp > 2500 {
return nil, fmt.Errorf("fetching tx '%s'; err: No transaction found within the allotted time", hash)
}
continue

// Tx not found, wait for next block and try again
Expand Down Expand Up @@ -115,8 +120,6 @@ func (c *Client) WaitForBlockHeight(ctx context.Context, h int64) error {
}

func (c *Client) sendTx(ctx context.Context, msg ...sdk.Msg) (*cmttypes.ResultTx, error) {
c.txMutex.Lock()
defer c.txMutex.Unlock()
block, err := c.cosmosCtx.Client.Block(ctx, nil)
if err != nil {
return nil, fmt.Errorf("error getting block: %w", err)
Expand Down Expand Up @@ -158,7 +161,8 @@ func (c *Client) sendTx(ctx context.Context, msg ...sdk.Msg) (*cmttypes.ResultTx
return nil, fmt.Errorf("error waiting for transaction: %w", err)
}
c.logger.Info("TxResult", "result", txnResponse.TxResult)
fmt.Println("transaction hash ", res.TxHash)
c.logger.Info(fmt.Sprintf("transaction hash: %s", res.TxHash))
c.logger.Info(fmt.Sprintf("response after submit message: %d", txnResponse.TxResult.Code))

return txnResponse, nil
}
Expand Down
Loading