Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer offsets a bit unclear #326

Open
HustonMmmavr opened this issue Jun 26, 2024 · 1 comment
Open

consumer offsets a bit unclear #326

HustonMmmavr opened this issue Jun 26, 2024 · 1 comment
Labels
enhancement New feature or request

Comments

@HustonMmmavr
Copy link
Contributor

Is your feature request related to a problem? Please describe.

Hello!

In my project I've to manually track offsets. It's done in a different (main) goroutine, while the consumer callback is called in it's own goroutine, so due to async nature i can't rely on consumer.Offset tracking outside of consumer routine, so I have to do something like that:

package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"os/signal"
	"strconv"

	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)

func CheckErr(err error) {
	if err != nil {
		fmt.Printf("%s ", err)
		os.Exit(1)
	}
}

type offsetMsg struct {
	offt int
	msg  *amqp.Message
}

func newHandler(acceptChan chan offsetMsg) func(stream.ConsumerContext, *amqp.Message) {
	return func(consumerContext stream.ConsumerContext, message *amqp.Message) {
		// fmt.Println(string(message.GetData()), message.DeliveryTag, consumerContext.Consumer.GetOffset())
		acceptChan <- offsetMsg{offt: int(consumerContext.Consumer.GetOffset()), msg: message}
	}
}

func storeOffset(c *stream.Consumer, offt int64) error {
	err := c.StoreCustomOffset(offt)
	if err != nil {
		fmt.Println(err)
	} else {
		fmt.Println("offset stored", offt)
	}
	return err
}

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer stop()
	env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost("localhost").
			SetPort(5552).
			SetUser("guest").
			SetPassword("guest"))
	CheckErr(err)

	streamName := "stream"
	err = env.DeclareStream(streamName,
		&stream.StreamOptions{
			MaxLengthBytes: stream.ByteCapacity{}.MB(500),
		},
	)
	CheckErr(err)

	consName := "consumer_"
	producer, err := env.NewProducer(streamName, nil)
	CheckErr(err)

	go func() {
		for i := 0; i < 5; i++ {
			err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i))))
			CheckErr(err)
		}
	}()

	offt, err := env.QueryOffset(consName, streamName)
	if err != nil {
		if !errors.Is(err, stream.OffsetNotFoundError) {
			panic(err)
		}
	} else {
		offt++
	}
	fmt.Println("queried offset")

	acceptCh := make(chan offsetMsg) //*amqp.Message)
	handler := newHandler(acceptCh)

	cons, err := env.NewConsumer(streamName, handler, stream.
		NewConsumerOptions().
		SetManualCommit().
		SetOffset(stream.OffsetSpecification{}.Offset(offt)).SetConsumerName(consName),
	)

	go func() {
		counter := 0
		for {
			select {
			case <-ctx.Done():
				return
			case data := <-acceptCh:
				counter++
				fmt.Printf("data: %s, offset: %d, deliveryTag: %v\n", string(data.msg.GetData()), data.offt, data.msg.DeliveryTag)
				if counter%5 == 0 {
					storeOffset(cons, int64(data.offt))
				}
			}
		}
	}()

	CheckErr(err)
	defer cons.Close()

	<-ctx.Done()
}

The output is next:

queried offset
data: hello_world_0, offset: 0, deliveryTag: []
data: hello_world_1, offset: 1, deliveryTag: []
data: hello_world_2, offset: 2, deliveryTag: []
data: hello_world_3, offset: 3, deliveryTag: []
data: hello_world_4, offset: 4, deliveryTag: []
offset stored 4

Also, as I saw in library - the consumer routine works with array of offsetMessages, where offsetMessage - pointer to *amqp.Message and offset. As a result there are two unnecessary wraps. Maybe it's worth to send offsetMessage to the callback, or extend the amqp.Message with the offset field (like it's done inside amqp lib DeliveryTag field).

Here is an amqp-code:

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"

	amqp "github.com/rabbitmq/amqp091-go"
)

func CheckErr(err error) {
	if err != nil {
		fmt.Printf("%s ", err)
		os.Exit(1)
	}
}

