Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throughput1 client library #26

Merged
merged 59 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
61325e2
Add handler
robertodauria Mar 23, 2023
d3016f7
Save archival data from deferred func
robertodauria Mar 23, 2023
49058f2
Add main, fix handler
robertodauria Mar 24, 2023
1fa18bd
Add script to generate local test certs
robertodauria Mar 24, 2023
7017664
Add build script and Dockerfile.
robertodauria Mar 24, 2023
d4e2a97
Update go.mod/go.sum
robertodauria Mar 24, 2023
7518b48
Add basic client for testing
robertodauria Mar 24, 2023
ce51f3d
Swap logger
robertodauria Mar 24, 2023
21c2780
Swap logger in measurer, too
robertodauria Mar 24, 2023
053476d
Remove files that don't belong to this PR
robertodauria Mar 24, 2023
83b937b
Remove more files that don't belong here
robertodauria Mar 24, 2023
d3fb5d6
go mod tidy
robertodauria Mar 24, 2023
b160cd9
Change metadata length limits
robertodauria Mar 24, 2023
2116117
Add access_token to the knownOptions map.
robertodauria Mar 27, 2023
d2d87a9
Add partial tests and prom metrics
robertodauria Apr 11, 2023
af21b15
Duration should be milliseconds
robertodauria Apr 11, 2023
93227ad
Increase test coverage
robertodauria Apr 12, 2023
824fb95
Update go.mod
robertodauria Apr 12, 2023
c169da4
Update go.mod go.sum
robertodauria Apr 12, 2023
f0c6635
Add client code
robertodauria Apr 12, 2023
5f64a9e
Merge branch 'main' into sandbox-roberto-client
robertodauria Jun 23, 2023
1a75668
Rename ndt8 -> throughput1
robertodauria Jun 23, 2023
6654601
Separate network-level and application-level measurements.
robertodauria Aug 7, 2023
aa112c7
Fix client
robertodauria Aug 7, 2023
60cdacd
Merge branch 'main' into sandbox-roberto-client
robertodauria Aug 7, 2023
053fc14
Merge branch 'add-application-byte-counters' into sandbox-roberto-client
robertodauria Aug 7, 2023
711dd07
Merge branch 'temp-test' into sandbox-roberto-client
robertodauria Aug 7, 2023
3a531ad
Make output more detailed, add server-side messages
robertodauria Aug 7, 2023
da29c0a
Make application-level byte counters atomic.
robertodauria Aug 7, 2023
748e449
Merge branch 'add-application-byte-counters' into sandbox-roberto-client
robertodauria Aug 7, 2023
23391ba
Merge branch 'main' into sandbox-roberto-client
robertodauria Aug 30, 2023
efb1b85
Add debug flag and simplify code
robertodauria Sep 25, 2023
6ff44a3
Add -debug flag, docstrings, and call Upload() in main
robertodauria Sep 25, 2023
58fdf67
Remove unused fields
robertodauria Sep 25, 2023
4565876
Complete rename from NDT8 to Throughput1
robertodauria Sep 25, 2023
1eff6e0
Use the same version for libraryVersion and clientVersion
robertodauria Sep 25, 2023
ccc1869
Rename client to msak-client-go
robertodauria Sep 25, 2023
5e8b90e
Add docstrings and use the right timeout in connect() call.
robertodauria Sep 27, 2023
706f888
Refactor code to extract runStream().
robertodauria Sep 27, 2023
7e5f23c
Add docstrings.
robertodauria Sep 27, 2023
61c3820
Fix race condition.
robertodauria Sep 27, 2023
b9d430c
Refactor code, only output every 100ms
robertodauria Sep 28, 2023
0f90997
Remove extra \n from OnDebug messages
robertodauria Sep 28, 2023
708874e
Address code review comments
robertodauria Sep 28, 2023
09a3a76
Update defaults, add comments
robertodauria Sep 28, 2023
e663ea9
Include
robertodauria Sep 28, 2023
bfef85d
Add DefaultScheme const and more docstrings
robertodauria Sep 28, 2023
93f6c41
Replace fmt.Print with Emitter.OnDebug
robertodauria Sep 28, 2023
8ff66c4
Panic on invalid subtests
robertodauria Sep 28, 2023
303308f
Remove ServiceURL
robertodauria Sep 28, 2023
bfe00cd
Move client config to a dedicated struct
robertodauria Sep 28, 2023
298b354
Rename client.ClientConfig to client.Config
robertodauria Sep 28, 2023
24be2a2
More refactoring and more comments/docstrings.
robertodauria Sep 28, 2023
6ecdd3c
Add TLSClientConfig to defaultDialer.
robertodauria Sep 28, 2023
c991115
Add docstrings in Emitter interface.
robertodauria Sep 28, 2023
ec3744b
Merge branch 'main' into sandbox-roberto-client
robertodauria Sep 29, 2023
fc287cc
FromTCPConn -> FromTCPLikeConn
robertodauria Sep 29, 2023
a9e343b
Panic on empty clientName/Version in client.New()
robertodauria Sep 29, 2023
26694cb
Add comments in nextURLFromLocate and remove extra printf
robertodauria Sep 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions cmd/msak-client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"context"
"crypto/tls"
"flag"
"log"
"os"
"time"

