Skip to content

Commit

Permalink
fix: Add server keep alive enforcement policy (#6016)
Browse files Browse the repository at this point in the history
* add server keep alove policy constants

* fix spelling mistakes in constant

* remove keep alive from agent->server

* scheduler server kaep

* use the same constants for client and server keep alive settings

* add server (agent) keap

* add dataflow server kaep

* set keepalive permits to false

* add kotlin client keap

* set operator client keap

* add note

* adjust path to file

* tidy up keepalive parameters

* make constants private

* add controller client exp backoff

* tidy up client exp backoff

* only output log when cleaning resources to reduce log spam

* tidy up imports

* extract parameters as constants

* remove specific grpc operator

* set grpc level retries to 100

* set grpc level retries to 100

* add envoy keepalive settings

* extra backoff parameters as const

* fix lint
  • Loading branch information
sakoush authored Oct 30, 2024
1 parent 294b5f8 commit ca025b3
Show file tree
Hide file tree
Showing 21 changed files with 141 additions and 89 deletions.
33 changes: 23 additions & 10 deletions operator/scheduler/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@ import (

const (
// these 2 constants in combination with the backoff exponential function will give us a max backoff of 13.5 minutes
SchedulerConnectMaxRetries = 12
SchedulerConnectBackoffScalar = 200 * time.Millisecond
ClientKeapAliveTime = 60 * time.Second
ClientKeapAliveTimeout = 2 * time.Second
ClientKeapAlivePermit = true
schedulerConnectMaxRetries = 100
schedulerConnectBackoffScalar = 200 * time.Millisecond
// these keep alive settings need to match the scheduler counterpart in scheduler/pkg/util/constants.go
clientKeepAliveTime = 60 * time.Second
clientKeepAliveTimeout = 2 * time.Second
clientKeepAlivePermit = false
// backoff
backoffMaxElapsedTime = 0 // Never stop due to large time between calls
backOffMaxInterval = time.Second * 15
backOffInitialInterval = time.Second
)

type SchedulerClient struct {
Expand Down Expand Up @@ -229,9 +234,9 @@ func (s *SchedulerClient) connectToScheduler(host string, namespace string, plai
}
}
kacp := keepalive.ClientParameters{
Time: ClientKeapAliveTime,
Timeout: ClientKeapAliveTimeout,
PermitWithoutStream: ClientKeapAlivePermit,
Time: clientKeepAliveTime,
Timeout: clientKeepAliveTimeout,
PermitWithoutStream: clientKeepAlivePermit,
}

retryOpts := []grpc_retry.CallOption{
Expand All @@ -249,7 +254,7 @@ func (s *SchedulerClient) connectToScheduler(host string, namespace string, plai
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
s.logger.Info("Running scheduler client in plain text mode", "port", port)
}
opts = append(opts, grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(retryOpts...)))
// we dont have backoff retry on the grpc streams as we handle this in the event handlers
opts = append(opts, grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(retryOpts...)))
opts = append(opts, grpc.WithKeepaliveParams(kacp))
s.logger.Info("Dialing scheduler", "host", host, "port", port)
Expand Down Expand Up @@ -313,7 +318,7 @@ func retryFn(
logFailure := func(err error, delay time.Duration) {
logger.Error(err, "Scheduler not ready")
}
backOffExp := backoff.NewExponentialBackOff()
backOffExp := getClientExponentialBackoff()
fnWithArgs := func() error {
grpcClient := scheduler.NewSchedulerClient(conn)
return fn(context.Background(), grpcClient, namespace)
Expand All @@ -325,3 +330,11 @@ func retryFn(
}
return nil
}

func getClientExponentialBackoff() *backoff.ExponentialBackOff {
backOffExp := backoff.NewExponentialBackOff()
backOffExp.MaxElapsedTime = backoffMaxElapsedTime
backOffExp.MaxInterval = backOffMaxInterval
backOffExp.InitialInterval = backOffInitialInterval
return backOffExp
}
4 changes: 2 additions & 2 deletions operator/scheduler/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcC
stream, err := grpcClient.SubscribeControlPlane(
ctx,
&scheduler.ControlPlaneSubscriptionRequest{SubscriberName: "seldon manager"},
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions operator/scheduler/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (s *SchedulerClient) StartExperiment(ctx context.Context, experiment *v1alp
_, err = grpcClient.StartExperiment(
ctx,
req,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err
}
Expand All @@ -66,8 +66,8 @@ func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alph
_, err = grpcClient.StopExperiment(
ctx,
req,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err
}
Expand All @@ -79,8 +79,8 @@ func (s *SchedulerClient) SubscribeExperimentEvents(ctx context.Context, grpcCli
stream, err := grpcClient.SubscribeExperimentStatus(
ctx,
&scheduler.ExperimentSubscriptionRequest{SubscriberName: "seldon manager"},
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions operator/scheduler/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (s *SchedulerClient) LoadModel(ctx context.Context, model *v1alpha1.Model,
_, err = grpcClient.LoadModel(
ctx,
&loadModelRequest,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return s.checkErrorRetryable(model.Kind, model.Name, err), err
Expand Down Expand Up @@ -102,8 +102,8 @@ func (s *SchedulerClient) UnloadModel(ctx context.Context, model *v1alpha1.Model
_, err = grpcClient.UnloadModel(
ctx,
modelRef,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return s.checkErrorRetryable(model.Kind, model.Name, err), err
Expand All @@ -117,8 +117,8 @@ func (s *SchedulerClient) SubscribeModelEvents(ctx context.Context, grpcClient s
stream, err := grpcClient.SubscribeModelStatus(
ctx,
&scheduler.ModelSubscriptionRequest{SubscriberName: "seldon manager"},
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions operator/scheduler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func (s *SchedulerClient) LoadPipeline(ctx context.Context, pipeline *v1alpha1.P
_, err = grpcClient.LoadPipeline(
ctx,
&req,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
return s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err), err
}
Expand All @@ -62,8 +62,8 @@ func (s *SchedulerClient) UnloadPipeline(ctx context.Context, pipeline *v1alpha1
_, err = grpcClient.UnloadPipeline(
ctx,
&req,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return err, s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err)
Expand All @@ -85,8 +85,8 @@ func (s *SchedulerClient) SubscribePipelineEvents(ctx context.Context, grpcClien
stream, err := grpcClient.SubscribePipelineStatus(
ctx,
&scheduler.PipelineSubscriptionRequest{SubscriberName: "seldon manager"},
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions operator/scheduler/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (s *SchedulerClient) ServerNotify(ctx context.Context, grpcClient scheduler
_, err := grpcClient.ServerNotify(
ctx,
request,
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
logger.Error(err, "Failed to send notify server to scheduler")
Expand All @@ -82,8 +82,8 @@ func (s *SchedulerClient) SubscribeServerEvents(ctx context.Context, grpcClient
stream, err := grpcClient.SubscribeServerStatus(
ctx,
&scheduler.ServerSubscriptionRequest{SubscriberName: "seldon manager"},
grpc_retry.WithMax(SchedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)),
grpc_retry.WithMax(schedulerConnectMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)),
)
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion scheduler/config/envoy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ static_resources:
socket_address:
address: seldon-scheduler
port_value: 9002
http2_protocol_options: {}
http2_protocol_options: {
connection_keepalive: {
interval: 60s,
timeout: 2s,
}
}
name: xds_cluster
- connect_timeout: 0.250s
type: LOGICAL_DNS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import io.klogging.logger as coLogger

@OptIn(FlowPreview::class)
Expand All @@ -60,6 +61,9 @@ class PipelineSubscriber(
.defaultServiceConfig(grpcServiceConfig)
.usePlaintext() // Use TLS
.enableRetry()
// these keep alive settings need to match the go counterpart in scheduler/pkg/util/constants.go
.keepAliveTime(60L, TimeUnit.SECONDS)
.keepAliveTimeout(2L, TimeUnit.SECONDS)
.build()
private val client = ChainerGrpcKt.ChainerCoroutineStub(channel)

Expand Down
12 changes: 3 additions & 9 deletions scheduler/pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/protobuf/encoding/protojson"

"github.com/seldonio/seldon-core/apis/go/v2/mlops/agent"
Expand Down Expand Up @@ -237,8 +236,7 @@ func (c *Client) Start() error {
logFailure := func(err error, delay time.Duration) {
c.logger.WithError(err).Errorf("Scheduler not ready")
}
backOffExp := backoff.NewExponentialBackOff()
backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls
backOffExp := util.GetClientExponentialBackoff()
err := backoff.RetryNotify(c.StartService, backOffExp, logFailure)
if err != nil {
c.logger.WithError(err).Fatal("Failed to start client")
Expand Down Expand Up @@ -417,11 +415,7 @@ func (c *Client) getConnection(host string, plainTxtPort int, tlsPort int) (*grp

logger.Infof("Connecting (non-blocking) to scheduler at %s:%d", host, port)

kacp := keepalive.ClientParameters{
Time: util.ClientKeapAliveTime,
Timeout: util.ClientKeapAliveTimeout,
PermitWithoutStream: util.ClientKeapAlivePermit,
}
kacp := util.GetClientKeepAliveParameters()

opts := []grpc.DialOption{
grpc.WithTransportCredentials(transCreds),
Expand Down Expand Up @@ -453,7 +447,7 @@ func (c *Client) StartService() error {
Shared: true,
AvailableMemoryBytes: c.stateManager.GetAvailableMemoryBytesWithOverCommit(),
},
grpc_retry.WithMax(1),
grpc_retry.WithMax(util.MaxGRPCRetriesOnStream),
) // TODO make configurable
if err != nil {
return err
Expand Down
8 changes: 0 additions & 8 deletions scheduler/pkg/agent/modelserver_controlplane/oip/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

v2 "github.com/seldonio/seldon-core/apis/go/v2/mlops/v2_dataplane"
Expand Down Expand Up @@ -51,19 +50,12 @@ func CreateV2GrpcConnection(v2Config V2Config) (*grpc.ClientConn, error) {
grpc_retry.WithMax(v2Config.GRPRetryMaxCount),
}

kacp := keepalive.ClientParameters{
Time: util.ClientKeapAliveTime,
Timeout: util.ClientKeapAliveTimeout,
PermitWithoutStream: util.ClientKeapAlivePermit,
}

opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(v2Config.GRPCMaxMsgSizeBytes), grpc.MaxCallSendMsgSize(v2Config.GRPCMaxMsgSizeBytes)),
grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(retryOpts...)),
grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(retryOpts...)),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
grpc.WithKeepaliveParams(kacp),
}
conn, err := grpc.NewClient(fmt.Sprintf("%s:%d", v2Config.Host, v2Config.GRPCPort), opts...)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions scheduler/pkg/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,16 @@ func (s *Server) startServer(port uint, secure bool) error {
if err != nil {
return err
}

kaep := util.GetServerKeepAliveEnforcementPolicy()

opts := []grpc.ServerOption{}
if secure {
opts = append(opts, grpc.Creds(s.certificateStore.CreateServerTransportCredentials()))
}
opts = append(opts, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler()))
opts = append(opts, grpc.KeepaliveEnforcementPolicy(kaep))
grpcServer := grpc.NewServer(opts...)
pb.RegisterAgentServiceServer(grpcServer, s)
s.logger.Printf("Agent server running on %d mtls:%v", port, secure)
Expand Down
4 changes: 4 additions & 0 deletions scheduler/pkg/envoy/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"google.golang.org/grpc"

seldontls "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls"

"github.com/seldonio/seldon-core/scheduler/v2/pkg/util"
)

const (
Expand Down Expand Up @@ -66,12 +68,14 @@ func (x *XDSServer) StartXDSServer(port uint) error {
return err
}
}
kaep := util.GetServerKeepAliveEnforcementPolicy()
secure := x.certificateStore != nil
var grpcOptions []grpc.ServerOption
if secure {
grpcOptions = append(grpcOptions, grpc.Creds(x.certificateStore.CreateServerTransportCredentials()))
}
grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
grpcOptions = append(grpcOptions, grpc.KeepaliveEnforcementPolicy(kaep))
grpcServer := grpc.NewServer(grpcOptions...)

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
Expand Down
4 changes: 4 additions & 0 deletions scheduler/pkg/kafka/dataflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ func (c *ChainerServer) StartGrpcServer(agentPort uint) error {
if err != nil {
log.Fatalf("failed to create listener: %v", err)
}

kaep := util.GetServerKeepAliveEnforcementPolicy()

var grpcOptions []grpc.ServerOption
grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
grpcOptions = append(grpcOptions, grpc.KeepaliveEnforcementPolicy(kaep))
grpcServer := grpc.NewServer(grpcOptions...)
chainer.RegisterChainerServer(grpcServer, c)
c.logger.Printf("Chainer server running on %d", agentPort)
Expand Down
19 changes: 7 additions & 12 deletions scheduler/pkg/kafka/gateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"

"github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler"
seldontls "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls"
Expand Down Expand Up @@ -81,11 +80,7 @@ func (kc *KafkaSchedulerClient) ConnectToScheduler(host string, plainTxtPort int
port = tlsPort
}

kacp := keepalive.ClientParameters{
Time: util.ClientKeapAliveTime,
Timeout: util.ClientKeapAliveTimeout,
PermitWithoutStream: util.ClientKeapAlivePermit,
}
kacp := util.GetClientKeepAliveParameters()

// note: retry is done in the caller
opts := []grpc.DialOption{
Expand Down Expand Up @@ -123,11 +118,7 @@ func (kc *KafkaSchedulerClient) Start() error {
logFailure := func(err error, delay time.Duration) {
kc.logger.WithError(err).Errorf("Scheduler not ready")
}
backOffExp := backoff.NewExponentialBackOff()
// Set some reasonable settings for trying to reconnect to scheduler
backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls
backOffExp.MaxInterval = time.Second * 15
backOffExp.InitialInterval = time.Second
backOffExp := util.GetClientExponentialBackoff()
err := backoff.RetryNotify(kc.SubscribeModelEvents, backOffExp, logFailure)
if err != nil {
kc.logger.WithError(err).Fatal("Failed to start modelgateway client")
Expand All @@ -141,7 +132,11 @@ func (kc *KafkaSchedulerClient) SubscribeModelEvents() error {
logger := kc.logger.WithField("func", "SubscribeModelEvents")
grpcClient := scheduler.NewSchedulerClient(kc.conn)
logger.Info("Subscribing to model status events")
stream, errSub := grpcClient.SubscribeModelStatus(context.Background(), &scheduler.ModelSubscriptionRequest{SubscriberName: SubscriberName}, grpc_retry.WithMax(100))
stream, errSub := grpcClient.SubscribeModelStatus(
context.Background(),
&scheduler.ModelSubscriptionRequest{SubscriberName: SubscriberName},
grpc_retry.WithMax(util.MaxGRPCRetriesOnStream),
)
if errSub != nil {
return errSub
}
Expand Down
Loading

0 comments on commit ca025b3

Please sign in to comment.