Skip to content

Commit

Permalink
Improve the reader consumer
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Jan 30, 2024
1 parent 19177ef commit 3d41351
Showing 1 changed file with 27 additions and 14 deletions.
41 changes: 27 additions & 14 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,14 @@ func (r *replicateChannelHandler) Close() {

func (r *replicateChannelHandler) startReadChannel() {
go func() {
innerHandlePack := func(msgPack *msgstream.MsgPack) {
p := r.handlePack(false, msgPack)
if p == util.EmptyMsgPack {
return
}
r.msgPackChan <- p
}

for {
select {
case <-r.replicateCtx.Done():
Expand All @@ -922,29 +930,34 @@ func (r *replicateChannelHandler) startReadChannel() {
case msgPack := <-r.forwardPackChan:
r.msgPackChan <- r.handlePack(true, msgPack)
case msgPack := <-r.generatePackChan:
p := r.handlePack(false, msgPack)
if p == util.EmptyMsgPack {
continue
}
r.msgPackChan <- p
innerHandlePack(msgPack)
case msgPack, ok := <-r.stream.Chan():
if !ok {
log.Warn("replicate channel closed", zap.String("channel_name", r.pChannelName))
continue
// close(r.msgPackChan)
// close(r.forwardPackChan)
// return
}
p := r.handlePack(false, msgPack)
if p == util.EmptyMsgPack {
continue
}
r.msgPackChan <- p
GreedyConsumeChan(r.generatePackChan, innerHandlePack)
GreedyConsumeChan(r.forwardPackChan, func(pack *msgstream.MsgPack) {
r.msgPackChan <- r.handlePack(true, pack)
})

innerHandlePack(msgPack)
}
}
}()
}

func GreedyConsumeChan(packChan chan *msgstream.MsgPack, f func(*msgstream.MsgPack)) {
for i := 0; i < 10; i++ {
select {
case pack := <-packChan:
f(pack)
default:
break
}
}
}

func (r *replicateChannelHandler) getCollectionTargetInfo(collectionID int64) (*model.TargetCollectionInfo, error) {
if r.isDroppedCollection != nil && r.isDroppedCollection(collectionID) {
return nil, nil
Expand Down Expand Up @@ -1305,7 +1318,7 @@ func newReplicateChannelHandler(ctx context.Context,
msgPackChan: make(chan *msgstream.MsgPack, opts.MessageBufferSize),
apiEventChan: apiEventChan,
forwardPackChan: make(chan *msgstream.MsgPack, opts.MessageBufferSize),
generatePackChan: make(chan *msgstream.MsgPack, 10),
generatePackChan: make(chan *msgstream.MsgPack, 30),
retryOptions: opts.RetryOptions,
lastSendTTTime: time.Now(),
ttPeriod: 5 * time.Second,
Expand Down

0 comments on commit 3d41351

Please sign in to comment.