Skip to content

Commit

Permalink
fix: load collection stucks if compaction/gc happens
Browse files Browse the repository at this point in the history
if compaction/gc happens, load collection may stuck due to
SegmentNotFound, we should trigger UpdateNextTarget to get a new data
view to execute loading operation.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Feb 10, 2025
1 parent d3e32bb commit e1616a9
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 30 deletions.
18 changes: 12 additions & 6 deletions internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,19 @@ func (ob *TargetObserver) schedule(ctx context.Context) {

case <-ticker.C:
ob.clean()
loaded := lo.FilterMap(ob.meta.GetAllCollections(ctx), func(collection *meta.Collection, _ int) (int64, bool) {
if collection.GetStatus() == querypb.LoadStatus_Loaded {
return collection.GetCollectionID(), true

collections := ob.meta.GetAllCollections(ctx)
var loadedIDs, loadingIDs []int64
for _, c := range collections {
if c.GetStatus() == querypb.LoadStatus_Loaded {
loadedIDs = append(loadedIDs, c.GetCollectionID())
} else {
loadingIDs = append(loadingIDs, c.GetCollectionID())
}
return 0, false
})
ob.loadedDispatcher.AddTask(loaded...)
}

ob.loadedDispatcher.AddTask(loadedIDs...)
ob.loadingDispatcher.AddTask(loadingIDs...)

case req := <-ob.updateChan:
log.Info("manually trigger update target",
Expand Down
3 changes: 2 additions & 1 deletion internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,8 @@ func (scheduler *taskScheduler) remove(task Task) {

if errors.Is(task.Err(), merr.ErrSegmentNotFound) {
log.Info("segment in target has been cleaned, trigger force update next target", zap.Int64("collectionID", task.CollectionID()))
scheduler.targetMgr.UpdateCollectionNextTarget(task.Context(), task.CollectionID())
// Avoid using task.Ctx as it may be canceled before remove is called.
scheduler.targetMgr.UpdateCollectionNextTarget(scheduler.ctx, task.CollectionID())
}

task.Cancel(nil)
Expand Down
33 changes: 33 additions & 0 deletions internal/querycoordv2/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1914,6 +1914,39 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
suite.Equal(0, scheduler.GetChannelTaskDelta(nodeID2, coll2))
}

func (suite *TaskSuite) TestRemoveTaskWithError() {
ctx := context.Background()
scheduler := suite.newScheduler()

mockTarget := meta.NewMockTargetManager(suite.T())
mockTarget.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&datapb.SegmentInfo{
NumOfRows: 100,
})

mockTarget.EXPECT().UpdateCollectionNextTarget(mock.Anything, mock.Anything).Return(nil)
scheduler.targetMgr = mockTarget

coll := int64(1001)
nodeID := int64(1)
// add segment task for collection
task1, err := NewSegmentTask(
ctx,
10*time.Second,
WrapIDSource(0),
coll,
suite.replica,
NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", 1, querypb.DataScope_Historical),
)
suite.NoError(err)
err = scheduler.Add(task1)
suite.NoError(err)

task1.Fail(merr.ErrSegmentNotFound)
// when try to remove task with ErrSegmentNotFound, should trigger UpdateNextTarget
scheduler.remove(task1)
mockTarget.AssertExpectations(suite.T())
}

func TestTask(t *testing.T) {
suite.Run(t, new(TaskSuite))
}
Expand Down
55 changes: 55 additions & 0 deletions tests/integration/replicas/load/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,61 @@ func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_OnLoadingCollection() {
s.releaseCollection(dbName, collectionName)
}

func (s *LoadTestSuite) TestLoadWithCompact() {
ctx := context.Background()
collName := "test_load_with_compact"

// Create collection with configuration
s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
DBName: dbName,
Dim: dim,
CollectionName: collName,
ChannelNum: 1,
SegmentNum: 3,
RowNumPerSegment: 2000,
})

s.releaseCollection(dbName, collName)

stopInsertCh := make(chan struct{}, 1)
// Start a goroutine to continuously insert data and trigger compaction
go func() {
for {
select {
case <-stopInsertCh:
return
default:
s.InsertAndFlush(ctx, dbName, collName, 2000, dim)
_, err := s.Cluster.Proxy.ManualCompaction(ctx, &milvuspb.ManualCompactionRequest{
CollectionName: collName,
})
s.NoError(err)
time.Sleep(time.Second)
}
}
}()

time.Sleep(10 * time.Second)

// Load the collection while data is being inserted and compacted
s.loadCollection(collName, dbName, 1, nil)

// Verify the collection is loaded
s.Eventually(func() bool {
resp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
CollectionNames: []string{collName},
Type: milvuspb.ShowType_InMemory,
})
s.NoError(err)

return len(resp.InMemoryPercentages) == 1 && resp.InMemoryPercentages[0] == 100
}, 30*time.Second, 1*time.Second)

