Skip to content

Commit

Permalink
Added profiling to the relay and client.
Browse files Browse the repository at this point in the history
Signed-off-by: L Lakshmanan <[email protected]>
  • Loading branch information
Lakshman authored and dhschall committed Jul 30, 2024
1 parent d6a771a commit 07c700e
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 4 deletions.
49 changes: 47 additions & 2 deletions tools/invoker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"flag"
"fmt"
"os"
"strings"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -53,6 +54,8 @@ const TimeseriesDBAddr = "10.96.0.84:90"
var (
completed int64
latSlice LatencySlice
profSlice LatencySlice
funcDurEnableFlag *bool
portFlag *int
grpcTimeout time.Duration
withTracing *bool
Expand All @@ -64,6 +67,8 @@ func main() {
rps := flag.Float64("rps", 1.0, "Target requests per second")
runDuration := flag.Int("time", 5, "Run the experiment for X seconds")
latencyOutputFile := flag.String("latf", "lat.csv", "CSV file for the latency measurements in microseconds")
funcDurationOutputFile := flag.String("durf", "dur.csv", "CSV file for the function duration measurements in microseconds")
funcDurEnableFlag = flag.Bool("profile", false, "Enable function duration profiling")
portFlag = flag.Int("port", 80, "The port that functions listen to")
withTracing = flag.Bool("trace", false, "Enable tracing in the client")
zipkin := flag.String("zipkin", "http://localhost:9411/api/v2/spans", "zipkin url")
Expand Down Expand Up @@ -107,6 +112,9 @@ func main() {
realRPS := runExperiment(endpoints, *runDuration, *rps)

writeLatencies(realRPS, *latencyOutputFile)
if *funcDurEnableFlag {
writeFunctionDurations(*funcDurationOutputFile)
}
}

func readEndpoints(path string) (endpoints []*endpoint.Endpoint, _ error) {
Expand Down Expand Up @@ -176,7 +184,7 @@ func SayHello(address, workflowID string) {
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
defer cancel()

_, err = c.SayHello(ctx, &pb.HelloRequest{
response, err := c.SayHello(ctx, &pb.HelloRequest{
Name: "Invoke relay",
VHiveMetadata: vhivemetadata.MakeVHiveMetadata(
workflowID,
Expand All @@ -187,6 +195,17 @@ func SayHello(address, workflowID string) {
if err != nil {
log.Warnf("Failed to invoke %v, err=%v", address, err)
} else {
if *funcDurEnableFlag {
log.Debugf("Inside if\n")
words := strings.Fields(response.Message)
lastWord := words[len(words)-1]
duration, err := strconv.ParseInt(lastWord, 10, 64)
if err == nil {
profSlice.Lock()
profSlice.slice = append(profSlice.slice, duration)
profSlice.Unlock()
}
}
atomic.AddInt64(&completed, 1)
}
}
Expand Down Expand Up @@ -249,10 +268,36 @@ func writeLatencies(rps float64, latencyOutputFile string) {
for _, lat := range latSlice.slice {
_, err := datawriter.WriteString(strconv.FormatInt(lat, 10) + "\n")
if err != nil {
log.Fatal("Failed to write the URLs to a file ", err)
log.Fatal("Failed to write the latencies to a file ", err)
}
}

datawriter.Flush()
file.Close()
}

func writeFunctionDurations(funcDurationOutputFile string) {
profSlice.Lock()
defer profSlice.Unlock()

fileName := funcDurationOutputFile
log.Info("The measured function durations are saved in ", fileName)

file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)

if err != nil {
log.Fatal("Failed creating file: ", err)
}

datawriter := bufio.NewWriter(file)

for _, dur := range profSlice.slice {
_, err := datawriter.WriteString(strconv.FormatInt(dur, 10) + "\n")
if err != nil {
log.Fatal("Failed to write the function durations to a file ", err)
}
}

datawriter.Flush()
file.Close()
}
21 changes: 19 additions & 2 deletions tools/relay/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"fmt"
"net"
"os"
"strconv"
"time"

pb "github.com/vhive-serverless/vSwarm-proto/proto/helloworld"

Expand Down Expand Up @@ -61,6 +63,7 @@ var (
value = flag.String("value", "helloWorld", "String input to pass to benchmark")
functionMethod = flag.String("function-method", "default", "Which method of benchmark to invoke")
verbose = flag.Bool("verbose", false, "Enable verbose log printing")
profileFunction = flag.Bool("profile-function", false, "Enable function profiling")

// Client
grpcClient grpcClients.GrpcClient
Expand Down Expand Up @@ -155,8 +158,22 @@ func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloRe
// Create new packet
pkt := inputGenerator.Next()
log.Debugf("Send to func: %s\n", pkt)

startTime := time.Now()
reply, err := grpcClient.Request(ctx, pkt)
log.Debugf("Recv from func: %s\n", reply)
endTime := time.Now()
elapsedTime := int64(endTime.Sub(startTime).Microseconds())

var finalReply string

if *profileFunction {
log.Debugf("Invoked in %d usec\n. Recv from func: %s\n", elapsedTime, reply)
elapsedTimeStr := strconv.FormatInt(elapsedTime, 10)
finalReply = reply + "|" + elapsedTimeStr
} else {
log.Debugf("Recv from func: %s\n", reply)
finalReply = reply
}

return &pb.HelloReply{Message: reply}, err
return &pb.HelloReply{Message: finalReply}, err
}

0 comments on commit 07c700e

Please sign in to comment.