Skip to content

Commit

Permalink
refactor: move configuration settings under proper namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Oct 7, 2024
1 parent 455516a commit 87217bb
Show file tree
Hide file tree
Showing 13 changed files with 179 additions and 148 deletions.
2 changes: 2 additions & 0 deletions broker/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package broker

type Config struct {
// Adapter name
Adapter string
// For how long to keep history in seconds
HistoryTTL int64
// Max size of messages to keep in the history per stream
Expand Down
26 changes: 13 additions & 13 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (r *Runner) checkAndSetDefaults() error {
}

if r.log == nil {
_, err := logger.InitLogger(r.config.LogFormat, r.config.LogLevel)
_, err := logger.InitLogger(r.config.Log.LogFormat, r.config.Log.LogLevel)
if err != nil {
return errorx.Decorate(err, "failed to initialize default logger")
}
Expand All @@ -113,9 +113,9 @@ func (r *Runner) checkAndSetDefaults() error {
return errorx.Decorate(err, "failed to load configuration presets")
}

server.SSL = &r.config.SSL
server.Host = r.config.Host
server.MaxConn = r.config.MaxConn
server.SSL = &r.config.Server.SSL
server.Host = r.config.Server.Host
server.MaxConn = r.config.Server.MaxConn
server.Logger = r.log

if r.name == "" {
Expand Down Expand Up @@ -172,17 +172,17 @@ func (r *Runner) Run() error {
return err
}

wsServer, err := server.ForPort(strconv.Itoa(r.config.Port))
wsServer, err := server.ForPort(strconv.Itoa(r.config.Server.Port))
if err != nil {
return errorx.Decorate(err, "failed to initialize WebSocket server at %s:%d", r.config.Host, r.config.Port)
return errorx.Decorate(err, "failed to initialize WebSocket server at %s:%d", r.config.Server.Host, r.config.Server.Port)
}

wsHandler, err := r.websocketHandlerFactory(appNode, r.config, r.log)
if err != nil {
return errorx.Decorate(err, "failed to initialize WebSocket handler")
}

for _, path := range r.config.Path {
for _, path := range r.config.WS.Paths {
wsServer.SetupHandler(path, wsHandler)
r.log.Info(fmt.Sprintf("Handle WebSocket connections at %s%s", wsServer.Address(), path))
}
Expand All @@ -195,8 +195,8 @@ func (r *Runner) Run() error {
wsServer.SetupHandler(path, handler)
}

wsServer.SetupHandler(r.config.HealthPath, http.HandlerFunc(server.HealthHandler))
r.log.Info(fmt.Sprintf("Handle health requests at %s%s", wsServer.Address(), r.config.HealthPath))
wsServer.SetupHandler(r.config.Server.HealthPath, http.HandlerFunc(server.HealthHandler))
r.log.Info(fmt.Sprintf("Handle health requests at %s%s", wsServer.Address(), r.config.Server.HealthPath))

if r.config.SSE.Enabled {
r.log.Info(
Expand Down Expand Up @@ -279,7 +279,7 @@ func (r *Runner) runNode() (*node.Node, error) {
go disconnector.Run() // nolint:errcheck
appNode.SetDisconnector(disconnector)

if r.config.EmbedNats {
if r.config.EmbeddedNats.Enabled {
service, enatsErr := r.embedNATS(&r.config.EmbeddedNats)

if enatsErr != nil {
Expand Down Expand Up @@ -354,7 +354,7 @@ func (r *Runner) setMaxProcs() int {
}

func (r *Runner) announceDebugMode() {
if r.config.Debug {
if r.config.Log.Debug {
r.log.Debug("🔧 🔧 🔧 Debug mode is on 🔧 🔧 🔧")
}
}
Expand Down Expand Up @@ -434,7 +434,7 @@ func (r *Runner) defaultDisconnector(n *node.Node, c *config.Config, l *slog.Log
}

func (r *Runner) defaultWebSocketHandler(n *node.Node, c *config.Config, l *slog.Logger) (http.Handler, error) {
extractor := server.DefaultHeadersExtractor{Headers: c.Headers, Cookies: c.Cookies}
extractor := server.DefaultHeadersExtractor{Headers: c.RPC.ProxyHeaders, Cookies: c.RPC.ProxyCookies}
return ws.WebsocketHandler(common.ActionCableProtocols(), &extractor, &c.WS, r.log, func(wsc *websocket.Conn, info *server.RequestInfo, callback func()) error {
wrappedConn := ws.NewConnection(wsc)

Expand All @@ -457,7 +457,7 @@ func (r *Runner) defaultWebSocketHandler(n *node.Node, c *config.Config, l *slog
}

func (r *Runner) defaultSSEHandler(n *node.Node, ctx context.Context, c *config.Config) (http.Handler, error) {
extractor := server.DefaultHeadersExtractor{Headers: c.Headers, Cookies: c.Cookies}
extractor := server.DefaultHeadersExtractor{Headers: c.RPC.ProxyHeaders, Cookies: c.RPC.ProxyCookies}
handler := sse.SSEHandler(n, ctx, &extractor, &c.SSE, r.log)

return handler, nil
Expand Down
87 changes: 46 additions & 41 deletions cli/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ func WithCLICustomOptions(factory customOptionsFactory) cliOption {
}

// NewConfigFromCLI reads config from os.Args. It returns config, error (if any) and a bool value
// indicating that the usage message or version was shown, no further action required.
// indicating that the execution was interrupted (e.g., usage message or version was shown), no further action required.
func NewConfigFromCLI(args []string, opts ...cliOption) (*config.Config, error, bool) {
c := config.NewConfig()

var path, headers, cookieFilter, mtags string
var helpOrVersionWereShown = true
var broadcastAdapters string
var cliInterrupted = true
var metricsFilter string
var enatsRoutes, enatsGateways string
var presets string
Expand All @@ -64,17 +65,16 @@ func NewConfigFromCLI(args []string, opts ...cliOption) (*config.Config, error,
var jwtIdKey, jwtIdParam string
var jwtIdEnforce bool
var noRPC bool
var isPublic bool

// Print raw version without prefix
cli.VersionPrinter = func(cCtx *cli.Context) {
_, _ = fmt.Fprintf(cCtx.App.Writer, "%v\n", cCtx.App.Version)
}

flags := []cli.Flag{}
flags = append(flags, serverCLIFlags(&c, &path, &isPublic)...)
flags = append(flags, serverCLIFlags(&c, &path)...)
flags = append(flags, sslCLIFlags(&c)...)
flags = append(flags, broadcastCLIFlags(&c)...)
flags = append(flags, broadcastCLIFlags(&c, &broadcastAdapters)...)
flags = append(flags, brokerCLIFlags(&c)...)
flags = append(flags, redisCLIFlags(&c)...)
flags = append(flags, httpBroadcastCLIFlags(&c)...)
Expand All @@ -99,7 +99,7 @@ func NewConfigFromCLI(args []string, opts ...cliOption) (*config.Config, error,
HideHelpCommand: true,
Flags: flags,
Action: func(nc *cli.Context) error {
helpOrVersionWereShown = false
cliInterrupted = false
return nil
},
}
Expand All @@ -121,26 +121,30 @@ func NewConfigFromCLI(args []string, opts ...cliOption) (*config.Config, error,
//
// Unfortunately, cli module does not support another way of detecting if or which
// command was run.
if helpOrVersionWereShown {
if cliInterrupted {
return &config.Config{}, nil, true
}

if broadcastAdapters != "" {
c.BroadcastAdapters = strings.Split(broadcastAdapters, ",")
}

if path != "" {
c.Path = strings.Split(path, ",")
c.WS.Paths = strings.Split(path, ",")
}

c.Headers = strings.Split(strings.ToLower(headers), ",")
c.RPC.ProxyHeaders = strings.Split(strings.ToLower(headers), ",")

if len(cookieFilter) > 0 {
c.Cookies = strings.Split(cookieFilter, ",")
c.RPC.ProxyCookies = strings.Split(cookieFilter, ",")
}

if c.Debug {
c.LogLevel = "debug"
if c.Log.Debug {
c.Log.LogLevel = "debug"
}

if c.Metrics.Port == 0 {
c.Metrics.Port = c.Port
c.Metrics.Port = c.Server.Port
}

if mtags != "" {
Expand Down Expand Up @@ -174,7 +178,7 @@ Use metrics_rotate_interval instead.`)
}

// Automatically set the URL of the embedded NATS as the pub/sub server URL
if c.EmbedNats && c.NATS.Servers == nats.DefaultURL {
if c.EmbeddedNats.Enabled && c.NATS.Servers == nats.DefaultURL {
c.NATS.Servers = c.EmbeddedNats.ServiceAddr
}

Expand Down Expand Up @@ -310,14 +314,14 @@ Use broadcast_key instead.`)
// Configure default HTTP port
if c.HTTPBroadcast.Port == 0 {
if c.HTTPBroadcast.IsSecured() {
c.HTTPBroadcast.Port = c.Port
c.HTTPBroadcast.Port = c.Server.Port
} else {
c.HTTPBroadcast.Port = 8090
}
}

// Configure public mode and other insecure features
if isPublic {
if c.PublicMode {
c.SkipAuth = true
c.Streams.Public = true
// Ensure broadcasting is also public
Expand Down Expand Up @@ -370,21 +374,21 @@ var (
)

// serverCLIFlags returns base server flags
func serverCLIFlags(c *config.Config, path *string, isPublic *bool) []cli.Flag {
func serverCLIFlags(c *config.Config, path *string) []cli.Flag {
return withDefaults(serverCategoryDescription, []cli.Flag{
&cli.StringFlag{
Name: "host",
Value: c.Host,
Value: c.Server.Host,
Usage: "Server host",
Destination: &c.Host,
Destination: &c.Server.Host,
},

&cli.IntFlag{
Name: "port",
Value: c.Port,
Value: c.Server.Port,
Usage: "Server port",
EnvVars: []string{envPrefix + "PORT", "PORT"},
Destination: &c.Port,
Destination: &c.Server.Port,
},

&cli.StringFlag{
Expand All @@ -404,7 +408,8 @@ func serverCLIFlags(c *config.Config, path *string, isPublic *bool) []cli.Flag {
&cli.BoolFlag{
Name: "public",
Usage: "[DANGER ZONE] Run server in the public mode allowing all connections and stream subscriptions",
Destination: isPublic,
Value: c.PublicMode,
Destination: &c.PublicMode,
},

&cli.BoolFlag{
Expand All @@ -417,21 +422,21 @@ func serverCLIFlags(c *config.Config, path *string, isPublic *bool) []cli.Flag {
&cli.IntFlag{
Name: "max-conn",
Usage: "Limit simultaneous server connections (0 – without limit)",
Destination: &c.MaxConn,
Destination: &c.Server.MaxConn,
},

&cli.StringFlag{
Name: "path",
Value: strings.Join(c.Path, ","),
Value: strings.Join(c.WS.Paths, ","),
Usage: "WebSocket endpoint path (you can specify multiple paths using comma as separator)",
Destination: path,
},

&cli.StringFlag{
Name: "health-path",
Value: c.HealthPath,
Value: c.Server.HealthPath,
Usage: "HTTP health endpoint path",
Destination: &c.HealthPath,
Destination: &c.Server.HealthPath,
},

&cli.IntFlag{
Expand All @@ -457,31 +462,31 @@ func sslCLIFlags(c *config.Config) []cli.Flag {
&cli.PathFlag{
Name: "ssl_cert",
Usage: "SSL certificate path",
Destination: &c.SSL.CertPath,
Destination: &c.Server.SSL.CertPath,
},

&cli.PathFlag{
Name: "ssl_key",
Usage: "SSL private key path",
Destination: &c.SSL.KeyPath,
Destination: &c.Server.SSL.KeyPath,
},
})
}

// broadcastCLIFlags returns broadcast_adapter flag
func broadcastCLIFlags(c *config.Config) []cli.Flag {
func broadcastCLIFlags(c *config.Config, adapters *string) []cli.Flag {
return withDefaults(broadcastCategoryDescription, []cli.Flag{
&cli.StringFlag{
Name: "broadcast_adapter",
Usage: "Broadcasting adapter to use (http, redisx, redis or nats). You can specify multiple at once via a comma-separated list",
Value: c.BroadcastAdapter,
Destination: &c.BroadcastAdapter,
Value: strings.Join(c.BroadcastAdapters, ","),
Destination: adapters,
},
&cli.StringFlag{
Name: "broker",
Usage: "Broker engine to use (memory)",
Value: c.BrokerAdapter,
Destination: &c.BrokerAdapter,
Value: c.Broker.Adapter,
Destination: &c.Broker.Adapter,
},
&cli.StringFlag{
Name: "pubsub",
Expand Down Expand Up @@ -644,8 +649,8 @@ func embeddedNatsCLIFlags(c *config.Config, routes *string, gateways *string) []
&cli.BoolFlag{
Name: "embed_nats",
Usage: "Enable embedded NATS server and use it for pub/sub",
Value: c.EmbedNats,
Destination: &c.EmbedNats,
Value: c.EmbeddedNats.Enabled,
Destination: &c.EmbeddedNats.Enabled,
},

&cli.StringFlag{
Expand Down Expand Up @@ -793,7 +798,7 @@ func rpcCLIFlags(c *config.Config, headers, cookieFilter *string, isNone *bool)
&cli.StringFlag{
Name: "headers",
Usage: "List of headers to proxy to RPC",
Value: strings.Join(c.Headers, ","),
Value: strings.Join(c.RPC.ProxyHeaders, ","),
Destination: headers,
},

Expand Down Expand Up @@ -877,21 +882,21 @@ func logCLIFlags(c *config.Config) []cli.Flag {
&cli.StringFlag{
Name: "log_level",
Usage: "Set logging level (debug/info/warn/error)",
Value: c.LogLevel,
Destination: &c.LogLevel,
Value: c.Log.LogLevel,
Destination: &c.Log.LogLevel,
},

&cli.StringFlag{
Name: "log_format",
Usage: "Set logging format (text/json)",
Value: c.LogFormat,
Destination: &c.LogFormat,
Value: c.Log.LogFormat,
Destination: &c.Log.LogFormat,
},

&cli.BoolFlag{
Name: "debug",
Usage: "Enable debug mode (more verbose logging)",
Destination: &c.Debug,
Destination: &c.Log.Debug,
},
})
}
Expand Down
9 changes: 4 additions & 5 deletions cli/runner_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cli
import (
"log/slog"
"os"
"strings"

"github.com/anycable/anycable-go/broadcast"
"github.com/anycable/anycable-go/broker"
Expand Down Expand Up @@ -68,7 +67,7 @@ func WithBroadcasters(fn broadcastersFactory) Option {
func WithDefaultBroadcaster() Option {
return WithBroadcasters(func(h broadcast.Handler, c *config.Config, l *slog.Logger) ([]broadcast.Broadcaster, error) {
broadcasters := []broadcast.Broadcaster{}
adapters := strings.Split(c.BroadcastAdapter, ",")
adapters := c.BroadcastAdapters

for _, adapter := range adapters {
switch adapter {
Expand Down Expand Up @@ -159,11 +158,11 @@ func WithWebSocketEndpoint(path string, fn websocketHandler) Option {
// WithDefaultBroker is an Option to set Runner broker to default broker from config
func WithDefaultBroker() Option {
return WithBroker(func(br broker.Broadcaster, c *config.Config, l *slog.Logger) (broker.Broker, error) {
if c.BrokerAdapter == "" {
if c.Broker.Adapter == "" {
return broker.NewLegacyBroker(br), nil
}

switch c.BrokerAdapter {
switch c.Broker.Adapter {
case "memory":
b := broker.NewMemoryBroker(br, &c.Broker)
return b, nil
Expand All @@ -175,7 +174,7 @@ func WithDefaultBroker() Option {
b := broker.NewNATSBroker(br, &c.Broker, &c.NATS, l)
return b, nil
default:
return nil, errorx.IllegalArgument.New("Unsupported broker adapter: %s", c.BrokerAdapter)
return nil, errorx.IllegalArgument.New("Unsupported broker adapter: %s", c.Broker.Adapter)
}
})
}
Expand Down
Loading

0 comments on commit 87217bb

Please sign in to comment.