"github.com/google/uuid"
"github.com/m-lab/msak/pkg/client"
"github.com/m-lab/msak/pkg/version"
)

var (
clientName = "msak-client-go"
clientVersion = version.Version
)

var (
flagServer = flag.String("server", "", "Server address")
flagStreams = flag.Int("streams", 2, "Number of streams")
flagCC = flag.String("cc", "bbr", "Congestion control algorithm to use")
flagDelay = flag.Duration("delay", 0, "Delay between each stream")
flagDuration = flag.Duration("duration", 3*time.Second, "Length of the last stream")
flagScheme = flag.String("scheme", "ws", "Websocket scheme (wss or ws)")
flagMID = flag.String("mid", uuid.NewString(), "Measurement ID to use")
flagNoVerify = flag.Bool("no-verify", false, "Skip TLS certificate verification")
flagDebug = flag.Bool("debug", false, "Enable debug logging")
)

func main() {
flag.Parse()

if float64(*flagStreams-1)*flagDelay.Seconds() >= flagDuration.Seconds() {
log.Print("Invalid configuration: please check streams, delay and duration and make sure they make sense.")
os.Exit(1)
}

cl := client.New(clientName, clientVersion)
cl.Dialer.TLSClientConfig = &tls.Config{
InsecureSkipVerify: *flagNoVerify,
}
cl.Server = *flagServer
cl.CongestionControl = *flagCC
cl.NumStreams = *flagStreams
cl.Scheme = *flagScheme
cl.MeasurementID = *flagMID
cl.Length = *flagDuration
cl.Delay = *flagDelay

cl.Emitter = &client.HumanReadable{
Debug: *flagDebug,
}

cl.Download(context.Background())
cl.Upload(context.Background())
}
314 changes: 314 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
package client

import (
"context"
"errors"
"fmt"
"log"
"net"
"net/http"
"net/url"
"runtime"
"sync"
"time"

"github.com/gorilla/websocket"
"github.com/m-lab/locate/api/locate"
v2 "github.com/m-lab/locate/api/v2"
"github.com/m-lab/msak/internal/netx"
"github.com/m-lab/msak/pkg/throughput1"
"github.com/m-lab/msak/pkg/throughput1/model"
"github.com/m-lab/msak/pkg/throughput1/spec"
"github.com/m-lab/msak/pkg/version"
)

const (
// DefaultWebSocketHandshakeTimeout is the default timeout used by the client
// for the WebSocket handshake.
DefaultWebSocketHandshakeTimeout = 5 * time.Second

// DefaultStreams is the default number of streams for a new client.
DefaultStreams = 5

libraryName = "msak-client"
)

var (
// ErrNoTargets is returned if all Locate targets have been tried.
ErrNoTargets = errors.New("no targets available")

libraryVersion = version.Version
)

