diff --git a/internal/querycoordv2/balance/utils.go b/internal/querycoordv2/balance/utils.go index f248728e1099c..ebec9ced37dab 100644 --- a/internal/querycoordv2/balance/utils.go +++ b/internal/querycoordv2/balance/utils.go @@ -39,11 +39,11 @@ func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeou for _, p := range plans { actions := make([]task.Action, 0) if p.To != -1 { - action := task.NewSegmentActionWithScope(p.To, task.ActionTypeGrow, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical) + action := task.NewSegmentActionWithScope(p.To, task.ActionTypeGrow, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical, int(p.Segment.GetNumOfRows())) actions = append(actions, action) } if p.From != -1 { - action := task.NewSegmentActionWithScope(p.From, task.ActionTypeReduce, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical) + action := task.NewSegmentActionWithScope(p.From, task.ActionTypeReduce, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical, int(p.Segment.GetNumOfRows())) actions = append(actions, action) } t, err := task.NewSegmentTask( diff --git a/internal/querycoordv2/checkers/index_checker.go b/internal/querycoordv2/checkers/index_checker.go index 99fc36abf395b..a7f49447d8bf5 100644 --- a/internal/querycoordv2/checkers/index_checker.go +++ b/internal/querycoordv2/checkers/index_checker.go @@ -177,7 +177,7 @@ func (c *IndexChecker) checkSegment(segment *meta.Segment, indexInfos []*indexpb } func (c *IndexChecker) createSegmentUpdateTask(ctx context.Context, segment *meta.Segment, replica *meta.Replica) (task.Task, bool) { - action := task.NewSegmentActionWithScope(segment.Node, task.ActionTypeUpdate, segment.GetInsertChannel(), segment.GetID(), querypb.DataScope_Historical) + action := task.NewSegmentActionWithScope(segment.Node, task.ActionTypeUpdate, segment.GetInsertChannel(), segment.GetID(), querypb.DataScope_Historical, int(segment.GetNumOfRows())) t, err := task.NewSegmentTask( ctx, params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index ca6d108c5c953..5a9a499a606e8 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -449,7 +449,7 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments [] func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments []*meta.Segment, replica *meta.Replica, scope querypb.DataScope) []task.Task { ret := make([]task.Task, 0, len(segments)) for _, s := range segments { - action := task.NewSegmentActionWithScope(s.Node, task.ActionTypeReduce, s.GetInsertChannel(), s.GetID(), scope) + action := task.NewSegmentActionWithScope(s.Node, task.ActionTypeReduce, s.GetInsertChannel(), s.GetID(), scope, int(s.GetNumOfRows())) task, err := task.NewSegmentTask( ctx, Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index b6d79c3026c03..894893fada873 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -119,11 +119,11 @@ func (s *Server) balanceSegments(ctx context.Context, zap.Int64("segmentID", plan.Segment.GetID()), ) actions := make([]task.Action, 0) - loadAction := task.NewSegmentActionWithScope(plan.To, task.ActionTypeGrow, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical) + loadAction := task.NewSegmentActionWithScope(plan.To, task.ActionTypeGrow, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical, int(plan.Segment.GetNumOfRows())) actions = append(actions, loadAction) if !copyMode { // if in copy mode, the release action will be skip - releaseAction := task.NewSegmentActionWithScope(plan.From, task.ActionTypeReduce, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical) + releaseAction := task.NewSegmentActionWithScope(plan.From, task.ActionTypeReduce, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical, int(plan.Segment.GetNumOfRows())) actions = append(actions, releaseAction) } diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index d8499fd491d1f..a9c3d76615528 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -51,19 +51,26 @@ type Action interface { IsFinished(distMgr *meta.DistributionManager) bool Desc() string String() string + + // return current action's workload effect on target query node + // which only works for `Grow` and `Reduce`, cause `Update` won't change query node's workload + WorkLoadEffect() int } type BaseAction struct { NodeID typeutil.UniqueID Typ ActionType Shard string + + workloadEffect int } -func NewBaseAction(nodeID typeutil.UniqueID, typ ActionType, shard string) *BaseAction { +func NewBaseAction(nodeID typeutil.UniqueID, typ ActionType, shard string, workLoadEffect int) *BaseAction { return &BaseAction{ - NodeID: nodeID, - Typ: typ, - Shard: shard, + NodeID: nodeID, + Typ: typ, + Shard: shard, + workloadEffect: workLoadEffect, } } @@ -83,6 +90,10 @@ func (action *BaseAction) String() string { return fmt.Sprintf(`{[type=%v][node=%d][shard=%v]}`, action.Type(), action.Node(), action.Shard) } +func (action *BaseAction) WorkLoadEffect() int { + return action.workloadEffect +} + type SegmentAction struct { *BaseAction @@ -92,14 +103,23 @@ type SegmentAction struct { rpcReturned atomic.Bool } +// Deprecate, only for existing unit test func NewSegmentAction(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID) *SegmentAction { - return NewSegmentActionWithScope(nodeID, typ, shard, segmentID, querypb.DataScope_All) -} - -func NewSegmentActionWithScope(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, scope querypb.DataScope) *SegmentAction { - base := NewBaseAction(nodeID, typ, shard) + return NewSegmentActionWithScope(nodeID, typ, shard, segmentID, querypb.DataScope_All, 0) +} + +func NewSegmentActionWithScope(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, scope querypb.DataScope, rowCount int) *SegmentAction { + workloadEffect := 0 + switch typ { + case ActionTypeGrow: + workloadEffect = rowCount + case ActionTypeReduce: + workloadEffect = -rowCount + default: + workloadEffect = 0 + } return &SegmentAction{ - BaseAction: base, + BaseAction: NewBaseAction(nodeID, typ, shard, workloadEffect), SegmentID: segmentID, Scope: scope, rpcReturned: *atomic.NewBool(false), @@ -131,8 +151,17 @@ type ChannelAction struct { } func NewChannelAction(nodeID typeutil.UniqueID, typ ActionType, channelName string) *ChannelAction { + workloadEffect := 0 + switch typ { + case ActionTypeGrow: + workloadEffect = 1 + case ActionTypeReduce: + workloadEffect = -1 + default: + workloadEffect = 0 + } return &ChannelAction{ - BaseAction: NewBaseAction(nodeID, typ, channelName), + BaseAction: NewBaseAction(nodeID, typ, channelName, workloadEffect), } } @@ -167,11 +196,10 @@ type LeaderAction struct { func NewLeaderAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, version typeutil.UniqueID) *LeaderAction { action := &LeaderAction{ - BaseAction: NewBaseAction(workerID, typ, shard), - - leaderID: leaderID, - segmentID: segmentID, - version: version, + BaseAction: NewBaseAction(workerID, typ, shard, 0), + leaderID: leaderID, + segmentID: segmentID, + version: version, } action.rpcReturned.Store(false) return action @@ -179,7 +207,7 @@ func NewLeaderAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard func NewLeaderUpdatePartStatsAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, partStatsVersions map[int64]int64) *LeaderAction { action := &LeaderAction{ - BaseAction: NewBaseAction(workerID, typ, shard), + BaseAction: NewBaseAction(workerID, typ, shard, 0), leaderID: leaderID, partStatsVersions: partStatsVersions, } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index a997d95c157ba..770cb0bd51905 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -596,7 +596,7 @@ func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) func (scheduler *taskScheduler) incExecutingTaskDelta(task Task) { for _, action := range task.Actions() { - delta := scheduler.computeActionDelta(task.CollectionID(), action) + delta := action.WorkLoadEffect() switch action.(type) { case *SegmentAction: scheduler.segmentTaskDelta.Add(action.Node(), task.CollectionID(), delta) @@ -608,7 +608,7 @@ func (scheduler *taskScheduler) incExecutingTaskDelta(task Task) { func (scheduler *taskScheduler) decExecutingTaskDelta(task Task) { for _, action := range task.Actions() { - delta := scheduler.computeActionDelta(task.CollectionID(), action) + delta := action.WorkLoadEffect() switch action.(type) { case *SegmentAction: scheduler.segmentTaskDelta.Sub(action.Node(), task.CollectionID(), delta) @@ -618,30 +618,6 @@ func (scheduler *taskScheduler) decExecutingTaskDelta(task Task) { } } -func (scheduler *taskScheduler) computeActionDelta(collectionID int64, action Action) int { - delta := 0 - if action.Type() == ActionTypeGrow { - delta = 1 - } else if action.Type() == ActionTypeReduce { - delta = -1 - } - - switch action := action.(type) { - case *SegmentAction: - // skip growing segment's count, cause doesn't know realtime row number of growing segment - if action.Scope == querypb.DataScope_Historical { - segment := scheduler.targetMgr.GetSealedSegment(scheduler.ctx, collectionID, action.SegmentID, meta.NextTargetFirst) - if segment != nil { - return int(segment.GetNumOfRows()) * delta - } - } - case *ChannelAction: - return delta - } - - return 0 -} - func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} { executor, ok := scheduler.executors.Get(nodeID) if !ok { diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index be5fc021bc768..da495a2e36270 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -763,7 +763,7 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() { WrapIDSource(0), suite.collection, suite.replica, - NewSegmentActionWithScope(targetNode, ActionTypeReduce, "", segment, querypb.DataScope_Streaming), + NewSegmentActionWithScope(targetNode, ActionTypeReduce, "", segment, querypb.DataScope_Streaming, 0), ) suite.NoError(err) tasks = append(tasks, task) @@ -1819,12 +1819,6 @@ func (suite *TaskSuite) TestCalculateTaskDelta() { ctx := context.Background() scheduler := suite.newScheduler() - mockTarget := meta.NewMockTargetManager(suite.T()) - mockTarget.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&datapb.SegmentInfo{ - NumOfRows: 100, - }) - scheduler.targetMgr = mockTarget - coll := int64(1001) nodeID := int64(1) channelName := "channel-1" @@ -1836,7 +1830,7 @@ func (suite *TaskSuite) TestCalculateTaskDelta() { WrapIDSource(0), coll, suite.replica, - NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", segmentID, querypb.DataScope_Historical), + NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", segmentID, querypb.DataScope_Historical, 100), ) suite.NoError(err) err = scheduler.Add(task1) @@ -1863,7 +1857,7 @@ func (suite *TaskSuite) TestCalculateTaskDelta() { WrapIDSource(0), coll2, suite.replica, - NewSegmentActionWithScope(nodeID2, ActionTypeGrow, "", segmentID2, querypb.DataScope_Historical), + NewSegmentActionWithScope(nodeID2, ActionTypeGrow, "", segmentID2, querypb.DataScope_Historical, 100), ) suite.NoError(err) err = scheduler.Add(task3)