Skip to content

Commit

Permalink
Tweaked invoker and added tests
Browse files Browse the repository at this point in the history
Signed-off-by: aryans1204 <[email protected]>

Add VHIVE_REPO and LOADER_REPO to setup script config

Signed-off-by: Mohsen Ghasemi <[email protected]>

Added tests and created new vSwarm invoker

Fixed linting

Fixed VSwarm Invoker for Knative mode

Committer: aryans1204 [email protected]

Fixed VSwarm Invoker for Knative mode

Added new invoker for vSwarm functions and added tests

Signed-off-by: aryans1204 <[email protected]>

Fixed formatting under grpc_client.go

Signed-off-by: aryans1204 <[email protected]>

Fixed driver tests

Signed-off-by: aryans1204 <[email protected]>

Fixed linting and slimmed VSwarm tests

Signed-off-by: aryans1204 <[email protected]>
  • Loading branch information
aryans1204 committed Dec 13, 2024
1 parent c9ee3d7 commit c447b94
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 21 deletions.
1 change: 1 addition & 0 deletions .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
[
config,
driver,
driver/clients,
generator,
trace,
]
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ require (
require (
github.com/aws/aws-lambda-go v1.47.0
github.com/stretchr/testify v1.10.0
github.com/containerd/log v0.1.0
github.com/google/uuid v1.6.0
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a
go.mongodb.org/mongo-driver v1.17.1
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
Expand All @@ -36,7 +39,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-pdf/fpdf v0.9.0 // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/pkg/sftp v1.13.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ github.com/aws/aws-lambda-go v1.47.0 h1:0H8s0vumYx/YKs4sE7YM0ktwL2eWse+kfopsRI1s
github.com/aws/aws-lambda-go v1.47.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7RfgJv23DymV8A=
github.com/campoy/embedmd v1.0.0 h1:V4kI2qTJJLf4J29RzI/MAt2c3Bl4dQSYPuflzwFH2hY=
github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -64,6 +66,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a h1:uT20mQeIhHlzRGgUznT7El03WbWfPt6J9xLPflEmx4E=
github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld v0.0.0-20240827121957-11be651eb39a/go.mod h1:e19QDifxTHn1xeHS7ZDFZzUW1EWeVmfaiqm0/jEEyUk=
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a h1:Wq/7eNz96WxQWPMEnhg3ai5sZQufCyplAUotEC+j5Kc=
github.com/vhive-serverless/vSwarm/utils/tracing/go v0.0.0-20240827121957-11be651eb39a/go.mod h1:7PjQe6bDZ5W5cWHTpNeKRobMy9NK0odj6ROXrfa/CLQ=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
1 change: 1 addition & 0 deletions pkg/config/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type LoaderConfiguration struct {
GRPCConnectionTimeoutSeconds int `json:"GRPCConnectionTimeoutSeconds"`
GRPCFunctionTimeoutSeconds int `json:"GRPCFunctionTimeoutSeconds"`
DAGMode bool `json:"DAGMode"`
VSwarm bool `json:"VSwarm"`
}

func ReadConfigurationFile(path string) LoaderConfiguration {
Expand Down
25 changes: 11 additions & 14 deletions pkg/driver/clients/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,15 @@ package clients

import (
"context"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"strings"
"time"

"github.com/sirupsen/logrus"
"github.com/vhive-serverless/loader/pkg/common"
"github.com/vhive-serverless/loader/pkg/config"
"github.com/vhive-serverless/loader/pkg/workload/proto"

"github.com/sirupsen/logrus"
protoExec "github.com/vhive-serverless/loader/pkg/workload/proto"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"strings"
"time"

mc "github.com/vhive-serverless/loader/pkg/metric"
)
Expand Down Expand Up @@ -68,9 +66,11 @@ func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt

var dialOptions []grpc.DialOption
dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))

if strings.Contains(strings.ToLower(i.cfg.Platform), "dirigent") {
dialOptions = append(dialOptions, grpc.WithAuthority(function.Name)) // Dirigent specific
}

if i.cfg.EnableZipkinTracing {
dialOptions = append(dialOptions, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))
}
Expand All @@ -90,26 +90,23 @@ func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt

record.GRPCConnectionEstablishTime = time.Since(grpcStart).Microseconds()

grpcClient := proto.NewExecutorClient(conn)
executionCxt, cancelExecution := context.WithTimeout(context.Background(), time.Duration(i.cfg.GRPCFunctionTimeoutSeconds)*time.Second)
defer cancelExecution()

