diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index c4938c453a85f..5844577cb743f 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -175,13 +175,19 @@ func (ob *TargetObserver) schedule(ctx context.Context) { case <-ticker.C: ob.clean() - loaded := lo.FilterMap(ob.meta.GetAllCollections(ctx), func(collection *meta.Collection, _ int) (int64, bool) { - if collection.GetStatus() == querypb.LoadStatus_Loaded { - return collection.GetCollectionID(), true + + collections := ob.meta.GetAllCollections(ctx) + var loadedIDs, loadingIDs []int64 + for _, c := range collections { + if c.GetStatus() == querypb.LoadStatus_Loaded { + loadedIDs = append(loadedIDs, c.GetCollectionID()) + } else { + loadingIDs = append(loadingIDs, c.GetCollectionID()) } - return 0, false - }) - ob.loadedDispatcher.AddTask(loaded...) + } + + ob.loadedDispatcher.AddTask(loadedIDs...) + ob.loadingDispatcher.AddTask(loadingIDs...) case req := <-ob.updateChan: log.Info("manually trigger update target", diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index a997d95c157ba..60c997c2bc0c1 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -946,7 +946,8 @@ func (scheduler *taskScheduler) remove(task Task) { if errors.Is(task.Err(), merr.ErrSegmentNotFound) { log.Info("segment in target has been cleaned, trigger force update next target", zap.Int64("collectionID", task.CollectionID())) - scheduler.targetMgr.UpdateCollectionNextTarget(task.Context(), task.CollectionID()) + // Avoid using task.Ctx as it may be canceled before remove is called. + scheduler.targetMgr.UpdateCollectionNextTarget(scheduler.ctx, task.CollectionID()) } task.Cancel(nil) diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index be5fc021bc768..1f923dc066598 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1914,6 +1914,39 @@ func (suite *TaskSuite) TestCalculateTaskDelta() { suite.Equal(0, scheduler.GetChannelTaskDelta(nodeID2, coll2)) } +func (suite *TaskSuite) TestRemoveTaskWithError() { + ctx := context.Background() + scheduler := suite.newScheduler() + + mockTarget := meta.NewMockTargetManager(suite.T()) + mockTarget.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&datapb.SegmentInfo{ + NumOfRows: 100, + }) + + mockTarget.EXPECT().UpdateCollectionNextTarget(mock.Anything, mock.Anything).Return(nil) + scheduler.targetMgr = mockTarget + + coll := int64(1001) + nodeID := int64(1) + // add segment task for collection + task1, err := NewSegmentTask( + ctx, + 10*time.Second, + WrapIDSource(0), + coll, + suite.replica, + NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", 1, querypb.DataScope_Historical), + ) + suite.NoError(err) + err = scheduler.Add(task1) + suite.NoError(err) + + task1.Fail(merr.ErrSegmentNotFound) + // when try to remove task with ErrSegmentNotFound, should trigger UpdateNextTarget + scheduler.remove(task1) + mockTarget.AssertExpectations(suite.T()) +} + func TestTask(t *testing.T) { suite.Run(t, new(TaskSuite)) } diff --git a/tests/integration/replicas/load/load_test.go b/tests/integration/replicas/load/load_test.go index 7bd7ba22ad6c4..564a6dcc6b8f5 100644 --- a/tests/integration/replicas/load/load_test.go +++ b/tests/integration/replicas/load/load_test.go @@ -971,6 +971,61 @@ func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_OnLoadingCollection() { s.releaseCollection(dbName, collectionName) } +func (s *LoadTestSuite) TestLoadWithCompact() { + ctx := context.Background() + collName := "test_load_with_compact" + + // Create collection with configuration + s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{ + DBName: dbName, + Dim: dim, + CollectionName: collName, + ChannelNum: 1, + SegmentNum: 3, + RowNumPerSegment: 2000, + }) + + s.releaseCollection(dbName, collName) + + stopInsertCh := make(chan struct{}, 1) + // Start a goroutine to continuously insert data and trigger compaction + go func() { + for { + select { + case <-stopInsertCh: + return + default: + s.InsertAndFlush(ctx, dbName, collName, 2000, dim) + _, err := s.Cluster.Proxy.ManualCompaction(ctx, &milvuspb.ManualCompactionRequest{ + CollectionName: collName, + }) + s.NoError(err) + time.Sleep(time.Second) + } + } + }() + + time.Sleep(10 * time.Second) + + // Load the collection while data is being inserted and compacted + s.loadCollection(collName, dbName, 1, nil) + + // Verify the collection is loaded + s.Eventually(func() bool { + resp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ + CollectionNames: []string{collName}, + Type: milvuspb.ShowType_InMemory, + }) + s.NoError(err) + + return len(resp.InMemoryPercentages) == 1 && resp.InMemoryPercentages[0] == 100 + }, 30*time.Second, 1*time.Second) + + // Clean up + close(stopInsertCh) + s.releaseCollection(dbName, collName) +} + func TestReplicas(t *testing.T) { suite.Run(t, new(LoadTestSuite)) } diff --git a/tests/integration/util_collection.go b/tests/integration/util_collection.go index a327c2de9c8d2..1caf7258cf144 100644 --- a/tests/integration/util_collection.go +++ b/tests/integration/util_collection.go @@ -28,6 +28,49 @@ type CreateCollectionConfig struct { ResourceGroups []string } +func (s *MiniClusterSuite) InsertAndFlush(ctx context.Context, dbName, collectionName string, rowNum, dim int) error { + fVecColumn := NewFloatVectorFieldData(FloatVecField, rowNum, dim) + hashKeys := GenerateHashKeys(rowNum) + insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + if err != nil { + return err + } + if !merr.Ok(insertResult.Status) { + return merr.Error(insertResult.Status) + } + + flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + if err != nil { + return err + } + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + if !has || segmentIDs == nil { + return merr.Error(&commonpb.Status{ + ErrorCode: commonpb.ErrorCode_IllegalArgument, + Reason: "failed to get segment IDs", + }) + } + ids := segmentIDs.GetData() + flushTs, has := flushResp.GetCollFlushTs()[collectionName] + if !has { + return merr.Error(&commonpb.Status{ + ErrorCode: commonpb.ErrorCode_IllegalArgument, + Reason: "failed to get flush timestamp", + }) + } + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + return nil +} + func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context, cfg *CreateCollectionConfig) { schema := ConstructSchema(cfg.CollectionName, cfg.Dim, true) marshaledSchema, err := proto.Marshal(schema) @@ -60,30 +103,8 @@ func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) for i := 0; i < cfg.SegmentNum; i++ { - fVecColumn := NewFloatVectorFieldData(FloatVecField, cfg.RowNumPerSegment, cfg.Dim) - hashKeys := GenerateHashKeys(cfg.RowNumPerSegment) - insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{ - DbName: cfg.DBName, - CollectionName: cfg.CollectionName, - FieldsData: []*schemapb.FieldData{fVecColumn}, - HashKeys: hashKeys, - NumRows: uint32(cfg.RowNumPerSegment), - }) - s.NoError(err) - s.True(merr.Ok(insertResult.Status)) - - flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{ - DbName: cfg.DBName, - CollectionNames: []string{cfg.CollectionName}, - }) + err = s.InsertAndFlush(ctx, cfg.DBName, cfg.CollectionName, cfg.RowNumPerSegment, cfg.Dim) s.NoError(err) - segmentIDs, has := flushResp.GetCollSegIDs()[cfg.CollectionName] - ids := segmentIDs.GetData() - s.Require().NotEmpty(segmentIDs) - s.Require().True(has) - flushTs, has := flushResp.GetCollFlushTs()[cfg.CollectionName] - s.True(has) - s.WaitForFlush(ctx, ids, flushTs, cfg.DBName, cfg.CollectionName) } // create index