Skip to content

Commit

Permalink
chore: migrate to slog
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Feb 10, 2024
1 parent 53aad25 commit f2d93da
Show file tree
Hide file tree
Showing 44 changed files with 467 additions and 576 deletions.
12 changes: 6 additions & 6 deletions broadcast/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"strconv"

"github.com/anycable/anycable-go/server"
"github.com/apex/log"
)

const (
Expand Down Expand Up @@ -41,7 +41,7 @@ type HTTPBroadcaster struct {
authHeader string
server *server.HTTPServer
node Handler
log *log.Entry
log *slog.Logger
}

var _ Broadcaster = (*HTTPBroadcaster)(nil)
Expand All @@ -56,7 +56,7 @@ func NewHTTPBroadcaster(node Handler, config *HTTPConfig) *HTTPBroadcaster {

return &HTTPBroadcaster{
node: node,
log: log.WithFields(log.Fields{"context": "broadcast", "provider": "http"}),
log: slog.With("context", "broadcast").With("provider", "http"),
port: config.Port,
path: config.Path,
authHeader: authHeader,
Expand All @@ -78,7 +78,7 @@ func (s *HTTPBroadcaster) Start(done chan (error)) error {
s.server = server
s.server.SetupHandler(s.path, http.HandlerFunc(s.Handler))

s.log.Infof("Accept broadcast requests at %s%s", s.server.Address(), s.path)
s.log.Info(fmt.Sprintf("Accept broadcast requests at %s%s", s.server.Address(), s.path))

go func() {
if err := s.server.StartAndAnnounce("broadcasting HTTP server"); err != nil {
Expand All @@ -103,7 +103,7 @@ func (s *HTTPBroadcaster) Shutdown(ctx context.Context) error {
// Handler processes HTTP requests
func (s *HTTPBroadcaster) Handler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
s.log.Debugf("Invalid request method: %s", r.Method)
s.log.Debug("invalid request method", "method", r.Method)
w.WriteHeader(422)
return
}
Expand All @@ -118,7 +118,7 @@ func (s *HTTPBroadcaster) Handler(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)

if err != nil {
s.log.Error("Failed to read request body")
s.log.Error("failed to read request body")
w.WriteHeader(422)
return
}
Expand Down
14 changes: 7 additions & 7 deletions broadcast/legacy_nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package broadcast

import (
"context"
"log/slog"

"github.com/apex/log"
"github.com/nats-io/nats.go"

nconfig "github.com/anycable/anycable-go/nats"
Expand All @@ -14,7 +14,7 @@ type LegacyNATSBroadcaster struct {
handler Handler
config *nconfig.NATSConfig

log *log.Entry
log *slog.Logger
}

var _ Broadcaster = (*LegacyNATSBroadcaster)(nil)
Expand All @@ -23,7 +23,7 @@ func NewLegacyNATSBroadcaster(node Handler, c *nconfig.NATSConfig) *LegacyNATSBr
return &LegacyNATSBroadcaster{
config: c,
handler: node,
log: log.WithFields(log.Fields{"context": "broadcast", "provider": "nats"}),
log: slog.With("context", "broadcast").With("provider", "nats"),
}
}

Expand All @@ -37,11 +37,11 @@ func (s *LegacyNATSBroadcaster) Start(done chan (error)) error {
nats.MaxReconnects(s.config.MaxReconnectAttempts),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
if err != nil {
log.Warnf("Connection failed: %v", err)
slog.Warn("connection failed", "error", err.Error())
}
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
log.Infof("Connection restored: %s", nc.ConnectedUrl())
slog.Info("connection restored", "url", nc.ConnectedUrl())
}),
}

Expand All @@ -56,7 +56,7 @@ func (s *LegacyNATSBroadcaster) Start(done chan (error)) error {
}

_, err = nc.Subscribe(s.config.Channel, func(m *nats.Msg) {
s.log.Debugf("Incoming pubsub message: %s", m.Data)
s.log.Debug("incoming pubsub message", "data", m.Data)
s.handler.HandlePubSub(m.Data)
})

Expand All @@ -65,7 +65,7 @@ func (s *LegacyNATSBroadcaster) Start(done chan (error)) error {
return err
}

s.log.Infof("Subscribing for broadcasts to channel: %s", s.config.Channel)
s.log.Info("subscribing for broadcasts", "channel", s.config.Channel)

s.conn = nc

Expand Down
33 changes: 16 additions & 17 deletions broadcast/legacy_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"net/url"
"strings"
"time"
Expand All @@ -12,7 +13,6 @@ import (
rconfig "github.com/anycable/anycable-go/redis"
"github.com/anycable/anycable-go/utils"

"github.com/apex/log"
"github.com/gomodule/redigo/redis"
)

Expand All @@ -28,7 +28,7 @@ type LegacyRedisBroadcaster struct {
reconnectAttempt int
maxReconnectAttempts int
uri *url.URL
log *log.Entry
log *slog.Logger
tlsVerify bool
}

Expand All @@ -43,7 +43,7 @@ func NewLegacyRedisBroadcaster(node Handler, config *rconfig.RedisConfig) *Legac
pingInterval: time.Duration(config.KeepalivePingInterval),
reconnectAttempt: 0,
maxReconnectAttempts: config.MaxReconnectAttempts,
log: log.WithFields(log.Fields{"context": "broadcast", "provider": "redis"}),
log: slog.With("context", "broadcast").With("provider", "redis"),
tlsVerify: config.TLSVerify,
}
}
Expand All @@ -67,8 +67,7 @@ func (s *LegacyRedisBroadcaster) Start(done chan (error)) error {
if s.sentinels != "" {
masterName := redisURL.Hostname()

s.log.Debug("Redis sentinel enabled")
s.log.Debugf("Redis sentinel parameters: sentinels: %s, masterName: %s", s.sentinels, masterName)
s.log.Debug("Redis sentinel enabled", "sentinels", s.sentinels, "master", masterName)
sentinels := strings.Split(s.sentinels, ",")
s.sentinelClient = &sentinel.Sentinel{
Addrs: sentinels,
Expand Down Expand Up @@ -100,10 +99,10 @@ func (s *LegacyRedisBroadcaster) Start(done chan (error)) error {
dialOptions...,
)
if err != nil {
s.log.Debugf("Failed to connect to sentinel %s", addr)
s.log.Debug("failed to connect to sentinel", "addr", addr)
return nil, err
}
s.log.Debugf("Successfully connected to sentinel %s", addr)
s.log.Debug("successfully connected to sentinel", "addr", addr)
return c, nil
},
}
Expand Down Expand Up @@ -149,18 +148,18 @@ func (s *LegacyRedisBroadcaster) keepalive(done chan (error)) {
masterAddress, err := s.sentinelClient.MasterAddr()

if err != nil {
s.log.Warn("Failed to get master address from sentinel.")
s.log.Warn("failed to get master address from sentinel")
done <- err
return
}
s.log.Debugf("Got master address from sentinel: %s", masterAddress)
s.log.Debug("obtained master address from sentinel", "addr", masterAddress)

s.uri.Host = masterAddress
s.url = s.uri.String()
}

if err := s.listen(); err != nil {
s.log.Warnf("Redis connection failed: %v", err)
s.log.Warn("Redis connection failed", "error", err.Error())
}

s.reconnectAttempt++
Expand All @@ -172,10 +171,10 @@ func (s *LegacyRedisBroadcaster) keepalive(done chan (error)) {

delay := utils.NextRetry(s.reconnectAttempt)

s.log.Infof("Next Redis reconnect attempt in %s", delay)
s.log.Info(fmt.Sprintf("next Redis reconnect attempt in %s", delay))
time.Sleep(delay)

s.log.Infof("Reconnecting to Redis...")
s.log.Info("reconnecting to Redis...")
}
}

Expand All @@ -198,13 +197,13 @@ func (s *LegacyRedisBroadcaster) listen() error {

if s.sentinels != "" {
if !sentinel.TestRole(c, "master") {
return errors.New("Failed master role check")
return errors.New("failed master role check")
}
}

psc := redis.PubSubConn{Conn: c}
if err = psc.Subscribe(s.channel); err != nil {
s.log.Errorf("Failed to subscribe to Redis channel: %v", err)
s.log.Error("failed to subscribe to Redis channel", "error", err.Error())
return err
}

Expand All @@ -216,12 +215,12 @@ func (s *LegacyRedisBroadcaster) listen() error {
for {
switch v := psc.Receive().(type) {
case redis.Message:
s.log.Debugf("Incoming pubsub message from Redis: %s", v.Data)
s.log.Debug("incoming pubsub message", "data", v.Data)
s.node.HandlePubSub(v.Data)
case redis.Subscription:
s.log.Infof("Subscribed to Redis channel: %s\n", v.Channel)
s.log.Info("subscribed to Redis channel", "channel", v.Channel)
case error:
s.log.Errorf("Redis subscription error: %v", v)
s.log.Error("Redis subscription error", "error", v.Error())
done <- v
}
}
Expand Down
38 changes: 19 additions & 19 deletions broadcast/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
"sync"
"time"

rconfig "github.com/anycable/anycable-go/redis"
"github.com/anycable/anycable-go/utils"

"github.com/apex/log"
nanoid "github.com/matoous/go-nanoid"
"github.com/redis/rueidis"
)
Expand All @@ -33,7 +33,7 @@ type RedisBroadcaster struct {
shutdownCh chan struct{}
finishedCh chan struct{}

log *log.Entry
log *slog.Logger
}

var _ Broadcaster = (*RedisBroadcaster)(nil)
Expand All @@ -46,7 +46,7 @@ func NewRedisBroadcaster(node Handler, config *rconfig.RedisConfig) *RedisBroadc
node: node,
config: config,
consumerName: name,
log: log.WithFields(log.Fields{"context": "broadcast", "provider": "redisx", "id": name}),
log: slog.With("context", "broadcast").With("provider", "redisx").With("id", name),
shutdownCh: make(chan struct{}),
finishedCh: make(chan struct{}),
}
Expand All @@ -64,11 +64,11 @@ func (s *RedisBroadcaster) Start(done chan error) error {
}

if s.config.IsSentinel() { //nolint:gocritic
s.log.WithField("stream", s.config.Channel).WithField("consumer", s.consumerName).Infof("Starting Redis broadcaster at %v (sentinels)", s.config.Hostnames())
s.log.With("stream", s.config.Channel).With("consumer", s.consumerName).Info(fmt.Sprintf("Starting Redis broadcaster at %v (sentinels)", s.config.Hostnames()))
} else if s.config.IsCluster() {
s.log.WithField("stream", s.config.Channel).WithField("consumer", s.consumerName).Infof("Starting Redis broadcaster at %v (cluster)", s.config.Hostnames())
s.log.With("stream", s.config.Channel).With("consumer", s.consumerName).Info(fmt.Sprintf("Starting Redis broadcaster at %v (cluster)", s.config.Hostnames()))
} else {
s.log.WithField("stream", s.config.Channel).WithField("consumer", s.consumerName).Infof("Starting Redis broadcaster at %s", s.config.Hostname())
s.log.With("stream", s.config.Channel).With("consumer", s.consumerName).Info(fmt.Sprintf("Starting Redis broadcaster at %s", s.config.Hostname()))
}

s.clientOptions = options
Expand All @@ -86,7 +86,7 @@ func (s *RedisBroadcaster) Shutdown(ctx context.Context) error {
return nil
}

s.log.Debugf("Shutting down Redis broadcaster")
s.log.Debug("shutting down Redis broadcaster")

close(s.shutdownCh)

Expand All @@ -100,7 +100,7 @@ func (s *RedisBroadcaster) Shutdown(ctx context.Context) error {
err := res.Error()

if err != nil {
s.log.Errorf("Failed to remove Redis stream consumer: %v", err)
s.log.Error("failed to remove Redis stream consumer", "error", err.Error())
}

s.client.Close()
Expand Down Expand Up @@ -131,7 +131,7 @@ func (s *RedisBroadcaster) runReader(done chan (error)) {
err := s.initClient()

if err != nil {
s.log.Errorf("Failed to connect to Redis: %v", err)
s.log.Error("failed to connect to Redis", "error", err)
s.maybeReconnect(done)
return
}
Expand All @@ -144,9 +144,9 @@ func (s *RedisBroadcaster) runReader(done chan (error)) {
if err != nil {
if redisErr, ok := rueidis.IsRedisErr(err); ok {
if strings.HasPrefix(redisErr.Error(), "BUSYGROUP") {
s.log.Debugf("Redis consumer group already exists")
s.log.Debug("Redis consumer group already exists")
} else {
s.log.Errorf("Failed to create consumer group: %v", err)
s.log.Error("failed to create consumer group", "error", err.Error())
s.maybeReconnect(done)
return
}
Expand All @@ -161,23 +161,23 @@ func (s *RedisBroadcaster) runReader(done chan (error)) {
for {
select {
case <-s.shutdownCh:
s.log.Debugf("Stop consuming stream")
s.log.Debug("stop consuming stream")
close(s.finishedCh)
return
default:
if lastClaimedAt+readBlockMilliseconds < time.Now().UnixMilli() {
reclaimed, err := s.autoclaimMessages(readBlockMilliseconds)

if err != nil {
s.log.Errorf("Failed to claim from Redis stream: %v", err)
s.log.Error("failed to claim from Redis stream", "error", err)
s.maybeReconnect(done)
return
}

lastClaimedAt = time.Now().UnixMilli()

if len(reclaimed) > 0 {
s.log.Debugf("Reclaimed messages: %d", len(reclaimed))
s.log.Debug("reclaimed messages", "size", len(reclaimed))

s.broadcastXrange(reclaimed)
}
Expand All @@ -186,7 +186,7 @@ func (s *RedisBroadcaster) runReader(done chan (error)) {
messages, err := s.readFromStream(readBlockMilliseconds)

if err != nil {
s.log.Errorf("Failed to read from Redis stream: %v", err)
s.log.Error("failed to read from Redis stream", "error", err)
s.maybeReconnect(done)
return
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func (s *RedisBroadcaster) autoclaimMessages(blockTime int64) ([]rueidis.XRangeE
func (s *RedisBroadcaster) broadcastXrange(messages []rueidis.XRangeEntry) {
for _, message := range messages {
if payload, pok := message.FieldValues["payload"]; pok {
s.log.Debugf("Incoming broadcast: %v", payload)
s.log.Debug("incoming broadcast", "data", payload)

s.node.HandleBroadcast([]byte(payload))

Expand All @@ -264,7 +264,7 @@ func (s *RedisBroadcaster) broadcastXrange(messages []rueidis.XRangeEntry) {
err := ackRes[0].Error()

if err != nil {
s.log.Errorf("Failed to ack message: %v", err)
s.log.Error("failed to ack message", "error", err)
}
}
}
Expand All @@ -281,10 +281,10 @@ func (s *RedisBroadcaster) maybeReconnect(done chan (error)) {

delay := utils.NextRetry(s.reconnectAttempt - 1)

s.log.Infof("Next Redis reconnect attempt in %s", delay)
s.log.Info(fmt.Sprintf("next Redis reconnect attempt in %s", delay))
time.Sleep(delay)

s.log.Infof("Reconnecting to Redis...")
s.log.Info("reconnecting to Redis...")

s.clientMu.Lock()

Expand Down
Loading

0 comments on commit f2d93da

Please sign in to comment.