response, err := grpcClient.Execute(executionCxt, &proto.FaasRequest{
defer cancelExecution()
grpcClient := protoExec.NewExecutorClient(conn)
response, err := grpcClient.Execute(executionCxt, &protoExec.FaasRequest{
Message: "nothing",
RuntimeInMilliSec: uint32(runtimeSpec.Runtime),
MemoryInMebiBytes: uint32(runtimeSpec.Memory),
})

if err != nil {
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)

record.ResponseTime = time.Since(start).Microseconds()
record.ConnectionTimeout = true // WithBlock deprecated in new gRPC interface
record.FunctionTimeout = true

return false, record
}

record.Instance = extractInstanceName(response.GetMessage())
record.ResponseTime = time.Since(start).Microseconds()
record.ActualDuration = response.DurationInMicroSec
Expand All @@ -119,9 +116,9 @@ func (i *grpcInvoker) Invoke(function *common.Function, runtimeSpec *common.Runt
} else {
record.ActualMemoryUsage = common.Kib2Mib(response.MemoryUsageInKb)
}

logrus.Tracef("(Replied)\t %s: %s, %.2f[ms], %d[MiB]", function.Name, response.Message,
float64(response.DurationInMicroSec)/1e3, common.Kib2Mib(response.MemoryUsageInKb))

logrus.Tracef("(E2E Latency) %s: %.2f[ms]\n", function.Name, float64(record.ResponseTime)/1e3)

return true, record
Expand Down
64 changes: 59 additions & 5 deletions pkg/driver/clients/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ func createFakeLoaderConfiguration() *config.LoaderConfiguration {
GRPCFunctionTimeoutSeconds: 15,
}
}
func createFakeVSwarmLoaderConfiguration() *config.LoaderConfiguration {
return &config.LoaderConfiguration{
Platform: "Knative",
InvokeProtocol: "grpc",
OutputPathPrefix: "test",
EnableZipkinTracing: true,
GRPCConnectionTimeoutSeconds: 5,
GRPCFunctionTimeoutSeconds: 15,
VSwarm: true,
}
}

