From 8f254f2a8120052364d83a6ce1b71a029b08a9bc Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Sat, 8 Feb 2025 12:04:14 +0800 Subject: [PATCH] fix: uneven distribution caused by executing task delta cache leak this PR maintain workload effect in action instead of computing workload effect from target, which may cause leak if target changes. Signed-off-by: Wei Liu --- internal/querycoordv2/balance/utils.go | 4 +- .../querycoordv2/checkers/index_checker.go | 2 +- .../querycoordv2/checkers/segment_checker.go | 2 +- internal/querycoordv2/handlers.go | 4 +- internal/querycoordv2/task/action.go | 62 ++++++++++++++----- internal/querycoordv2/task/scheduler.go | 28 +-------- internal/querycoordv2/task/task_test.go | 12 +--- 7 files changed, 56 insertions(+), 58 deletions(-) diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go index f248728e1099c..ebec9ced37dab 100644 --- a/internal/querycoordv2/balance/utils.go +++ b/internal/querycoordv2/balance/utils.go @@ -39,11 +39,11 @@ func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeou for _, p := range plans { actions := make([]task.Action, 0) if p.To != -1 { - action := task.NewSegmentActionWithScope(p.To, task.ActionTypeGrow, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical) + action := task.NewSegmentActionWithScope(p.To, task.ActionTypeGrow, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical, int(p.Segment.GetNumOfRows())) actions = append(actions, action) } if p.From != -1 { - action := task.NewSegmentActionWithScope(p.From, task.ActionTypeReduce, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical) + action := task.NewSegmentActionWithScope(p.From, task.ActionTypeReduce, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical, int(p.Segment.GetNumOfRows())) actions = append(actions, action) } t, err := task.NewSegmentTask( diff --git a/internal/querycoordv2/checkers/index_checker.go b/internal/querycoordv2/checkers/index_checker.go index 99fc36abf395b..a7f49447d8bf5 100644 --- a/internal/querycoordv2/checkers/index_checker.go +++ b/internal/querycoordv2/checkers/index_checker.go @@ -177,7 +177,7 @@ func (c *IndexChecker) checkSegment(segment *meta.Segment, indexInfos []*indexpb } func (c *IndexChecker) createSegmentUpdateTask(ctx context.Context, segment *meta.Segment, replica *meta.Replica) (task.Task, bool) { - action := task.NewSegmentActionWithScope(segment.Node, task.ActionTypeUpdate, segment.GetInsertChannel(), segment.GetID(), querypb.DataScope_Historical) + action := task.NewSegmentActionWithScope(segment.Node, task.ActionTypeUpdate, segment.GetInsertChannel(), segment.GetID(), querypb.DataScope_Historical, int(segment.GetNumOfRows())) t, err := task.NewSegmentTask( ctx, params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index ca6d108c5c953..5a9a499a606e8 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -449,7 +449,7 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments []*meta.Segment, replica *meta.Replica, scope querypb.DataScope) []task.Task { ret := make([]task.Task, 0, len(segments)) for _, s := range segments { - action := task.NewSegmentActionWithScope(s.Node, task.ActionTypeReduce, s.GetInsertChannel(), s.GetID(), scope) + action := task.NewSegmentActionWithScope(s.Node, task.ActionTypeReduce, s.GetInsertChannel(), s.GetID(), scope, int(s.GetNumOfRows())) task, err := task.NewSegmentTask( ctx, Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index b6d79c3026c03..894893fada873 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -119,11 +119,11 @@ func (s *Server) balanceSegments(ctx context.Context, zap.Int64("segmentID", plan.Segment.GetID()), ) actions := make([]task.Action, 0) - loadAction := task.NewSegmentActionWithScope(plan.To, task.ActionTypeGrow, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical) + loadAction := task.NewSegmentActionWithScope(plan.To, task.ActionTypeGrow, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical, int(plan.Segment.GetNumOfRows())) actions = append(actions, loadAction) if !copyMode { // if in copy mode, the release action will be skip - releaseAction := task.NewSegmentActionWithScope(plan.From, task.ActionTypeReduce, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical) + releaseAction := task.NewSegmentActionWithScope(plan.From, task.ActionTypeReduce, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical, int(plan.Segment.GetNumOfRows())) actions = append(actions, releaseAction) } diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index d8499fd491d1f..a9c3d76615528 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -51,19 +51,26 @@ type Action interface { IsFinished(distMgr *meta.DistributionManager) bool Desc() string String() string + + // return current action's workload effect on target query node + // which only works for `Grow` and `Reduce`, cause `Update` won't change query node's workload + WorkLoadEffect() int } type BaseAction struct { NodeID typeutil.UniqueID Typ ActionType Shard string + + workloadEffect int } -func NewBaseAction(nodeID typeutil.UniqueID, typ ActionType, shard string) *BaseAction { +func NewBaseAction(nodeID typeutil.UniqueID, typ ActionType, shard string, workLoadEffect int) *BaseAction { return &BaseAction{ - NodeID: nodeID, - Typ: typ, - Shard: shard, + NodeID: nodeID, + Typ: typ, + Shard: shard, + workloadEffect: workLoadEffect, } } @@ -83,6 +90,10 @@ func (action *BaseAction) String() string { return fmt.Sprintf(`{[type=%v][node=%d][shard=%v]}`, action.Type(), action.Node(), action.Shard) } +func (action *BaseAction) WorkLoadEffect() int { + return action.workloadEffect +} + type SegmentAction struct { *BaseAction @@ -92,14 +103,23 @@ type SegmentAction struct { rpcReturned atomic.Bool } +// Deprecate, only for existing unit test func NewSegmentAction(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID) *SegmentAction { - return NewSegmentActionWithScope(nodeID, typ, shard, segmentID, querypb.DataScope_All) -} - -func NewSegmentActionWithScope(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, scope querypb.DataScope) *SegmentAction { - base := NewBaseAction(nodeID, typ, shard) + return NewSegmentActionWithScope(nodeID, typ, shard, segmentID, querypb.DataScope_All, 0) +} + +func NewSegmentActionWithScope(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, scope querypb.DataScope, rowCount int) *SegmentAction { + workloadEffect := 0 + switch typ { + case ActionTypeGrow: + workloadEffect = rowCount + case ActionTypeReduce: + workloadEffect = -rowCount + default: + workloadEffect = 0 + } return &SegmentAction{ - BaseAction: base, + BaseAction: NewBaseAction(nodeID, typ, shard, workloadEffect), SegmentID: segmentID, Scope: scope, rpcReturned: *atomic.NewBool(false), @@ -131,8 +151,17 @@ type ChannelAction struct { } func NewChannelAction(nodeID typeutil.UniqueID, typ ActionType, channelName string) *ChannelAction { + workloadEffect := 0 + switch typ { + case ActionTypeGrow: + workloadEffect = 1 + case ActionTypeReduce: + workloadEffect = -1 + default: + workloadEffect = 0 + } return &ChannelAction{ - BaseAction: NewBaseAction(nodeID, typ, channelName), + BaseAction: NewBaseAction(nodeID, typ, channelName, workloadEffect), } } @@ -167,11 +196,10 @@ type LeaderAction struct { func NewLeaderAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, version typeutil.UniqueID) *LeaderAction { action := &LeaderAction{ - BaseAction: NewBaseAction(workerID, typ, shard), - - leaderID: leaderID, - segmentID: segmentID, - version: version, + BaseAction: NewBaseAction(workerID, typ, shard, 0), + leaderID: leaderID, + segmentID: segmentID, + version: version, } action.rpcReturned.Store(false) return action @@ -179,7 +207,7 @@ func NewLeaderAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard func NewLeaderUpdatePartStatsAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, partStatsVersions map[int64]int64) *LeaderAction { action := &LeaderAction{ - BaseAction: NewBaseAction(workerID, typ, shard), + BaseAction: NewBaseAction(workerID, typ, shard, 0), leaderID: leaderID, partStatsVersions: partStatsVersions, } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index a997d95c157ba..770cb0bd51905 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -596,7 +596,7 @@ func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) func (scheduler *taskScheduler) incExecutingTaskDelta(task Task) { for _, action := range task.Actions() { - delta := scheduler.computeActionDelta(task.CollectionID(), action) + delta := action.WorkLoadEffect() switch action.(type) { case *SegmentAction: scheduler.segmentTaskDelta.Add(action.Node(), task.CollectionID(), delta) @@ -608,7 +608,7 @@ func (scheduler *taskScheduler) incExecutingTaskDelta(task Task) { func (scheduler *taskScheduler) decExecutingTaskDelta(task Task) { for _, action := range task.Actions() { - delta := scheduler.computeActionDelta(task.CollectionID(), action) + delta := action.WorkLoadEffect() switch action.(type) { case *SegmentAction: scheduler.segmentTaskDelta.Sub(action.Node(), task.CollectionID(), delta) @@ -618,30 +618,6 @@ func (scheduler *taskScheduler) decExecutingTaskDelta(task Task) { } } -func (scheduler *taskScheduler) computeActionDelta(collectionID int64, action Action) int { - delta := 0 - if action.Type() == ActionTypeGrow { - delta = 1 - } else if action.Type() == ActionTypeReduce { - delta = -1 - } - - switch action := action.(type) { - case *SegmentAction: - // skip growing segment's count, cause doesn't know realtime row number of growing segment - if action.Scope == querypb.DataScope_Historical { - segment := scheduler.targetMgr.GetSealedSegment(scheduler.ctx, collectionID, action.SegmentID, meta.NextTargetFirst) - if segment != nil { - return int(segment.GetNumOfRows()) * delta - } - } - case *ChannelAction: - return delta - } - - return 0 -} - func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} { executor, ok := scheduler.executors.Get(nodeID) if !ok { diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index be5fc021bc768..da495a2e36270 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -763,7 +763,7 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() { WrapIDSource(0), suite.collection, suite.replica, - NewSegmentActionWithScope(targetNode, ActionTypeReduce, "", segment, querypb.DataScope_Streaming), + NewSegmentActionWithScope(targetNode, ActionTypeReduce, "", segment, querypb.DataScope_Streaming, 0), ) suite.NoError(err) tasks = append(tasks, task) @@ -1819,12 +1819,6 @@ func (suite *TaskSuite) TestCalculateTaskDelta() { ctx := context.Background() scheduler := suite.newScheduler() - mockTarget := meta.NewMockTargetManager(suite.T()) - mockTarget.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&datapb.SegmentInfo{ - NumOfRows: 100, - }) - scheduler.targetMgr = mockTarget - coll := int64(1001) nodeID := int64(1) channelName := "channel-1" @@ -1836,7 +1830,7 @@ func (suite *TaskSuite) TestCalculateTaskDelta() { WrapIDSource(0), coll, suite.replica, - NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", segmentID, querypb.DataScope_Historical), + NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", segmentID, querypb.DataScope_Historical, 100), ) suite.NoError(err) err = scheduler.Add(task1) @@ -1863,7 +1857,7 @@ func (suite *TaskSuite) TestCalculateTaskDelta() { WrapIDSource(0), coll2, suite.replica, - NewSegmentActionWithScope(nodeID2, ActionTypeGrow, "", segmentID2, querypb.DataScope_Historical), + NewSegmentActionWithScope(nodeID2, ActionTypeGrow, "", segmentID2, querypb.DataScope_Historical, 100), ) suite.NoError(err) err = scheduler.Add(task3)