Skip to content

Commit

Permalink
STOMP connection recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Aug 13, 2024
1 parent 9ac0b3e commit 948d03d
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 59 deletions.
7 changes: 4 additions & 3 deletions pkg/amqp10_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,11 @@ func (c *Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
err = c.Receiver.AcceptMessage(ctx, msg)
if err != nil {
log.Error("message NOT accepted", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic)
} else {
metrics.MessagesConsumed.With(prometheus.Labels{"protocol": "amqp-1.0", "priority": priority}).Inc()
i++
log.Debug("message accepted", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic)
}
metrics.MessagesConsumed.With(prometheus.Labels{"protocol": "amqp-1.0", "priority": priority}).Inc()
i++
log.Debug("message accepted", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic)
}
}

Expand Down
94 changes: 68 additions & 26 deletions pkg/stomp_client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,57 +22,92 @@ type StompConsumer struct {
Subscription *stomp.Subscription
Topic string
Config config.Config
whichUri int
}

func NewConsumer(cfg config.Config, id int) *StompConsumer {
parsedUri := utils.ParseURI(cfg.ConsumerUri[0], "stomp", "61613")

var o []func(*stomp.Conn) error = []func(*stomp.Conn) error{
stomp.ConnOpt.Login(parsedUri.Username, parsedUri.Password),
stomp.ConnOpt.Host("/"), // TODO
consumer := &StompConsumer{
Id: id,
Connection: nil,
Subscription: nil,
Topic: topic.CalculateTopic(cfg.ConsumeFrom, id),
Config: cfg,
whichUri: 0,
}

conn, err := stomp.Dial("tcp", parsedUri.Broker, o...)
consumer.Connect()

if err != nil {
log.Error("consumer connection failed", "protocol", "STOMP", "consumerId", id, "error", err.Error())
return nil
return consumer
}

func (c *StompConsumer) Connect() {
if c.Subscription != nil {
_ = c.Subscription.Unsubscribe()
}
if c.Connection != nil {
_ = c.Connection.Disconnect()
}
c.Subscription = nil
c.Connection = nil

for c.Connection == nil {
if c.whichUri >= len(c.Config.ConsumerUri) {
c.whichUri = 0
}
uri := c.Config.ConsumerUri[c.whichUri]
c.whichUri++
parsedUri := utils.ParseURI(uri, "stomp", "61613")

var o []func(*stomp.Conn) error = []func(*stomp.Conn) error{
stomp.ConnOpt.Login(parsedUri.Username, parsedUri.Password),
stomp.ConnOpt.Host("/"), // TODO
}

topic := topic.CalculateTopic(cfg.ConsumeFrom, id)
log.Debug("connecting to broker", "protocol", "STOMP", "consumerId", c.Id, "broker", parsedUri.Broker)
conn, err := stomp.Dial("tcp", parsedUri.Broker, o...)

return &StompConsumer{
Id: id,
Connection: conn,
Topic: topic,
Config: cfg,
if err != nil {
log.Error("consumer connection failed", "protocol", "STOMP", "consumerId", c.Id, "error", err.Error())
time.Sleep(1 * time.Second)
} else {
c.Connection = conn
}
}

}

func (c StompConsumer) Start(ctx context.Context, subscribed chan bool) {
func (c *StompConsumer) Subscribe() {
var sub *stomp.Subscription
var err error

log.Info("subscribing to durable queue", "protocol", "STOMP", "consumerId", c.Id, "queue", c.Topic, "offset", c.Config.StreamOffset, "credits", c.Config.ConsumerCredits)
sub, err = c.Connection.Subscribe(c.Topic, stomp.AckClient, buildSubscribeOpts(c.Config)...)
if err != nil {
log.Error("subscription failed", "protocol", "STOMP", "consumerId", c.Id, "queue", c.Topic, "error", err.Error())
return
}
c.Subscription = sub
}

func (c *StompConsumer) Start(ctx context.Context, subscribed chan bool) {
c.Subscribe()
close(subscribed)
log.Info("consumer started", "protocol", "STOMP", "consumerId", c.Id, "destination", c.Topic)

previousMessageTimeSent := time.Unix(0, 0)
m := metrics.EndToEndLatency.With(prometheus.Labels{"protocol": "stomp"})

log.Info("consumer started", "protocol", "STOMP", "consumerId", c.Id, "destination", c.Topic)
previousMessageTimeSent := time.Unix(0, 0)
for i := 1; i <= c.Config.ConsumeCount; {
if c.Subscription == nil {
c.Subscribe()
}

for i := 1; i <= c.Config.ConsumeCount; i++ {
select {
case msg := <-sub.C:
case msg := <-c.Subscription.C:
if msg.Err != nil {
log.Error("failed to receive a message", "protocol", "STOMP", "consumerId", c.Id, "c.Topic", c.Topic, "error", msg.Err)
return
c.Connect()
continue
}

timeSent, latency := utils.CalculateEndToEndLatency(&msg.Body)
Expand All @@ -93,12 +128,15 @@ func (c StompConsumer) Start(ctx context.Context, subscribed chan bool) {
time.Sleep(c.Config.ConsumerLatency)
}

err = c.Connection.Ack(msg)
err := c.Connection.Ack(msg)
if err != nil {
log.Error("message NOT acknowledged", "protocol", "stomp", "consumerId", c.Id, "destination", c.Topic)

} else {
metrics.MessagesConsumed.With(prometheus.Labels{"protocol": "stomp", "priority": priority}).Inc()
i++
log.Debug("message acknowledged", "protocol", "stomp", "consumerId", c.Id, "terminus", c.Topic)
}
metrics.MessagesConsumed.With(prometheus.Labels{"protocol": "stomp", "priority": priority}).Inc()
case <-ctx.Done():
c.Stop("time limit reached")
return
Expand All @@ -111,10 +149,14 @@ func (c StompConsumer) Start(ctx context.Context, subscribed chan bool) {

}

func (c StompConsumer) Stop(reason string) {
func (c *StompConsumer) Stop(reason string) {
log.Debug("closing connection", "protocol", "stomp", "consumerId", c.Id, "reason", reason)
_ = c.Subscription.Unsubscribe()
_ = c.Connection.Disconnect()
if c.Subscription != nil {
_ = c.Subscription.Unsubscribe()
}
if c.Connection != nil {
_ = c.Connection.Disconnect()
}
}

func buildSubscribeOpts(cfg config.Config) []func(*frame.Frame) error {
Expand Down
88 changes: 58 additions & 30 deletions pkg/stomp_client/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,52 @@ type StompPublisher struct {
Topic string
Config config.Config
msg []byte
whichUri int
}

func NewPublisher(cfg config.Config, id int) *StompPublisher {
parsedUri := utils.ParseURI(cfg.PublisherUri[0], "stomp", "61613")

var o []func(*stomp.Conn) error = []func(*stomp.Conn) error{
stomp.ConnOpt.Login(parsedUri.Username, parsedUri.Password),
stomp.ConnOpt.Host("/"), // TODO
publisher := &StompPublisher{
Id: id,
Connection: nil,
Topic: topic.CalculateTopic(cfg.PublishTo, id),
Config: cfg,
}

conn, err := stomp.Dial("tcp", parsedUri.Broker, o...)
if err != nil {
log.Error("publisher connection failed", "protocol", "STOMP", "publisherId", id, "error", err.Error())
return nil
publisher.Connect()

return publisher
}

func (p *StompPublisher) Connect() {
if p.Connection != nil {
_ = p.Connection.Disconnect()
}
log.Info("publisher connected", "protocol", "STOMP", "publisherId", id)

topic := topic.CalculateTopic(cfg.PublishTo, id)
for p.Connection == nil {
if p.whichUri >= len(p.Config.PublisherUri) {
p.whichUri = 0
}
uri := p.Config.PublisherUri[p.whichUri]
p.whichUri++
parsedUri := utils.ParseURI(uri, "stomp", "61613")

return &StompPublisher{
Id: id,
Connection: conn,
Topic: topic,
Config: cfg,
var o []func(*stomp.Conn) error = []func(*stomp.Conn) error{
stomp.ConnOpt.Login(parsedUri.Username, parsedUri.Password),
stomp.ConnOpt.Host("/"), // TODO
}

conn, err := stomp.Dial("tcp", parsedUri.Broker, o...)
if err != nil {
log.Error("publisher connection failed", "protocol", "STOMP", "publisherId", p.Id, "error", err.Error())
time.Sleep(1 * time.Second)
} else {
p.Connection = conn
}
log.Info("connection established", "protocol", "STOMP", "publisherId", p.Id)
}
}

func (p StompPublisher) Start(ctx context.Context) {
func (p *StompPublisher) Start(ctx context.Context) {
// sleep random interval to avoid all publishers publishing at the same time
s := rand.Intn(1000)
time.Sleep(time.Duration(s) * time.Millisecond)
Expand All @@ -66,27 +84,32 @@ func (p StompPublisher) Start(ctx context.Context) {
}
}

func (p StompPublisher) StartFullSpeed(ctx context.Context) {
func (p *StompPublisher) StartFullSpeed(ctx context.Context) {
log.Info("publisher started", "protocol", "STOMP", "publisherId", p.Id, "rate", "unlimited", "destination", p.Topic)

for i := 1; i <= p.Config.PublishCount; i++ {
for i := 1; i <= p.Config.PublishCount; {
select {
case <-ctx.Done():
return
default:
p.Send()
err := p.Send()
if err != nil {
p.Connect()
} else {
i++
}
}
}
log.Debug("publisher completed", "protocol", "stomp", "publisherId", p.Id)
}

func (p StompPublisher) StartIdle(ctx context.Context) {
func (p *StompPublisher) StartIdle(ctx context.Context) {
log.Info("publisher started", "protocol", "STOMP", "publisherId", p.Id, "rate", "-", "destination", p.Topic)

_ = ctx.Done()
}

func (p StompPublisher) StartRateLimited(ctx context.Context) {
func (p *StompPublisher) StartRateLimited(ctx context.Context) {
log.Info("publisher started", "protocol", "STOMP", "publisherId", p.Id, "rate", p.Config.Rate, "destination", p.Topic)
ticker := time.NewTicker(time.Duration(1000/float64(p.Config.Rate)) * time.Millisecond)

Expand All @@ -97,32 +120,37 @@ func (p StompPublisher) StartRateLimited(ctx context.Context) {
p.Stop("time limit reached")
return
case <-ticker.C:
p.Send()
msgSent++
if msgSent >= p.Config.PublishCount {
p.Stop("publish count reached")
return
err := p.Send()
if err != nil {
p.Connect()
} else {
msgSent++
if msgSent >= p.Config.PublishCount {
p.Stop("publish count reached")
return
}
}
}
}
}

func (p StompPublisher) Send() {
func (p *StompPublisher) Send() error {
utils.UpdatePayload(p.Config.UseMillis, &p.msg)

timer := prometheus.NewTimer(metrics.PublishingLatency.With(prometheus.Labels{"protocol": "stomp"}))
err := p.Connection.Send(p.Topic, "", p.msg, buildHeaders(p.Config)...)
timer.ObserveDuration()
if err != nil {
log.Error("message sending failure", "protocol", "STOMP", "publisherId", p.Id, "error", err)
return
return err
}
log.Debug("message sent", "protocol", "STOMP", "publisherId", p.Id, "destination", p.Topic)

metrics.MessagesPublished.With(prometheus.Labels{"protocol": "stomp"}).Inc()
return nil
}

func (p StompPublisher) Stop(reason string) {
func (p *StompPublisher) Stop(reason string) {
log.Debug("closing connection", "protocol", "stomp", "publisherId", p.Id, "reason", reason)
_ = p.Connection.Disconnect()
}
Expand Down

0 comments on commit 948d03d

Please sign in to comment.