diff --git a/CHANGELOG.md b/CHANGELOG.md index 55a71d83..84346553 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## master +- Added `--max-conn` option to limit simultaneous server connections. ([@skryukov][]) + - Added `data_sent_bytes_total` and `data_rcvd_bytes_total` metrics. ([@palkan][]) - Add `--allowed_origins` option to enable Origin check during the WebSocket upgrade. ([@skryukov][]) diff --git a/cli/cli.go b/cli/cli.go index 23986328..2b79a5be 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -57,6 +57,7 @@ func NewRunner(name string, config *config.Config) *Runner { // Set global HTTP params as early as possible to make sure all servers use them server.SSL = &config.SSL server.Host = config.Host + server.MaxConn = config.MaxConn return &Runner{name: name, config: config, shutdownables: []Shutdownable{}, errChan: make(chan error)} } diff --git a/cli/options.go b/cli/options.go index c35d42de..24e17cca 100644 --- a/cli/options.go +++ b/cli/options.go @@ -46,6 +46,7 @@ func init() { // Config vars fs.StringVar(&defaults.Host, "host", "localhost", "") fs.IntVar(&defaults.Port, "port", portDefault, "") + fs.IntVar(&defaults.MaxConn, "max-conn", 0, "") fs.StringVar(&defaults.Path, "path", "/cable", "") fs.StringVar(&defaults.HealthPath, "health-path", "/health", "") @@ -133,6 +134,7 @@ USAGE OPTIONS --host Server host, default: localhost, env: ANYCABLE_HOST --port Server port, default: 8080, env: ANYCABLE_PORT, PORT + --max-conn Limit simultaneous server connections (0 – without limit), default: 0, env: ANYCABLE_MAX_CONN --path WebSocket endpoint path, default: /cable, env: ANYCABLE_PATH --health-path HTTP health endpoint path, default: /health, env: ANYCABLE_HEALTH_PATH diff --git a/config/config.go b/config/config.go index ef5fea17..ab4faaf9 100644 --- a/config/config.go +++ b/config/config.go @@ -17,6 +17,7 @@ type Config struct { HTTPPubSub pubsub.HTTPConfig Host string Port int + MaxConn int BroadcastAdapter string Path string HealthPath string diff --git a/go.mod b/go.mod index af03aad5..b516364e 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/pkg/errors v0.8.1 // indirect github.com/stretchr/testify v1.7.0 github.com/syossan27/tebata v0.0.0-20180602121909-b283fe4bc5ba - golang.org/x/net v0.0.0-20200506145744-7e3656a0809f // indirect + golang.org/x/net v0.0.0-20200506145744-7e3656a0809f golang.org/x/sys v0.0.0-20210420205809-ac73e9fd8988 // indirect google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380 // indirect google.golang.org/grpc v1.29.1 diff --git a/metrics/metrics.go b/metrics/metrics.go index 6505ed25..c3a593b8 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -59,7 +59,7 @@ func FromConfig(config *Config) (*Metrics, error) { if config.HTTPEnabled() { if config.Host != "" && config.Host != server.Host { - srv, err := server.NewServer(config.Host, strconv.Itoa(config.Port), server.SSL) + srv, err := server.NewServer(config.Host, strconv.Itoa(config.Port), server.SSL, 0) if err != nil { return nil, err } diff --git a/server/server.go b/server/server.go index ce91adb4..622baf79 100644 --- a/server/server.go +++ b/server/server.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/apex/log" + "golang.org/x/net/netutil" ) // HTTPServer is wrapper over http.Server @@ -19,6 +20,7 @@ type HTTPServer struct { secured bool shutdown bool started bool + maxConn int mu sync.Mutex log *log.Entry @@ -31,12 +33,14 @@ var ( Host string = "localhost" // SSL is a default configuration for HTTP servers SSL *SSLConfig + // MaxConn is a default configuration for maximum connections + MaxConn int ) // ForPort creates new or returns the existing server for the specified port func ForPort(port string) (*HTTPServer, error) { if _, ok := allServers[port]; !ok { - server, err := NewServer(Host, port, SSL) + server, err := NewServer(Host, port, SSL, MaxConn) if err != nil { return nil, err } @@ -47,7 +51,7 @@ func ForPort(port string) (*HTTPServer, error) { } // NewServer builds HTTPServer from config params -func NewServer(host string, port string, ssl *SSLConfig) (*HTTPServer, error) { +func NewServer(host string, port string, ssl *SSLConfig, maxConn int) (*HTTPServer, error) { mux := http.NewServeMux() addr := net.JoinHostPort(host, port) @@ -72,6 +76,7 @@ func NewServer(host string, port string, ssl *SSLConfig) (*HTTPServer, error) { secured: secured, shutdown: false, started: false, + maxConn: maxConn, log: log.WithField("context", "http"), }, nil } @@ -87,11 +92,21 @@ func (s *HTTPServer) Start() error { s.started = true s.mu.Unlock() + ln, err := net.Listen("tcp", s.addr) + if err != nil { + return err + } + defer ln.Close() + + if s.maxConn > 0 { + ln = netutil.LimitListener(ln, s.maxConn) + } + if s.secured { - return s.server.ListenAndServeTLS("", "") + return s.server.ServeTLS(ln, "", "") } - return s.server.ListenAndServe() + return s.server.Serve(ln) } // StartAndAnnounce prints server info and starts server diff --git a/vendor/golang.org/x/net/netutil/listen.go b/vendor/golang.org/x/net/netutil/listen.go new file mode 100644 index 00000000..cee46e33 --- /dev/null +++ b/vendor/golang.org/x/net/netutil/listen.go @@ -0,0 +1,74 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package netutil provides network utility functions, complementing the more +// common ones in the net package. +package netutil // import "golang.org/x/net/netutil" + +import ( + "net" + "sync" +) + +// LimitListener returns a Listener that accepts at most n simultaneous +// connections from the provided Listener. +func LimitListener(l net.Listener, n int) net.Listener { + return &limitListener{ + Listener: l, + sem: make(chan struct{}, n), + done: make(chan struct{}), + } +} + +type limitListener struct { + net.Listener + sem chan struct{} + closeOnce sync.Once // ensures the done chan is only closed once + done chan struct{} // no values sent; closed when Close is called +} + +// acquire acquires the limiting semaphore. Returns true if successfully +// accquired, false if the listener is closed and the semaphore is not +// acquired. +func (l *limitListener) acquire() bool { + select { + case <-l.done: + return false + case l.sem <- struct{}{}: + return true + } +} +func (l *limitListener) release() { <-l.sem } + +func (l *limitListener) Accept() (net.Conn, error) { + acquired := l.acquire() + // If the semaphore isn't acquired because the listener was closed, expect + // that this call to accept won't block, but immediately return an error. + c, err := l.Listener.Accept() + if err != nil { + if acquired { + l.release() + } + return nil, err + } + return &limitListenerConn{Conn: c, release: l.release}, nil +} + +func (l *limitListener) Close() error { + err := l.Listener.Close() + l.closeOnce.Do(func() { close(l.done) }) + return err +} + +type limitListenerConn struct { + net.Conn + releaseOnce sync.Once + release func() +} + +func (l *limitListenerConn) Close() error { + err := l.Conn.Close() + l.releaseOnce.Do(l.release) + return err +} diff --git a/vendor/modules.txt b/vendor/modules.txt index a6c1093b..5998058a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -65,6 +65,7 @@ golang.org/x/net/http2 golang.org/x/net/http2/hpack golang.org/x/net/idna golang.org/x/net/internal/timeseries +golang.org/x/net/netutil golang.org/x/net/trace # golang.org/x/sys v0.0.0-20210420205809-ac73e9fd8988 ## explicit