Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #479]fix: duplicate pulls due to concurrency issues in rebalance. #480

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

humkum
Copy link
Contributor

@humkum humkum commented Dec 4, 2024

fix #479

根因分析

根因:AsyncPullCallback 构建存在并发问题,导致多个 pull 线程消费同一个队列。

  • 每个 queue 对应一个 PullRequest,Rebalance 负责添加和删除队列,并对每个队列对应的 PullRequest 进行初始化,或配置状态为 Drop。
  • 消息拉取依赖 PullRequest 的状态,如果 PullRequest 状态不为 Drop,会重复利用这个 PullRequest 进行循环拉取。
  • C++ sdk 中每个队列对应的 PullRequest 是 AsyncPullCallback 的局部变量,AsyncPullCallback 会存储在一个本地缓存中(<mq, AsyncPullCallback>)
  • Rebalance 添加队列时,会生成一个新的 pullRequest 替换缓存中 AsyncPullCallback 的 pullRequest。
  • Pull 是否结束是依赖 AsyncPullCallback 中的 pullRequest 是否 Drop。
    通过以上 5 个条件:如果两次 Rebalance 相隔时间很短,mq 对应的之前的 PullRequest 还来不及因设置为 drop 而结束当前的拉取。第二次 rebalance 就替换掉了 AsyncPullCallback 中的局部变量 PullRequest,导致上一次的拉取请求又拿到了非 drop 的 pullRequest。此时就会出现 2 个 pullRequest 同时拉取同一个队列的情况。依次类推,如果重复多次短时间 rebalance,单个分区可能存在多个 pullRequest 同时拉取。

新的 pullRequest 替换掉旧的 pullRequest, drop 状态被配置成了 false
image
上次还没处理结束的拉取线程因为状态变化不能正常退出
image

@humkum
Copy link
Contributor Author

humkum commented Dec 19, 2024

@ShannonDing @ifplusor Could you please help to review this pr?

Copy link
Contributor

@ifplusor ifplusor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

修改后,AsyncPullCallback不由DefaultMQPushConsumerImpl::m_PullCallback跟踪,生命周期管理不完整,有内存泄漏风险。
如:请求超时,执行ResponseFuture::invokeExceptionCallback->PullCallbackWrap::onException,因为AsyncPullCallback仅在PullCallbackWrap::operationComplete中delete,就泄漏了。

@ifplusor
Copy link
Contributor

PullRequest是由Rebalance::m_requestQueueTable持有的,AsyncPullCallback中只是弱指针引用。
我认为问题在于sdk中在复用AsyncPullCallback时,跨越了PullRequest的生命周期。理想的方案应该是PullRequest持有一个AsyncPullCallback用于复用。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] PushConsumer may cause consumption duplication in some scenarios due to concurrency issues.
2 participants