From 575e4b58bbf810f74de4279614b1ce7a294681e3 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 29 Apr 2024 12:22:25 -0700 Subject: [PATCH] support HTTP based hperf --- CREDITS | 33 +++++++++ go.mod | 1 + go.sum | 2 + main.go | 223 +++++++++++++++++++++++++++++++++++++++++--------------- 4 files changed, 200 insertions(+), 59 deletions(-) diff --git a/CREDITS b/CREDITS index 5aacb9e..8a969f5 100644 --- a/CREDITS +++ b/CREDITS @@ -91,3 +91,36 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ================================================================ +golang.org/x/sys +https://golang.org/x/sys +---------------------------------------------------------------- +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +================================================================ + diff --git a/go.mod b/go.mod index 71a452a..d64896f 100644 --- a/go.mod +++ b/go.mod @@ -5,4 +5,5 @@ go 1.19 require ( github.com/dustin/go-humanize v1.0.1 github.com/google/uuid v1.4.0 + golang.org/x/sys v0.19.0 ) diff --git a/go.sum b/go.sum index 51512f1..f2964e9 100644 --- a/go.sum +++ b/go.sum @@ -2,3 +2,5 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/main.go b/main.go index 06c9386..8742a4d 100644 --- a/main.go +++ b/main.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2023 MinIO, Inc. +// Copyright (c) 2015-2024 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -17,29 +17,30 @@ package main import ( + "context" + "crypto/rand" "flag" "fmt" + "io" "log" "net" "net/http" "os" - "runtime" "strconv" "sync" "sync/atomic" + "syscall" "time" - humanize "github.com/dustin/go-humanize" + "github.com/dustin/go-humanize" "github.com/google/uuid" + "golang.org/x/sys/unix" ) var port = func() string { - p := os.Getenv("NPERF_PORT") + p := os.Getenv("HPERF_PORT") if p == "" { - p = os.Getenv("HPERF_PORT") - if p == "" { - p = "9999" - } + p = "9999" } return p }() @@ -58,8 +59,6 @@ var selfDetectPort = func() string { var uniqueStr = uuid.New().String() -var oneMB = 1024 * 1024 - var ( dataIn uint64 dataOut uint64 @@ -76,80 +75,182 @@ func printDataOut() { } } -func handleTX(conn net.Conn, b []byte) error { - defer conn.Close() - for { - n, err := conn.Write(b) - if err != nil { - log.Println("TX-Error", conn, err) - return err - } - atomic.AddUint64(&dataOut, uint64(n)) +// Discard is just like io.Discard without the io.ReaderFrom compatible +// implementation which is buggy on NUMA systems, we have to use a simpler +// io.Writer implementation alone avoids also unnecessary buffer copies, +// and as such incurred latencies. +var Discard io.Writer = discard{} + +// discard is /dev/null for Golang. +type discard struct{} + +func (discard) Write(p []byte) (int, error) { + atomic.AddUint64(&dataIn, uint64(len(p))) + return len(p), nil +} + +func runServer(host string) { + http.HandleFunc("/devnull", func(w http.ResponseWriter, r *http.Request) { + buf := make([]byte, 1*humanize.MiByte) + io.CopyBuffer(Discard, r.Body, buf) + }) + s := &http.Server{ + Addr: net.JoinHostPort(host, port), + MaxHeaderBytes: 1 << 20, } + s.ListenAndServe() } -func handleRX(conn net.Conn) { - defer conn.Close() - b := make([]byte, oneMB) - for { - n, err := conn.Read(b) - if err != nil { - log.Println("RX-Error", conn, err) - return - } - atomic.AddUint64(&dataIn, uint64(n)) +// DialContext is a function to make custom Dial for internode communications +type DialContext func(ctx context.Context, network, address string) (net.Conn, error) + +func setTCPParametersFn() func(network, address string, c syscall.RawConn) error { + return func(network, address string, c syscall.RawConn) error { + c.Control(func(fdPtr uintptr) { + // got socket file descriptor to set parameters. + fd := int(fdPtr) + + _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1) + + _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + + { + // Enable big buffers + _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_SNDBUF, 8*humanize.MiByte) + + _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_RCVBUF, 8*humanize.MiByte) + } + + // Enable TCP open + // https://lwn.net/Articles/508865/ - 32k queue size. + _ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_FASTOPEN, 32*1024) + + // Enable TCP fast connect + // TCPFastOpenConnect sets the underlying socket to use + // the TCP fast open connect. This feature is supported + // since Linux 4.11. + _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_FASTOPEN_CONNECT, 1) + + // Enable TCP quick ACK, John Nagle says + // "Set TCP_QUICKACK. If you find a case where that makes things worse, let me know." + _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_QUICKACK, 1) + + /// Enable keep-alive + { + _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1) + + // The time (in seconds) the connection needs to remain idle before + // TCP starts sending keepalive probes + _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, 15) + + // Number of probes. + // ~ cat /proc/sys/net/ipv4/tcp_keepalive_probes (defaults to 9, we reduce it to 5) + _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, 5) + + // Wait time after successful probe in seconds. + // ~ cat /proc/sys/net/ipv4/tcp_keepalive_intvl (defaults to 75 secs, we reduce it to 15 secs) + _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, 15) + } + }) + return nil } } -func runServer() { - l, err := net.Listen("tcp", net.JoinHostPort("", port)) - if err != nil { - log.Fatal(err) +// NewInternodeDialContext setups a custom dialer for internode communication +func NewInternodeDialContext(dialTimeout time.Duration) DialContext { + d := &net.Dialer{ + Timeout: dialTimeout, + Control: setTCPParametersFn(), } - defer l.Close() - for { - // Listen for an incoming connection. - conn, err := l.Accept() - if err != nil { - log.Fatal(err) - } - // Handle connections in a new goroutine. - go handleRX(conn) + return func(ctx context.Context, network, addr string) (net.Conn, error) { + return d.DialContext(ctx, network, addr) + } +} + +// Reader to read random data. +type netperfReader struct { + doneCh <-chan struct{} + buf []byte +} + +func (m *netperfReader) Read(b []byte) (int, error) { + select { + case <-m.doneCh: + return 0, io.EOF + default: } + n := copy(b, m.buf) + atomic.AddUint64(&dataOut, uint64(n)) + return n, nil } func runClient(host string) { - host = host + ":" + port - b := make([]byte, oneMB) - proc := runtime.GOMAXPROCS(0) - if proc < 16 { - proc = 16 // 16 TCP connections is more than enough to saturate a 100G link. + host = net.JoinHostPort(host, port) + proc := 32 + + // For more details about various values used here refer + // https://golang.org/pkg/net/http/#Transport documentation + tr := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: NewInternodeDialContext(10 * time.Second), + MaxIdleConnsPerHost: 1024, + WriteBufferSize: 64 << 10, // 64KiB moving up from 4KiB default + ReadBufferSize: 64 << 10, // 64KiB moving up from 4KiB default + IdleConnTimeout: 15 * time.Second, + ResponseHeaderTimeout: 15 * time.Minute, // Conservative timeout is the default (for MinIO internode) + TLSHandshakeTimeout: 10 * time.Second, + // Go net/http automatically unzip if content-type is + // gzip disable this feature, as we are always interested + // in raw stream. + DisableCompression: true, + } + + clnt := &http.Client{ + Transport: tr, } + + ctx, cancel := context.WithCancel(context.Background()) + r := &netperfReader{doneCh: ctx.Done()} + r.buf = make([]byte, 1*humanize.MiByte) + rand.Read(r.buf) + var wg sync.WaitGroup for i := 0; i < proc; i++ { wg.Add(1) go func() { defer wg.Done() - var conn net.Conn - var err error + // Establish the connection. for { - conn, err = net.Dial("tcp", host) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, "http://"+host+"/devnull", nil) if err != nil { - log.Println("Dial-Error", conn, err) + log.Println("Client-Error-New", err) + time.Sleep(dialTimeout) + continue + } + req.Body = io.NopCloser(r) + req.ContentLength = -1 + + resp, err := clnt.Do(req) + if err != nil { + log.Println("Client-Error-Do", err) + time.Sleep(dialTimeout) + continue + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Println("Client-Error-Response", resp.Status) time.Sleep(dialTimeout) continue - } else { - break } - } - // Use the connection. - if err := handleTX(conn, b); err != nil { - panic(err) } }() } + wg.Wait() + cancel() } func main() { @@ -178,17 +279,21 @@ func main() { log.Println("Starting HTTP service to skip self.. waiting for 10secs for services to be ready") time.Sleep(time.Second * 10) - go runServer() - go printDataOut() + var serverRunningOnce sync.Once for host := range hostMap { resp, err := http.Get("http://" + host + ":" + selfDetectPort + "/" + uniqueStr) if err == nil && resp.StatusCode == http.StatusOK { resp.Body.Close() // close the connection. s.Close() // close the server as we are done. log.Println("HTTP service closed after successful skip...") + serverRunningOnce.Do(func() { + go runServer(host) + }) continue } go runClient(host) } + + go printDataOut() time.Sleep(time.Hour * 72) }