From 2b5eb07147142b91e13c2c483238e356e6c5871c Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Sat, 18 May 2024 11:04:00 +0200 Subject: [PATCH] fix(da): full-nodes can sync from p2p while DA light client is down or out of sync (#857) Co-authored-by: Daniel T <30197399+danwt@users.noreply.github.com> --- block/manager.go | 10 +++-- block/retriever.go | 3 +- da/avail/avail.go | 9 +++- da/celestia/celestia.go | 93 ++++++++++++++++++++++++----------------- da/da.go | 2 + da/grpc/grpc.go | 11 ++++- da/local/local.go | 10 +++++ 7 files changed, 91 insertions(+), 47 deletions(-) diff --git a/block/manager.go b/block/manager.go index ea67eea4b..603182a26 100644 --- a/block/manager.go +++ b/block/manager.go @@ -11,9 +11,8 @@ import ( "github.com/dymensionxyz/dymint/gerr" "github.com/dymensionxyz/dymint/store" - uevent "github.com/dymensionxyz/dymint/utils/event" - "code.cloudfoundry.org/go-diodes" + uevent "github.com/dymensionxyz/dymint/utils/event" "github.com/dymensionxyz/dymint/p2p" "github.com/libp2p/go-libp2p/core/crypto" @@ -150,23 +149,26 @@ func (m *Manager) Start(ctx context.Context) error { } if !isSequencer { + // Fullnode loop can start before syncing from DA go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger) } + // TODO: populate the accumulatedSize on startup + err = m.syncBlockManager() if err != nil { return fmt.Errorf("sync block manager: %w", err) } if isSequencer { - // TODO: populate the accumulatedSize on startup + // Sequencer must wait till DA is synced to start submitting blobs + <-m.DAClient.Synced() go m.ProduceBlockLoop(ctx) go m.SubmitLoop(ctx) } else { go m.RetrieveLoop(ctx) go m.SyncToTargetHeightLoop(ctx) } - return nil } diff --git a/block/retriever.go b/block/retriever.go index 30bbb09df..9763224b4 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -57,11 +57,10 @@ func (m *Manager) syncToTargetHeight(targetHeight uint64) error { if err != nil { return fmt.Errorf("process next DA batch: %w", err) } + m.logger.Info("Synced from DA", "store height", currH, "target height", targetHeight) } - m.logger.Info("Synced", "store height", m.State.Height(), "target height", targetHeight) - err := m.attemptApplyCachedBlocks() if err != nil { m.logger.Error("Attempt apply cached blocks.", "err", err) diff --git a/da/avail/avail.go b/da/avail/avail.go index e5b20eabd..236b91236 100644 --- a/da/avail/avail.go +++ b/da/avail/avail.go @@ -64,6 +64,7 @@ type DataAvailabilityLayerClient struct { txInclusionTimeout time.Duration batchRetryDelay time.Duration batchRetryAttempts uint + synced chan struct{} } var ( @@ -102,6 +103,7 @@ func WithBatchRetryAttempts(attempts uint) da.Option { // Init initializes DataAvailabilityLayerClient instance. func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.Server, kvStore store.KVStore, logger types.Logger, options ...da.Option) error { c.logger = logger + c.synced = make(chan struct{}, 1) if len(config) > 0 { err := json.Unmarshal(config, &c.config) @@ -135,21 +137,26 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S } c.ctx, c.cancel = context.WithCancel(context.Background()) - return nil } // Start starts DataAvailabilityLayerClient instance. func (c *DataAvailabilityLayerClient) Start() error { + c.synced <- struct{}{} return nil } // Stop stops DataAvailabilityLayerClient instance. func (c *DataAvailabilityLayerClient) Stop() error { c.cancel() + close(c.synced) return nil } +func (c *DataAvailabilityLayerClient) Synced() <-chan struct{} { + return c.synced +} + // GetClientType returns client type. func (c *DataAvailabilityLayerClient) GetClientType() da.Client { return da.Avail diff --git a/da/celestia/celestia.go b/da/celestia/celestia.go index 19e631b19..7deeccd05 100644 --- a/da/celestia/celestia.go +++ b/da/celestia/celestia.go @@ -37,6 +37,7 @@ type DataAvailabilityLayerClient struct { logger types.Logger ctx context.Context cancel context.CancelFunc + synced chan struct{} } var ( @@ -75,7 +76,7 @@ func WithSubmitBackoff(c uretry.BackoffConfig) da.Option { // Init initializes DataAvailabilityLayerClient instance. func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.Server, kvStore store.KVStore, logger types.Logger, options ...da.Option) error { c.logger = logger - + c.synced = make(chan struct{}, 1) var err error c.config, err = createConfig(config) if err != nil { @@ -133,48 +134,16 @@ func (c *DataAvailabilityLayerClient) Start() (err error) { return nil } - rpc, err := openrpc.NewClient(c.ctx, c.config.BaseURL, c.config.AuthToken) - if err != nil { - return err - } - - state, err := rpc.Header.SyncState(c.ctx) + var rpc *openrpc.Client + rpc, err = openrpc.NewClient(c.ctx, c.config.BaseURL, c.config.AuthToken) if err != nil { return err } + c.rpc = NewOpenRPC(rpc) - if !state.Finished() { - c.logger.Info("Waiting for celestia-node to finish syncing.", "height", state.Height, "target", state.ToHeight) - - done := make(chan error, 1) - go func() { - done <- rpc.Header.SyncWait(c.ctx) - }() - - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() - - for { - select { - case err := <-done: - if err != nil { - return err - } - return nil - case <-ticker.C: - state, err := rpc.Header.SyncState(c.ctx) - if err != nil { - return err - } - c.logger.Info("Celestia-node still syncing.", "height", state.Height, "target", state.ToHeight) - } - } - } - - c.logger.Info("Celestia-node is synced.", "height", state.ToHeight) + go c.sync(rpc) - c.rpc = NewOpenRPC(rpc) - return nil + return } // Stop stops DataAvailabilityLayerClient. @@ -185,9 +154,15 @@ func (c *DataAvailabilityLayerClient) Stop() error { return err } c.cancel() + close(c.synced) return nil } +// Started returns channel for on start event +func (c *DataAvailabilityLayerClient) Synced() <-chan struct{} { + return c.synced +} + // GetClientType returns client type. func (c *DataAvailabilityLayerClient) GetClientType() da.Client { return da.Celestia @@ -629,3 +604,45 @@ func (c *DataAvailabilityLayerClient) getDataAvailabilityHeaders(height uint64) return headers.DAH, nil } + +// Celestia syncing in background +func (c *DataAvailabilityLayerClient) sync(rpc *openrpc.Client) { + sync := func() error { + done := make(chan error, 1) + go func() { + done <- rpc.Header.SyncWait(c.ctx) + }() + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case err := <-done: + return err + case <-ticker.C: + state, err := rpc.Header.SyncState(c.ctx) + if err != nil { + return err + } + c.logger.Info("Celestia-node syncing", "height", state.Height, "target", state.ToHeight) + + } + } + } + + err := retry.Do(sync, + retry.Attempts(0), // try forever + retry.Delay(10*time.Second), + retry.LastErrorOnly(true), + retry.DelayType(retry.FixedDelay), + retry.OnRetry(func(n uint, err error) { + c.logger.Error("Failed to sync Celestia DA", "attempt", n, "error", err) + }), + ) + + c.logger.Info("Celestia-node is synced.") + c.synced <- struct{}{} + + if err != nil { + c.logger.Error("Waiting for Celestia data availability client to sync", "err", err) + } +} diff --git a/da/da.go b/da/da.go index 62f2502cc..2cb7a52bd 100644 --- a/da/da.go +++ b/da/da.go @@ -214,6 +214,8 @@ type DataAvailabilityLayerClient interface { // CheckBatchAvailability checks the availability of the blob submitted getting proofs and validating them CheckBatchAvailability(daMetaData *DASubmitMetaData) ResultCheckBatch + + Synced() <-chan struct{} } // BatchRetriever is additional interface that can be implemented by Data Availability Layer Client that is able to retrieve diff --git a/da/grpc/grpc.go b/da/grpc/grpc.go index 85c87d6ff..ca8a979c2 100644 --- a/da/grpc/grpc.go +++ b/da/grpc/grpc.go @@ -21,7 +21,7 @@ type DataAvailabilityLayerClient struct { conn *grpc.ClientConn client dalc.DALCServiceClient - + synced chan struct{} logger types.Logger } @@ -50,12 +50,15 @@ func (d *DataAvailabilityLayerClient) Init(config []byte, _ *pubsub.Server, _ st d.config = DefaultConfig return nil } + d.synced = make(chan struct{}, 1) return json.Unmarshal(config, &d.config) } // Start creates connection to gRPC server and instantiates gRPC client. func (d *DataAvailabilityLayerClient) Start() error { d.logger.Info("starting GRPC DALC", "host", d.config.Host, "port", d.config.Port) + d.synced <- struct{}{} + var err error var opts []grpc.DialOption // TODO(tzdybal): add more options @@ -66,7 +69,6 @@ func (d *DataAvailabilityLayerClient) Start() error { } d.client = dalc.NewDALCServiceClient(d.conn) - return nil } @@ -76,6 +78,11 @@ func (d *DataAvailabilityLayerClient) Stop() error { return d.conn.Close() } +// Synced returns channel for on sync event +func (m *DataAvailabilityLayerClient) Synced() <-chan struct{} { + return m.synced +} + // GetClientType returns client type. func (d *DataAvailabilityLayerClient) GetClientType() da.Client { return da.Celestia diff --git a/da/local/local.go b/da/local/local.go index f3e074bb2..5dfcfe419 100644 --- a/da/local/local.go +++ b/da/local/local.go @@ -20,6 +20,7 @@ type DataAvailabilityLayerClient struct { dalcKV store.KVStore daHeight atomic.Uint64 config config + synced chan struct{} } const defaultBlockTime = 3 * time.Second @@ -47,27 +48,36 @@ func (m *DataAvailabilityLayerClient) Init(config []byte, _ *pubsub.Server, dalc } else { m.config.BlockTime = defaultBlockTime } + m.synced = make(chan struct{}, 1) return nil } // Start implements DataAvailabilityLayerClient interface. func (m *DataAvailabilityLayerClient) Start() error { m.logger.Debug("Mock Data Availability Layer Client starting") + m.synced <- struct{}{} go func() { for { time.Sleep(m.config.BlockTime) m.updateDAHeight() } }() + return nil } // Stop implements DataAvailabilityLayerClient interface. func (m *DataAvailabilityLayerClient) Stop() error { m.logger.Debug("Mock Data Availability Layer Client stopped") + close(m.synced) return nil } +// Synced returns channel for on start event +func (m *DataAvailabilityLayerClient) Synced() <-chan struct{} { + return m.synced +} + // GetClientType returns client type. func (m *DataAvailabilityLayerClient) GetClientType() da.Client { return da.Mock