diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index c19ec1d625d..094acd1570e 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1333,6 +1333,12 @@ This feature is still under development and should NOT be enabled.`, 0*time.Second, `HistoryStartupMembershipJoinDelay is the duration a history instance waits before joining membership after starting.`, + ) + HistoryAlignMembershipChange = NewGlobalDurationSetting( + "history.alignMembershipChange", + 0*time.Second, + `HistoryAlignMembershipChange is a duration to align history's membership changes to. +This can help reduce effects of shard movement.`, ) HistoryShutdownDrainDuration = NewGlobalDurationSetting( "history.shutdownDrainDuration", diff --git a/common/membership/ringpop/factory.go b/common/membership/ringpop/factory.go index 4b0b4e8c635..2c80c22ae88 100644 --- a/common/membership/ringpop/factory.go +++ b/common/membership/ringpop/factory.go @@ -156,6 +156,8 @@ func (factory *factory) getJoinTime(maxPropagationTime time.Duration) time.Time switch factory.ServiceName { case primitives.MatchingService: alignTime = dynamicconfig.MatchingAlignMembershipChange.Get(factory.DC)() + case primitives.HistoryService: + alignTime = dynamicconfig.HistoryAlignMembershipChange.Get(factory.DC)() } if alignTime == 0 { return time.Time{} diff --git a/service/history/configs/config.go b/service/history/configs/config.go index e42fcc0e6d6..aa29174ef66 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -64,6 +64,7 @@ type Config struct { MaxAutoResetPoints dynamicconfig.IntPropertyFnWithNamespaceFilter ThrottledLogRPS dynamicconfig.IntPropertyFn EnableStickyQuery dynamicconfig.BoolPropertyFnWithNamespaceFilter + AlignMembershipChange dynamicconfig.DurationPropertyFn ShutdownDrainDuration dynamicconfig.DurationPropertyFn StartupMembershipJoinDelay dynamicconfig.DurationPropertyFn @@ -393,6 +394,7 @@ func NewConfig( PersistencePerShardNamespaceMaxQPS: dynamicconfig.HistoryPersistencePerShardNamespaceMaxQPS.Get(dc), PersistenceDynamicRateLimitingParams: dynamicconfig.HistoryPersistenceDynamicRateLimitingParams.Get(dc), PersistenceQPSBurstRatio: dynamicconfig.PersistenceQPSBurstRatio.Get(dc), + AlignMembershipChange: dynamicconfig.HistoryAlignMembershipChange.Get(dc), ShutdownDrainDuration: dynamicconfig.HistoryShutdownDrainDuration.Get(dc), StartupMembershipJoinDelay: dynamicconfig.HistoryStartupMembershipJoinDelay.Get(dc), MaxAutoResetPoints: dynamicconfig.HistoryMaxAutoResetPoints.Get(dc), diff --git a/service/history/service.go b/service/history/service.go index 009b12926d5..c02aa605fef 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -34,6 +34,7 @@ import ( "go.temporal.io/server/common/membership" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/common/util" "go.temporal.io/server/service/history/configs" "google.golang.org/grpc" "google.golang.org/grpc/health" @@ -119,22 +120,36 @@ func (s *Service) Start() { // Stop stops the service func (s *Service) Stop() { - s.logger.Info("ShutdownHandler: Evicting self from membership ring") - _ = s.membershipMonitor.EvictSelf() - - if delay := s.config.ShutdownDrainDuration(); delay > 0 { - s.logger.Info("ShutdownHandler: delaying for shutdown drain", - tag.NewDurationTag("shutdownDrainDuration", delay)) - time.Sleep(delay) + // remove self from membership ring and wait for traffic to drain + var err error + var waitTime time.Duration + if align := s.config.AlignMembershipChange(); align > 0 { + propagation := s.membershipMonitor.ApproximateMaxPropagationTime() + asOf := util.NextAlignedTime(time.Now().Add(propagation), align) + s.logger.Info("ShutdownHandler: Evicting self from membership ring as of", tag.Timestamp(asOf)) + waitTime, err = s.membershipMonitor.EvictSelfAt(asOf) + } else { + s.logger.Info("ShutdownHandler: Evicting self from membership ring immediately") + err = s.membershipMonitor.EvictSelf() + } + if err != nil { + s.logger.Error("ShutdownHandler: Failed to evict self from membership ring", tag.Error(err)) } - s.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_NOT_SERVING) + s.logger.Info("ShutdownHandler: Waiting for drain") + time.Sleep(max(s.config.ShutdownDrainDuration(), waitTime)) + s.logger.Info("ShutdownHandler: Initiating shardController shutdown") s.handler.controller.Stop() - // TODO: Change this to GracefulStop when integration tests are refactored. - s.server.Stop() + // All grpc handlers should be cancelled now. Give them a little time to return. + t := time.AfterFunc(2*time.Second, func() { + s.logger.Info("ShutdownHandler: Drain time expired, stopping all traffic") + s.server.Stop() + }) + s.server.GracefulStop() + t.Stop() s.handler.Stop() s.visibilityManager.Close()