Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support HTTP based hperf #8

Merged
merged 1 commit into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/vulncheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [ 1.21.4 ]
go-version: [ 1.21.9, 1.22.2 ]
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v3
Expand Down
33 changes: 33 additions & 0 deletions CREDITS
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,36 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

================================================================

golang.org/x/sys
https://golang.org/x/sys
----------------------------------------------------------------
Copyright (c) 2009 The Go Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

================================================================

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ go 1.19
require (
github.com/dustin/go-humanize v1.0.1
github.com/google/uuid v1.4.0
golang.org/x/sys v0.19.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
223 changes: 164 additions & 59 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2015-2023 MinIO, Inc.
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
Expand All @@ -17,29 +17,30 @@
package main

import (
"context"
"crypto/rand"
"flag"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"runtime"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"

humanize "github.com/dustin/go-humanize"
"github.com/dustin/go-humanize"
"github.com/google/uuid"
"golang.org/x/sys/unix"
)

var port = func() string {
p := os.Getenv("NPERF_PORT")
p := os.Getenv("HPERF_PORT")
if p == "" {
p = os.Getenv("HPERF_PORT")
if p == "" {
p = "9999"
}
p = "9999"
}
return p
}()
Expand All @@ -58,8 +59,6 @@ var selfDetectPort = func() string {

var uniqueStr = uuid.New().String()

var oneMB = 1024 * 1024

var (
dataIn uint64
dataOut uint64
Expand All @@ -76,80 +75,182 @@ func printDataOut() {
}
}

func handleTX(conn net.Conn, b []byte) error {
defer conn.Close()
for {
n, err := conn.Write(b)
if err != nil {
log.Println("TX-Error", conn, err)
return err
}
atomic.AddUint64(&dataOut, uint64(n))
// Discard is just like io.Discard without the io.ReaderFrom compatible
// implementation which is buggy on NUMA systems, we have to use a simpler
// io.Writer implementation alone avoids also unnecessary buffer copies,
// and as such incurred latencies.
var Discard io.Writer = discard{}

// discard is /dev/null for Golang.
type discard struct{}

func (discard) Write(p []byte) (int, error) {
atomic.AddUint64(&dataIn, uint64(len(p)))
return len(p), nil
}

func runServer(host string) {
http.HandleFunc("/devnull", func(w http.ResponseWriter, r *http.Request) {
buf := make([]byte, 1*humanize.MiByte)
io.CopyBuffer(Discard, r.Body, buf)
})
s := &http.Server{
Addr: net.JoinHostPort(host, port),
MaxHeaderBytes: 1 << 20,
}
s.ListenAndServe()
}

func handleRX(conn net.Conn) {
defer conn.Close()
b := make([]byte, oneMB)
for {
n, err := conn.Read(b)
if err != nil {
log.Println("RX-Error", conn, err)
return
}
atomic.AddUint64(&dataIn, uint64(n))
// DialContext is a function to make custom Dial for internode communications
type DialContext func(ctx context.Context, network, address string) (net.Conn, error)

func setTCPParametersFn() func(network, address string, c syscall.RawConn) error {
return func(network, address string, c syscall.RawConn) error {
c.Control(func(fdPtr uintptr) {
// got socket file descriptor to set parameters.
fd := int(fdPtr)

_ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)

_ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)

{
// Enable big buffers
_ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_SNDBUF, 8*humanize.MiByte)

_ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_RCVBUF, 8*humanize.MiByte)
}

// Enable TCP open
// https://lwn.net/Articles/508865/ - 32k queue size.
_ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_FASTOPEN, 32*1024)

// Enable TCP fast connect
// TCPFastOpenConnect sets the underlying socket to use
// the TCP fast open connect. This feature is supported
// since Linux 4.11.
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_FASTOPEN_CONNECT, 1)

// Enable TCP quick ACK, John Nagle says
// "Set TCP_QUICKACK. If you find a case where that makes things worse, let me know."
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_QUICKACK, 1)

/// Enable keep-alive
{
_ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)

// The time (in seconds) the connection needs to remain idle before
// TCP starts sending keepalive probes
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, 15)

// Number of probes.
// ~ cat /proc/sys/net/ipv4/tcp_keepalive_probes (defaults to 9, we reduce it to 5)
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, 5)

// Wait time after successful probe in seconds.
// ~ cat /proc/sys/net/ipv4/tcp_keepalive_intvl (defaults to 75 secs, we reduce it to 15 secs)
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, 15)
}
})
return nil
}
}

