Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client for the latency1 protocol #35

Merged
merged 3 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions cmd/msak-latency/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"time"

"github.com/m-lab/go/flagx"
"github.com/m-lab/go/rtx"
"github.com/m-lab/locate/api/locate"
v2 "github.com/m-lab/locate/api/v2"
"github.com/m-lab/msak/pkg/latency1/model"
"github.com/m-lab/msak/pkg/latency1/spec"
)

var (
flagServer = flagx.URL{}
flagScheme = flag.String("scheme", "http", "Server scheme (http|https)")
flagMID = flag.String("mid", "", "MID to use")
)

func init() {
flag.Var(&flagServer, "server", "Server address. If a scheme is provided, it overrides -scheme.")
}

func getTargetsFromLocate() []v2.Target {
locateV2 := locate.NewClient("msak-latency")
targets, err := locateV2.Nearest(context.Background(), spec.ServiceName)
rtx.Must(err, "cannot get server list from locate")
return targets
}

func tryConnect(authorizeURL *url.URL) ([]byte, error) {
resp, err := http.Get(authorizeURL.String())
if err != nil {
return nil, err
}
return io.ReadAll(resp.Body)
}

func stats(result model.Summary) (int, float64, int, float64) {
if len(result.RoundTrips) == 0 {
return 0, 0, 0, 0
}
var min, max, sum int
min = result.RoundTrips[0].RTT
for _, v := range result.RoundTrips {
if v.RTT < min {
min = v.RTT
}
if v.RTT > max {
max = v.RTT
}
sum += v.RTT
}
return min, float64(sum) / float64(len(result.RoundTrips)),
max, 1 - float64(result.PacketsReceived)/float64(result.PacketsSent)
}

func runMeasurement(authorizeURL, resultURL *url.URL, kickoff []byte) {
udpServer, err := net.ResolveUDPAddr("udp", authorizeURL.Hostname()+":1053")
rtx.Must(err, "ResolveUDPAddr failed")

conn, err := net.DialUDP("udp", nil, udpServer)
rtx.Must(err, "DialUDP failed")
defer conn.Close()

// Set a time limit of 6s for the test.
conn.SetDeadline(time.Now().Add(6 * time.Second))

_, err = conn.Write(kickoff)
rtx.Must(err, "failed to send kickoff message")

recvBuf := make([]byte, 512)
for {
n, err := conn.Read(recvBuf)
if err != nil {
fmt.Printf("read error: %v\n", err)
break
}
_, err = conn.Write(recvBuf[:n])
if err != nil {
fmt.Printf("write error: %v\n", err)
break
}
fmt.Printf(".")
}
fmt.Println()

// Get results.
resp, err := http.Get(resultURL.String())
if err != nil {
fmt.Printf("failed to read test results: %v\n", err)
return
}
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Printf("failed to read response body: %v\n", err)
return
}

var result model.Summary
err = json.Unmarshal(body, &result)
if err != nil {
fmt.Printf("error parsing result as JSON: %v\n", err)
return
}
min, avg, max, loss := stats(result)
fmt.Printf("rtt min/avg/max: %.3f/%.3f/%.3f ms, loss: %.1f\n",
float64(min)/1000, avg/1000, float64(max)/1000, loss)
}

func main() {
flag.Parse()
flagx.ArgsFromEnv(flag.CommandLine)

var (
authorizeURL *url.URL
resultURL *url.URL

kickoffMsg []byte
)

if flagServer.URL != nil {
// If a server was provided, use it.
var scheme string
// Use the scheme included in the server URL, if present.
if flagServer.Scheme != "" {
scheme = flagServer.Scheme
} else {
scheme = *flagScheme
}
var err error
authorizeURL = &url.URL{
Scheme: scheme,
Host: flagServer.Host,
Path: spec.AuthorizeV1,
RawQuery: "mid=" + *flagMID,
}
resultURL = &url.URL{
Scheme: scheme,
Host: flagServer.Host,
Path: spec.ResultV1,
RawQuery: "mid=" + *flagMID,
}
fmt.Printf("Attempting to connect to: %s\n", authorizeURL)
kickoffMsg, err = tryConnect(authorizeURL)
rtx.Must(err, "connection failed")
} else {
targets := getTargetsFromLocate()

for _, t := range targets {
var err error
authorizeURL, err = url.Parse(t.URLs[*flagScheme+"://"+spec.AuthorizeV1])
rtx.Must(err, "Locate returned an invalid authorization URL")

resultURL, err = url.Parse(t.URLs[*flagScheme+"://"+spec.ResultV1])
rtx.Must(err, "Locate returned an invalid result URL")

fmt.Printf("Attempting to connect to: %s\n", authorizeURL)
kickoffMsg, err = tryConnect(authorizeURL)
if err == nil {
break
}
fmt.Printf("failed to connect to %s\n", authorizeURL)
}

if len(kickoffMsg) == 0 {
fmt.Printf("no server found")
os.Exit(1)
}
}

// Now we have a server and a kickoff message, start the measurement.
runMeasurement(authorizeURL, resultURL, kickoffMsg)

}
3 changes: 3 additions & 0 deletions pkg/latency1/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package spec
import "time"

const (
// ServiceName is the service name for the Locate V2 API.
ServiceName = "msak/latency1"

// AuthorizeV1 is the v1 /authorize endpoint.
AuthorizeV1 = "/latency/v1/authorize"
// ResultV1 is the v1 /result endpoint.
Expand Down