diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 7317a84..bcddedc 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -4,14 +4,19 @@ import "fmt" // Consumer ... type Consumer struct { - url string - exchange string - queue string + url string + exchange string + queue string + enableDelay bool } // Create a new consumer instance -func NewConsumer(rabbitURL, exchange, queue string) *Consumer { - return &Consumer{rabbitURL, exchange, queue} +func NewConsumer(rabbitURL, exchange, queue string, config ...Config) *Consumer { + if len(config) > 0 { + return &Consumer{rabbitURL, exchange, queue, config[0].EnableDelay} + } + + return &Consumer{rabbitURL, exchange, queue, false} } // Consume consume messages from the channels @@ -23,7 +28,7 @@ func (c *Consumer) Consume(workerFunc func([]byte)) error { defer ch.Close() defer conn.Close() - if err := initPubSub(ch, c.exchange, c.queue); err != nil { + if err := initPubSub(ch, c.exchange, c.queue, c.enableDelay); err != nil { return fmt.Errorf("failed to initialize a pubsub: %s", err.Error()) } diff --git a/pubsub/publisher.go b/pubsub/publisher.go index 7a2518d..f5d8dea 100644 --- a/pubsub/publisher.go +++ b/pubsub/publisher.go @@ -9,14 +9,19 @@ import ( // publisher ... type publisher struct { - url string - exchange string - queue string + url string + exchange string + queue string + enableDelay bool } // NewPublisher creates a new publisher to rabbit -func NewPublisher(rabbitURL, exchange, queue string) *publisher { - return &publisher{rabbitURL, exchange, queue} +func NewPublisher(rabbitURL, exchange, queue string, config ...Config) *publisher { + if len(config) > 0 { + return &publisher{rabbitURL, exchange, queue, config[0].EnableDelay} + } + + return &publisher{rabbitURL, exchange, queue, false} } // publish ... @@ -28,7 +33,7 @@ func (p *publisher) publish(body []byte, delay time.Duration) error { defer ch.Close() defer conn.Close() - if err := initPubSub(ch, p.exchange, p.queue); err != nil { + if err := initPubSub(ch, p.exchange, p.queue, p.enableDelay); err != nil { return fmt.Errorf("failed to initialize a pubsub: %s", err.Error()) } diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 88a66d7..22183f5 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -6,6 +6,11 @@ import ( "github.com/streadway/amqp" ) +// Config for publisher and consumer +type Config struct { + EnableDelay bool +} + func initQ(url string) (*amqp.Connection, *amqp.Channel, error) { conn, err := amqp.Dial(url) if err != nil { @@ -26,14 +31,19 @@ func initQ(url string) (*amqp.Connection, *amqp.Channel, error) { } // initPubSub ... -func initPubSub(ch *amqp.Channel, exchangeName, queueName string) error { +func initPubSub(ch *amqp.Channel, exchangeName, queueName string, enableDelay bool) error { + exchangeType := "direct" + if enableDelay { + exchangeType = "x-delayed-message" + } + err := ch.ExchangeDeclare( - exchangeName, // name - "x-delayed-message", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait + exchangeName, // name + exchangeType, // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait amqp.Table{ "x-delayed-type": "direct", }, // arguments