Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into release/v0.5.x
Browse files Browse the repository at this point in the history
  • Loading branch information
omritoptix committed Aug 23, 2023
2 parents 3c2f35c + 2e9bb1d commit 18d1966
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 74 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
go-version: '1.19'
- run: git config --global url.https://[email protected]/.insteadOf https://github.com/
- name: golangci-lint
uses: golangci/golangci-lint-action@v3.5.0
uses: golangci/golangci-lint-action@v3.7.0
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: v1.49
Expand All @@ -48,7 +48,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: markdownlint-cli
uses: nosborn/github-action-markdown-cli@v3.2.0
uses: nosborn/github-action-markdown-cli@v3.3.0
with:
files: .
config-file: .markdownlint.yaml
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v4
with:
go-version: 1.19
- run: git config --global url.https://[email protected]/.insteadOf https://github.com/
Expand Down
46 changes: 31 additions & 15 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
m.logger.Info("Starting in aggregator mode")
// TODO(omritoptix): change to private methods
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
}
// TODO(omritoptix): change to private methods
go m.RetriveLoop(ctx)
Expand Down Expand Up @@ -282,6 +283,36 @@ func (m *Manager) waitForSync(ctx context.Context) error {
return nil
}

func (m *Manager) SubmitLoop(ctx context.Context) {
ticker := time.NewTicker(m.conf.BatchSubmitMaxTime)
defer ticker.Stop()

for {
select {
//Context canceled
case <-ctx.Done():
return
case <-ticker.C:
// SyncTarget is the height of the last block in the last batch as seen by this node.
syncTarget := atomic.LoadUint64(&m.syncTarget)
height := m.store.Height()
//no new blocks produced yet
if (height - syncTarget) == 0 {
continue
}

// Submit batch if we've reached the batch size and there isn't another batch currently in submission process.
if m.batchInProcess.Load() == false {
m.batchInProcess.Store(true)
go m.submitNextBatch(ctx)
}

//TODO: add the case of batch size (should be signaled from the the block production)
// case <- requiredByNumOfBlocks
}
}
}

