-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(rpc): add grpc benchmark (#120)
- Loading branch information
Showing
10 changed files
with
1,269 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package benchmark_grpc | ||
|
||
import ( | ||
"context" | ||
"encoding/gob" | ||
"os" | ||
"testing" | ||
"time" | ||
|
||
"github.com/pancsta/asyncmachine-go/internal/testing/utils" | ||
am "github.com/pancsta/asyncmachine-go/pkg/machine" | ||
arpc "github.com/pancsta/asyncmachine-go/pkg/rpc" | ||
ams "github.com/pancsta/asyncmachine-go/pkg/states" | ||
"github.com/stretchr/testify/assert" | ||
"golang.org/x/text/language" | ||
"golang.org/x/text/message" | ||
|
||
ss "github.com/pancsta/asyncmachine-go/examples/benchmark_grpc/worker_states" | ||
) | ||
|
||
func BenchmarkClientArpc(b *testing.B) { | ||
// init | ||
ctx := context.Background() | ||
worker := &Worker{} | ||
i := 0 | ||
limit := b.N | ||
end := make(chan struct{}) | ||
|
||
// read env | ||
amDbgAddr := os.Getenv("AM_DBG_ADDR") | ||
logLvl := am.EnvLogLevel("") | ||
|
||
// register gob types | ||
gob.Register(Value(0)) | ||
gob.Register(Op(0)) | ||
|
||
// init server | ||
s, err := NewWorkerArpcServer(ctx, "localhost:50551", worker) | ||
if err != nil { | ||
b.Fatal(err) | ||
} | ||
serverAddr := s.RPC.Listener.Addr().String() | ||
|
||
// monitor traffic | ||
counterListener := utils.RandListener("localhost") | ||
connAddr := counterListener.Addr().String() | ||
counter := make(chan int64, 1) | ||
go arpc.TrafficMeter(counterListener, serverAddr, counter, end) | ||
|
||
// init client | ||
c, err := arpc.NewClient(ctx, connAddr, "worker", ss.States, ss.Names) | ||
if err != nil { | ||
b.Fatal(err) | ||
} | ||
c.Mach.SetLoggerSimple(func(msg string, args ...any) { | ||
l("arpc-client", msg, args...) | ||
}, logLvl) | ||
utils.MachDebug(c.Mach, amDbgAddr, logLvl, false) | ||
|
||
// tear down | ||
b.Cleanup(func() { | ||
c.Stop(ctx, true) | ||
s.RPC.Stop(true) | ||
|
||
<-c.Mach.WhenDisposed() | ||
<-s.RPC.Mach.WhenDisposed() | ||
|
||
// cool off am-dbg and free the ports | ||
if amDbgAddr != "" { | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
}) | ||
|
||
// start client | ||
c.Start() | ||
<-c.Mach.When1(ams.Ready, nil) | ||
<-s.RPC.Mach.When1(ams.Ready, nil) | ||
|
||
// test subscribe-get-process | ||
// | ||
// 1. subscription: wait for notifications | ||
// 2. getter: get a value from the worker | ||
// 3. processing: call an operation based on the value | ||
ticks := c.Worker.Tick(ss.Event) | ||
go func() { | ||
for { | ||
<-c.Worker.WhenTicksEq(ss.Event, ticks+2, nil) | ||
ticks += 2 | ||
|
||
// loop | ||
i++ | ||
if i > limit { | ||
l("test", "limit done") | ||
close(end) | ||
return | ||
} | ||
|
||
// value (getter) | ||
value := c.Worker.Switch(ss.GroupValues...) | ||
|
||
// call op from value (processing) | ||
|
||
var res am.Result | ||
switch value { | ||
case ss.Value1: | ||
res = c.Worker.Add1(ss.CallOp, am.A{"Op": Op1}) | ||
case ss.Value2: | ||
res = c.Worker.Add1(ss.CallOp, am.A{"Op": Op2}) | ||
case ss.Value3: | ||
res = c.Worker.Add1(ss.CallOp, am.A{"Op": Op3}) | ||
default: | ||
// err | ||
b.Fatalf("Unknown value: %v", value) | ||
} | ||
if res != am.Executed { | ||
b.Fatalf("CallOp failed: %v", c.Worker.Err()) | ||
} | ||
} | ||
}() | ||
|
||
// reset the timer to exclude setup time | ||
b.ResetTimer() | ||
|
||
// start, wait and report | ||
c.Worker.Add1(ss.Start, nil) | ||
<-end | ||
|
||
b.ReportAllocs() | ||
p := message.NewPrinter(language.English) | ||
b.Log(p.Sprintf("Transferred: %d bytes", <-counter)) | ||
b.Log(p.Sprintf("Calls: %d", s.RPC.CallCount+c.CallCount)) | ||
b.Log(p.Sprintf("Errors: %d", worker.ErrCount)) | ||
b.Log(p.Sprintf("Completions: %d", worker.SuccessCount)) | ||
|
||
assert.Equal(b, 0, worker.ErrCount) | ||
assert.Greater(b, worker.SuccessCount, 0) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
package benchmark_grpc | ||
|
||
import ( | ||
"context" | ||
"log" | ||
"net" | ||
"testing" | ||
|
||
"github.com/pancsta/asyncmachine-go/internal/testing/utils" | ||
arpc "github.com/pancsta/asyncmachine-go/pkg/rpc" | ||
"github.com/stretchr/testify/assert" | ||
"golang.org/x/text/language" | ||
"golang.org/x/text/message" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/reflection" | ||
|
||
pb "github.com/pancsta/asyncmachine-go/examples/benchmark_grpc/worker_proto" | ||
) | ||
|
||
func BenchmarkClientGrpc(b *testing.B) { | ||
// init | ||
ctx := context.Background() | ||
worker := &Worker{} | ||
limit := b.N | ||
end := make(chan struct{}) | ||
calls := 0 | ||
|
||
// init grpc server | ||
lis, err := net.Listen("tcp", ":50051") | ||
if err != nil { | ||
b.Fatalf("failed to listen: %v", err) | ||
} | ||
s := grpc.NewServer() | ||
service := NewWorkerServiceServer(worker) | ||
pb.RegisterWorkerServiceServer(s, service) | ||
reflection.Register(s) | ||
go s.Serve(lis) | ||
defer lis.Close() | ||
l("test", "grpc server started") | ||
serverAddr := lis.Addr().String() | ||
|
||
// monitor traffic | ||
counterListener := utils.RandListener("localhost") | ||
connAddr := counterListener.Addr().String() | ||
counter := make(chan int64, 1) | ||
go arpc.TrafficMeter(counterListener, serverAddr, counter, end) | ||
|
||
// init grpc client | ||
conn, err := grpc.NewClient(connAddr, grpc.WithInsecure()) | ||
if err != nil { | ||
b.Fatalf("did not connect: %v", err) | ||
} | ||
defer conn.Close() | ||
client := pb.NewWorkerServiceClient(conn) | ||
l("test", "grpc client started") | ||
|
||
// test subscribe-get-process | ||
// | ||
// 1. subscription: wait for notifications | ||
// 2. getter: get the value from the source | ||
// 3. processing: call an operation based on the value | ||
calls++ | ||
stream, err := client.Subscribe(ctx, &pb.Empty{}) | ||
if err != nil { | ||
b.Fatalf("Subscribe failed: %v", err) | ||
} | ||
|
||
go func() { | ||
for i := 0; i <= limit; i++ { | ||
|
||
// wait for notification (subscription) | ||
_, err := stream.Recv() | ||
if err != nil { | ||
log.Fatalf("Failed to receive a notification: %v", err) | ||
} | ||
|
||
// value (getter) | ||
calls++ | ||
respValue, err := client.GetValue(ctx, &pb.Empty{}) | ||
if err != nil { | ||
log.Fatalf("GetValue failed: %v", err) | ||
} | ||
|
||
// call op from value (processing) | ||
calls++ | ||
switch Value(respValue.Value) { | ||
case Value1: | ||
_, err = client.CallOp(ctx, &pb.CallOpRequest{Op: int32(Op1)}) | ||
case Value2: | ||
_, err = client.CallOp(ctx, &pb.CallOpRequest{Op: int32(Op2)}) | ||
case Value3: | ||
_, err = client.CallOp(ctx, &pb.CallOpRequest{Op: int32(Op3)}) | ||
default: | ||
// err | ||
b.Fatalf("Unknown value: %v", respValue.Value) | ||
} | ||
if err != nil { | ||
b.Fatalf("CallOp failed: %v", err) | ||
} | ||
} | ||
|
||
// exit | ||
close(end) | ||
}() | ||
|
||
// reset the timer to exclude setup time | ||
b.ResetTimer() | ||
|
||
// start, wait and report | ||
calls++ | ||
_, err = client.Start(ctx, &pb.Empty{}) | ||
if err != nil { | ||
log.Fatalf("Start failed: %v", err) | ||
} | ||
<-end | ||
b.ReportAllocs() | ||
p := message.NewPrinter(language.English) | ||
b.Log(p.Sprintf("Transferred: %d bytes", <-counter)) | ||
b.Log(p.Sprintf("Calls: %d", calls+service.calls)) | ||
b.Log(p.Sprintf("Errors: %d", worker.ErrCount)) | ||
b.Log(p.Sprintf("Completions: %d", worker.SuccessCount)) | ||
|
||
assert.Equal(b, 0, worker.ErrCount) | ||
assert.Greater(b, worker.SuccessCount, 0) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package benchmark_grpc | ||
|
||
import "testing" | ||
|
||
func BenchmarkClientLocal(b *testing.B) { | ||
// init | ||
worker := &Worker{} | ||
i := 0 | ||
limit := b.N | ||
end := make(chan struct{}) | ||
|
||
// test sub-get-process | ||
// | ||
// 1. subscription: wait for notifications | ||
// 2. getter: get a value from the worker | ||
// 3. processing: call an operation based on the value | ||
worker.Subscribe(func() { | ||
|
||
// loop | ||
i++ | ||
if i > limit { | ||
close(end) | ||
return | ||
} | ||
|
||
// value (getter) | ||
value := worker.GetValue() | ||
|
||
// call op from value (processing) | ||
switch value { | ||
case Value1: | ||
go worker.CallOp(Op1) | ||
case Value2: | ||
go worker.CallOp(Op2) | ||
case Value3: | ||
go worker.CallOp(Op3) | ||
default: | ||
// err | ||
b.Fatalf("Unknown value: %v", value) | ||
} | ||
}) | ||
|
||
// reset the timer to exclude setup time | ||
b.ResetTimer() | ||
|
||
// start, wait and report | ||
worker.Start() | ||
<-end | ||
b.ReportAllocs() | ||
} |
Oops, something went wrong.