Skip to content

Commit

Permalink
Fix the problem of repeatedly replicating the same collection
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Jan 26, 2024
1 parent 79e7a71 commit 5883ed1
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 2 deletions.
5 changes: 5 additions & 0 deletions core/api/meta_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type MetaOp interface {
// WatchCollection its implementation should make sure it's only called once. The WatchPartition is same
WatchCollection(ctx context.Context, filter CollectionFilter)
WatchPartition(ctx context.Context, filter PartitionFilter)
StartWatch()

// SubscribeCollectionEvent an event only is consumed once. The SubscribePartitionEvent is same
// TODO need to consider the many target, maybe try the method a meta op corresponds to a target
Expand Down Expand Up @@ -73,6 +74,10 @@ func (d *DefaultMetaOp) WatchPartition(ctx context.Context, filter PartitionFilt
log.Warn("WatchPartition is not implemented, please check it")
}

func (d *DefaultMetaOp) StartWatch() {
log.Warn("StartWatch is not implemented, please check it")
}

func (d *DefaultMetaOp) SubscribeCollectionEvent(taskID string, consumer CollectionEventConsumer) {
log.Warn("SubscribeCollectionEvent is not implemented, please check it")
}
Expand Down
5 changes: 5 additions & 0 deletions core/api/replicate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
// ChannelManager a target must promise a manager
type ChannelManager interface {
SetCtx(ctx context.Context)
AddDroppedCollection(ids []int64)

StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error
StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error
Expand Down Expand Up @@ -91,6 +92,10 @@ func (d *DefaultChannelManager) SetCtx(ctx context.Context) {
log.Warn("SetCtx is not implemented, please check it")
}

func (d *DefaultChannelManager) AddDroppedCollection(ids []int64) {
log.Warn("AddDroppedCollection is not implemented, please check it")
}

func (d *DefaultChannelManager) StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error {
log.Warn("StartReadCollection is not implemented, please check it")
return nil
Expand Down
33 changes: 33 additions & 0 deletions core/mocks/channel_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 75 additions & 0 deletions core/mocks/meta_op.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 40 additions & 1 deletion core/reader/collection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,54 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
reader.sendError(err)
return
}

recordCreateCollectionTime := make(map[string]*pb.CollectionInfo)
repeatedCollectionID := make(map[int64]struct{})
for _, info := range existedCollectionInfos {
collectionName := info.Schema.GetName()
createTime := info.CreateTime
lastCollectionInfo, recordOK := recordCreateCollectionTime[collectionName]
if recordOK {
if createTime > lastCollectionInfo.CreateTime {
repeatedCollectionID[lastCollectionInfo.ID] = struct{}{}
recordCreateCollectionTime[collectionName] = info
} else {
repeatedCollectionID[info.ID] = struct{}{}
}
} else {
recordCreateCollectionTime[collectionName] = info
}
}

// for the dropped collection when cdc is down, like:
// 1. create collection and cdc server is healthy
// 2. cdc server is down
// 3. drop collection, or drop and create the same collection
// 4. cdc server restart
// 5. create the same collection again
reader.channelManager.AddDroppedCollection(lo.Keys(repeatedCollectionID))

seekPositions := lo.Values(reader.channelSeekPositions)
for _, info := range existedCollectionInfos {
log.Info("exist collection", zap.String("name", info.Schema.Name))
if _, ok := repeatedCollectionID[info.ID]; ok {
log.Info("skip to start to read collection", zap.String("name", info.Schema.Name), zap.Int64("collection_id", info.ID))
continue
}
log.Info("exist collection", zap.String("name", info.Schema.Name), zap.Int64("collection_id", info.ID))
if err := reader.channelManager.StartReadCollection(ctx, info, seekPositions); err != nil {
log.Warn("fail to start to replicate the collection data", zap.Any("collection", info), zap.Error(err))
reader.sendError(err)
}
reader.replicateCollectionMap.Store(info.ID, info)
}
_, err = reader.metaOp.GetAllPartition(ctx, func(info *pb.PartitionInfo) bool {
if _, ok := repeatedCollectionID[info.CollectionID]; ok {
log.Info("skip to start to add partition",
zap.String("name", info.PartitionName),
zap.Int64("partition_id", info.PartitionID),
zap.Int64("collection_id", info.CollectionID))
return true
}
var collectionName string
retryErr := retry.Do(ctx, func() error {
collectionName = reader.metaOp.GetCollectionNameByID(ctx, info.CollectionID)
Expand Down Expand Up @@ -198,6 +236,7 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
log.Warn("get all partition failed", zap.Error(err))
reader.sendError(err)
}
reader.metaOp.StartWatch()
})
}

Expand Down
19 changes: 19 additions & 0 deletions core/reader/etcd_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type EtcdOp struct {
subscribePartitionEvent util.Map[string, api.PartitionEventConsumer]

handlerWatchEventPool *conc.Pool[struct{}]

startWatch chan struct{}
}

func NewEtcdOp(endpoints []string,
Expand All @@ -90,6 +92,7 @@ func NewEtcdOp(endpoints []string,
defaultPartitionName: defaultPartitionName,
retryOptions: util.GetRetryOptions(etcdConfig.Retry),
handlerWatchEventPool: conc.NewPool[struct{}](16, conc.WithExpiryDuration(time.Minute)),
startWatch: make(chan struct{}),
}

// set default value
Expand Down Expand Up @@ -129,6 +132,10 @@ func NewEtcdOp(endpoints []string,
return etcdOp, nil
}

func (e *EtcdOp) StartWatch() {
close(e.startWatch)
}

func (e *EtcdOp) collectionPrefix() string {
return fmt.Sprintf("%s/%s/%s", e.rootPath, e.metaSubPath, collectionPrefix)
}
Expand All @@ -153,6 +160,12 @@ func (e *EtcdOp) WatchCollection(ctx context.Context, filter api.CollectionFilte
e.watchCollectionOnce.Do(func() {
watchChan := e.etcdClient.Watch(ctx, e.collectionPrefix()+"/", clientv3.WithPrefix())
go func() {
select {
case <-e.startWatch:
case <-ctx.Done():
log.Warn("watch collection context done")
return
}
for {
select {
case watchResp, ok := <-watchChan:
Expand Down Expand Up @@ -235,6 +248,12 @@ func (e *EtcdOp) WatchPartition(ctx context.Context, filter api.PartitionFilter)
e.watchPartitionOnce.Do(func() {
watchChan := e.etcdClient.Watch(ctx, e.partitionPrefix()+"/", clientv3.WithPrefix())
go func() {
select {
case <-e.startWatch:
case <-ctx.Done():
log.Warn("watch partition context done")
return
}
for {
select {
case watchResp, ok := <-watchChan:
Expand Down
15 changes: 14 additions & 1 deletion core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type replicateChannelManager struct {
forwardLock sync.RWMutex
channelForwardMap map[string]struct{}

collectionLock sync.Mutex
collectionLock sync.RWMutex
replicateCollections map[int64]chan struct{}

partitionLock sync.Mutex
Expand Down Expand Up @@ -134,6 +134,13 @@ func IsCollectionNotFoundError(err error) bool {
return strings.Contains(err.Error(), "collection not found")
}

func (r *replicateChannelManager) AddDroppedCollection(ids []int64) {
for _, id := range ids {
r.droppedCollections.Store(id, struct{}{})
}
log.Info("has added dropped collection", zap.Int64s("ids", ids))
}

func (r *replicateChannelManager) StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error {
sourceDBInfo := r.metaOp.GetDatabaseInfoForCollection(ctx, info.ID)

Expand All @@ -143,6 +150,12 @@ func (r *replicateChannelManager) StartReadCollection(ctx context.Context, info
if err != nil && !IsCollectionNotFoundError(err) {
return err
}
r.collectionLock.RLock()
_, ok := r.replicateCollections[info.ID]
r.collectionLock.Unlock()
if ok {
return errors.Newf("the collection has been replicated, wait it [collection name: %s] to drop...", info.Schema.Name)
}
// collection not found will exit the retry
return nil
}, r.startReadRetryOptions...)
Expand Down

0 comments on commit 5883ed1

Please sign in to comment.