type Locator interface {
Nearest(ctx context.Context, service string) ([]v2.Target, error)
}

type Throughput1Client struct {
ClientName string
ClientVersion string

Dialer *websocket.Dialer

Server string
ServiceURL *url.URL

Locate Locator

Scheme string

NumStreams int
Length time.Duration
Delay time.Duration
CongestionControl string
MeasurementID string

OutputPath string

Emitter Emitter

// targets and tIndex cache the results from the Locate API.
targets []v2.Target
tIndex map[string]int
}

type Result struct {
Goodput float64
Throughput float64
Elapsed time.Duration
MinRTT uint32
}

type StreamResult struct {
Result
StreamID int
}

// makeUserAgent creates the user agent string
func makeUserAgent(clientName, clientVersion string) string {
return clientName + "/" + clientVersion + " " + libraryName + "/" + libraryVersion
}

func New(clientName, clientVersion string) *Throughput1Client {
return &Throughput1Client{
ClientName: clientName,
ClientVersion: clientVersion,
Dialer: &websocket.Dialer{
HandshakeTimeout: DefaultWebSocketHandshakeTimeout,
NetDial: func(network, addr string) (net.Conn, error) {
conn, err := net.Dial(network, addr)
if err != nil {
return nil, err
}
return netx.FromTCPConn(conn.(*net.TCPConn))
},
},
Scheme: "wss",
Locate: locate.NewClient(
makeUserAgent(clientName, clientVersion),
),
Emitter: &HumanReadable{Debug: false},
tIndex: map[string]int{},
}
}

func (c *Throughput1Client) connect(ctx context.Context, serviceURL *url.URL) (*websocket.Conn, error) {
q := serviceURL.Query()
q.Set("streams", fmt.Sprint(c.NumStreams))
q.Set("cc", c.CongestionControl)
q.Set("duration", fmt.Sprintf("%d", c.Length.Milliseconds()))
q.Set("client_arch", runtime.GOARCH)
q.Set("client_library_name", libraryName)
q.Set("client_library_version", libraryVersion)
q.Set("client_os", runtime.GOOS)
q.Set("client_name", c.ClientName)
q.Set("client_version", c.ClientVersion)
serviceURL.RawQuery = q.Encode()
headers := http.Header{}
headers.Add("Sec-WebSocket-Protocol", spec.SecWebSocketProtocol)
headers.Add("User-Agent", makeUserAgent(c.ClientName, c.ClientVersion))
conn, _, err := c.Dialer.DialContext(ctx, serviceURL.String(), headers)
return conn, err
}

// nextURLFromLocate returns the next URL to try from the Locate API.
// If it's the first time we're calling this function, it contacts the Locate
// API. Subsequently, it returns the next URL from the cache.
// If there are no more URLs to try, it returns an error.
func (c *Throughput1Client) nextURLFromLocate(ctx context.Context, p string) (string, error) {
if len(c.targets) == 0 {
targets, err := c.Locate.Nearest(ctx, "msak/throughput1")
if err != nil {
return "", err
}
// cache targets on success.
c.targets = targets
}
k := c.Scheme + "://" + p
if c.tIndex[k] < len(c.targets) {
fmt.Println(c.targets[c.tIndex[k]].URLs)
r := c.targets[c.tIndex[k]].URLs[k]
c.tIndex[k]++
return r, nil
}
return "", ErrNoTargets
}

