Skip to content

Commit

Permalink
fix: [2.5]uneven distribution caused by executing task delta cache le…
Browse files Browse the repository at this point in the history
…ak (#39759)

issue: #39681
pr: #39702
this PR maintain workload effect in action instead of computing workload
effect from target, which may cause leak if target changes.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Feb 11, 2025
1 parent c0cc8a5 commit 969e34d
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 58 deletions.
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeou
for _, p := range plans {
actions := make([]task.Action, 0)
if p.To != -1 {
action := task.NewSegmentActionWithScope(p.To, task.ActionTypeGrow, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical)
action := task.NewSegmentActionWithScope(p.To, task.ActionTypeGrow, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical, int(p.Segment.GetNumOfRows()))
actions = append(actions, action)
}
if p.From != -1 {
action := task.NewSegmentActionWithScope(p.From, task.ActionTypeReduce, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical)
action := task.NewSegmentActionWithScope(p.From, task.ActionTypeReduce, p.Segment.GetInsertChannel(), p.Segment.GetID(), querypb.DataScope_Historical, int(p.Segment.GetNumOfRows()))
actions = append(actions, action)
}
t, err := task.NewSegmentTask(
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/checkers/index_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (c *IndexChecker) checkSegment(segment *meta.Segment, indexInfos []*indexpb
}

func (c *IndexChecker) createSegmentUpdateTask(ctx context.Context, segment *meta.Segment, replica *meta.Replica) (task.Task, bool) {
action := task.NewSegmentActionWithScope(segment.Node, task.ActionTypeUpdate, segment.GetInsertChannel(), segment.GetID(), querypb.DataScope_Historical)
action := task.NewSegmentActionWithScope(segment.Node, task.ActionTypeUpdate, segment.GetInsertChannel(), segment.GetID(), querypb.DataScope_Historical, int(segment.GetNumOfRows()))
t, err := task.NewSegmentTask(
ctx,
params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/checkers/segment_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments []*meta.Segment, replica *meta.Replica, scope querypb.DataScope) []task.Task {
ret := make([]task.Task, 0, len(segments))
for _, s := range segments {
action := task.NewSegmentActionWithScope(s.Node, task.ActionTypeReduce, s.GetInsertChannel(), s.GetID(), scope)
action := task.NewSegmentActionWithScope(s.Node, task.ActionTypeReduce, s.GetInsertChannel(), s.GetID(), scope, int(s.GetNumOfRows()))
task, err := task.NewSegmentTask(
ctx,
Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ func (s *Server) balanceSegments(ctx context.Context,
zap.Int64("segmentID", plan.Segment.GetID()),
)
actions := make([]task.Action, 0)
loadAction := task.NewSegmentActionWithScope(plan.To, task.ActionTypeGrow, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical)
loadAction := task.NewSegmentActionWithScope(plan.To, task.ActionTypeGrow, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical, int(plan.Segment.GetNumOfRows()))
actions = append(actions, loadAction)
if !copyMode {
// if in copy mode, the release action will be skip
releaseAction := task.NewSegmentActionWithScope(plan.From, task.ActionTypeReduce, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical)
releaseAction := task.NewSegmentActionWithScope(plan.From, task.ActionTypeReduce, plan.Segment.GetInsertChannel(), plan.Segment.GetID(), querypb.DataScope_Historical, int(plan.Segment.GetNumOfRows()))
actions = append(actions, releaseAction)
}

Expand Down
62 changes: 45 additions & 17 deletions internal/querycoordv2/task/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,26 @@ type Action interface {
IsFinished(distMgr *meta.DistributionManager) bool
Desc() string
String() string

// return current action's workload effect on target query node
// which only works for `Grow` and `Reduce`, cause `Update` won't change query node's workload
WorkLoadEffect() int
}

type BaseAction struct {
NodeID typeutil.UniqueID
Typ ActionType
Shard string

workloadEffect int
}

func NewBaseAction(nodeID typeutil.UniqueID, typ ActionType, shard string) *BaseAction {
func NewBaseAction(nodeID typeutil.UniqueID, typ ActionType, shard string, workLoadEffect int) *BaseAction {
return &BaseAction{
NodeID: nodeID,
Typ: typ,
Shard: shard,
NodeID: nodeID,
Typ: typ,
Shard: shard,
workloadEffect: workLoadEffect,
}
}

Expand All @@ -83,6 +90,10 @@ func (action *BaseAction) String() string {
return fmt.Sprintf(`{[type=%v][node=%d][shard=%v]}`, action.Type(), action.Node(), action.Shard)
}

func (action *BaseAction) WorkLoadEffect() int {
return action.workloadEffect
}

type SegmentAction struct {
*BaseAction

Expand All @@ -92,14 +103,23 @@ type SegmentAction struct {
rpcReturned atomic.Bool
}

// Deprecate, only for existing unit test
func NewSegmentAction(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID) *SegmentAction {
return NewSegmentActionWithScope(nodeID, typ, shard, segmentID, querypb.DataScope_All)
}

func NewSegmentActionWithScope(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, scope querypb.DataScope) *SegmentAction {
base := NewBaseAction(nodeID, typ, shard)
return NewSegmentActionWithScope(nodeID, typ, shard, segmentID, querypb.DataScope_All, 0)
}

func NewSegmentActionWithScope(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, scope querypb.DataScope, rowCount int) *SegmentAction {
workloadEffect := 0
switch typ {
case ActionTypeGrow:
workloadEffect = rowCount
case ActionTypeReduce:
workloadEffect = -rowCount
default:
workloadEffect = 0
}
return &SegmentAction{
BaseAction: base,
BaseAction: NewBaseAction(nodeID, typ, shard, workloadEffect),
SegmentID: segmentID,
Scope: scope,
rpcReturned: *atomic.NewBool(false),
Expand Down Expand Up @@ -131,8 +151,17 @@ type ChannelAction struct {
}

func NewChannelAction(nodeID typeutil.UniqueID, typ ActionType, channelName string) *ChannelAction {
workloadEffect := 0
switch typ {
case ActionTypeGrow:
workloadEffect = 1
case ActionTypeReduce:
workloadEffect = -1
default:
workloadEffect = 0
}
return &ChannelAction{
BaseAction: NewBaseAction(nodeID, typ, channelName),
BaseAction: NewBaseAction(nodeID, typ, channelName, workloadEffect),
}
}

Expand Down Expand Up @@ -167,19 +196,18 @@ type LeaderAction struct {

func NewLeaderAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, version typeutil.UniqueID) *LeaderAction {
action := &LeaderAction{
BaseAction: NewBaseAction(workerID, typ, shard),

leaderID: leaderID,
segmentID: segmentID,
version: version,
BaseAction: NewBaseAction(workerID, typ, shard, 0),
leaderID: leaderID,
segmentID: segmentID,
version: version,
}
action.rpcReturned.Store(false)
return action
}

func NewLeaderUpdatePartStatsAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, partStatsVersions map[int64]int64) *LeaderAction {
action := &LeaderAction{
BaseAction: NewBaseAction(workerID, typ, shard),
BaseAction: NewBaseAction(workerID, typ, shard, 0),
leaderID: leaderID,
partStatsVersions: partStatsVersions,
}
Expand Down
28 changes: 2 additions & 26 deletions internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64)

func (scheduler *taskScheduler) incExecutingTaskDelta(task Task) {
for _, action := range task.Actions() {
delta := scheduler.computeActionDelta(task.CollectionID(), action)
delta := action.WorkLoadEffect()
switch action.(type) {
case *SegmentAction:
scheduler.segmentTaskDelta.Add(action.Node(), task.CollectionID(), delta)
Expand All @@ -608,7 +608,7 @@ func (scheduler *taskScheduler) incExecutingTaskDelta(task Task) {

func (scheduler *taskScheduler) decExecutingTaskDelta(task Task) {
for _, action := range task.Actions() {
delta := scheduler.computeActionDelta(task.CollectionID(), action)
delta := action.WorkLoadEffect()
switch action.(type) {
case *SegmentAction:
scheduler.segmentTaskDelta.Sub(action.Node(), task.CollectionID(), delta)
Expand All @@ -618,30 +618,6 @@ func (scheduler *taskScheduler) decExecutingTaskDelta(task Task) {
}
}

func (scheduler *taskScheduler) computeActionDelta(collectionID int64, action Action) int {
delta := 0
if action.Type() == ActionTypeGrow {
delta = 1
} else if action.Type() == ActionTypeReduce {
delta = -1
}

switch action := action.(type) {
case *SegmentAction:
// skip growing segment's count, cause doesn't know realtime row number of growing segment
if action.Scope == querypb.DataScope_Historical {
segment := scheduler.targetMgr.GetSealedSegment(scheduler.ctx, collectionID, action.SegmentID, meta.NextTargetFirst)
if segment != nil {
return int(segment.GetNumOfRows()) * delta
}
}
case *ChannelAction:
return delta
}

return 0
}

func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} {
executor, ok := scheduler.executors.Get(nodeID)
if !ok {
Expand Down
12 changes: 3 additions & 9 deletions internal/querycoordv2/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() {
WrapIDSource(0),
suite.collection,
suite.replica,
NewSegmentActionWithScope(targetNode, ActionTypeReduce, "", segment, querypb.DataScope_Streaming),
NewSegmentActionWithScope(targetNode, ActionTypeReduce, "", segment, querypb.DataScope_Streaming, 0),
)
suite.NoError(err)
tasks = append(tasks, task)
Expand Down Expand Up @@ -1819,12 +1819,6 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
ctx := context.Background()
scheduler := suite.newScheduler()

mockTarget := meta.NewMockTargetManager(suite.T())
mockTarget.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&datapb.SegmentInfo{
NumOfRows: 100,
})
scheduler.targetMgr = mockTarget

coll := int64(1001)
nodeID := int64(1)
channelName := "channel-1"
Expand All @@ -1836,7 +1830,7 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
WrapIDSource(0),
coll,
suite.replica,
NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", segmentID, querypb.DataScope_Historical),
NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", segmentID, querypb.DataScope_Historical, 100),
)
suite.NoError(err)
err = scheduler.Add(task1)
Expand All @@ -1863,7 +1857,7 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
WrapIDSource(0),
coll2,
suite.replica,
NewSegmentActionWithScope(nodeID2, ActionTypeGrow, "", segmentID2, querypb.DataScope_Historical),
NewSegmentActionWithScope(nodeID2, ActionTypeGrow, "", segmentID2, querypb.DataScope_Historical, 100),
)
suite.NoError(err)
err = scheduler.Add(task3)
Expand Down

0 comments on commit 969e34d

Please sign in to comment.