Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed Nov 25, 2024
2 parents a6732eb + 526924f commit 7ac43e6
Show file tree
Hide file tree
Showing 46 changed files with 587 additions and 144 deletions.
3 changes: 0 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ issues:
- mempool
- state/indexer
- state/txindex
exclude-rules:
- text: "G115: integer overflow conversion"
linters: [ gosec ]


linters-settings:
Expand Down
93 changes: 93 additions & 0 deletions block/balance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package block

import (
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/cockroachdb/errors"
"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/types"
)

const CheckBalancesInterval = 3 * time.Minute

// MonitorBalances checks the balances of the node and updates the gauges for prometheus
func (m *Manager) MonitorBalances(ctx context.Context) error {
ticker := time.NewTicker(CheckBalancesInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
m.logger.Info("Checking balances.")
balances, err := m.checkBalances()

if balances.DA != nil {
if amountFloat, errDA := strconv.ParseFloat(balances.DA.Amount.String(), 64); errDA == nil {
types.DaLayerBalanceGauge.Set(amountFloat)
} else {
m.logger.Error("Parsing DA balance amount", "error", errDA)
}
}

if balances.SL != nil {
if amountFloat, errSL := strconv.ParseFloat(balances.SL.Amount.String(), 64); errSL == nil {
types.HubLayerBalanceGauge.Set(amountFloat)
} else {
m.logger.Error("Parsing SL balance amount", "error", errSL)
}
}

if err != nil {
m.logger.Error("Checking balances", "error", err)
}
}
}
}

type Balances struct {
DA *da.Balance
SL *types.Balance
}

func (m *Manager) checkBalances() (*Balances, error) {
balances := &Balances{}
var wg sync.WaitGroup
wg.Add(2)

var errDA, errSL error

go func() {
defer wg.Done()
balance, err := m.DAClient.GetSignerBalance()
if err != nil {
errDA = fmt.Errorf("get DA signer balance: %w", err)
return
}
balances.DA = &balance
}()

go func() {
defer wg.Done()
balance, err := m.SLClient.GetSignerBalance()
if err != nil {
errSL = fmt.Errorf("get SL signer balance: %w", err)
return
}
balances.SL = &balance
}()

wg.Wait()

errs := errors.Join(errDA, errSL)
if errs != nil {
return balances, fmt.Errorf("errors checking balances: %w", errs)
}

return balances, nil
}
2 changes: 1 addition & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
return false, errorsmod.Wrap(err, "get app info")
}

isBlockAlreadyApplied := uint64(proxyAppInfo.LastBlockHeight) == blockHeight
isBlockAlreadyApplied := uint64(proxyAppInfo.LastBlockHeight) == blockHeight //nolint:gosec // LastBlockHeight is always positive

// TODO: add switch case to validate better the current app state