// ProduceBlockLoop is calling publishBlock in a loop as long as wer'e synced.
func (m *Manager) ProduceBlockLoop(ctx context.Context) {
atomic.StoreInt64(&m.lastSubmissionTime, time.Now().Unix())
Expand Down Expand Up @@ -708,21 +739,6 @@ func (m *Manager) produceBlock(ctx context.Context, allowEmpty bool) error {

m.logger.Info("block created", "height", newHeight, "num_tx", len(block.Data.Txs))
rollappHeightGauge.Set(float64(newHeight))

//TODO: move to separate function
lastSubmissionTime := atomic.LoadInt64(&m.lastSubmissionTime)
requiredByTime := time.Since(time.Unix(0, lastSubmissionTime)) > m.conf.BatchSubmitMaxTime

// SyncTarget is the height of the last block in the last batch as seen by this node.
syncTarget := atomic.LoadUint64(&m.syncTarget)
requiredByNumOfBlocks := (block.Header.Height - syncTarget) > m.conf.BlockBatchSize

// Submit batch if we've reached the batch size and there isn't another batch currently in submission process.
if m.batchInProcess.Load() == false && (requiredByTime || requiredByNumOfBlocks) {
m.batchInProcess.Store(true)
go m.submitNextBatch(ctx)
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) {
mCtx, cancel := context.WithTimeout(context.Background(), runTime)
defer cancel()
go manager.ProduceBlockLoop(mCtx)
go manager.SubmitLoop(mCtx)
<-mCtx.Done()

require.True(manager.batchInProcess.Load() == true)
Expand Down
12 changes: 6 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,16 @@ func (c BlockManagerConfig) Validate() error {
return fmt.Errorf("empty_blocks_max_time must be positive or zero to disable")
}

if c.EmptyBlocksMaxTime <= c.BlockTime {
return fmt.Errorf("empty_blocks_max_time must be greater than block_time")
}

if c.BatchSubmitMaxTime <= 0 {
return fmt.Errorf("batch_submit_max_time must be positive")
}

if c.BatchSubmitMaxTime <= c.EmptyBlocksMaxTime {
return fmt.Errorf("batch_submit_max_time must be greater than empty_blocks_max_time")
if c.EmptyBlocksMaxTime != 0 && c.EmptyBlocksMaxTime <= c.BlockTime {
return fmt.Errorf("empty_blocks_max_time must be greater than block_time")
}

if c.BatchSubmitMaxTime < c.BlockTime {
return fmt.Errorf("batch_submit_max_time must be greater than block_time")
}

if c.BlockBatchSize <= 0 {
Expand Down
2 changes: 1 addition & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ aggregator = "{{ .Aggregator }}"
# block production interval
block_time = "{{ .BlockManagerConfig.BlockTime }}"
# block time for empty blocks (block time in case of no transactions)
# block production interval in case of no transactions ("0s" produces empty blocks)
empty_blocks_max_time = "{{ .BlockManagerConfig.EmptyBlocksMaxTime }}"
# triggers to submit batch to DA and settlement (both required)
Expand Down
5 changes: 3 additions & 2 deletions da/celestia/celestia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
"github.com/celestiaorg/go-cnc"
"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/da/celestia"
"github.com/dymensionxyz/dymint/log/test"
mocks "github.com/dymensionxyz/dymint/mocks/da/celestia"
"github.com/dymensionxyz/dymint/testutil"
"github.com/dymensionxyz/dymint/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"

"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/pubsub"
rpcmock "github.com/tendermint/tendermint/rpc/client/mocks"
Expand Down Expand Up @@ -114,7 +115,7 @@ func TestSubmitBatch(t *testing.T) {
assert.NoError(err)
// Start the DALC
dalc := celestia.DataAvailabilityLayerClient{}
err = dalc.Init(configBytes, pubsubServer, nil, test.NewLogger(t), options...)
err = dalc.Init(configBytes, pubsubServer, nil, log.TestingLogger(), options...)
require.NoError(err)
err = dalc.Start()
require.NoError(err)
Expand Down
4 changes: 2 additions & 2 deletions da/celestia/mock/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func (s *Server) Start(listener net.Listener) error {
if err != nil {
return err
}
s.server = new(http.Server)
s.server.Handler = s.getHandler()
go func() {
s.server = new(http.Server)
s.server.Handler = s.getHandler()
err := s.server.Serve(listener)
s.logger.Debug("http server exited with", "error", err)
}()
Expand Down
23 changes: 12 additions & 11 deletions da/da_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import (
"github.com/tendermint/tendermint/libs/pubsub"
"google.golang.org/grpc"

"github.com/tendermint/tendermint/libs/log"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/da/celestia"
cmock "github.com/dymensionxyz/dymint/da/celestia/mock"
grpcda "github.com/dymensionxyz/dymint/da/grpc"
"github.com/dymensionxyz/dymint/da/grpc/mockserv"
"github.com/dymensionxyz/dymint/da/mock"
"github.com/dymensionxyz/dymint/da/registry"
"github.com/dymensionxyz/dymint/log/test"
"github.com/dymensionxyz/dymint/store"
"github.com/dymensionxyz/dymint/types"
)
Expand All @@ -35,7 +36,7 @@ func TestLifecycle(t *testing.T) {
for _, dalc := range registry.RegisteredClients() {
//TODO(omritoptix): Possibly add support for avail here.
if dalc == "avail" {
t.Skip("TODO")
continue
}
t.Run(dalc, func(t *testing.T) {
doTestLifecycle(t, dalc)
Expand All @@ -57,7 +58,7 @@ func doTestLifecycle(t *testing.T, daType string) {
require.NoError(err)
}

err = dalc.Init(dacfg, pubsubServer, nil, test.NewLogger(t))
err = dalc.Init(dacfg, pubsubServer, nil, log.TestingLogger())
require.NoError(err)

err = dalc.Start()
Expand All @@ -77,7 +78,7 @@ func TestDALC(t *testing.T) {
for _, dalc := range registry.RegisteredClients() {
//TODO(omritoptix): Possibly add support for avail here.
if dalc == "avail" {
t.Skip("TODO")
continue
}
t.Run(dalc, func(t *testing.T) {
doTestDALC(t, registry.GetClient(dalc))
Expand Down Expand Up @@ -106,7 +107,7 @@ func doTestDALC(t *testing.T, dalc da.DataAvailabilityLayerClient) {
}
pubsubServer := pubsub.NewServer()
pubsubServer.Start()
err := dalc.Init(conf, pubsubServer, store.NewDefaultInMemoryKVStore(), test.NewLogger(t))
err := dalc.Init(conf, pubsubServer, store.NewDefaultInMemoryKVStore(), log.TestingLogger())
require.NoError(err)

err = dalc.Start()
Expand Down Expand Up @@ -164,11 +165,11 @@ func TestRetrieve(t *testing.T) {
defer httpServer.Stop()

for _, client := range registry.RegisteredClients() {
//TODO(omritoptix): Possibly add support for avail here.
if client == "avail" {
continue
}
t.Run(client, func(t *testing.T) {
//TODO(omritoptix): Possibly add support for avail here.
if client == "avail" {
t.Skip("TODO")
}
dalc := registry.GetClient(client)
_, ok := dalc.(da.BatchRetriever)
if ok {
Expand All @@ -194,7 +195,7 @@ func startMockGRPCServ(t *testing.T) *grpc.Server {

func startMockCelestiaNodeServer(t *testing.T) *cmock.Server {
t.Helper()
httpSrv := cmock.NewServer(mockDaBlockTime, test.NewLogger(t))
httpSrv := cmock.NewServer(mockDaBlockTime, log.TestingLogger())
l, err := net.Listen("tcp4", ":26658")
if err != nil {
t.Fatal("failed to create listener for mock celestia-node RPC server", "error", err)
Expand Down Expand Up @@ -227,7 +228,7 @@ func doTestRetrieve(t *testing.T, dalc da.DataAvailabilityLayerClient) {
}
pubsubServer := pubsub.NewServer()
pubsubServer.Start()
err := dalc.Init(conf, pubsubServer, store.NewDefaultInMemoryKVStore(), test.NewLogger(t))
err := dalc.Init(conf, pubsubServer, store.NewDefaultInMemoryKVStore(), log.TestingLogger())
require.NoError(err)

err = dalc.Start()
Expand Down
16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ require (
github.com/gorilla/rpc v1.2.0
github.com/gorilla/websocket v1.5.0
github.com/informalsystems/tm-load-test v1.3.0
github.com/ipfs/go-log v1.0.5
github.com/libp2p/go-libp2p v0.26.0
github.com/libp2p/go-libp2p-core v0.20.1
github.com/libp2p/go-libp2p-kad-dht v0.21.0
github.com/libp2p/go-libp2p-pubsub v0.9.0
github.com/multiformats/go-multiaddr v0.8.0
github.com/multiformats/go-multiaddr v0.11.0
github.com/prometheus/client_golang v1.14.0
github.com/rs/cors v1.8.3
github.com/spf13/cobra v1.6.1
Expand All @@ -33,7 +32,7 @@ require (
github.com/tendermint/tendermint v0.34.28
go.uber.org/multierr v1.8.0
golang.org/x/net v0.9.0
gonum.org/v1/gonum v0.12.0
gonum.org/v1/gonum v0.14.0
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.31.0
)
Expand Down Expand Up @@ -64,6 +63,7 @@ require (
github.com/hashicorp/go-uuid v1.0.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/lib/pq v1.10.7 // indirect
github.com/libp2p/go-yamux/v4 v4.0.0 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
Expand All @@ -84,7 +84,7 @@ require (
github.com/zondax/ledger-go v0.14.1 // indirect
go.uber.org/dig v1.15.0 // indirect
go.uber.org/fx v1.18.2 // indirect
golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0 // indirect
golang.org/x/exp v0.0.0-20230725012225-302865e7556b // indirect
google.golang.org/api v0.114.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
Expand Down Expand Up @@ -160,7 +160,7 @@ require (
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/jmhodges/levigo v1.0.0 // indirect
github.com/klauspost/compress v1.16.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.1 // indirect
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/koron/go-ssdp v0.0.3 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
Expand All @@ -180,7 +180,7 @@ require (
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/mimoo/StrobeGo v0.0.0-20220103164710-9a04d6ca976b // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/mtibben/percent v0.2.1 // indirect
Expand Down Expand Up @@ -223,7 +223,7 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
Expand All @@ -242,7 +242,7 @@ require (
github.com/golang/glog v1.1.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
github.com/ignite/cli v0.25.2
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.42.0 // indirect
github.com/smartystreets/assertions v1.0.1 // indirect
golang.org/x/tools v0.7.0 // indirect
Expand Down
Loading

0 comments on commit 18d1966

Please sign in to comment.