Skip to content

Commit

Permalink
fix: generic pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
ochom committed Apr 17, 2024
1 parent 405c292 commit afa31a6
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 19 deletions.
17 changes: 11 additions & 6 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ import "fmt"

// Consumer ...
type Consumer struct {
url string
exchange string
queue string
url string
exchange string
queue string
enableDelay bool
}

// Create a new consumer instance
func NewConsumer(rabbitURL, exchange, queue string) *Consumer {
return &Consumer{rabbitURL, exchange, queue}
func NewConsumer(rabbitURL, exchange, queue string, config ...Config) *Consumer {
if len(config) > 0 {
return &Consumer{rabbitURL, exchange, queue, config[0].EnableDelay}
}

return &Consumer{rabbitURL, exchange, queue, false}
}

// Consume consume messages from the channels
Expand All @@ -23,7 +28,7 @@ func (c *Consumer) Consume(workerFunc func([]byte)) error {
defer ch.Close()
defer conn.Close()

if err := initPubSub(ch, c.exchange, c.queue); err != nil {
if err := initPubSub(ch, c.exchange, c.queue, c.enableDelay); err != nil {
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
}

Expand Down
17 changes: 11 additions & 6 deletions pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@ import (

// publisher ...
type publisher struct {
url string
exchange string
queue string
url string
exchange string
queue string
enableDelay bool
}

// NewPublisher creates a new publisher to rabbit
func NewPublisher(rabbitURL, exchange, queue string) *publisher {
return &publisher{rabbitURL, exchange, queue}
func NewPublisher(rabbitURL, exchange, queue string, config ...Config) *publisher {
if len(config) > 0 {
return &publisher{rabbitURL, exchange, queue, config[0].EnableDelay}
}

return &publisher{rabbitURL, exchange, queue, false}
}

// publish ...
Expand All @@ -28,7 +33,7 @@ func (p *publisher) publish(body []byte, delay time.Duration) error {
defer ch.Close()
defer conn.Close()

if err := initPubSub(ch, p.exchange, p.queue); err != nil {
if err := initPubSub(ch, p.exchange, p.queue, p.enableDelay); err != nil {
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
}

Expand Down
24 changes: 17 additions & 7 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"github.com/streadway/amqp"
)

// Config for publisher and consumer
type Config struct {
EnableDelay bool
}

func initQ(url string) (*amqp.Connection, *amqp.Channel, error) {
conn, err := amqp.Dial(url)
if err != nil {
Expand All @@ -26,14 +31,19 @@ func initQ(url string) (*amqp.Connection, *amqp.Channel, error) {
}

// initPubSub ...
func initPubSub(ch *amqp.Channel, exchangeName, queueName string) error {
func initPubSub(ch *amqp.Channel, exchangeName, queueName string, enableDelay bool) error {
exchangeType := "direct"
if enableDelay {
exchangeType = "x-delayed-message"
}

err := ch.ExchangeDeclare(
exchangeName, // name
"x-delayed-message", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
exchangeName, // name
exchangeType, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
amqp.Table{
"x-delayed-type": "direct",
}, // arguments
Expand Down

0 comments on commit afa31a6

Please sign in to comment.