Skip to content

Commit

Permalink
feat(rpc): add grpc benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
pancsta committed Sep 11, 2024
1 parent 5c8702c commit 9d161d3
Show file tree
Hide file tree
Showing 10 changed files with 1,269 additions and 0 deletions.
137 changes: 137 additions & 0 deletions examples/benchmark_grpc/client_arpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package benchmark_grpc

import (
"context"
"encoding/gob"
"os"
"testing"
"time"

"github.com/pancsta/asyncmachine-go/internal/testing/utils"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
ams "github.com/pancsta/asyncmachine-go/pkg/states"
"github.com/stretchr/testify/assert"
"golang.org/x/text/language"
"golang.org/x/text/message"

ss "github.com/pancsta/asyncmachine-go/examples/benchmark_grpc/worker_states"
)

func BenchmarkClientArpc(b *testing.B) {
// init
ctx := context.Background()
worker := &Worker{}
i := 0
limit := b.N
end := make(chan struct{})

// read env
amDbgAddr := os.Getenv("AM_DBG_ADDR")
logLvl := am.EnvLogLevel("")

// register gob types
gob.Register(Value(0))
gob.Register(Op(0))

// init server
s, err := NewWorkerArpcServer(ctx, "localhost:50551", worker)
if err != nil {
b.Fatal(err)
}
serverAddr := s.RPC.Listener.Addr().String()

// monitor traffic
counterListener := utils.RandListener("localhost")
connAddr := counterListener.Addr().String()
counter := make(chan int64, 1)
go arpc.TrafficMeter(counterListener, serverAddr, counter, end)

// init client
c, err := arpc.NewClient(ctx, connAddr, "worker", ss.States, ss.Names)
if err != nil {
b.Fatal(err)
}
c.Mach.SetLoggerSimple(func(msg string, args ...any) {
l("arpc-client", msg, args...)
}, logLvl)
utils.MachDebug(c.Mach, amDbgAddr, logLvl, false)

// tear down
b.Cleanup(func() {
c.Stop(ctx, true)
s.RPC.Stop(true)

<-c.Mach.WhenDisposed()
<-s.RPC.Mach.WhenDisposed()

// cool off am-dbg and free the ports
if amDbgAddr != "" {
time.Sleep(100 * time.Millisecond)
}
})

// start client
c.Start()
<-c.Mach.When1(ams.Ready, nil)
<-s.RPC.Mach.When1(ams.Ready, nil)

// test subscribe-get-process
//
// 1. subscription: wait for notifications
// 2. getter: get a value from the worker
// 3. processing: call an operation based on the value
ticks := c.Worker.Tick(ss.Event)
go func() {
for {
<-c.Worker.WhenTicksEq(ss.Event, ticks+2, nil)
ticks += 2

// loop
i++
if i > limit {
l("test", "limit done")
close(end)
return
}

// value (getter)
value := c.Worker.Switch(ss.GroupValues...)

// call op from value (processing)

var res am.Result
switch value {
case ss.Value1:
res = c.Worker.Add1(ss.CallOp, am.A{"Op": Op1})
case ss.Value2:
res = c.Worker.Add1(ss.CallOp, am.A{"Op": Op2})
case ss.Value3:
res = c.Worker.Add1(ss.CallOp, am.A{"Op": Op3})
default:
// err
b.Fatalf("Unknown value: %v", value)
}
if res != am.Executed {
b.Fatalf("CallOp failed: %v", c.Worker.Err())
}
}
}()

// reset the timer to exclude setup time
b.ResetTimer()

// start, wait and report
c.Worker.Add1(ss.Start, nil)
<-end

b.ReportAllocs()
p := message.NewPrinter(language.English)
b.Log(p.Sprintf("Transferred: %d bytes", <-counter))
b.Log(p.Sprintf("Calls: %d", s.RPC.CallCount+c.CallCount))
b.Log(p.Sprintf("Errors: %d", worker.ErrCount))
b.Log(p.Sprintf("Completions: %d", worker.SuccessCount))

assert.Equal(b, 0, worker.ErrCount)
assert.Greater(b, worker.SuccessCount, 0)
}
125 changes: 125 additions & 0 deletions examples/benchmark_grpc/client_grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package benchmark_grpc

