diff --git a/operator/scheduler/client.go b/operator/scheduler/client.go index a2d0142ade..98492d31c4 100644 --- a/operator/scheduler/client.go +++ b/operator/scheduler/client.go @@ -33,11 +33,16 @@ import ( const ( // these 2 constants in combination with the backoff exponential function will give us a max backoff of 13.5 minutes - SchedulerConnectMaxRetries = 12 - SchedulerConnectBackoffScalar = 200 * time.Millisecond - ClientKeapAliveTime = 60 * time.Second - ClientKeapAliveTimeout = 2 * time.Second - ClientKeapAlivePermit = true + schedulerConnectMaxRetries = 100 + schedulerConnectBackoffScalar = 200 * time.Millisecond + // these keep alive settings need to match the scheduler counterpart in scheduler/pkg/util/constants.go + clientKeepAliveTime = 60 * time.Second + clientKeepAliveTimeout = 2 * time.Second + clientKeepAlivePermit = false + // backoff + backoffMaxElapsedTime = 0 // Never stop due to large time between calls + backOffMaxInterval = time.Second * 15 + backOffInitialInterval = time.Second ) type SchedulerClient struct { @@ -229,9 +234,9 @@ func (s *SchedulerClient) connectToScheduler(host string, namespace string, plai } } kacp := keepalive.ClientParameters{ - Time: ClientKeapAliveTime, - Timeout: ClientKeapAliveTimeout, - PermitWithoutStream: ClientKeapAlivePermit, + Time: clientKeepAliveTime, + Timeout: clientKeepAliveTimeout, + PermitWithoutStream: clientKeepAlivePermit, } retryOpts := []grpc_retry.CallOption{ @@ -249,7 +254,7 @@ func (s *SchedulerClient) connectToScheduler(host string, namespace string, plai opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) s.logger.Info("Running scheduler client in plain text mode", "port", port) } - opts = append(opts, grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(retryOpts...))) + // we dont have backoff retry on the grpc streams as we handle this in the event handlers opts = append(opts, grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(retryOpts...))) opts = append(opts, grpc.WithKeepaliveParams(kacp)) s.logger.Info("Dialing scheduler", "host", host, "port", port) @@ -313,7 +318,7 @@ func retryFn( logFailure := func(err error, delay time.Duration) { logger.Error(err, "Scheduler not ready") } - backOffExp := backoff.NewExponentialBackOff() + backOffExp := getClientExponentialBackoff() fnWithArgs := func() error { grpcClient := scheduler.NewSchedulerClient(conn) return fn(context.Background(), grpcClient, namespace) @@ -325,3 +330,11 @@ func retryFn( } return nil } + +func getClientExponentialBackoff() *backoff.ExponentialBackOff { + backOffExp := backoff.NewExponentialBackOff() + backOffExp.MaxElapsedTime = backoffMaxElapsedTime + backOffExp.MaxInterval = backOffMaxInterval + backOffExp.InitialInterval = backOffInitialInterval + return backOffExp +} diff --git a/operator/scheduler/control_plane.go b/operator/scheduler/control_plane.go index 9dbf18a590..9834cfea98 100644 --- a/operator/scheduler/control_plane.go +++ b/operator/scheduler/control_plane.go @@ -31,8 +31,8 @@ func (s *SchedulerClient) SubscribeControlPlaneEvents(ctx context.Context, grpcC stream, err := grpcClient.SubscribeControlPlane( ctx, &scheduler.ControlPlaneSubscriptionRequest{SubscriberName: "seldon manager"}, - grpc_retry.WithMax(SchedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)), + grpc_retry.WithMax(schedulerConnectMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) if err != nil { return err diff --git a/operator/scheduler/experiment.go b/operator/scheduler/experiment.go index c2b4275800..50be2e6cf8 100644 --- a/operator/scheduler/experiment.go +++ b/operator/scheduler/experiment.go @@ -43,8 +43,8 @@ func (s *SchedulerClient) StartExperiment(ctx context.Context, experiment *v1alp _, err = grpcClient.StartExperiment( ctx, req, - grpc_retry.WithMax(SchedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)), + grpc_retry.WithMax(schedulerConnectMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err } @@ -66,8 +66,8 @@ func (s *SchedulerClient) StopExperiment(ctx context.Context, experiment *v1alph _, err = grpcClient.StopExperiment( ctx, req, - grpc_retry.WithMax(SchedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)), + grpc_retry.WithMax(schedulerConnectMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) return s.checkErrorRetryable(experiment.Kind, experiment.Name, err), err } @@ -79,8 +79,8 @@ func (s *SchedulerClient) SubscribeExperimentEvents(ctx context.Context, grpcCli stream, err := grpcClient.SubscribeExperimentStatus( ctx, &scheduler.ExperimentSubscriptionRequest{SubscriberName: "seldon manager"}, - grpc_retry.WithMax(SchedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)), + grpc_retry.WithMax(schedulerConnectMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) if err != nil { return err diff --git a/operator/scheduler/model.go b/operator/scheduler/model.go index 767aabd84f..f49217be38 100644 --- a/operator/scheduler/model.go +++ b/operator/scheduler/model.go @@ -60,8 +60,8 @@ func (s *SchedulerClient) LoadModel(ctx context.Context, model *v1alpha1.Model, _, err = grpcClient.LoadModel( ctx, &loadModelRequest, - grpc_retry.WithMax(SchedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)), + grpc_retry.WithMax(schedulerConnectMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) if err != nil { return s.checkErrorRetryable(model.Kind, model.Name, err), err @@ -102,8 +102,8 @@ func (s *SchedulerClient) UnloadModel(ctx context.Context, model *v1alpha1.Model _, err = grpcClient.UnloadModel( ctx, modelRef, - grpc_retry.WithMax(SchedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)), + grpc_retry.WithMax(schedulerConnectMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) if err != nil { return s.checkErrorRetryable(model.Kind, model.Name, err), err @@ -117,8 +117,8 @@ func (s *SchedulerClient) SubscribeModelEvents(ctx context.Context, grpcClient s stream, err := grpcClient.SubscribeModelStatus( ctx, &scheduler.ModelSubscriptionRequest{SubscriberName: "seldon manager"}, - grpc_retry.WithMax(SchedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)), + grpc_retry.WithMax(schedulerConnectMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) if err != nil { return err diff --git a/operator/scheduler/pipeline.go b/operator/scheduler/pipeline.go index 586e84103d..fce4af998c 100644 --- a/operator/scheduler/pipeline.go +++ b/operator/scheduler/pipeline.go @@ -42,8 +42,8 @@ func (s *SchedulerClient) LoadPipeline(ctx context.Context, pipeline *v1alpha1.P _, err = grpcClient.LoadPipeline( ctx, &req, - grpc_retry.WithMax(SchedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)), + grpc_retry.WithMax(schedulerConnectMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) return s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err), err } @@ -62,8 +62,8 @@ func (s *SchedulerClient) UnloadPipeline(ctx context.Context, pipeline *v1alpha1 _, err = grpcClient.UnloadPipeline( ctx, &req, - grpc_retry.WithMax(SchedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)), + grpc_retry.WithMax(schedulerConnectMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) if err != nil { return err, s.checkErrorRetryable(pipeline.Kind, pipeline.Name, err) @@ -85,8 +85,8 @@ func (s *SchedulerClient) SubscribePipelineEvents(ctx context.Context, grpcClien stream, err := grpcClient.SubscribePipelineStatus( ctx, &scheduler.PipelineSubscriptionRequest{SubscriberName: "seldon manager"}, - grpc_retry.WithMax(SchedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)), + grpc_retry.WithMax(schedulerConnectMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) if err != nil { return err diff --git a/operator/scheduler/server.go b/operator/scheduler/server.go index af1f88fc19..8ecf39bf17 100644 --- a/operator/scheduler/server.go +++ b/operator/scheduler/server.go @@ -64,8 +64,8 @@ func (s *SchedulerClient) ServerNotify(ctx context.Context, grpcClient scheduler _, err := grpcClient.ServerNotify( ctx, request, - grpc_retry.WithMax(SchedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)), + grpc_retry.WithMax(schedulerConnectMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) if err != nil { logger.Error(err, "Failed to send notify server to scheduler") @@ -82,8 +82,8 @@ func (s *SchedulerClient) SubscribeServerEvents(ctx context.Context, grpcClient stream, err := grpcClient.SubscribeServerStatus( ctx, &scheduler.ServerSubscriptionRequest{SubscriberName: "seldon manager"}, - grpc_retry.WithMax(SchedulerConnectMaxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffExponential(SchedulerConnectBackoffScalar)), + grpc_retry.WithMax(schedulerConnectMaxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(schedulerConnectBackoffScalar)), ) if err != nil { return err diff --git a/scheduler/config/envoy.yaml b/scheduler/config/envoy.yaml index 5275cc8b15..fba6fa0de1 100644 --- a/scheduler/config/envoy.yaml +++ b/scheduler/config/envoy.yaml @@ -11,7 +11,12 @@ static_resources: socket_address: address: seldon-scheduler port_value: 9002 - http2_protocol_options: {} + http2_protocol_options: { + connection_keepalive: { + interval: 60s, + timeout: 2s, + } + } name: xds_cluster - connect_timeout: 0.250s type: LOGICAL_DNS diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt index f1d3de4066..b9a2c16a30 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt @@ -37,6 +37,7 @@ import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.runBlocking import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit import io.klogging.logger as coLogger @OptIn(FlowPreview::class) @@ -60,6 +61,9 @@ class PipelineSubscriber( .defaultServiceConfig(grpcServiceConfig) .usePlaintext() // Use TLS .enableRetry() + // these keep alive settings need to match the go counterpart in scheduler/pkg/util/constants.go + .keepAliveTime(60L, TimeUnit.SECONDS) + .keepAliveTimeout(2L, TimeUnit.SECONDS) .build() private val client = ChainerGrpcKt.ChainerCoroutineStub(channel) diff --git a/scheduler/pkg/agent/client.go b/scheduler/pkg/agent/client.go index 7698df52ab..79a388266d 100644 --- a/scheduler/pkg/agent/client.go +++ b/scheduler/pkg/agent/client.go @@ -24,7 +24,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/keepalive" "google.golang.org/protobuf/encoding/protojson" "github.com/seldonio/seldon-core/apis/go/v2/mlops/agent" @@ -237,8 +236,7 @@ func (c *Client) Start() error { logFailure := func(err error, delay time.Duration) { c.logger.WithError(err).Errorf("Scheduler not ready") } - backOffExp := backoff.NewExponentialBackOff() - backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls + backOffExp := util.GetClientExponentialBackoff() err := backoff.RetryNotify(c.StartService, backOffExp, logFailure) if err != nil { c.logger.WithError(err).Fatal("Failed to start client") @@ -417,11 +415,7 @@ func (c *Client) getConnection(host string, plainTxtPort int, tlsPort int) (*grp logger.Infof("Connecting (non-blocking) to scheduler at %s:%d", host, port) - kacp := keepalive.ClientParameters{ - Time: util.ClientKeapAliveTime, - Timeout: util.ClientKeapAliveTimeout, - PermitWithoutStream: util.ClientKeapAlivePermit, - } + kacp := util.GetClientKeepAliveParameters() opts := []grpc.DialOption{ grpc.WithTransportCredentials(transCreds), @@ -453,7 +447,7 @@ func (c *Client) StartService() error { Shared: true, AvailableMemoryBytes: c.stateManager.GetAvailableMemoryBytesWithOverCommit(), }, - grpc_retry.WithMax(1), + grpc_retry.WithMax(util.MaxGRPCRetriesOnStream), ) // TODO make configurable if err != nil { return err diff --git a/scheduler/pkg/agent/modelserver_controlplane/oip/v2.go b/scheduler/pkg/agent/modelserver_controlplane/oip/v2.go index 136de4e171..39fad22bf0 100644 --- a/scheduler/pkg/agent/modelserver_controlplane/oip/v2.go +++ b/scheduler/pkg/agent/modelserver_controlplane/oip/v2.go @@ -19,7 +19,6 @@ import ( "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" v2 "github.com/seldonio/seldon-core/apis/go/v2/mlops/v2_dataplane" @@ -51,19 +50,12 @@ func CreateV2GrpcConnection(v2Config V2Config) (*grpc.ClientConn, error) { grpc_retry.WithMax(v2Config.GRPRetryMaxCount), } - kacp := keepalive.ClientParameters{ - Time: util.ClientKeapAliveTime, - Timeout: util.ClientKeapAliveTimeout, - PermitWithoutStream: util.ClientKeapAlivePermit, - } - opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(v2Config.GRPCMaxMsgSizeBytes), grpc.MaxCallSendMsgSize(v2Config.GRPCMaxMsgSizeBytes)), grpc.WithStreamInterceptor(grpc_retry.StreamClientInterceptor(retryOpts...)), grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(retryOpts...)), grpc.WithStatsHandler(otelgrpc.NewClientHandler()), - grpc.WithKeepaliveParams(kacp), } conn, err := grpc.NewClient(fmt.Sprintf("%s:%d", v2Config.Host, v2Config.GRPCPort), opts...) if err != nil { diff --git a/scheduler/pkg/agent/server.go b/scheduler/pkg/agent/server.go index 5388b8d552..4cdb6ccaec 100644 --- a/scheduler/pkg/agent/server.go +++ b/scheduler/pkg/agent/server.go @@ -165,12 +165,16 @@ func (s *Server) startServer(port uint, secure bool) error { if err != nil { return err } + + kaep := util.GetServerKeepAliveEnforcementPolicy() + opts := []grpc.ServerOption{} if secure { opts = append(opts, grpc.Creds(s.certificateStore.CreateServerTransportCredentials())) } opts = append(opts, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams)) opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler())) + opts = append(opts, grpc.KeepaliveEnforcementPolicy(kaep)) grpcServer := grpc.NewServer(opts...) pb.RegisterAgentServiceServer(grpcServer, s) s.logger.Printf("Agent server running on %d mtls:%v", port, secure) diff --git a/scheduler/pkg/envoy/server/server.go b/scheduler/pkg/envoy/server/server.go index a46eaff52c..cb9d1bceed 100644 --- a/scheduler/pkg/envoy/server/server.go +++ b/scheduler/pkg/envoy/server/server.go @@ -24,6 +24,8 @@ import ( "google.golang.org/grpc" seldontls "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls" + + "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) const ( @@ -66,12 +68,14 @@ func (x *XDSServer) StartXDSServer(port uint) error { return err } } + kaep := util.GetServerKeepAliveEnforcementPolicy() secure := x.certificateStore != nil var grpcOptions []grpc.ServerOption if secure { grpcOptions = append(grpcOptions, grpc.Creds(x.certificateStore.CreateServerTransportCredentials())) } grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams)) + grpcOptions = append(grpcOptions, grpc.KeepaliveEnforcementPolicy(kaep)) grpcServer := grpc.NewServer(grpcOptions...) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) diff --git a/scheduler/pkg/kafka/dataflow/server.go b/scheduler/pkg/kafka/dataflow/server.go index 23ab122364..cdb2ce1969 100644 --- a/scheduler/pkg/kafka/dataflow/server.go +++ b/scheduler/pkg/kafka/dataflow/server.go @@ -82,8 +82,12 @@ func (c *ChainerServer) StartGrpcServer(agentPort uint) error { if err != nil { log.Fatalf("failed to create listener: %v", err) } + + kaep := util.GetServerKeepAliveEnforcementPolicy() + var grpcOptions []grpc.ServerOption grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams)) + grpcOptions = append(grpcOptions, grpc.KeepaliveEnforcementPolicy(kaep)) grpcServer := grpc.NewServer(grpcOptions...) chainer.RegisterChainerServer(grpcServer, c) c.logger.Printf("Chainer server running on %d", agentPort) diff --git a/scheduler/pkg/kafka/gateway/client.go b/scheduler/pkg/kafka/gateway/client.go index def18eb33f..314c935b59 100644 --- a/scheduler/pkg/kafka/gateway/client.go +++ b/scheduler/pkg/kafka/gateway/client.go @@ -22,7 +22,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/keepalive" "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" seldontls "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls" @@ -81,11 +80,7 @@ func (kc *KafkaSchedulerClient) ConnectToScheduler(host string, plainTxtPort int port = tlsPort } - kacp := keepalive.ClientParameters{ - Time: util.ClientKeapAliveTime, - Timeout: util.ClientKeapAliveTimeout, - PermitWithoutStream: util.ClientKeapAlivePermit, - } + kacp := util.GetClientKeepAliveParameters() // note: retry is done in the caller opts := []grpc.DialOption{ @@ -123,11 +118,7 @@ func (kc *KafkaSchedulerClient) Start() error { logFailure := func(err error, delay time.Duration) { kc.logger.WithError(err).Errorf("Scheduler not ready") } - backOffExp := backoff.NewExponentialBackOff() - // Set some reasonable settings for trying to reconnect to scheduler - backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls - backOffExp.MaxInterval = time.Second * 15 - backOffExp.InitialInterval = time.Second + backOffExp := util.GetClientExponentialBackoff() err := backoff.RetryNotify(kc.SubscribeModelEvents, backOffExp, logFailure) if err != nil { kc.logger.WithError(err).Fatal("Failed to start modelgateway client") @@ -141,7 +132,11 @@ func (kc *KafkaSchedulerClient) SubscribeModelEvents() error { logger := kc.logger.WithField("func", "SubscribeModelEvents") grpcClient := scheduler.NewSchedulerClient(kc.conn) logger.Info("Subscribing to model status events") - stream, errSub := grpcClient.SubscribeModelStatus(context.Background(), &scheduler.ModelSubscriptionRequest{SubscriberName: SubscriberName}, grpc_retry.WithMax(100)) + stream, errSub := grpcClient.SubscribeModelStatus( + context.Background(), + &scheduler.ModelSubscriptionRequest{SubscriberName: SubscriberName}, + grpc_retry.WithMax(util.MaxGRPCRetriesOnStream), + ) if errSub != nil { return errSub } diff --git a/scheduler/pkg/kafka/gateway/worker.go b/scheduler/pkg/kafka/gateway/worker.go index 3852556fdd..6b544c01a1 100644 --- a/scheduler/pkg/kafka/gateway/worker.go +++ b/scheduler/pkg/kafka/gateway/worker.go @@ -32,7 +32,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" @@ -124,12 +123,6 @@ func (iw *InferWorker) getGrpcClient(host string, port int) (v2.GRPCInferenceSer creds = insecure.NewCredentials() } - kacp := keepalive.ClientParameters{ - Time: util.ClientKeapAliveTime, - Timeout: util.ClientKeapAliveTimeout, - PermitWithoutStream: util.ClientKeapAlivePermit, - } - opts := []grpc.DialOption{ grpc.WithTransportCredentials(creds), grpc.WithDefaultCallOptions( @@ -142,7 +135,6 @@ func (iw *InferWorker) getGrpcClient(host string, port int) (v2.GRPCInferenceSer grpc.WithUnaryInterceptor( grpc_retry.UnaryClientInterceptor(retryOpts...), ), - grpc.WithKeepaliveParams(kacp), } conn, err := grpc.NewClient(fmt.Sprintf("%s:%d", host, port), opts...) diff --git a/scheduler/pkg/kafka/pipeline/status/client.go b/scheduler/pkg/kafka/pipeline/status/client.go index df77b4d91d..e9191bc4e1 100644 --- a/scheduler/pkg/kafka/pipeline/status/client.go +++ b/scheduler/pkg/kafka/pipeline/status/client.go @@ -22,7 +22,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/keepalive" "google.golang.org/protobuf/encoding/protojson" "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" @@ -87,11 +86,7 @@ func (pc *PipelineSchedulerClient) connectToScheduler(host string, plainTxtPort port = tlsPort } - kacp := keepalive.ClientParameters{ - Time: util.ClientKeapAliveTime, - Timeout: util.ClientKeapAliveTimeout, - PermitWithoutStream: util.ClientKeapAlivePermit, - } + kacp := util.GetClientKeepAliveParameters() // note: retry is done in the caller opts := []grpc.DialOption{ @@ -129,11 +124,7 @@ func (pc *PipelineSchedulerClient) Start(host string, plainTxtPort int, tlsPort logFailure := func(err error, delay time.Duration) { logger.WithError(err).Errorf("Scheduler not ready") } - backOffExp := backoff.NewExponentialBackOff() - // Set some reasonable settings for trying to reconnect to scheduler - backOffExp.MaxElapsedTime = 0 // Never stop due to large time between calls - backOffExp.MaxInterval = time.Second * 15 - backOffExp.InitialInterval = time.Second + backOffExp := util.GetClientExponentialBackoff() err = backoff.RetryNotify(pc.SubscribePipelineEvents, backOffExp, logFailure) if err != nil { logger.WithError(err).Fatal("Failed to start pipeline gateway client") @@ -147,7 +138,11 @@ func (pc *PipelineSchedulerClient) SubscribePipelineEvents() error { logger := pc.logger.WithField("func", "SubscribePipelineEvents") grpcClient := scheduler.NewSchedulerClient(pc.conn) logger.Info("Subscribing to pipeline status events") - stream, errSub := grpcClient.SubscribePipelineStatus(context.Background(), &scheduler.PipelineSubscriptionRequest{SubscriberName: SubscriberName}, grpc_retry.WithMax(100)) + stream, errSub := grpcClient.SubscribePipelineStatus( + context.Background(), + &scheduler.PipelineSubscriptionRequest{SubscriberName: SubscriberName}, + grpc_retry.WithMax(util.MaxGRPCRetriesOnStream), + ) if errSub != nil { return errSub } diff --git a/scheduler/pkg/server/server.go b/scheduler/pkg/server/server.go index 3f74c40da1..a9a369989c 100644 --- a/scheduler/pkg/server/server.go +++ b/scheduler/pkg/server/server.go @@ -32,6 +32,7 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/store/experiment" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store/pipeline" "github.com/seldonio/seldon-core/scheduler/v2/pkg/synchroniser" + "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) const ( @@ -131,12 +132,16 @@ func (s *SchedulerServer) startServer(port uint, secure bool) error { if err != nil { return err } + + kaep := util.GetServerKeepAliveEnforcementPolicy() + opts := []grpc.ServerOption{} if secure { opts = append(opts, grpc.Creds(s.certificateStore.CreateServerTransportCredentials())) } opts = append(opts, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams)) opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler())) + opts = append(opts, grpc.KeepaliveEnforcementPolicy(kaep)) grpcServer := grpc.NewServer(opts...) pb.RegisterSchedulerServer(grpcServer, s) s.logger.Printf("Scheduler server running on %d mtls:%v", port, secure) diff --git a/scheduler/pkg/store/experiment/store.go b/scheduler/pkg/store/experiment/store.go index 247946a203..30b9b22593 100644 --- a/scheduler/pkg/store/experiment/store.go +++ b/scheduler/pkg/store/experiment/store.go @@ -474,7 +474,6 @@ func (es *ExperimentStore) GetExperiments() ([]*Experiment, error) { func (es *ExperimentStore) cleanupDeletedExperiments() { es.mu.Lock() defer es.mu.Unlock() - es.logger.Info("cleaning up deleted experiments") for _, experiment := range es.experiments { if experiment.Deleted { if experiment.DeletedAt.IsZero() { @@ -487,6 +486,7 @@ func (es *ExperimentStore) cleanupDeletedExperiments() { } } else if experiment.DeletedAt.Add(utils.DeletedResourceTTL).Before(time.Now()) { delete(es.experiments, experiment.Name) + es.logger.Info("cleaning up deleted experiment: %s", experiment.Name) } } } diff --git a/scheduler/pkg/store/pipeline/store.go b/scheduler/pkg/store/pipeline/store.go index d9c056b5dd..d6e2c0d979 100644 --- a/scheduler/pkg/store/pipeline/store.go +++ b/scheduler/pkg/store/pipeline/store.go @@ -448,7 +448,6 @@ func (ps *PipelineStore) handleModelEvents(event coordinator.ModelEventMsg) { func (ps *PipelineStore) cleanupDeletedPipelines() { ps.mu.Lock() defer ps.mu.Unlock() - ps.logger.Info("cleaning up deleted pipelines") for _, pipeline := range ps.pipelines { if pipeline.Deleted { if pipeline.DeletedAt.IsZero() { @@ -461,6 +460,7 @@ func (ps *PipelineStore) cleanupDeletedPipelines() { } } else if pipeline.DeletedAt.Add(utils.DeletedResourceTTL).Before(time.Now()) { delete(ps.pipelines, pipeline.Name) + ps.logger.Info("cleaning up deleted pipeline: %s", pipeline.Name) } } } diff --git a/scheduler/pkg/util/constants.go b/scheduler/pkg/util/constants.go index 971c4a860c..ea837db8ce 100644 --- a/scheduler/pkg/util/constants.go +++ b/scheduler/pkg/util/constants.go @@ -36,7 +36,14 @@ const ( const ( EnvoyUpdateDefaultBatchWait = 250 * time.Millisecond - ClientKeapAliveTime = 60 * time.Second - ClientKeapAliveTimeout = 2 * time.Second - ClientKeapAlivePermit = true + // note that we keep client and server keepalive times the same + // they need to match counterparts in controller client: operator/scheduler/client.go + // and dataflow-engine: data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt + gRPCKeepAliveTime = 60 * time.Second + clientKeepAliveTimeout = 2 * time.Second + gRPCKeepAlivePermit = false + MaxGRPCRetriesOnStream = 100 // this is at the grpc library level + backOffExpMaxElapsedTime = 0 // Never stop due to large time between calls + backOffExpMaxInterval = time.Second * 15 + backOffExpInitialInterval = time.Second ) diff --git a/scheduler/pkg/util/grpc.go b/scheduler/pkg/util/grpc.go new file mode 100644 index 0000000000..96aed017da --- /dev/null +++ b/scheduler/pkg/util/grpc.go @@ -0,0 +1,38 @@ +/* +Copyright (c) 2024 Seldon Technologies Ltd. + +Use of this software is governed by +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ + +package util + +import ( + "github.com/cenkalti/backoff/v4" + "google.golang.org/grpc/keepalive" +) + +func GetClientKeepAliveParameters() keepalive.ClientParameters { + return keepalive.ClientParameters{ + Time: gRPCKeepAliveTime, + Timeout: clientKeepAliveTimeout, + PermitWithoutStream: gRPCKeepAlivePermit, + } +} + +func GetServerKeepAliveEnforcementPolicy() keepalive.EnforcementPolicy { + return keepalive.EnforcementPolicy{ + MinTime: gRPCKeepAliveTime, + PermitWithoutStream: gRPCKeepAlivePermit, + } +} + +func GetClientExponentialBackoff() *backoff.ExponentialBackOff { + backOffExp := backoff.NewExponentialBackOff() + backOffExp.MaxElapsedTime = backOffExpMaxElapsedTime + backOffExp.MaxInterval = backOffExpMaxInterval + backOffExp.InitialInterval = backOffExpInitialInterval + return backOffExp +}