From 1b815a278d144987b6e8c2b776415189a2fb016f Mon Sep 17 00:00:00 2001 From: merlin Date: Sat, 12 Nov 2022 14:10:09 +0300 Subject: [PATCH 1/5] add limited listener --- gemax/server.go | 28 ++++++++++++++++++++++++++++ go.mod | 2 ++ go.sum | 2 ++ 3 files changed, 32 insertions(+) create mode 100644 go.sum diff --git a/gemax/server.go b/gemax/server.go index 688ec56..14b563d 100644 --- a/gemax/server.go +++ b/gemax/server.go @@ -10,8 +10,12 @@ import ( "sync" "github.com/ninedraft/gemax/gemax/status" + "golang.org/x/net/netutil" ) +// DefaultMaxConnections default number of maximum connections. +const DefaultMaxConnections = 256 + // Handler describes a gemini protocol handler. type Handler func(ctx context.Context, rw ResponseWriter, req IncomingRequest) @@ -25,6 +29,11 @@ type Server struct { ConnContext func(ctx context.Context, conn net.Conn) context.Context Logf func(format string, args ...interface{}) + // Maximum number of simultaneous connections served by Server. + // 0 - DefaultMaxConnections + // <0 - no limitation + MaxConnections int + mu sync.RWMutex conns map[*connTrack]struct{} listeners map[net.Listener]struct{} @@ -47,10 +56,18 @@ func (server *Server) ListenAndServe(ctx context.Context, tlsCfg *tls.Config) er ctx, cancel := context.WithCancel(ctx) defer cancel() var lc = net.ListenConfig{} + var tcpListener, errListener = lc.Listen(ctx, "tcp", server.Addr) if errListener != nil { return fmt.Errorf("creating listener: %w", errListener) } + + if n := server.maxConnections(); n >= 0 { + var limited = netutil.LimitListener(tcpListener, n) + server.addListener(limited) + tcpListener = limited + } + var listener = tls.NewListener(tcpListener, tlsCfg) go func() { <-ctx.Done() @@ -78,6 +95,17 @@ func (server *Server) Serve(ctx context.Context, listener net.Listener) error { } } +func (server *Server) maxConnections() int { + switch { + case server.MaxConnections > 0: + return server.MaxConnections + case server.MaxConnections == 0: + return DefaultMaxConnections + default: + return -1 + } +} + // Stop gracefully shuts down the server: closes all connections. func (server *Server) Stop() { server.closeAll() diff --git a/go.mod b/go.mod index f868898..95faf56 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module github.com/ninedraft/gemax go 1.19 + +require golang.org/x/net v0.2.0 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2195436 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +golang.org/x/net v0.2.0 h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU= +golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= From b0f007afc26a7ac8f1a34e1df96e03653f1bcb0f Mon Sep 17 00:00:00 2001 From: merlin Date: Sat, 12 Nov 2022 17:22:06 +0300 Subject: [PATCH 2/5] log error on bad request --- gemax/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gemax/server.go b/gemax/server.go index 14b563d..489c2e6 100644 --- a/gemax/server.go +++ b/gemax/server.go @@ -146,7 +146,7 @@ func (server *Server) handle(ctx context.Context, conn net.Conn) { code = status.BadRequest } if errParseReq != nil { - server.logf("WARN: bad request: remote_ip=%s, code=%s", conn.RemoteAddr(), code) + server.logf("WARN: bad request: remote_ip=%s, code=%s: %v", conn.RemoteAddr(), code, errParseReq) rw.WriteStatus(code, status.Text(code)) return } From 3d02558138e32b38f1f4a8c4cd82c9e7d0335b2a Mon Sep 17 00:00:00 2001 From: merlin Date: Sat, 12 Nov 2022 17:23:28 +0300 Subject: [PATCH 3/5] add tests for limited listener --- gemax/server_test.go | 81 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/gemax/server_test.go b/gemax/server_test.go index 2e52f8b..a354ea4 100644 --- a/gemax/server_test.go +++ b/gemax/server_test.go @@ -8,6 +8,8 @@ import ( "io" "net" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -52,6 +54,7 @@ func TestServerBadRequest(test *testing.T) { } func TestServerInvalidHost(test *testing.T) { + test.Parallel() var listener, server = setupEchoServer(test) server.Hosts = []string{"example.com"} defer func() { _ = listener.Close() }() @@ -70,6 +73,7 @@ func TestServerInvalidHost(test *testing.T) { } func TestServerCancelListen(test *testing.T) { + test.Parallel() var server = &gemax.Server{ Addr: testaddr.Addr(), Logf: test.Logf, @@ -99,6 +103,7 @@ func TestServerCancelListen(test *testing.T) { } func TestListenAndServe(test *testing.T) { + test.Parallel() var server = &gemax.Server{ Addr: "localhost:40423", Logf: test.Logf, @@ -139,6 +144,82 @@ func TestListenAndServe(test *testing.T) { test.Logf("%s / %v", data, errRead) } +func TestLimitedListen(test *testing.T) { + test.Parallel() + var trigger = make(chan struct{}) + var counter atomic.Int64 + + var server = &gemax.Server{ + Addr: testaddr.Addr(), + Logf: test.Logf, + MaxConnections: 2, + Handler: func(_ context.Context, rw gemax.ResponseWriter, _ gemax.IncomingRequest) { + counter.Add(1) + <-trigger + _, _ = io.WriteString(rw, "example text") + }, + } + test.Logf("loading test certs") + var cert, errCert = tls.LoadX509KeyPair("testdata/cert.pem", "testdata/key.pem") + if errCert != nil { + test.Fatal(errCert) + } + var cfg = &tls.Config{ + MinVersion: tls.VersionTLS12, + Certificates: []tls.Certificate{cert}, + } + var ctx, cancel = context.WithCancel(context.Background()) + test.Cleanup(cancel) + test.Logf("starting test server") + + var wg = sync.WaitGroup{} + defer wg.Wait() + + wg.Add(1) + go func() { + defer wg.Done() + test.Logf("test server: listening on %q", server.Addr) + var err = server.ListenAndServe(ctx, cfg) + switch { + case err == nil, errors.Is(err, net.ErrClosed): + return + default: + test.Errorf("test server: listening: %v", err) + } + }() + time.Sleep(time.Second) + + var client = &gemax.Client{} + + wg.Add(2 * server.MaxConnections) + for i := 0; i < 2*server.MaxConnections; i++ { + go func() { + defer wg.Done() + var resp, errFetch = client.Fetch(ctx, "gemini://"+server.Addr) + switch { + case errFetch == nil: + // pass + case errors.Is(errFetch, context.Canceled): + return + default: + test.Error("fetching: ", errFetch) + return + } + defer func() { _ = resp.Close() }() + expectResponse(test, resp, "example text") + var data, errRead = io.ReadAll(resp) + test.Logf("%s / %v", data, errRead) + }() + } + + time.Sleep(time.Second) + if counter.Load() > int64(server.MaxConnections) { + test.Errorf("number of simultaneous connections must not exceed %d", server.MaxConnections) + } + cancel() + close(trigger) +} + // emulates michael-lazar/gemini-diagnostics localhost $PORT --checks='URLDotEscape' func TestURLDotEscape(test *testing.T) { var listener, server = setupEchoServer(test) From a83a2cad72d585b728017b4add50101ee37c9506 Mon Sep 17 00:00:00 2001 From: merlin Date: Sat, 12 Nov 2022 17:24:21 +0300 Subject: [PATCH 4/5] await all handlers to end --- gemax/server.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/gemax/server.go b/gemax/server.go index 489c2e6..5ad8d3b 100644 --- a/gemax/server.go +++ b/gemax/server.go @@ -51,6 +51,9 @@ func (server *Server) init() { } // ListenAndServe starts a TLS gemini server at specified server. +// It will block until context is canceled. +// It respects the MaxConnections setting. +// It will await all running handlers to end. func (server *Server) ListenAndServe(ctx context.Context, tlsCfg *tls.Config) error { server.init() ctx, cancel := context.WithCancel(ctx) @@ -79,16 +82,21 @@ func (server *Server) ListenAndServe(ctx context.Context, tlsCfg *tls.Config) er } // Serve starts server on provided listener. Provided context will be passed to handlers. +// Serve will await all running handlers to end. func (server *Server) Serve(ctx context.Context, listener net.Listener) error { server.init() server.addListener(listener) + var wg sync.WaitGroup for { var conn, errAccept = listener.Accept() if errAccept != nil { + wg.Wait() return fmt.Errorf("gemini server: %w", errAccept) } var track = server.addConn(conn) + wg.Add(1) go func() { + defer wg.Done() defer server.removeTrack(track) server.handle(ctx, conn) }() From 9400f5e6eb7216ada3311afe5af3c5867a8141a8 Mon Sep 17 00:00:00 2001 From: merlin Date: Sat, 12 Nov 2022 17:27:02 +0300 Subject: [PATCH 5/5] drop tests for go 1.18 --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 910f2f4..9d06760 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,7 +6,7 @@ jobs: test: strategy: matrix: - go-version: [1.18.x, 1.19.*] + go-version: [1.19.*] os: [ubuntu-latest, macos-latest, windows-latest] runs-on: ${{ matrix.os }} steps: