From 969cf2ccefdf6cd94eff266a8094e3bd76399ea3 Mon Sep 17 00:00:00 2001 From: "Aratz M. Lasa" Date: Wed, 10 May 2023 15:03:58 -0600 Subject: [PATCH] gert rid of publishers --- cmd/dreamboat/main.go | 2 -- stream/stream.go | 60 ++++++------------------------------------- 2 files changed, 8 insertions(+), 54 deletions(-) diff --git a/cmd/dreamboat/main.go b/cmd/dreamboat/main.go index 87294e5c..53477633 100644 --- a/cmd/dreamboat/main.go +++ b/cmd/dreamboat/main.go @@ -880,8 +880,6 @@ func initStreamer(c *cli.Context, redisClient *redis.Client, ds stream.Datastore redisStreamer := stream.NewClient(pubsub, streamConfig) redisStreamer.AttachMetrics(m) - redisStreamer.RunPublisherParallel(c.Context, c.Uint("relay-distribution-stream-workers")) - if err := redisStreamer.RunSubscriberParallel(c.Context, ds, c.Uint("relay-distribution-stream-workers")); err != nil { return nil, fmt.Errorf("fail to start stream subscriber: %w", err) } diff --git a/stream/stream.go b/stream/stream.go index fb185b77..cad11bc4 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -164,73 +164,28 @@ func (s *Client) RunSlotDeliveredSubscriber(ctx context.Context, slots chan []by return ctx.Err() } -func (s *Client) RunPublisherParallel(ctx context.Context, num uint) { - for i := uint(0); i < num; i++ { - go s.RunStorePublisher(ctx) - go s.RunCachePublisher(ctx) - } -} - -func (s *Client) RunCachePublisher(ctx context.Context) error { - for { - select { - case req := <-s.cacheRequests: - s.encodeAndPublish(ctx, req, true) - case <-ctx.Done(): - return ctx.Err() - } - } -} - -func (s *Client) RunStorePublisher(ctx context.Context) error { - for { - select { - case req := <-s.storeRequests: - s.encodeAndPublish(ctx, req, false) - case <-ctx.Done(): - return ctx.Err() - } - } -} - func (s *Client) PublishBlockSubmission(ctx context.Context, block structs.BlockBidAndTrace) error { - select { - case s.storeRequests <- block: - return nil - case <-ctx.Done(): - return ctx.Err() - } + return s.encodeAndPublish(ctx, block, false) } func (s *Client) PublishCacheBlock(ctx context.Context, block structs.BlockBidAndTrace) error { - select { - case s.cacheRequests <- block: - return nil - case <-ctx.Done(): - return ctx.Err() - } + return s.encodeAndPublish(ctx, block, true) } func (s *Client) PublishSlotDelivered(ctx context.Context, slot structs.Slot) error { - select { - case s.slotDeliveredRequests <- slot: - return nil - case <-ctx.Done(): - return ctx.Err() - } + return nil // TODO } func (s *Client) SlotDeliveredChan() <-chan structs.Slot { return s.slotDelivered } -func (s *Client) encodeAndPublish(ctx context.Context, block structs.BlockBidAndTrace, isCache bool) { +func (s *Client) encodeAndPublish(ctx context.Context, block structs.BlockBidAndTrace, isCache bool) error { timer1 := prometheus.NewTimer(s.m.Timing.WithLabelValues("encodeAndPublish", "encode")) b, err := s.encode(block, isCache) if err != nil { timer1.ObserveDuration() - s.Logger.Warnf("fail to encode encode and stream block: %s", err.Error()) - return + return fmt.Errorf("fail to encode encode and stream block: %w", err) } timer1.ObserveDuration() @@ -238,9 +193,10 @@ func (s *Client) encodeAndPublish(ctx context.Context, block structs.BlockBidAnd defer timer2.ObserveDuration() if err := s.Pubsub.Publish(ctx, s.Config.PubsubTopic, b); err != nil { - s.Logger.Warnf("fail to encode encode and stream block: %s", err.Error()) - return + return fmt.Errorf("fail to encode encode and stream block: %w", err) } + + return nil } func (s *Client) cachePayload(ctx context.Context, ds Datastore, sBlock StreamBlock) error {