Expand Down
14 changes: 7 additions & 7 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func (e *Executor) CreateBlock(
state *types.State,
maxBlockDataSizeBytes uint64,
) *types.Block {
maxBlockDataSizeBytes = min(maxBlockDataSizeBytes, uint64(max(minBlockMaxBytes, state.ConsensusParams.Block.MaxBytes)))
mempoolTxs := e.mempool.ReapMaxBytesMaxGas(int64(maxBlockDataSizeBytes), state.ConsensusParams.Block.MaxGas)
maxBlockDataSizeBytes = min(maxBlockDataSizeBytes, uint64(max(minBlockMaxBytes, state.ConsensusParams.Block.MaxBytes))) //nolint:gosec // MaxBytes is always positive
mempoolTxs := e.mempool.ReapMaxBytesMaxGas(int64(maxBlockDataSizeBytes), state.ConsensusParams.Block.MaxGas) //nolint:gosec // size is always positive and falls in int64

block := &types.Block{
Header: types.Header{
Expand Down Expand Up @@ -214,7 +214,7 @@ func (e *Executor) commit(state *types.State, block *types.Block, deliverTxs []*

maxBytes := state.ConsensusParams.Block.MaxBytes
maxGas := state.ConsensusParams.Block.MaxGas
err = e.mempool.Update(int64(block.Header.Height), fromDymintTxs(block.Data.Txs), deliverTxs)
err = e.mempool.Update(int64(block.Header.Height), fromDymintTxs(block.Data.Txs), deliverTxs) //nolint:gosec // height is non-negative and falls in int64
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (e *Executor) ExecuteBlock(block *types.Block) (*tmstate.ABCIResponses, err
}
}

abciResponses.EndBlock, err = e.proxyAppConsensusConn.EndBlockSync(abci.RequestEndBlock{Height: int64(block.Header.Height)})
abciResponses.EndBlock, err = e.proxyAppConsensusConn.EndBlockSync(abci.RequestEndBlock{Height: int64(block.Header.Height)}) //nolint:gosec // height is non-negative and falls in int64
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -305,14 +305,14 @@ func (e *Executor) publishEvents(resp *tmstate.ABCIResponses, block *types.Block
for _, ev := range abciBlock.Evidence.Evidence {
err = multierr.Append(err, e.eventBus.PublishEventNewEvidence(tmtypes.EventDataNewEvidence{
Evidence: ev,
Height: int64(block.Header.Height),
Height: int64(block.Header.Height), //nolint:gosec // height is non-negative and falls in int64
}))
}
for i, dtx := range resp.DeliverTxs {
err = multierr.Append(err, e.eventBus.PublishEventTx(tmtypes.EventDataTx{
TxResult: abci.TxResult{
Height: int64(block.Header.Height),
Index: uint32(i),
Height: int64(block.Header.Height), //nolint:gosec // block height is within int64 range
Index: uint32(i), //nolint:gosec // num of deliver txs is less than 2^32
Tx: abciBlock.Data.Txs[i],
Result: *dtx,
},
Expand Down
2 changes: 1 addition & 1 deletion block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func (m *Manager) RunInitChain(ctx context.Context) error {
// Get the proposer at the initial height. If we're at genesis the height will be 0.
proposer, err := m.SLClient.GetProposerAtHeight(int64(m.State.Height()) + 1)
proposer, err := m.SLClient.GetProposerAtHeight(int64(m.State.Height()) + 1) //nolint:gosec // height is non-negative and falls in int64
if err != nil {
return fmt.Errorf("get proposer at height: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ func (m *Manager) Start(ctx context.Context) error {
return m.MonitorForkUpdateLoop(ctx)
})

uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.MonitorBalances(ctx)
})

// run based on the node role
if !amIProposer {
return m.runAsFullNode(ctx, eg)
Expand All @@ -308,7 +312,7 @@ func (m *Manager) updateFromLastSettlementState() error {
if errors.Is(err, gerrc.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSettlementHeight.Store(uint64(m.Genesis.InitialHeight - 1))
m.LastSettlementHeight.Store(uint64(m.Genesis.InitialHeight - 1)) //nolint:gosec // height is non-negative and falls in int64
m.LastBlockTimeInSettlement.Store(m.Genesis.GenesisTime.UTC().UnixNano())
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
return nil
case bytesProducedC <- bytesProducedN:
default:
evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time %s: %w", m.Conf.MaxSkewTime, gerrc.ErrResourceExhausted)}
evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time: %s last block in settlement time: %s %w", m.Conf.MaxSkewTime, time.Unix(0, m.LastBlockTimeInSettlement.Load()), gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime)
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime, "Last block in settlement time", time.Unix(0, m.LastBlockTimeInSettlement.Load()))
select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -274,7 +274,7 @@ func (m *Manager) createTMSignature(block *types.Block, proposerAddress []byte,
headerHash := block.Header.Hash()
vote := tmtypes.Vote{
Type: cmtproto.PrecommitType,
Height: int64(block.Header.Height),
Height: int64(block.Header.Height), //nolint:gosec // height is non-negative and falls in int64
Round: 0,
Timestamp: voteTimestamp,
BlockID: tmtypes.BlockID{Hash: headerHash[:], PartSetHeader: tmtypes.PartSetHeader{
Expand Down
1 change: 1 addition & 0 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (m *Manager) Prune(retainHeight uint64) {
logResult(err, "dymint store", retainHeight, pruned)
}

//nolint:gosec // height is non-negative and falls in int64
func (m *Manager) PruningLoop(ctx context.Context) error {
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion block/sequencers.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (m *Manager) UpdateSequencerSetFromSL() error {

// UpdateProposerFromSL queries the hub and updates the local dymint state proposer at the current height
func (m *Manager) UpdateProposerFromSL() error {
SLProposer, err := m.SLClient.GetProposerAtHeight(int64(m.State.NextHeight()))
SLProposer, err := m.SLClient.GetProposerAtHeight(int64(m.State.NextHeight())) //nolint:gosec // height is non-negative and falls in int64
if err != nil {
return fmt.Errorf("get proposer at height: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewStateFromGenesis(genDoc *tmtypes.GenesisDoc) (*types.State, error) {
s := types.State{
Version: InitStateVersion,
ChainID: genDoc.ChainID,
InitialHeight: uint64(genDoc.InitialHeight),
InitialHeight: uint64(genDoc.InitialHeight), //nolint:gosec // height is non-negative and falls in int64
ConsensusParams: *genDoc.ConsensusParams,
}
s.SetHeight(0)
Expand All @@ -80,7 +80,7 @@ func (m *Manager) UpdateStateFromApp(blockHeaderHash [32]byte) error {
return errorsmod.Wrap(err, "get app info")
}

appHeight := uint64(proxyAppInfo.LastBlockHeight)
appHeight := uint64(proxyAppInfo.LastBlockHeight) //nolint:gosec // height is non-negative and falls in int64
resp, err := m.Store.LoadBlockResponses(appHeight)
if err != nil {
return errorsmod.Wrap(err, "load block responses")
Expand Down
33 changes: 16 additions & 17 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func SubmitLoopInner(
ctx context.Context,
logger types.Logger,
bytesProduced chan int, // a channel of block and commit bytes produced
maxProduceSubmitSkewTime time.Duration, // max time between last submitted block and last produced block allowed. if this threshold is reached block production is stopped.
unsubmittedBlocksNum func() uint64,
unsubmittedBlocksBytes func() int,
batchSkewTime func() time.Duration,
maxSkewTime time.Duration, // max time between last submitted block and last produced block allowed. if this threshold is reached block production is stopped.
unsubmittedBlocksNum func() uint64, // func that returns the amount of non-submitted blocks
unsubmittedBlocksBytes func() int, // func that returns bytes from non-submitted blocks
batchSkewTime func() time.Duration, // func that returns measured time between last submitted block and last produced block
maxBatchSubmitTime time.Duration, // max time to allow between batches
maxBatchSubmitBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (bytes uint64, err error),
Expand All @@ -67,15 +67,14 @@ func SubmitLoopInner(
case <-ctx.Done():
return nil
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
pendingBytes.Add(uint64(n)) //nolint:gosec // bytes size is always positive
logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load())
}

submitter.Nudge()

if maxProduceSubmitSkewTime < batchSkewTime() {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
// if the time between the last produced block and last submitted is greater than maxSkewTime we block here until we get a progress nudge from the submitter thread
if maxSkewTime < batchSkewTime() {
select {
case <-ctx.Done():
return nil
Expand All @@ -87,8 +86,8 @@ func SubmitLoopInner(
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production
ticker := time.NewTicker(maxBatchSubmitTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
// 'submitter': this thread actually creates and submits batches. this thread is woken up every batch_submit_time (in addition to every block produced) to check if there is anything to submit even if no new blocks have been produced
ticker := time.NewTicker(maxBatchSubmitTime)
for {
select {
case <-ctx.Done():
Expand All @@ -110,8 +109,6 @@ func SubmitLoopInner(
UpdateBatchSubmissionGauges(pending, unsubmittedBlocksNum(), batchSkewTime())

if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) {
pendingBytes.Store(pending)
ticker.Reset(maxBatchSubmitTime)
break
}

Expand All @@ -130,12 +127,14 @@ func SubmitLoopInner(
}
return err
}
pending = uint64(unsubmittedBlocksBytes())
if batchSkewTime() < maxProduceSubmitSkewTime {
pending = uint64(unsubmittedBlocksBytes()) //nolint:gosec // bytes size is always positive
if batchSkewTime() < maxSkewTime {
trigger.Nudge()
}
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending, "skew time", batchSkewTime()) // TODO: debug level
logger.Debug("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending, "skew time", batchSkewTime())
}
// update pendingBytes with non submitted block bytes after all pending batches have been submitted
pendingBytes.Store(pending)
}
})

Expand All @@ -150,7 +149,7 @@ func (m *Manager) CreateAndSubmitBatchGetSizeBlocksCommits(maxSize uint64) (uint
if b == nil {
return 0, err
}
return uint64(b.SizeBlockAndCommitBytes()), err
return uint64(b.SizeBlockAndCommitBytes()), err //nolint:gosec // size is always positive and falls in uint64
}

// CreateAndSubmitBatch creates and submits a batch to the DA and SL.
Expand Down Expand Up @@ -215,7 +214,7 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight
batch.DRSVersion = append(batch.DRSVersion, drsVersion)

totalSize := batch.SizeBytes()
if maxBatchSize < uint64(totalSize) {
if maxBatchSize < uint64(totalSize) { //nolint:gosec // size is always positive and falls in uint64

// Remove the last block and commit from the batch
batch.Blocks = batch.Blocks[:len(batch.Blocks)-1]
Expand Down
2 changes: 1 addition & 1 deletion config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func AddNodeFlags(cmd *cobra.Command) {
cmd.Flags().String(FlagP2PListenAddress, def.P2PConfig.ListenAddress, "P2P listen address")
cmd.Flags().String(FlagP2PBootstrapNodes, def.P2PConfig.BootstrapNodes, "P2P bootstrap nodes")
cmd.Flags().Duration(FlagP2PBootstrapRetryTime, def.P2PConfig.BootstrapRetryTime, "P2P bootstrap time")
cmd.Flags().Uint64(FlagP2PGossipCacheSize, uint64(def.P2PConfig.GossipSubCacheSize), "P2P Gossiped blocks cache size")
cmd.Flags().Uint64(FlagP2PGossipCacheSize, uint64(def.P2PConfig.GossipSubCacheSize), "P2P Gossiped blocks cache size") //nolint:gosec // GossipSubCacheSize should be always positive
}

func BindDymintFlags(cmd *cobra.Command, v *viper.Viper) error {
Expand Down
2 changes: 2 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ const defaultConfigTemplate = `
block_time = "{{ .BlockManagerConfig.BlockTime }}"
# block production interval in case of no transactions ("0s" produces empty blocks)
max_idle_time = "{{ .BlockManagerConfig.MaxIdleTime }}"
# block production interval after block with no transactions
max_proof_time = "{{ .BlockManagerConfig.MaxProofTime }}"
# maximum time the node will produce blocks without submitting to SL before stopping block production
max_skew_time = "{{ .BlockManagerConfig.MaxSkewTime }}"
Expand Down
12 changes: 9 additions & 3 deletions da/avail/avail.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (
"github.com/centrifuge/go-substrate-rpc-client/v4/rpc/state"
"github.com/centrifuge/go-substrate-rpc-client/v4/signature"
availtypes "github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/tendermint/tendermint/libs/pubsub"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/store"
pb "github.com/dymensionxyz/dymint/types/pb/dymint"
"github.com/tendermint/tendermint/libs/pubsub"
)

const (
Expand Down Expand Up @@ -102,7 +103,7 @@ func WithBatchRetryAttempts(attempts uint) da.Option {
}

// Init initializes DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.Server, kvStore store.KV, logger types.Logger, options ...da.Option) error {
func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.Server, _ store.KV, logger types.Logger, options ...da.Option) error {
c.logger = logger
c.synced = make(chan struct{}, 1)

Expand Down Expand Up @@ -363,7 +364,7 @@ func (c *DataAvailabilityLayerClient) broadcastTx(tx []byte) (uint64, error) {
SpecVersion: rv.SpecVersion,
Tip: availtypes.NewUCompactFromUInt(c.config.Tip),
TransactionVersion: rv.TransactionVersion,
AppID: availtypes.NewUCompactFromUInt(uint64(c.config.AppID)),
AppID: availtypes.NewUCompactFromUInt(uint64(c.config.AppID)), //nolint:gosec // AppID should be always positive
}

// Sign the transaction using Alice's default account
Expand Down Expand Up @@ -442,3 +443,8 @@ func (c *DataAvailabilityLayerClient) getHeightFromHash(hash availtypes.Hash) (u
func (d *DataAvailabilityLayerClient) GetMaxBlobSizeBytes() uint32 {
return maxBlobSize
}

// GetBalance returns the balance for a specific address
func (c *DataAvailabilityLayerClient) GetSignerBalance() (da.Balance, error) {
return da.Balance{}, nil
}
Loading

0 comments on commit 7ac43e6

Please sign in to comment.