Skip to content

Commit

Permalink
feat(rabbitmq): concurrent subscribe msg
Browse files Browse the repository at this point in the history
  • Loading branch information
greenhat616 committed Sep 17, 2023
1 parent ff76418 commit 799454a
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 37 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/rabbitmq/amqp091-go v1.8.1
github.com/spf13/viper v1.16.0
go.uber.org/zap v1.26.0
golang.org/x/sync v0.3.0
)

require (
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
6 changes: 4 additions & 2 deletions rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,10 @@ func (c *Consumer) Consume(handler func(ctx Ctx, delivery amqp.Delivery) error)
defer cancel()
u := uuid.NewString()
rCtx := NewCtxFromContext(ctxWithDeadline, c.instance)
logging.NewContext(rCtx, zap.String("traceID", u))
logging.NewContext(rCtx, zap.String("consumerTag", co.Tag))
logging.NewContext(rCtx,
zap.String("trace_id", u),
zap.String("consumer_tag", co.Tag),
)
logger := logging.WithContext(rCtx)
defer logger.Sync()
done := make(chan bool)
Expand Down
140 changes: 105 additions & 35 deletions rabbitmq/exports.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package rabbitmq

import (
"context"
"github.com/cockroachdb/errors"
amqp "github.com/rabbitmq/amqp091-go"
"golang.org/x/sync/errgroup"
"sync"
"time"
)

type Instance struct {
Expand Down Expand Up @@ -45,11 +49,29 @@ func (r *Instance) RegisterConsumerConfig(options ConsumerRegisterOptions) {

// ConsumerSubscribe subscribe consumerOptionsList
func (r *Instance) ConsumerSubscribe() error {
return r.ConsumerSubscribeWithTimeout(5 * time.Minute) // 默认超时时间为 5 分钟
}

func (r *Instance) ConsumerSubscribeWithTimeout(timeout time.Duration) error {
c := context.Background()
if timeout > 0 { // 如果超时时间大于 0,则使用超时上下文
var cancel context.CancelFunc
c, cancel = context.WithTimeout(context.Background(), timeout)
defer cancel()
}
eg, _ := errgroup.WithContext(c)
for _, v := range r.consumersOptions {
err := r.RegisterConsumer(v)
if err != nil {
return errors.WithMessagef(err, "consumer Tag: %v", v.ConsumerOptions.Tag)
}
co := v // copy
eg.Go(func() error {
err := r.RegisterConsumer(co)
if err != nil {
return errors.WithMessagef(err, "consumer Tag: %v", co.ConsumerOptions.Tag)
}
return nil
})
}
if err := eg.Wait(); err != nil {
return errors.Wrap(err, "consumer subscribe error")
}
return nil
}
Expand All @@ -60,8 +82,11 @@ func (r *Instance) RegisterConsumer(options ConsumerRegisterOptions) error {
if err != nil {
return err
}
err = consumer.Consume(options.CallFunc)
if err != nil {
var e = make(chan error)
go func() {
e <- consumer.Consume(options.CallFunc)
}()
if err = <-e; err != nil {
return err
}
r.Consumers.Add(ConsumerUnit{
Expand Down Expand Up @@ -93,40 +118,29 @@ func (r *Instance) RegisterProducer(options ProducerRegisterOptions) (*Producer,

// GetConsumer get an exist consumer by uuid
func (r *Instance) GetConsumer(uuid string) (*Consumer, bool) {
for _, v := range r.Consumers {
if v.UUID == uuid {
return v.Consumer, true
}
}
return nil, false
return r.Consumers.Get(uuid)
}

// GetProducer get an exist producer by uuid
func (r *Instance) GetProducer(uuid string) (*Producer, bool) {
for i, v := range r.Producers {
if v.UUID == uuid {
if !v.Producer.channel.IsClosed() {
return v.Producer, true
}
// Remove from list
r.Producers = append(r.Producers[:i], r.Producers[i+1:]...)
}
}
return nil, false
return r.Producers.Get(uuid)
}

// registerChannelRecover is used to recover channel after channel closed
func (r *Instance) registerChannelRecover() {
go func() {
for _ = range channelShouldUpdateConn { // ignore data because of notification channel(with useless data)
r.Consumers.UpdateInstant(r.RabbitMQ)
r.Producers.UpdateInstant(r.RabbitMQ)
r.Consumers.UpdateInstance(r.RabbitMQ)
r.Producers.UpdateInstance(r.RabbitMQ)
}
}()
}

// ConsumerList defines a ConsumerUnit List
type ConsumerList []ConsumerUnit
type ConsumerList struct {
list []ConsumerUnit
sync.RWMutex
}

// ConsumerUnit defines a Consumer unit
type ConsumerUnit struct {
Expand All @@ -136,33 +150,89 @@ type ConsumerUnit struct {

// Add a unit to the list
func (p *ConsumerList) Add(unit ConsumerUnit) {
*p = append(*p, unit)
p.Lock()
defer p.Unlock()
p.list = append(p.list, unit)
}

// UpdateInstant update rabbitmq connection(also called instant)
func (p *ConsumerList) UpdateInstant(rmq *RabbitMQ) {
for _, unit := range *p {
// UpdateInstance update rabbitmq connection(also called instant)
func (p *ConsumerList) UpdateInstance(rmq *RabbitMQ) {
p.Lock()
defer p.Unlock()
for _, unit := range p.list {
unit.Consumer.RabbitMQ = rmq
}
}

func (p *ConsumerList) Get(uuid string) (*Consumer, bool) {
p.RLock()
defer p.RUnlock()
for _, v := range p.list {
if v.UUID == uuid {
return v.Consumer, true
}
}
return nil, false
}

func (p *ConsumerList) Remove(uuid string) {
p.Lock()
defer p.Unlock()
for i, v := range p.list {
if v.UUID == uuid {
p.list = append(p.list[:i], p.list[i+1:]...)
}
}
}

// ProducerList defines a ProducerUnit List
type ProducerList []ProducerUnit
type ProducerList struct {
list []ProducerUnit
sync.RWMutex
}

// ProducerUnit defines a Producer unit
type ProducerUnit struct {
UUID string
Producer *Producer
}

// Add add a unit to the list
// Add a unit to the list
func (p *ProducerList) Add(unit ProducerUnit) {
*p = append(*p, unit)
p.Lock()
defer p.Unlock()
p.list = append(p.list, unit)
}

// UpdateInstant update rabbitmq connection(also called instant)
func (p *ProducerList) UpdateInstant(rmq *RabbitMQ) {
for _, unit := range *p {
// UpdateInstance update rabbitmq connection(also called instant)
func (p *ProducerList) UpdateInstance(rmq *RabbitMQ) {
p.Lock()
defer p.Unlock()
for _, unit := range p.list {
unit.Producer.RabbitMQ = rmq
}
}

func (p *ProducerList) Get(uuid string) (*Producer, bool) {
p.RLock()
defer p.RUnlock()
for _, v := range p.list {
if v.UUID == uuid {
if !v.Producer.channel.IsClosed() { // 如果通道未关闭,则返回
return v.Producer, true
}
p.Remove(uuid)
}
}
return nil, false
}

func (p *ProducerList) Remove(uuid string) {
p.Lock()
defer p.Unlock()
for i, v := range p.list {
if v.UUID == uuid {
p.list = append(p.list[:i], p.list[i+1:]...)
}
}
}

0 comments on commit 799454a

Please sign in to comment.