import (
"context"
"log"
"net"
"testing"

"github.com/pancsta/asyncmachine-go/internal/testing/utils"
arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
"github.com/stretchr/testify/assert"
"golang.org/x/text/language"
"golang.org/x/text/message"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

pb "github.com/pancsta/asyncmachine-go/examples/benchmark_grpc/worker_proto"
)

func BenchmarkClientGrpc(b *testing.B) {
// init
ctx := context.Background()
worker := &Worker{}
limit := b.N
end := make(chan struct{})
calls := 0

// init grpc server
lis, err := net.Listen("tcp", ":50051")
if err != nil {
b.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
service := NewWorkerServiceServer(worker)
pb.RegisterWorkerServiceServer(s, service)
reflection.Register(s)
go s.Serve(lis)
defer lis.Close()
l("test", "grpc server started")
serverAddr := lis.Addr().String()

// monitor traffic
counterListener := utils.RandListener("localhost")
connAddr := counterListener.Addr().String()
counter := make(chan int64, 1)
go arpc.TrafficMeter(counterListener, serverAddr, counter, end)

// init grpc client
conn, err := grpc.NewClient(connAddr, grpc.WithInsecure())
if err != nil {
b.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := pb.NewWorkerServiceClient(conn)
l("test", "grpc client started")

// test subscribe-get-process
//
// 1. subscription: wait for notifications
// 2. getter: get the value from the source
// 3. processing: call an operation based on the value
calls++
stream, err := client.Subscribe(ctx, &pb.Empty{})
if err != nil {
b.Fatalf("Subscribe failed: %v", err)
}

go func() {
for i := 0; i <= limit; i++ {

// wait for notification (subscription)
_, err := stream.Recv()
if err != nil {
log.Fatalf("Failed to receive a notification: %v", err)
}

// value (getter)
calls++
respValue, err := client.GetValue(ctx, &pb.Empty{})
if err != nil {
log.Fatalf("GetValue failed: %v", err)
}

// call op from value (processing)
calls++
switch Value(respValue.Value) {
case Value1:
_, err = client.CallOp(ctx, &pb.CallOpRequest{Op: int32(Op1)})
case Value2:
_, err = client.CallOp(ctx, &pb.CallOpRequest{Op: int32(Op2)})
case Value3:
_, err = client.CallOp(ctx, &pb.CallOpRequest{Op: int32(Op3)})
default:
// err
b.Fatalf("Unknown value: %v", respValue.Value)
}
if err != nil {
b.Fatalf("CallOp failed: %v", err)
}
}

// exit
close(end)
}()

// reset the timer to exclude setup time
b.ResetTimer()

// start, wait and report
calls++
_, err = client.Start(ctx, &pb.Empty{})
if err != nil {
log.Fatalf("Start failed: %v", err)
}
<-end
b.ReportAllocs()
p := message.NewPrinter(language.English)
b.Log(p.Sprintf("Transferred: %d bytes", <-counter))
b.Log(p.Sprintf("Calls: %d", calls+service.calls))
b.Log(p.Sprintf("Errors: %d", worker.ErrCount))
b.Log(p.Sprintf("Completions: %d", worker.SuccessCount))

assert.Equal(b, 0, worker.ErrCount)
assert.Greater(b, worker.SuccessCount, 0)
}
50 changes: 50 additions & 0 deletions examples/benchmark_grpc/client_local_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package benchmark_grpc

import "testing"

func BenchmarkClientLocal(b *testing.B) {
// init
worker := &Worker{}
i := 0
limit := b.N
end := make(chan struct{})

// test sub-get-process
//
// 1. subscription: wait for notifications
// 2. getter: get a value from the worker
// 3. processing: call an operation based on the value
worker.Subscribe(func() {

// loop
i++
if i > limit {
close(end)
return
}

// value (getter)
value := worker.GetValue()

// call op from value (processing)
switch value {
case Value1:
go worker.CallOp(Op1)
case Value2:
go worker.CallOp(Op2)
case Value3:
go worker.CallOp(Op3)
default:
// err
b.Fatalf("Unknown value: %v", value)
}
})

// reset the timer to exclude setup time
b.ResetTimer()

// start, wait and report
worker.Start()
<-end
b.ReportAllocs()
}
Loading

0 comments on commit 9d161d3

Please sign in to comment.