var testFunction = common.Function{
Name: "test-function",
Expand All @@ -68,9 +79,26 @@ func TestGRPCClientWithServerUnreachable(t *testing.T) {
record.StartTime == 0 ||
record.ResponseTime == 0 ||
success != false ||
record.ConnectionTimeout != true {
record.FunctionTimeout != true {

t.Error("Error while testing an unreachable server for trace function.")
}

}
func TestVSwarmClientUnreachable(t *testing.T) {
cfgSwarm := createFakeVSwarmLoaderConfiguration()

vSwarmInvoker := CreateInvoker(cfgSwarm, nil, nil)
success, record := vSwarmInvoker.Invoke(&testFunction, &testRuntimeSpecs)

if record.Instance != "" ||
record.RequestedDuration != uint32(testRuntimeSpecs.Runtime*1000) ||
record.StartTime == 0 ||
record.ResponseTime == 0 ||
success != false ||
record.FunctionTimeout != true {

t.Error("Error while testing an unreachable server.")
t.Error("Error while testing an unreachable server for vSwarm function.")
}
}

Expand Down Expand Up @@ -98,7 +126,32 @@ func TestGRPCClientWithServerReachable(t *testing.T) {
record.ActualDuration == 0 ||
record.ActualMemoryUsage == 0 {

t.Error("Failed gRPC invocations.")
t.Error("Failed gRPC invocations for trace function.")
}

}
func TestVSwarmClientWithServerReachable(t *testing.T) {
address, port := "localhost", 18081
testFunction.Endpoint = fmt.Sprintf("%s:%d", address, port)

go standard.StartVSwarmGRPCServer(address, port)
time.Sleep(2 * time.Second)

cfgSwarm := createFakeVSwarmLoaderConfiguration()
vSwarmInvoker := CreateInvoker(cfgSwarm, nil, nil)

start := time.Now()
success, record := vSwarmInvoker.Invoke(&testFunction, &testRuntimeSpecs)
logrus.Info("Elapsed: ", time.Since(start).Milliseconds(), " ms")

if !success ||
record.MemoryAllocationTimeout != false ||
record.ConnectionTimeout != false ||
record.FunctionTimeout != false ||
record.ResponseTime == 0 ||
record.ActualDuration == 0 {

t.Error("Failed gRPC invocations for vSwarm function.")
}
}

Expand All @@ -109,7 +162,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) {
t.Error(err)
}

address, port := "localhost", 18081
address, port := "localhost", 18082
testFunction.Endpoint = fmt.Sprintf("%s:%d", address, port)

go standard.StartGRPCServer(address, port, standard.TraceFunction, "")
Expand All @@ -118,6 +171,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) {
time.Sleep(2 * time.Second)

cfg := createFakeLoaderConfiguration()

invoker := CreateInvoker(cfg, nil, nil)

for i := 0; i < 50; i++ {
Expand All @@ -131,7 +185,7 @@ func TestGRPCClientWithServerBatchWorkload(t *testing.T) {
record.ActualDuration == 0 ||
record.ActualMemoryUsage == 0 {

t.Error("Failed gRPC invocations.")
t.Error("Failed gRPC invocations for trace function.")
}
}
}
97 changes: 97 additions & 0 deletions pkg/driver/clients/grpc_vswarm_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package clients

import (
"github.com/vhive-serverless/loader/pkg/config"
"github.com/vhive-serverless/loader/pkg/common"
proto "github.com/vhive-serverless/vSwarm/utils/protobuf/helloworld"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
mc "github.com/vhive-serverless/loader/pkg/metric"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/sirupsen/logrus"
"strings"
"time"
"github.com/google/uuid"
"context"
)

type grpcVSwarmInvoker struct {
cfg *config.LoaderConfiguration
}

func newGRPCVSwarmInvoker(cfg *config.LoaderConfiguration) *grpcVSwarmInvoker {
return &grpcVSwarmInvoker{
cfg: cfg,
}
}

func (i *grpcVSwarmInvoker) Invoke(function *common.Function, runtimeSpec *common.RuntimeSpecification) (bool, *mc.ExecutionRecord) {
logrus.Tracef("(Invoke)\t %s: %d[ms], %d[MiB]", function.Name, runtimeSpec.Runtime, runtimeSpec.Memory)

record := &mc.ExecutionRecord{
ExecutionRecordBase: mc.ExecutionRecordBase{
RequestedDuration: uint32(runtimeSpec.Runtime * 1e3),
},
}

////////////////////////////////////
// INVOKE FUNCTION
////////////////////////////////////
start := time.Now()
record.StartTime = start.UnixMicro()

var dialOptions []grpc.DialOption
dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
if i.cfg.EnableZipkinTracing {
dialOptions = append(dialOptions, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))
}

grpcStart := time.Now()

conn, err := grpc.NewClient(function.Endpoint, dialOptions...)
if err != nil {
logrus.Debugf("Failed to establish a gRPC connection - %v\n", err)

record.ResponseTime = time.Since(start).Microseconds()
record.ConnectionTimeout = true

return false, record
}
defer gRPCConnectionClose(conn)

record.GRPCConnectionEstablishTime = time.Since(grpcStart).Microseconds()

executionCxt, cancelExecution := context.WithTimeout(context.Background(), time.Duration(i.cfg.GRPCFunctionTimeoutSeconds)*time.Second)

defer cancelExecution()

grpcClient := proto.NewGreeterClient(conn)
response, err := grpcClient.SayHello(executionCxt, &proto.HelloRequest{
Name: "Invoke Relay",
VHiveMetadata: MakeVHiveMetadata(
uuid.New().String(),
uuid.New().String(),
time.Now().UTC(),
),
})
if err != nil {
logrus.Debugf("gRPC timeout exceeded for function %s - %s", function.Name, err)

record.ResponseTime = time.Since(start).Microseconds()
record.FunctionTimeout = true

return false, record
}
record.ResponseTime = time.Since(start).Microseconds()
record.ActualDuration = uint32(record.ResponseTime)
record.Instance = extractInstanceName(response.GetMessage())
if strings.HasPrefix(response.GetMessage(), "FAILURE - mem_alloc") {
record.MemoryAllocationTimeout = true
} else {
record.ActualMemoryUsage = common.Kib2Mib(0)
}

logrus.Tracef("(E2E Latency) %s: %.2f[ms]\n", function.Name, float64(record.ResponseTime)/1e3)

return true, record
}
6 changes: 5 additions & 1 deletion pkg/driver/clients/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func CreateInvoker(cfg *config.LoaderConfiguration, announceDoneExe *sync.WaitGr
return newHTTPInvoker(cfg)
case "Knative", "Knative-RPS":
if cfg.InvokeProtocol == "grpc" {
return newGRPCInvoker(cfg)
if !cfg.VSwarm {
return newGRPCInvoker(cfg)
} else {
return newGRPCVSwarmInvoker(cfg)
}
} else {
return newHTTPInvoker(cfg)
}
Expand Down
Loading

0 comments on commit c447b94

Please sign in to comment.