Skip to content

Commit

Permalink
gert rid of publishers
Browse files Browse the repository at this point in the history
  • Loading branch information
Aratz M. Lasa committed May 10, 2023
1 parent 3126582 commit 969cf2c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 54 deletions.
2 changes: 0 additions & 2 deletions cmd/dreamboat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,8 +880,6 @@ func initStreamer(c *cli.Context, redisClient *redis.Client, ds stream.Datastore
redisStreamer := stream.NewClient(pubsub, streamConfig)
redisStreamer.AttachMetrics(m)

redisStreamer.RunPublisherParallel(c.Context, c.Uint("relay-distribution-stream-workers"))

if err := redisStreamer.RunSubscriberParallel(c.Context, ds, c.Uint("relay-distribution-stream-workers")); err != nil {
return nil, fmt.Errorf("fail to start stream subscriber: %w", err)
}
Expand Down
60 changes: 8 additions & 52 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,83 +164,39 @@ func (s *Client) RunSlotDeliveredSubscriber(ctx context.Context, slots chan []by
return ctx.Err()
}

func (s *Client) RunPublisherParallel(ctx context.Context, num uint) {
for i := uint(0); i < num; i++ {
go s.RunStorePublisher(ctx)
go s.RunCachePublisher(ctx)
}
}

func (s *Client) RunCachePublisher(ctx context.Context) error {
for {
select {
case req := <-s.cacheRequests:
s.encodeAndPublish(ctx, req, true)
case <-ctx.Done():
return ctx.Err()
}
}
}

func (s *Client) RunStorePublisher(ctx context.Context) error {
for {
select {
case req := <-s.storeRequests:
s.encodeAndPublish(ctx, req, false)
case <-ctx.Done():
return ctx.Err()
}
}
}

func (s *Client) PublishBlockSubmission(ctx context.Context, block structs.BlockBidAndTrace) error {
select {
case s.storeRequests <- block:
return nil
case <-ctx.Done():
return ctx.Err()
}
return s.encodeAndPublish(ctx, block, false)
}

func (s *Client) PublishCacheBlock(ctx context.Context, block structs.BlockBidAndTrace) error {
select {
case s.cacheRequests <- block:
return nil
case <-ctx.Done():
return ctx.Err()
}
return s.encodeAndPublish(ctx, block, true)
}

func (s *Client) PublishSlotDelivered(ctx context.Context, slot structs.Slot) error {
select {
case s.slotDeliveredRequests <- slot:
return nil
case <-ctx.Done():
return ctx.Err()
}
return nil // TODO
}

func (s *Client) SlotDeliveredChan() <-chan structs.Slot {
return s.slotDelivered
}

func (s *Client) encodeAndPublish(ctx context.Context, block structs.BlockBidAndTrace, isCache bool) {
func (s *Client) encodeAndPublish(ctx context.Context, block structs.BlockBidAndTrace, isCache bool) error {
timer1 := prometheus.NewTimer(s.m.Timing.WithLabelValues("encodeAndPublish", "encode"))
b, err := s.encode(block, isCache)
if err != nil {
timer1.ObserveDuration()
s.Logger.Warnf("fail to encode encode and stream block: %s", err.Error())
return
return fmt.Errorf("fail to encode encode and stream block: %w", err)
}
timer1.ObserveDuration()

timer2 := prometheus.NewTimer(s.m.Timing.WithLabelValues("encodeAndPublish", "publish"))
defer timer2.ObserveDuration()

if err := s.Pubsub.Publish(ctx, s.Config.PubsubTopic, b); err != nil {
s.Logger.Warnf("fail to encode encode and stream block: %s", err.Error())
return
return fmt.Errorf("fail to encode encode and stream block: %w", err)
}

return nil
}

func (s *Client) cachePayload(ctx context.Context, ds Datastore, sBlock StreamBlock) error {
Expand Down

0 comments on commit 969cf2c

Please sign in to comment.