// Clean up
close(stopInsertCh)
s.releaseCollection(dbName, collName)
}

func TestReplicas(t *testing.T) {
suite.Run(t, new(LoadTestSuite))
}
67 changes: 44 additions & 23 deletions tests/integration/util_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,49 @@ type CreateCollectionConfig struct {
ResourceGroups []string
}

func (s *MiniClusterSuite) InsertAndFlush(ctx context.Context, dbName, collectionName string, rowNum, dim int) error {
fVecColumn := NewFloatVectorFieldData(FloatVecField, rowNum, dim)
hashKeys := GenerateHashKeys(rowNum)
insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
})
if err != nil {
return err
}

Check warning on line 43 in tests/integration/util_collection.go

View check run for this annotation

Codecov / codecov/patch

tests/integration/util_collection.go#L42-L43

Added lines #L42 - L43 were not covered by tests
if !merr.Ok(insertResult.Status) {
return merr.Error(insertResult.Status)
}

Check warning on line 46 in tests/integration/util_collection.go

View check run for this annotation

Codecov / codecov/patch

tests/integration/util_collection.go#L45-L46

Added lines #L45 - L46 were not covered by tests

flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
if err != nil {
return err
}

Check warning on line 54 in tests/integration/util_collection.go

View check run for this annotation

Codecov / codecov/patch

tests/integration/util_collection.go#L53-L54

Added lines #L53 - L54 were not covered by tests
segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
if !has || segmentIDs == nil {
return merr.Error(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_IllegalArgument,
Reason: "failed to get segment IDs",
})
}

Check warning on line 61 in tests/integration/util_collection.go

View check run for this annotation

Codecov / codecov/patch

tests/integration/util_collection.go#L57-L61

Added lines #L57 - L61 were not covered by tests
ids := segmentIDs.GetData()
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
if !has {
return merr.Error(&commonpb.Status{
ErrorCode: commonpb.ErrorCode_IllegalArgument,
Reason: "failed to get flush timestamp",
})
}

Check warning on line 69 in tests/integration/util_collection.go

View check run for this annotation

Codecov / codecov/patch

tests/integration/util_collection.go#L65-L69

Added lines #L65 - L69 were not covered by tests
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
return nil
}

func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context, cfg *CreateCollectionConfig) {
schema := ConstructSchema(cfg.CollectionName, cfg.Dim, true)
marshaledSchema, err := proto.Marshal(schema)
Expand Down Expand Up @@ -60,30 +103,8 @@ func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))

for i := 0; i < cfg.SegmentNum; i++ {
fVecColumn := NewFloatVectorFieldData(FloatVecField, cfg.RowNumPerSegment, cfg.Dim)
hashKeys := GenerateHashKeys(cfg.RowNumPerSegment)
insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: cfg.DBName,
CollectionName: cfg.CollectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
HashKeys: hashKeys,
NumRows: uint32(cfg.RowNumPerSegment),
})
s.NoError(err)
s.True(merr.Ok(insertResult.Status))

flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: cfg.DBName,
CollectionNames: []string{cfg.CollectionName},
})
err = s.InsertAndFlush(ctx, cfg.DBName, cfg.CollectionName, cfg.RowNumPerSegment, cfg.Dim)
s.NoError(err)
segmentIDs, has := flushResp.GetCollSegIDs()[cfg.CollectionName]
ids := segmentIDs.GetData()
s.Require().NotEmpty(segmentIDs)
s.Require().True(has)
flushTs, has := flushResp.GetCollFlushTs()[cfg.CollectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, cfg.DBName, cfg.CollectionName)
}

// create index
Expand Down

0 comments on commit e1616a9

Please sign in to comment.