From 0f968ba7a233919798b3b3e85b7bbf0cf84d78ac Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Tue, 8 Aug 2023 12:27:56 -0400 Subject: [PATCH] Log incoming subscription messages. --- stream/stream.go | 8 ++++++-- stream/transport/transport.go | 25 +++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/stream/stream.go b/stream/stream.go index 793c48bb..f87f3c5c 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -136,6 +136,8 @@ func subscribe(ctx context.Context, log log.Logger, ps PubSub, handle func(conte s := ps.Subscribe(ctx) defer s.Close() + log = log.WithField("topic", ps.String()) + for { msg, err := s.Next(ctx) if err != nil { @@ -147,10 +149,12 @@ func subscribe(ctx context.Context, log log.Logger, ps PubSub, handle func(conte return } - log.WithError(err). - WithField("topic", ps.String()). + log.With(msg). + WithError(err). Warn("failed to handle subscription event") } + + log.With(msg).Debug("handled subscription event") } } diff --git a/stream/transport/transport.go b/stream/transport/transport.go index 876976d9..dbdd8e72 100644 --- a/stream/transport/transport.go +++ b/stream/transport/transport.go @@ -29,12 +29,37 @@ type Subscription interface { type ForkVersionFormat uint64 +func (f ForkVersionFormat) String() string { + switch f { + case Unknown: + return "none" + case AltairJson: + return "altair/json" + case BellatrixJson: + return "bellatrix/json" + case CapellaJson: + return "capella/json" + case CapellaSSZ: + return "capella/ssz" + } + + return fmt.Sprintf("invalid: %d", f) +} + type Message struct { Source uuid.UUID ForkEncoding ForkVersionFormat Payload []byte } +func (m Message) Loggable() map[string]any { + return map[string]any{ + "source": m.Source, + "fork_encoding": m.ForkEncoding, + "n_bytes": len(m.Payload), + } +} + func Encode(m Message) ([]byte, error) { rawItem, err := json.Marshal(m) if err != nil {