Skip to content

Commit

Permalink
update pubsub consumer publisher setup
Browse files Browse the repository at this point in the history
  • Loading branch information
ochom committed Oct 15, 2024
1 parent 0d705fd commit a380300
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 130 deletions.
96 changes: 61 additions & 35 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,71 @@ import (
"github.com/streadway/amqp"
)

// Consumer ...
type Consumer struct {
url string
queue string
config Config
// consumer ...
type consumer struct {
connectionName string
url string
queue string

// more
tag string
autoAck bool
exclusive bool
noLocal bool
noWait bool
}

type Config struct {
Type ExchangeType
Tag string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
// SetConnectionName implements Consumer.
func (c *consumer) SetConnectionName(connectionName string) {
c.connectionName = connectionName
}

var defaultConfig = Config{
Type: Direct,
Tag: "",
AutoAck: true,
Exclusive: false,
NoLocal: false,
NoWait: false,
// SetAutoAck implements Consumer.
func (c *consumer) SetAutoAck(autoAck bool) {
c.autoAck = autoAck
}

// Create a new consumer instance
func NewConsumer(rabbitURL, queue string, config ...Config) *Consumer {
c := Consumer{url: rabbitURL, queue: queue, config: defaultConfig}
// SetExclusive implements Consumer.
func (c *consumer) SetExclusive(exclusive bool) {
c.exclusive = exclusive
}

if len(config) > 0 {
c.config = config[0]
}
// SetNoLocal implements Consumer.
func (c *consumer) SetNoLocal(noLocal bool) {
c.noLocal = noLocal
}

// SetNoWait implements Consumer.
func (c *consumer) SetNoWait(noWait bool) {
c.noWait = noWait
}

return &c
// SetTag implements Consumer.
func (c *consumer) SetTag(tag string) {
c.tag = tag
}

// Create a new consumer instance
func NewConsumer(rabbitURL, queueName string) Consumer {
return &consumer{
url: rabbitURL,
queue: queueName,
autoAck: true,
exclusive: false,
noLocal: false,
noWait: false,
}
}

// Consume consume messages from the channels
func (c *Consumer) Consume(workerFunc func([]byte)) error {
conn, err := amqp.Dial(c.url)
func (c *consumer) Consume(workerFunc func([]byte)) error {
cfg := amqp.Config{
Properties: amqp.Table{
"connection_name": c.connectionName,
},
}

conn, err := amqp.DialConfig(c.url, cfg)
if err != nil {
return fmt.Errorf("failed to connect to RabbitMQ: %s", err.Error())
}
Expand All @@ -71,13 +97,13 @@ func (c *Consumer) Consume(workerFunc func([]byte)) error {
}

deliveries, err := ch.Consume(
q.Name, // queue
c.config.Tag, // consumerTag
c.config.AutoAck, // auto-ack
c.config.Exclusive, // exclusive
c.config.NoLocal, // no-local
c.config.NoWait, // no-wait
nil, // args
q.Name, // queue
c.tag, // consumerTag
c.autoAck, // auto-ack
c.exclusive, // exclusive
c.noLocal, // no-local
c.noWait, // no-wait
nil, // args
)

if err != nil {
Expand Down
106 changes: 88 additions & 18 deletions pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,108 @@ type publisher struct {
url string
exchange string
queue string
exchangeType string
exchangeType ExchangeType
routingKey string
}

// NewPublisher creates a new publisher to rabbit
func NewPublisher(rabbitURL, exchange, queue string, exchangeType ExchangeType) *publisher {
return &publisher{rabbitURL, exchange, queue, string(exchangeType)}
func NewPublisher(rabbitURL, exchangeName, queueName string) Publisher {
return &publisher{
url: rabbitURL,
exchange: exchangeName,
queue: queueName,
exchangeType: Direct,
}
}

func (p *publisher) SetRoutingKey(routingKey string) {
p.routingKey = routingKey
}

func (p *publisher) SetExchangeType(exchangeType ExchangeType) {
p.exchangeType = exchangeType
}

// PublishWithDelay ...
func (p *publisher) PublishWithDelay(body []byte, delay time.Duration) error {
return p.publish(body, delay)
}

// Publish ...
func (p *publisher) Publish(body []byte) error {
return p.publish(body, 0)
}

// initPubSub ...
func (p *publisher) initPubSub(ch *amqp.Channel) error {
err := ch.ExchangeDeclare(
p.exchange, // name
string(p.exchangeType), // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
amqp.Table{
"x-delayed-type": "direct",
}, // arguments
)
if err != nil {
return fmt.Errorf("exchange Declare: %s", err.Error())
}

// declare queue
q, err := ch.QueueDeclare(
p.queue, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("queue Declare: %s", err.Error())
}

// bind queue to exchange
err = ch.QueueBind(
q.Name, // queue name
p.routingKey, // routing key
p.exchange, // exchange
false, // no-wait
nil,
)
if err != nil {
return fmt.Errorf("queue Bind: %s", err.Error())
}

return nil
}

// publish ...
func (p *publisher) publish(body []byte, delay time.Duration) error {
ps, err := initQ(p.url)
conn, err := amqp.Dial(p.url)
if err != nil {
return fmt.Errorf("failed to initialize a connection: %s", err.Error())
return fmt.Errorf("failed to connect to RabbitMQ: %s", err.Error())
}
defer ps.Close()

if err := initPubSub(ps.ch, p.exchange, p.queue, p.exchangeType); err != nil {
defer conn.Close()

channel, err := conn.Channel()
if err != nil {
return fmt.Errorf("failed to open a channel: %s", err.Error())
}

err = channel.Qos(10, 0, false) // fair dispatch
if err != nil {
return fmt.Errorf("failed to set QoS: %s", err.Error())
}

if err := p.initPubSub(channel); err != nil {
return fmt.Errorf("failed to initialize a pubsub: %s", err.Error())
}

// publish message to exchange
err = ps.ch.Publish(
err = channel.Publish(
p.exchange, // exchange
"", // routing key
true, // mandatory
Expand All @@ -50,13 +130,3 @@ func (p *publisher) publish(body []byte, delay time.Duration) error {

return err
}

// PublishWithDelay ...
func (p *publisher) PublishWithDelay(body []byte, delay time.Duration) error {
return p.publish(body, delay)
}

// Publish ...
func (p *publisher) Publish(body []byte) error {
return p.publish(body, 0)
}
91 changes: 14 additions & 77 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package pubsub

import (
"fmt"

"github.com/streadway/amqp"
)
import "time"

// ExchangeType ...
type ExchangeType string
Expand All @@ -17,78 +13,19 @@ var (
Delayed ExchangeType = "x-delayed-message"
)

// Pubsub ...
type Pubsub struct {
conn *amqp.Connection
ch *amqp.Channel
}

// Close ...
func (p *Pubsub) Close() {
_ = p.conn.Close()
_ = p.ch.Close()
}

func initQ(url string) (*Pubsub, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, fmt.Errorf("failed to connect to RabbitMQ: %s", err.Error())
}

ch, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("failed to open a channel: %s", err.Error())
}

err = ch.Qos(10, 0, false) // fair dispatch
if err != nil {
return nil, fmt.Errorf("failed to set QoS: %s", err.Error())
}

return &Pubsub{conn: conn, ch: ch}, nil
type Publisher interface {
SetExchangeType(ExchangeType)
SetRoutingKey(string)
Publish([]byte) error
PublishWithDelay([]byte, time.Duration) error
}

// initPubSub ...
func initPubSub(ch *amqp.Channel, exchangeName, queueName, exchangeType string) error {
err := ch.ExchangeDeclare(
exchangeName, // name
exchangeType, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
amqp.Table{
"x-delayed-type": "direct",
}, // arguments
)
if err != nil {
return fmt.Errorf("exchange Declare: %s", err.Error())
}

// declare queue
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("queue Declare: %s", err.Error())
}

// bind queue to exchange
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
exchangeName, // exchange
false, // no-wait
nil,
)
if err != nil {
return fmt.Errorf("queue Bind: %s", err.Error())
}

return nil
type Consumer interface {
SetConnectionName(string)
SetTag(string)
SetAutoAck(bool)
SetExclusive(bool)
SetNoLocal(bool)
SetNoWait(bool)
Consume(func([]byte)) error
}

0 comments on commit a380300

Please sign in to comment.