Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add history.alignMembershipChange setting #6510

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,12 @@ This feature is still under development and should NOT be enabled.`,
0*time.Second,
`HistoryStartupMembershipJoinDelay is the duration a history instance waits
before joining membership after starting.`,
)
HistoryAlignMembershipChange = NewGlobalDurationSetting(
"history.alignMembershipChange",
0*time.Second,
`HistoryAlignMembershipChange is a duration to align history's membership changes to.
This can help reduce effects of shard movement.`,
)
HistoryShutdownDrainDuration = NewGlobalDurationSetting(
"history.shutdownDrainDuration",
Expand Down
2 changes: 2 additions & 0 deletions common/membership/ringpop/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ func (factory *factory) getJoinTime(maxPropagationTime time.Duration) time.Time
switch factory.ServiceName {
case primitives.MatchingService:
alignTime = dynamicconfig.MatchingAlignMembershipChange.Get(factory.DC)()
case primitives.HistoryService:
alignTime = dynamicconfig.HistoryAlignMembershipChange.Get(factory.DC)()
}
if alignTime == 0 {
return time.Time{}
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Config struct {
MaxAutoResetPoints dynamicconfig.IntPropertyFnWithNamespaceFilter
ThrottledLogRPS dynamicconfig.IntPropertyFn
EnableStickyQuery dynamicconfig.BoolPropertyFnWithNamespaceFilter
AlignMembershipChange dynamicconfig.DurationPropertyFn
ShutdownDrainDuration dynamicconfig.DurationPropertyFn
StartupMembershipJoinDelay dynamicconfig.DurationPropertyFn

Expand Down Expand Up @@ -393,6 +394,7 @@ func NewConfig(
PersistencePerShardNamespaceMaxQPS: dynamicconfig.HistoryPersistencePerShardNamespaceMaxQPS.Get(dc),
PersistenceDynamicRateLimitingParams: dynamicconfig.HistoryPersistenceDynamicRateLimitingParams.Get(dc),
PersistenceQPSBurstRatio: dynamicconfig.PersistenceQPSBurstRatio.Get(dc),
AlignMembershipChange: dynamicconfig.HistoryAlignMembershipChange.Get(dc),
ShutdownDrainDuration: dynamicconfig.HistoryShutdownDrainDuration.Get(dc),
StartupMembershipJoinDelay: dynamicconfig.HistoryStartupMembershipJoinDelay.Get(dc),
MaxAutoResetPoints: dynamicconfig.HistoryMaxAutoResetPoints.Get(dc),
Expand Down
35 changes: 25 additions & 10 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/configs"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
Expand Down Expand Up @@ -119,22 +120,36 @@ func (s *Service) Start() {

// Stop stops the service
func (s *Service) Stop() {
s.logger.Info("ShutdownHandler: Evicting self from membership ring")
_ = s.membershipMonitor.EvictSelf()

if delay := s.config.ShutdownDrainDuration(); delay > 0 {
s.logger.Info("ShutdownHandler: delaying for shutdown drain",
tag.NewDurationTag("shutdownDrainDuration", delay))
time.Sleep(delay)
// remove self from membership ring and wait for traffic to drain
var err error
var waitTime time.Duration
if align := s.config.AlignMembershipChange(); align > 0 {
propagation := s.membershipMonitor.ApproximateMaxPropagationTime()
asOf := util.NextAlignedTime(time.Now().Add(propagation), align)
s.logger.Info("ShutdownHandler: Evicting self from membership ring as of", tag.Timestamp(asOf))
waitTime, err = s.membershipMonitor.EvictSelfAt(asOf)
} else {
s.logger.Info("ShutdownHandler: Evicting self from membership ring immediately")
err = s.membershipMonitor.EvictSelf()
}
if err != nil {
s.logger.Error("ShutdownHandler: Failed to evict self from membership ring", tag.Error(err))
}

s.healthServer.SetServingStatus(serviceName, healthpb.HealthCheckResponse_NOT_SERVING)

s.logger.Info("ShutdownHandler: Waiting for drain")
time.Sleep(max(s.config.ShutdownDrainDuration(), waitTime))

s.logger.Info("ShutdownHandler: Initiating shardController shutdown")
s.handler.controller.Stop()

// TODO: Change this to GracefulStop when integration tests are refactored.
s.server.Stop()
// All grpc handlers should be cancelled now. Give them a little time to return.
t := time.AfterFunc(2*time.Second, func() {
s.logger.Info("ShutdownHandler: Drain time expired, stopping all traffic")
s.server.Stop()
})
s.server.GracefulStop()
t.Stop()

s.handler.Stop()
s.visibilityManager.Close()
Expand Down
Loading