func (c *Throughput1Client) start(ctx context.Context, subtest spec.SubtestKind) error {
// Find the URL to use for this measurement.
var mURL *url.URL
// If the server has been provided, use it and use default paths based on
// the subtest kind (download/upload).
if c.Server != "" {
c.Emitter.OnDebug(fmt.Sprintf("using server provided via flags %s", c.Server))
path := getPathForSubtest(subtest)
mURL = &url.URL{
Scheme: c.Scheme,
Host: c.Server,
Path: path,
}
q := mURL.Query()
q.Set("mid", c.MeasurementID)
mURL.RawQuery = q.Encode()
}

// If a service URL was provided, use it as-is.
if c.ServiceURL != nil {
c.Emitter.OnDebug(fmt.Sprintf("using service url provided via flags %s", c.ServiceURL.String()))
// Override scheme to match the provided service url.
c.Scheme = c.ServiceURL.Scheme
mURL = c.ServiceURL
}

// If no service URL nor server was provided, use the Locate API.
if mURL == nil {
log.Print("using locate")
urlStr, err := c.nextURLFromLocate(ctx, getPathForSubtest(subtest))
if err != nil {
return err
}
mURL, err = url.Parse(urlStr)
if err != nil {
return err
}
log.Print("URL: ", mURL.String())
}

wg := &sync.WaitGroup{}
globalTimeout, cancel := context.WithTimeout(ctx, c.Length)
defer cancel()

globalStartTime := time.Now()
applicationBytes := map[int][]int64{}

// Main client loop. Spawns one goroutine per stream requested.
for i := 0; i < c.NumStreams; i++ {
streamID := i
wg.Add(1)
measurements := make(chan model.WireMeasurement)

go func() {
defer wg.Done()

// Connect to mURL.
c.Emitter.OnStart(mURL.Host, subtest)
conn, err := c.connect(ctx, mURL)
if err != nil {
c.Emitter.OnError(err)
close(measurements)
return
}
c.Emitter.OnConnect(mURL.String())

proto := throughput1.New(conn)

var clientCh, serverCh <-chan model.WireMeasurement
var errCh <-chan error
switch subtest {
case spec.SubtestDownload:
clientCh, serverCh, errCh = proto.ReceiverLoop(globalTimeout)
case spec.SubtestUpload:
clientCh, serverCh, errCh = proto.SenderLoop(globalTimeout)
}

for {
select {
case <-globalTimeout.Done():
c.Emitter.OnComplete(streamID, mURL.Host)
return
case m := <-clientCh:
if subtest != spec.SubtestDownload {
continue
}
c.emitResults(streamID, m, globalStartTime, applicationBytes)
case m := <-serverCh:
if subtest != spec.SubtestUpload {
continue
}
c.emitResults(streamID, m, globalStartTime, applicationBytes)
case err := <-errCh:
c.Emitter.OnError(err)
}
}
}()

time.Sleep(c.Delay)
}

wg.Wait()

return nil
}

func (c *Throughput1Client) emitResults(streamID int, m model.WireMeasurement,
globalStartTime time.Time, applicationBytes map[int][]int64) {
c.Emitter.OnMeasurement(streamID, m)
elapsed := time.Since(globalStartTime)
streamResult := StreamResult{
StreamID: streamID,
Result: Result{
Elapsed: elapsed,
Goodput: float64(m.Application.BytesReceived) / float64(m.ElapsedTime) * 8,
Throughput: float64(m.Network.BytesReceived) / float64(m.ElapsedTime) * 8,
MinRTT: m.TCPInfo.MinRTT,
},
}
c.Emitter.OnStreamResult(streamResult)

applicationBytes[streamID] = append(applicationBytes[streamID], m.Application.BytesReceived)

var sum int64
for _, bytes := range applicationBytes {
sum += bytes[len(bytes)-1]
}
result := Result{
Elapsed: elapsed,
Goodput: float64(sum) / float64(elapsed.Microseconds()) * 8,
}
c.Emitter.OnResult(result)
}

func (c *Throughput1Client) Download(ctx context.Context) {
err := c.start(ctx, spec.SubtestDownload)
if err != nil {
log.Println(err)
}
}

func (c *Throughput1Client) Upload(ctx context.Context) {
err := c.start(ctx, spec.SubtestUpload)
if err != nil {
log.Println(err)
}
}

func getPathForSubtest(subtest spec.SubtestKind) string {
switch subtest {
case spec.SubtestDownload:
return spec.DownloadPath
case spec.SubtestUpload:
return spec.UploadPath
default:
return "invalid"
}
}
Loading