Skip to content

Commit

Permalink
channel fix
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed May 13, 2024
1 parent 6f92104 commit 9b4192f
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 7 deletions.
4 changes: 4 additions & 0 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func (m *Manager) Start(ctx context.Context) error {
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
fmt.Println("waiting")
<-m.DAClient.Started()
fmt.Println("started")

go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
go m.RetrieveLoop(ctx)
go m.SyncTargetLoop(ctx)
Expand Down
3 changes: 2 additions & 1 deletion da/avail/avail.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func WithBatchRetryAttempts(attempts uint) da.Option {
// Init initializes DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.Server, kvStore store.KVStore, logger types.Logger, options ...da.Option) error {
c.logger = logger
c.started = make(chan struct{})
c.started = make(chan struct{}, 1)

if len(config) > 0 {
err := json.Unmarshal(config, &c.config)
Expand Down Expand Up @@ -149,6 +149,7 @@ func (c *DataAvailabilityLayerClient) Start() error {
// Stop stops DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Stop() error {
c.cancel()
close(c.started)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func WithSubmitBackoff(c uretry.BackoffConfig) da.Option {
// Init initializes DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.Server, kvStore store.KVStore, logger types.Logger, options ...da.Option) error {
c.logger = logger
c.started = make(chan struct{})
c.started = make(chan struct{}, 1)
var err error
c.config, err = createConfig(config)
if err != nil {
Expand Down Expand Up @@ -177,7 +177,6 @@ func (c *DataAvailabilityLayerClient) Start() (err error) {

c.rpc = NewOpenRPC(rpc)
c.started <- struct{}{}

c.logger.Info("celestia-node is synced", "height", state.ToHeight)

return nil
Expand All @@ -199,6 +198,7 @@ func (c *DataAvailabilityLayerClient) Stop() error {
return err
}
c.cancel()
close(c.started)
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions da/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ func (d *DataAvailabilityLayerClient) Init(config []byte, _ *pubsub.Server, _ st
d.config = DefaultConfig
return nil
}
d.started = make(chan struct{})
d.started = make(chan struct{}, 1)
return json.Unmarshal(config, &d.config)
}

// Start creates connection to gRPC server and instantiates gRPC client.
func (d *DataAvailabilityLayerClient) Start() error {
d.logger.Info("starting GRPC DALC", "host", d.config.Host, "port", d.config.Port)
d.started <- struct{}{}

var err error
var opts []grpc.DialOption
// TODO(tzdybal): add more options
Expand All @@ -67,7 +69,6 @@ func (d *DataAvailabilityLayerClient) Start() error {
}

d.client = dalc.NewDALCServiceClient(d.conn)
d.started <- struct{}{}
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions da/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,28 @@ func (m *DataAvailabilityLayerClient) Init(config []byte, _ *pubsub.Server, dalc
} else {
m.config.BlockTime = defaultBlockTime
}
m.started = make(chan struct{})
m.started = make(chan struct{}, 1)
return nil
}

// Start implements DataAvailabilityLayerClient interface.
func (m *DataAvailabilityLayerClient) Start() error {
m.logger.Debug("Mock Data Availability Layer Client starting")
m.started <- struct{}{}
go func() {
for {
time.Sleep(m.config.BlockTime)
m.updateDAHeight()
}
}()
m.started <- struct{}{}

return nil
}

// Stop implements DataAvailabilityLayerClient interface.
func (m *DataAvailabilityLayerClient) Stop() error {
m.logger.Debug("Mock Data Availability Layer Client stopped")
close(m.started)
return nil
}

Expand Down

0 comments on commit 9b4192f

Please sign in to comment.