func main() {
	ctx, done := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer done()
	produce("amqp://localhost:5672", "exch", "fanout", "key")
	c := NewConsumer("amqp://localhost:5672", "exch", "fanout", "q", "key", "tag")
	SetupCloseHandler(c)
	<-ctx.Done()
}

func produce(uri, exchange, exchangeType, routingKey string) {
	connection, err := amqp.Dial(uri)
	CheckErr(err)
	defer connection.Close()

	channel, err := connection.Channel()
	CheckErr(err)
	err = channel.ExchangeDeclare(exchange, exchangeType, true, false, false, false, nil)
	CheckErr(err)

	for i := 0; i < 5; i++ {
		body := []byte(fmt.Sprintf("body_%d", i))
		err := channel.Publish(
			exchange,   // publish to an exchange
			routingKey, // routing to 0 or more queues
			false,      // mandatory
			false,      // immediate
			amqp.Publishing{
				Headers:         amqp.Table{},
				ContentType:     "text/plain",
				ContentEncoding: "",
				Body:            []byte(body),
				DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent
				Priority:        0,              // 0-9
				// a bunch of application/implementation-specific fields
			},
		)
		CheckErr(err)
	}
}

type Consumer struct {
	conn    *amqp.Connection
	channel *amqp.Channel
	tag     string
	done    chan error
}

func SetupCloseHandler(consumer *Consumer) {
	c := make(chan os.Signal, 2)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
	go func() {
		<-c
		fmt.Printf("Ctrl+C pressed in Terminal")
		consumer.channel.Close()
		consumer.conn.Close()
		fmt.Println(<-consumer.done)
		os.Exit(0)
	}()
}

func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) *Consumer {
	c := &Consumer{
		conn: nil, channel: nil, tag: ctag, done: make(chan error),
	}

	var err error
	config := amqp.Config{Properties: amqp.NewConnectionProperties()}
	config.Properties.SetClientConnectionName("sample-consumer")

	c.conn, err = amqp.DialConfig(amqpURI, config)
	CheckErr(err)

	go func() {
		fmt.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error)))
	}()

	c.channel, err = c.conn.Channel()
	CheckErr(err)

	err = c.channel.ExchangeDeclare(exchange, exchangeType, true, false, false, false, nil)
	CheckErr(err)

	queue, err := c.channel.QueueDeclare(queueName, true, false, false, false, nil)
	CheckErr(err)

	err = c.channel.QueueBind(queue.Name, key, exchange, false, nil)
	CheckErr(err)

	deliveries, err := c.channel.Consume(queue.Name, c.tag, false, false, false, false, nil)
	CheckErr(err)

	go handle(deliveries, c.done)

	return c
}

func handle(deliveries <-chan amqp.Delivery, done chan error) {
	cleanup := func() {
		fmt.Printf("handle: deliveries channel closed")
		done <- nil
	}

	defer cleanup()

	for d := range deliveries {
		fmt.Printf("data: %s, deliveryTag: %d\n", string(d.Body), d.DeliveryTag)
		CheckErr(d.Ack(false))
	}
}

It's output is next:

data: body_0, deliveryTag: 1
data: body_1, deliveryTag: 2
data: body_2, deliveryTag: 3
data: body_3, deliveryTag: 4
data: body_4, deliveryTag: 5

Describe the solution you'd like

Maybe it's worth to make api of amqp Messages similar to amqp library or extend current amqp message with new Offset field.
And the second proposal: maybe it's worth to add the ability for user to interact with channel based approach, like it's done inside amqp lib (for ex create new API method that will return consumer channel to user application)

Describe alternatives you've considered

No response

Additional context

No response

@HustonMmmavr HustonMmmavr added the enhancement New feature or request label Jun 26, 2024
@Gsantomaggio
Copy link
Member

hi @HustonMmmavr ,
The offset is outside the message context, so this information cannot be added to the message. The other clients have the same behaviour.

Having a channel on the consumer side with all the information is correct, as you have done.

maybe it's worth to add the ability for user to interact with channel based approach

That was the intention for the version 2.x of this client, but at the moment, the implementation is blocked due to other priorities.

You could propose a PR if you like, but it can be unclear whether the user has two ways to consume the messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants