Skip to content

Commit

Permalink
fix(da): full-nodes can sync from p2p while DA light client is down o…
Browse files Browse the repository at this point in the history
…r out of sync (#857)

Co-authored-by: Daniel T <[email protected]>
  • Loading branch information
srene and danwt authored May 18, 2024
1 parent 55dd894 commit 2b5eb07
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 47 deletions.
10 changes: 6 additions & 4 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
"github.com/dymensionxyz/dymint/gerr"
"github.com/dymensionxyz/dymint/store"

uevent "github.com/dymensionxyz/dymint/utils/event"

"code.cloudfoundry.org/go-diodes"
uevent "github.com/dymensionxyz/dymint/utils/event"

"github.com/dymensionxyz/dymint/p2p"
"github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -150,23 +149,26 @@ func (m *Manager) Start(ctx context.Context) error {
}

if !isSequencer {
// Fullnode loop can start before syncing from DA
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
}

// TODO: populate the accumulatedSize on startup

err = m.syncBlockManager()
if err != nil {
return fmt.Errorf("sync block manager: %w", err)
}

if isSequencer {
// TODO: populate the accumulatedSize on startup
// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
go m.RetrieveLoop(ctx)
go m.SyncToTargetHeightLoop(ctx)
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ func (m *Manager) syncToTargetHeight(targetHeight uint64) error {
if err != nil {
return fmt.Errorf("process next DA batch: %w", err)
}
m.logger.Info("Synced from DA", "store height", currH, "target height", targetHeight)

}

m.logger.Info("Synced", "store height", m.State.Height(), "target height", targetHeight)

err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("Attempt apply cached blocks.", "err", err)
Expand Down
9 changes: 8 additions & 1 deletion da/avail/avail.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type DataAvailabilityLayerClient struct {
txInclusionTimeout time.Duration
batchRetryDelay time.Duration
batchRetryAttempts uint
synced chan struct{}
}

var (
Expand Down Expand Up @@ -102,6 +103,7 @@ func WithBatchRetryAttempts(attempts uint) da.Option {
// Init initializes DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.Server, kvStore store.KVStore, logger types.Logger, options ...da.Option) error {
c.logger = logger
c.synced = make(chan struct{}, 1)

if len(config) > 0 {
err := json.Unmarshal(config, &c.config)
Expand Down Expand Up @@ -135,21 +137,26 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S
}

c.ctx, c.cancel = context.WithCancel(context.Background())

return nil
}

// Start starts DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Start() error {
c.synced <- struct{}{}
return nil
}

// Stop stops DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Stop() error {
c.cancel()
close(c.synced)
return nil
}

func (c *DataAvailabilityLayerClient) Synced() <-chan struct{} {
return c.synced
}

// GetClientType returns client type.
func (c *DataAvailabilityLayerClient) GetClientType() da.Client {
return da.Avail
Expand Down
93 changes: 55 additions & 38 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type DataAvailabilityLayerClient struct {
logger types.Logger
ctx context.Context
cancel context.CancelFunc
synced chan struct{}
}

var (
Expand Down Expand Up @@ -75,7 +76,7 @@ func WithSubmitBackoff(c uretry.BackoffConfig) da.Option {
// Init initializes DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.Server, kvStore store.KVStore, logger types.Logger, options ...da.Option) error {
c.logger = logger

c.synced = make(chan struct{}, 1)
var err error
c.config, err = createConfig(config)
if err != nil {
Expand Down Expand Up @@ -133,48 +134,16 @@ func (c *DataAvailabilityLayerClient) Start() (err error) {
return nil
}

rpc, err := openrpc.NewClient(c.ctx, c.config.BaseURL, c.config.AuthToken)
if err != nil {
return err
}

state, err := rpc.Header.SyncState(c.ctx)
var rpc *openrpc.Client
rpc, err = openrpc.NewClient(c.ctx, c.config.BaseURL, c.config.AuthToken)
if err != nil {
return err
}
c.rpc = NewOpenRPC(rpc)

if !state.Finished() {
c.logger.Info("Waiting for celestia-node to finish syncing.", "height", state.Height, "target", state.ToHeight)

done := make(chan error, 1)
go func() {
done <- rpc.Header.SyncWait(c.ctx)
}()

ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

for {
select {
case err := <-done:
if err != nil {
return err
}
return nil
case <-ticker.C:
state, err := rpc.Header.SyncState(c.ctx)
if err != nil {
return err
}
c.logger.Info("Celestia-node still syncing.", "height", state.Height, "target", state.ToHeight)
}
}
}

c.logger.Info("Celestia-node is synced.", "height", state.ToHeight)
go c.sync(rpc)

c.rpc = NewOpenRPC(rpc)
return nil
return
}

// Stop stops DataAvailabilityLayerClient.
Expand All @@ -185,9 +154,15 @@ func (c *DataAvailabilityLayerClient) Stop() error {
return err
}
c.cancel()
close(c.synced)
return nil
}

// Started returns channel for on start event
func (c *DataAvailabilityLayerClient) Synced() <-chan struct{} {
return c.synced
}

// GetClientType returns client type.
func (c *DataAvailabilityLayerClient) GetClientType() da.Client {
return da.Celestia
Expand Down Expand Up @@ -629,3 +604,45 @@ func (c *DataAvailabilityLayerClient) getDataAvailabilityHeaders(height uint64)

return headers.DAH, nil
}

// Celestia syncing in background
func (c *DataAvailabilityLayerClient) sync(rpc *openrpc.Client) {
sync := func() error {
done := make(chan error, 1)
go func() {
done <- rpc.Header.SyncWait(c.ctx)
}()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case err := <-done:
return err
case <-ticker.C:
state, err := rpc.Header.SyncState(c.ctx)
if err != nil {
return err
}
c.logger.Info("Celestia-node syncing", "height", state.Height, "target", state.ToHeight)

}
}
}

err := retry.Do(sync,
retry.Attempts(0), // try forever
retry.Delay(10*time.Second),
retry.LastErrorOnly(true),
retry.DelayType(retry.FixedDelay),
retry.OnRetry(func(n uint, err error) {
c.logger.Error("Failed to sync Celestia DA", "attempt", n, "error", err)
}),
)

c.logger.Info("Celestia-node is synced.")
c.synced <- struct{}{}

if err != nil {
c.logger.Error("Waiting for Celestia data availability client to sync", "err", err)
}
}
2 changes: 2 additions & 0 deletions da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ type DataAvailabilityLayerClient interface {

// CheckBatchAvailability checks the availability of the blob submitted getting proofs and validating them
CheckBatchAvailability(daMetaData *DASubmitMetaData) ResultCheckBatch

Synced() <-chan struct{}
}

