Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heartbeats misleading behaviour #325

Open
HustonMmmavr opened this issue Jun 24, 2024 · 1 comment
Open

Heartbeats misleading behaviour #325

HustonMmmavr opened this issue Jun 24, 2024 · 1 comment
Labels
enhancement New feature or request

Comments

@HustonMmmavr
Copy link
Contributor

Is your feature request related to a problem? Please describe.

Hello!

There is a question about hearbeats, for example the next code would raise to logs next messages

Code:

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"strconv"
	"time"

	"github.com/google/uuid"
	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)

func CheckErr(err error) {
	if err != nil {
		fmt.Printf("%s ", err)
		os.Exit(1)
	}
}

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer stop()
	env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost("localhost").
			SetPort(5552).
			SetUser("guest").
			SetPassword("guest").SetRequestedHeartbeat(time.Second * 1))
	CheckErr(err)

	streamName := uuid.New().String()
	err = env.DeclareStream(streamName,
		&stream.StreamOptions{
			MaxLengthBytes: stream.ByteCapacity{}.MB(500),
		},
	)
	CheckErr(err)

	consName := "consumer_"
	producer, err := env.NewProducer(streamName, nil)
	CheckErr(err)

	go func() {
		for i := 0; i < 1000; i++ {
			err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i))))
			CheckErr(err)
			time.Sleep(5 * time.Second)
		}
	}()

	handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
		err := consumerContext.Consumer.StoreCustomOffset(consumerContext.Consumer.GetOffset())
		if err != nil {
			CheckErr(err)
		}
		fmt.Println(string(message.GetData()))
	}

	cons, err := env.NewConsumer(streamName, handleMessages, stream.
		NewConsumerOptions().
		SetManualCommit().
		SetOffset(stream.OffsetSpecification{}.LastConsumed()).SetConsumerName(consName),
	)
	CheckErr(err)
	defer cons.Close()

	<-ctx.Done()
}
go run main.go
hello_world_0
hello_world_1
hello_world_2
hello_world_3
2024/06/24 18:24:35 [warn] - Missing heart beat: 1
2024/06/24 18:24:35 [warn] - Missing heart beat: 1
2024/06/24 18:24:35 [warn] - Missing heart beat: 1
2024/06/24 18:24:35 [warn] - Missing heart beat: 1
2024/06/24 18:24:35 [warn] - Missing heart beat: 1
hello_world_4
hello_world_5
hello_world_6
hello_world_7
2024/06/24 18:24:55 [warn] - Missing heart beat: 2
2024/06/24 18:24:55 [warn] - Too many heartbeat missing: 2
2024/06/24 18:24:55 [warn] - Missing heart beat: 2
2024/06/24 18:24:55 [warn] - Too many heartbeat missing: 2
2024/06/24 18:24:55 [error] - Producer BatchSend error during flush: write tcp 127.0.0.1:37404->127.0.0.1:5552: use of closed network connection
producer id: 0  closed exit status 1

Im a bit confused, because the demo rabbit is alive and connection seems too - messages still consuming.
Is it a correct behaviour?

Seems the heartBeat uses a hardcoded heartbeat intervals and the heartbeat condition fails

Describe the solution you'd like

Seems hearbeat should be called every interval specified by user, but I may be wrong

Describe alternatives you've considered

No response

Additional context

No response

@HustonMmmavr HustonMmmavr added the enhancement New feature or request label Jun 24, 2024
@HustonMmmavr HustonMmmavr changed the title Heartbeats Heartbeats misleading behaviour Jun 24, 2024
@Gsantomaggio
Copy link
Member

Hi @HustonMmmavr
Thank you for testing and reporting the issue.
This is a bug. When we set SetRequestedHeartbeat, we should also change the ticket timer.
Even we should put a limit; for example, 1 second is too low.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants