Skip to content

Commit

Permalink
full node does not wait for da to be synced + renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed May 16, 2024
1 parent 0b24dd7 commit a880c7e
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 33 deletions.
10 changes: 6 additions & 4 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,16 @@ func (m *Manager) Start(ctx context.Context) error {
}
}

// Fullnode loop can start before syncing from DA
if !isSequencer {
if isSequencer {
// Sequencer must wait till DA is synced
<-m.DAClient.Synced()
} else {
// Fullnode loop can start before syncing from DA
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
}

// TODO: populate the accumulatedSize on startup
// Wait till DA is up and running
<-m.DAClient.Started()

// Start syncing from DA
err = m.syncBlockManager()
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions da/avail/avail.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type DataAvailabilityLayerClient struct {
txInclusionTimeout time.Duration
batchRetryDelay time.Duration
batchRetryAttempts uint
started chan struct{}
synced chan struct{}
}

var (
Expand Down Expand Up @@ -103,7 +103,7 @@ func WithBatchRetryAttempts(attempts uint) da.Option {
// Init initializes DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.Server, kvStore store.KVStore, logger types.Logger, options ...da.Option) error {
c.logger = logger
c.started = make(chan struct{}, 1)
c.synced = make(chan struct{}, 1)

if len(config) > 0 {
err := json.Unmarshal(config, &c.config)
Expand Down Expand Up @@ -142,19 +142,19 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S

// Start starts DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Start() error {
c.started <- struct{}{}
c.synced <- struct{}{}
return nil
}

// Stop stops DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Stop() error {
c.cancel()
close(c.started)
close(c.synced)
return nil
}

func (c *DataAvailabilityLayerClient) Started() <-chan struct{} {
return c.started
func (c *DataAvailabilityLayerClient) Synced() <-chan struct{} {
return c.synced
}

// GetClientType returns client type.
Expand Down
12 changes: 6 additions & 6 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type DataAvailabilityLayerClient struct {
logger types.Logger
ctx context.Context
cancel context.CancelFunc
started chan struct{}
synced chan struct{}
}

var (
Expand Down Expand Up @@ -76,7 +76,7 @@ func WithSubmitBackoff(c uretry.BackoffConfig) da.Option {
// Init initializes DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.Server, kvStore store.KVStore, logger types.Logger, options ...da.Option) error {
c.logger = logger
c.started = make(chan struct{}, 1)
c.synced = make(chan struct{}, 1)
var err error
c.config, err = createConfig(config)
if err != nil {
Expand Down Expand Up @@ -168,7 +168,7 @@ func (c *DataAvailabilityLayerClient) Start() (err error) {
}

c.rpc = NewOpenRPC(rpc)
c.started <- struct{}{}
c.synced <- struct{}{}
c.logger.Info("celestia-node is synced")

err := retry.Do(sync,
Expand All @@ -192,13 +192,13 @@ func (c *DataAvailabilityLayerClient) Stop() error {
return err
}
c.cancel()
close(c.started)
close(c.synced)
return nil
}

// Started returns channel for on start event
func (c *DataAvailabilityLayerClient) Started() <-chan struct{} {
return c.started
func (c *DataAvailabilityLayerClient) Synced() <-chan struct{} {
return c.synced
}

// GetClientType returns client type.
Expand Down
2 changes: 1 addition & 1 deletion da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ type DataAvailabilityLayerClient interface {
// CheckBatchAvailability checks the availability of the blob submitted getting proofs and validating them
CheckBatchAvailability(daMetaData *DASubmitMetaData) ResultCheckBatch

Started() <-chan struct{}
Synced() <-chan struct{}
}

// BatchRetriever is additional interface that can be implemented by Data Availability Layer Client that is able to retrieve
Expand Down
18 changes: 9 additions & 9 deletions da/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
type DataAvailabilityLayerClient struct {
config Config

conn *grpc.ClientConn
client dalc.DALCServiceClient
started chan struct{}
logger types.Logger
conn *grpc.ClientConn
client dalc.DALCServiceClient
synced chan struct{}
logger types.Logger
}

// Config contains configuration options for DataAvailabilityLayerClient.
Expand Down Expand Up @@ -50,14 +50,14 @@ func (d *DataAvailabilityLayerClient) Init(config []byte, _ *pubsub.Server, _ st
d.config = DefaultConfig
return nil
}
d.started = make(chan struct{}, 1)
d.synced = make(chan struct{}, 1)
return json.Unmarshal(config, &d.config)
}

// Start creates connection to gRPC server and instantiates gRPC client.
func (d *DataAvailabilityLayerClient) Start() error {
d.logger.Info("starting GRPC DALC", "host", d.config.Host, "port", d.config.Port)
d.started <- struct{}{}
d.synced <- struct{}{}

var err error
var opts []grpc.DialOption
Expand All @@ -78,9 +78,9 @@ func (d *DataAvailabilityLayerClient) Stop() error {
return d.conn.Close()
}

// Started returns channel for on start event
func (m *DataAvailabilityLayerClient) Started() <-chan struct{} {
return m.started
// Synced returns channel for on sync event
func (m *DataAvailabilityLayerClient) Synced() <-chan struct{} {
return m.synced
}

// GetClientType returns client type.
Expand Down
14 changes: 7 additions & 7 deletions da/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type DataAvailabilityLayerClient struct {
dalcKV store.KVStore
daHeight atomic.Uint64
config config
started chan struct{}
synced chan struct{}
}

const defaultBlockTime = 3 * time.Second
Expand Down Expand Up @@ -48,14 +48,14 @@ func (m *DataAvailabilityLayerClient) Init(config []byte, _ *pubsub.Server, dalc
} else {
m.config.BlockTime = defaultBlockTime
}
m.started = make(chan struct{}, 1)
m.synced = make(chan struct{}, 1)
return nil
}

// Start implements DataAvailabilityLayerClient interface.
func (m *DataAvailabilityLayerClient) Start() error {
m.logger.Debug("Mock Data Availability Layer Client starting")
m.started <- struct{}{}
m.synced <- struct{}{}
go func() {
for {
time.Sleep(m.config.BlockTime)
Expand All @@ -69,13 +69,13 @@ func (m *DataAvailabilityLayerClient) Start() error {
// Stop implements DataAvailabilityLayerClient interface.
func (m *DataAvailabilityLayerClient) Stop() error {
m.logger.Debug("Mock Data Availability Layer Client stopped")
close(m.started)
close(m.synced)
return nil
}

// Started returns channel for on start event
func (m *DataAvailabilityLayerClient) Started() <-chan struct{} {
return m.started
// Synced returns channel for on start event
func (m *DataAvailabilityLayerClient) Synced() <-chan struct{} {
return m.synced
}

// GetClientType returns client type.
Expand Down

0 comments on commit a880c7e

Please sign in to comment.