// BatchRetriever is additional interface that can be implemented by Data Availability Layer Client that is able to retrieve
Expand Down
11 changes: 9 additions & 2 deletions da/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type DataAvailabilityLayerClient struct {

conn *grpc.ClientConn
client dalc.DALCServiceClient

synced chan struct{}
logger types.Logger
}

Expand Down Expand Up @@ -50,12 +50,15 @@ func (d *DataAvailabilityLayerClient) Init(config []byte, _ *pubsub.Server, _ st
d.config = DefaultConfig
return nil
}
d.synced = make(chan struct{}, 1)
return json.Unmarshal(config, &d.config)
}

// Start creates connection to gRPC server and instantiates gRPC client.
func (d *DataAvailabilityLayerClient) Start() error {
d.logger.Info("starting GRPC DALC", "host", d.config.Host, "port", d.config.Port)
d.synced <- struct{}{}

var err error
var opts []grpc.DialOption
// TODO(tzdybal): add more options
Expand All @@ -66,7 +69,6 @@ func (d *DataAvailabilityLayerClient) Start() error {
}

d.client = dalc.NewDALCServiceClient(d.conn)

return nil
}

Expand All @@ -76,6 +78,11 @@ func (d *DataAvailabilityLayerClient) Stop() error {
return d.conn.Close()
}

// Synced returns channel for on sync event
func (m *DataAvailabilityLayerClient) Synced() <-chan struct{} {
return m.synced
}

// GetClientType returns client type.
func (d *DataAvailabilityLayerClient) GetClientType() da.Client {
return da.Celestia
Expand Down
10 changes: 10 additions & 0 deletions da/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type DataAvailabilityLayerClient struct {
dalcKV store.KVStore
daHeight atomic.Uint64
config config
synced chan struct{}
}

const defaultBlockTime = 3 * time.Second
Expand Down Expand Up @@ -47,27 +48,36 @@ func (m *DataAvailabilityLayerClient) Init(config []byte, _ *pubsub.Server, dalc
} else {
m.config.BlockTime = defaultBlockTime
}
m.synced = make(chan struct{}, 1)
return nil
}

// Start implements DataAvailabilityLayerClient interface.
func (m *DataAvailabilityLayerClient) Start() error {
m.logger.Debug("Mock Data Availability Layer Client starting")
m.synced <- struct{}{}
go func() {
for {
time.Sleep(m.config.BlockTime)
m.updateDAHeight()
}
}()

return nil
}

// Stop implements DataAvailabilityLayerClient interface.
func (m *DataAvailabilityLayerClient) Stop() error {
m.logger.Debug("Mock Data Availability Layer Client stopped")
close(m.synced)
return nil
}

// Synced returns channel for on start event
func (m *DataAvailabilityLayerClient) Synced() <-chan struct{} {
return m.synced
}

// GetClientType returns client type.
func (m *DataAvailabilityLayerClient) GetClientType() da.Client {
return da.Mock
Expand Down

0 comments on commit 2b5eb07

Please sign in to comment.