func runServer() {
l, err := net.Listen("tcp", net.JoinHostPort("", port))
if err != nil {
log.Fatal(err)
// NewInternodeDialContext setups a custom dialer for internode communication
func NewInternodeDialContext(dialTimeout time.Duration) DialContext {
d := &net.Dialer{
Timeout: dialTimeout,
Control: setTCPParametersFn(),
}
defer l.Close()
for {
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
// Handle connections in a new goroutine.
go handleRX(conn)
return func(ctx context.Context, network, addr string) (net.Conn, error) {
return d.DialContext(ctx, network, addr)
}
}

// Reader to read random data.
type netperfReader struct {
doneCh <-chan struct{}
buf []byte
}

func (m *netperfReader) Read(b []byte) (int, error) {
select {
case <-m.doneCh:
return 0, io.EOF
default:
}
n := copy(b, m.buf)
atomic.AddUint64(&dataOut, uint64(n))
return n, nil
}

func runClient(host string) {
host = host + ":" + port
b := make([]byte, oneMB)
proc := runtime.GOMAXPROCS(0)
if proc < 16 {
proc = 16 // 16 TCP connections is more than enough to saturate a 100G link.
host = net.JoinHostPort(host, port)
proc := 32

// For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: NewInternodeDialContext(10 * time.Second),
MaxIdleConnsPerHost: 1024,
WriteBufferSize: 64 << 10, // 64KiB moving up from 4KiB default
ReadBufferSize: 64 << 10, // 64KiB moving up from 4KiB default
IdleConnTimeout: 15 * time.Second,
ResponseHeaderTimeout: 15 * time.Minute, // Conservative timeout is the default (for MinIO internode)
TLSHandshakeTimeout: 10 * time.Second,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
}

clnt := &http.Client{
Transport: tr,
}

ctx, cancel := context.WithCancel(context.Background())
r := &netperfReader{doneCh: ctx.Done()}
r.buf = make([]byte, 1*humanize.MiByte)
rand.Read(r.buf)

var wg sync.WaitGroup
for i := 0; i < proc; i++ {
wg.Add(1)
go func() {
defer wg.Done()
var conn net.Conn
var err error

// Establish the connection.
for {
conn, err = net.Dial("tcp", host)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, "http://"+host+"/devnull", nil)
if err != nil {
log.Println("Dial-Error", conn, err)
log.Println("Client-Error-New", err)
time.Sleep(dialTimeout)
continue
}
req.Body = io.NopCloser(r)
req.ContentLength = -1

resp, err := clnt.Do(req)
if err != nil {
log.Println("Client-Error-Do", err)
time.Sleep(dialTimeout)
continue
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Println("Client-Error-Response", resp.Status)
time.Sleep(dialTimeout)
continue
} else {
break
}
}
// Use the connection.
if err := handleTX(conn, b); err != nil {
panic(err)
}
}()
}

wg.Wait()
cancel()
}

func main() {
Expand Down Expand Up @@ -178,17 +279,21 @@ func main() {
log.Println("Starting HTTP service to skip self.. waiting for 10secs for services to be ready")
time.Sleep(time.Second * 10)

go runServer()
go printDataOut()
var serverRunningOnce sync.Once
for host := range hostMap {
resp, err := http.Get("http://" + host + ":" + selfDetectPort + "/" + uniqueStr)
if err == nil && resp.StatusCode == http.StatusOK {
resp.Body.Close() // close the connection.
s.Close() // close the server as we are done.
log.Println("HTTP service closed after successful skip...")
serverRunningOnce.Do(func() {
go runServer(host)
})
continue
}
go runClient(host)
}

go printDataOut()
time.Sleep(time.Hour * 72)
}
Loading