From 85bdd9e9e7df79ad578a5482da67e6dd7685b728 Mon Sep 17 00:00:00 2001 From: Pablo Aguilar Date: Mon, 18 Mar 2024 19:48:39 -0300 Subject: [PATCH] update `amqp091-go` from `v1.7.0` to `v1.9.0` --- go.mod | 2 +- go.sum | 4 + .../rabbitmq/amqp091-go/CHANGELOG.md | 46 ++++++ .../rabbitmq/amqp091-go/CONTRIBUTING.md | 18 ++- .../github.com/rabbitmq/amqp091-go/Makefile | 21 +++ .../github.com/rabbitmq/amqp091-go/RELEASE.md | 11 ++ .../rabbitmq/amqp091-go/allocator.go | 40 +++-- .../github.com/rabbitmq/amqp091-go/certs.sh | 8 +- .../github.com/rabbitmq/amqp091-go/channel.go | 147 +++++++++++++++++- .../rabbitmq/amqp091-go/confirms.go | 23 ++- .../rabbitmq/amqp091-go/connection.go | 105 ++++++++++--- .../rabbitmq/amqp091-go/consumers.go | 27 ++++ .../github.com/rabbitmq/amqp091-go/spec091.go | 128 +++++++-------- .../github.com/rabbitmq/amqp091-go/types.go | 52 +++++-- .../github.com/rabbitmq/amqp091-go/write.go | 3 +- vendor/modules.txt | 2 +- 16 files changed, 506 insertions(+), 131 deletions(-) diff --git a/go.mod b/go.mod index 3c21622..5bee736 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/wagslane/go-rabbitmq go 1.20 -require github.com/rabbitmq/amqp091-go v1.7.0 +require github.com/rabbitmq/amqp091-go v1.9.0 diff --git a/go.sum b/go.sum index e627709..ee6e1e8 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo= github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -13,6 +15,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= diff --git a/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md b/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md index 1165775..db633d4 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md +++ b/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md @@ -1,5 +1,51 @@ # Changelog +## [v1.8.1](https://github.com/rabbitmq/amqp091-go/tree/v1.8.1) (2023-05-04) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.8.0...v1.8.1) + +**Fixed bugs:** + +- Fixed incorrect version reported in client properties [52ce2efd03c53dcf77d5496977da46840e9abd24](https://github.com/rabbitmq/amqp091-go/commit/52ce2efd03c53dcf77d5496977da46840e9abd24) + +**Merged pull requests:** + +- Fix Example Client not reconnecting [\#186](https://github.com/rabbitmq/amqp091-go/pull/186) ([frankfil](https://github.com/frankfil)) + +## [v1.8.0](https://github.com/rabbitmq/amqp091-go/tree/v1.8.0) (2023-03-21) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.7.0...v1.8.0) + +**Closed issues:** + +- memory leak [\#179](https://github.com/rabbitmq/amqp091-go/issues/179) +- the publishWithContext interface will not return when it times out [\#178](https://github.com/rabbitmq/amqp091-go/issues/178) + +**Merged pull requests:** + +- Fix race condition on confirms [\#183](https://github.com/rabbitmq/amqp091-go/pull/183) ([calloway-jacob](https://github.com/calloway-jacob)) +- Add a CloseDeadline function to Connection [\#181](https://github.com/rabbitmq/amqp091-go/pull/181) ([Zerpet](https://github.com/Zerpet)) +- Fix memory leaks [\#180](https://github.com/rabbitmq/amqp091-go/pull/180) ([GXKe](https://github.com/GXKe)) +- Bump go.uber.org/goleak from 1.2.0 to 1.2.1 [\#177](https://github.com/rabbitmq/amqp091-go/pull/177) ([dependabot[bot]](https://github.com/apps/dependabot)) + +## [v1.7.0](https://github.com/rabbitmq/amqp091-go/tree/v1.7.0) (2023-02-09) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1...v1.7.0) + +**Closed issues:** + +- \#31 resurfacing \(?\) [\#170](https://github.com/rabbitmq/amqp091-go/issues/170) +- Deprecate QueueInspect [\#167](https://github.com/rabbitmq/amqp091-go/issues/167) +- v1.6.0 causing rabbit connection errors [\#160](https://github.com/rabbitmq/amqp091-go/issues/160) + +**Merged pull requests:** + +- Set channels and allocator to nil in shutdown [\#172](https://github.com/rabbitmq/amqp091-go/pull/172) ([lukebakken](https://github.com/lukebakken)) +- Fix racing in Open [\#171](https://github.com/rabbitmq/amqp091-go/pull/171) ([Zerpet](https://github.com/Zerpet)) +- adding go 1.20 to tests [\#169](https://github.com/rabbitmq/amqp091-go/pull/169) ([halilylm](https://github.com/halilylm)) +- Deprecate the QueueInspect function [\#168](https://github.com/rabbitmq/amqp091-go/pull/168) ([lukebakken](https://github.com/lukebakken)) +- Check if channel is nil before updating it [\#150](https://github.com/rabbitmq/amqp091-go/pull/150) ([julienschmidt](https://github.com/julienschmidt)) + ## [v1.6.1](https://github.com/rabbitmq/amqp091-go/tree/v1.6.1) (2023-02-01) [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.6.1-rc.2...v1.6.1) diff --git a/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md b/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md index ed1b971..ec86fe5 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md +++ b/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md @@ -9,11 +9,13 @@ Here is the recommended workflow: 1. Run Static Checks 1. Run integration tests (see below) 1. **Implement tests** -1. Implement fixs -1. Commit your changes (`git commit -am 'Add some feature'`) +1. Implement fixes +1. Commit your changes. Use a [good, descriptive, commit message][good-commit]. 1. Push to a branch (`git push -u origin my-new-feature`) 1. Submit a pull request +[good-commit]: https://cbea.ms/git-commit/ + ## Running Static Checks golangci-lint must be installed to run the static checks. See [installation @@ -43,6 +45,18 @@ The integration tests can be run via: make tests ``` +Some tests require access to `rabbitmqctl` CLI. Use the environment variable +`RABBITMQ_RABBITMQCTL_PATH=/some/path/to/rabbitmqctl` to run those tests. + +If you have Docker available in your machine, you can run: + +```shell +make tests-docker +``` + +This target will start a RabbitMQ container, run the test suite with the environment +variable setup, and stop RabbitMQ container after a successful run. + All integration tests should use the `integrationConnection(...)` test helpers defined in `integration_test.go` to setup the integration environment and logging. diff --git a/vendor/github.com/rabbitmq/amqp091-go/Makefile b/vendor/github.com/rabbitmq/amqp091-go/Makefile index 7342731..7dc71bc 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/Makefile +++ b/vendor/github.com/rabbitmq/amqp091-go/Makefile @@ -19,6 +19,11 @@ fmt: ## Run go fmt against code tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test go test -race -v -tags integration $(GO_TEST_FLAGS) +.PHONY: tests-docker +tests-docker: rabbitmq-server + RABBITMQ_RABBITMQCTL_PATH="DOCKER:$(CONTAINER_NAME)" go test -race -v -tags integration $(GO_TEST_FLAGS) + $(MAKE) stop-rabbitmq-server + .PHONY: check check: golangci-lint run ./... @@ -34,3 +39,19 @@ rabbitmq-server: ## Start a RabbitMQ server using Docker. Container name can be .PHONY: stop-rabbitmq-server stop-rabbitmq-server: ## Stop a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit docker stop $(CONTAINER_NAME) + +certs: + ./certs.sh + +.PHONY: certs-rm +certs-rm: + rm -r ./certs/ + +.PHONY: rabbitmq-server-tls +rabbitmq-server-tls: | certs ## Start a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit + docker run --detach --rm --name $(CONTAINER_NAME) \ + --publish 5672:5672 --publish 5671:5671 --publish 15672:15672 \ + --mount type=bind,src=./certs/server,dst=/certs \ + --mount type=bind,src=./certs/ca/cacert.pem,dst=/certs/cacert.pem,readonly \ + --mount type=bind,src=./rabbitmq-confs/tls/90-tls.conf,dst=/etc/rabbitmq/conf.d/90-tls.conf \ + --pull always rabbitmq:3-management diff --git a/vendor/github.com/rabbitmq/amqp091-go/RELEASE.md b/vendor/github.com/rabbitmq/amqp091-go/RELEASE.md index a1b1ae0..1378d68 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/RELEASE.md +++ b/vendor/github.com/rabbitmq/amqp091-go/RELEASE.md @@ -1,3 +1,14 @@ +# Guide to release a new version + +1. Update the `buildVersion` constant in [connection.go](https://github.com/rabbitmq/amqp091-go/blob/4886c35d10b273bd374e3ed2356144ad41d27940/connection.go#L31) +2. Commit and push. Include the version in the commit message e.g. [this commit](https://github.com/rabbitmq/amqp091-go/commit/52ce2efd03c53dcf77d5496977da46840e9abd24) +3. Create a new [GitHub Release](https://github.com/rabbitmq/amqp091-go/releases). Create a new tag as `v..` + 1. Use auto-generate release notes feature in GitHub +4. Generate the change log, see [Changelog Generation](#changelog-generation) +5. Review the changelog. Watch out for issues closed as "not-fixed" or without a PR +6. Commit and Push. Pro-tip: include `[skip ci]` in the commit message to skip the CI run, since it's only documentation +7. Send an announcement to the mailing list. Take inspiration from [this message](https://groups.google.com/g/rabbitmq-users/c/EBGYGOWiSgs/m/0sSFuAGICwAJ) + ## Changelog Generation ``` diff --git a/vendor/github.com/rabbitmq/amqp091-go/allocator.go b/vendor/github.com/rabbitmq/amqp091-go/allocator.go index 0688e4b..f2925e7 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/allocator.go +++ b/vendor/github.com/rabbitmq/amqp091-go/allocator.go @@ -18,10 +18,10 @@ const ( // allocator maintains a bitset of allocated numbers. type allocator struct { - pool *big.Int - last int - low int - high int + pool *big.Int + follow int + low int + high int } // NewAllocator reserves and frees integers out of a range between low and @@ -31,10 +31,10 @@ type allocator struct { // sizeof(big.Word) func newAllocator(low, high int) *allocator { return &allocator{ - pool: big.NewInt(0), - last: low, - low: low, - high: high, + pool: big.NewInt(0), + follow: low, + low: low, + high: high, } } @@ -69,21 +69,29 @@ func (a allocator) String() string { // O(N) worst case runtime where N is allocated, but usually O(1) due to a // rolling index into the oldest allocation. func (a *allocator) next() (int, bool) { - wrapped := a.last + wrapped := a.follow + defer func() { + // make a.follow point to next value + if a.follow == a.high { + a.follow = a.low + } else { + a.follow += 1 + } + }() // Find trailing bit - for ; a.last <= a.high; a.last++ { - if a.reserve(a.last) { - return a.last, true + for ; a.follow <= a.high; a.follow++ { + if a.reserve(a.follow) { + return a.follow, true } } // Find preceding free'd pool - a.last = a.low + a.follow = a.low - for ; a.last < wrapped; a.last++ { - if a.reserve(a.last) { - return a.last, true + for ; a.follow < wrapped; a.follow++ { + if a.reserve(a.follow) { + return a.follow, true } } diff --git a/vendor/github.com/rabbitmq/amqp091-go/certs.sh b/vendor/github.com/rabbitmq/amqp091-go/certs.sh index 403e80c..0bbb1c6 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/certs.sh +++ b/vendor/github.com/rabbitmq/amqp091-go/certs.sh @@ -71,12 +71,12 @@ keyUsage = keyCertSign, cRLSign [ client_ca_extensions ] basicConstraints = CA:false -keyUsage = digitalSignature +keyUsage = keyEncipherment,digitalSignature extendedKeyUsage = 1.3.6.1.5.5.7.3.2 [ server_ca_extensions ] basicConstraints = CA:false -keyUsage = keyEncipherment +keyUsage = keyEncipherment,digitalSignature extendedKeyUsage = 1.3.6.1.5.5.7.3.1 subjectAltName = @alt_names @@ -106,7 +106,7 @@ openssl req \ -new \ -nodes \ -config openssl.cnf \ - -subj "/CN=127.0.0.1/O=server/" \ + -subj "/CN=localhost/O=server/" \ -key $root/server/key.pem \ -out $root/server/req.pem \ -outform PEM @@ -115,7 +115,7 @@ openssl req \ -new \ -nodes \ -config openssl.cnf \ - -subj "/CN=127.0.0.1/O=client/" \ + -subj "/CN=localhost/O=client/" \ -key $root/client/key.pem \ -out $root/client/req.pem \ -outform PEM diff --git a/vendor/github.com/rabbitmq/amqp091-go/channel.go b/vendor/github.com/rabbitmq/amqp091-go/channel.go index 8ba9bab..0dcec90 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/channel.go +++ b/vendor/github.com/rabbitmq/amqp091-go/channel.go @@ -41,6 +41,7 @@ type Channel struct { // closed is set to 1 when the channel has been closed - see Channel.send() closed int32 + close chan struct{} // true when we will never notify again noNotify bool @@ -86,6 +87,7 @@ func newChannel(c *Connection, id uint16) *Channel { confirms: newConfirms(), recv: (*Channel).recvMethod, errors: make(chan *Error, 1), + close: make(chan struct{}), } } @@ -146,6 +148,7 @@ func (ch *Channel) shutdown(e *Error) { } close(ch.errors) + close(ch.close) ch.noNotify = true }) } @@ -368,7 +371,11 @@ func (ch *Channel) dispatch(msg message) { // deliveries are in flight and a no-wait cancel has happened default: - ch.rpc <- msg + select { + case <-ch.close: + return + case ch.rpc <- msg: + } } } @@ -468,6 +475,10 @@ code set to '200'. It is safe to call this method multiple times. */ func (ch *Channel) Close() error { + if ch.IsClosed() { + return nil + } + defer ch.connection.closeChannel(ch, nil) return ch.call( &channelClose{ReplyCode: replySuccess}, @@ -1085,7 +1096,8 @@ Inflight messages, limited by Channel.Qos will be buffered until received from the returned chan. When the Channel or Connection is closed, all buffered and inflight messages will -be dropped. +be dropped. RabbitMQ will requeue messages not acknowledged. In other words, dropped +messages in this way won't be lost. When the consumer tag is cancelled, all inflight messages will be delivered until the returned chan is closed. @@ -1126,6 +1138,121 @@ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, return deliveries, nil } +/* +ConsumeWithContext immediately starts delivering queued messages. + +This function is similar to Channel.Consume, and accepts a context to control +consumer lifecycle. When the context passed to this function is canceled, the +consumer associated with the deliveries channel will be canceled too. When the +context passed to this function is cancelled, the deliveries channel will be closed. + +An application is advised to keep on receiving messages from the delivery channel +until the channel is empty. This is specially important to avoid memory leaks from +unconsumed messages from the delivery channel. + +Begin receiving on the returned chan Delivery before any other operation on the +Connection or Channel. + +Continues deliveries to the returned chan Delivery until Channel.Cancel, +Connection.Close, Channel.Close, context is cancelled, or an AMQP exception +occurs. Consumers must range over the chan to ensure all deliveries are +received. Unreceived deliveries will block all methods on the same connection. + +All deliveries in AMQP must be acknowledged. It is expected of the consumer to +call Delivery.Ack after it has successfully processed the delivery. If the +consumer is cancelled or the channel or connection is closed any unacknowledged +deliveries will be requeued at the end of the same queue. + +The consumer is identified by a string that is unique and scoped for all +consumers on this channel. If you wish to eventually cancel the consumer, use +the same non-empty identifier in Channel.Cancel. An empty string will cause +the library to generate a unique identity. The consumer identity will be +included in every Delivery in the ConsumerTag field + +When autoAck (also known as noAck) is true, the server will acknowledge +deliveries to this consumer prior to writing the delivery to the network. When +autoAck is true, the consumer should not call Delivery.Ack. Automatically +acknowledging deliveries means that some deliveries may get lost if the +consumer is unable to process them after the server delivers them. +See http://www.rabbitmq.com/confirms.html for more details. + +When exclusive is true, the server will ensure that this is the sole consumer +from this queue. When exclusive is false, the server will fairly distribute +deliveries across multiple consumers. + +The noLocal flag is not supported by RabbitMQ. + +It's advisable to use separate connections for Channel.Publish and +Channel.Consume so not to have TCP pushback on publishing affect the ability to +consume messages, so this parameter is here mostly for completeness. + +When noWait is true, do not wait for the server to confirm the request and +immediately begin deliveries. If it is not possible to consume, a channel +exception will be raised and the channel will be closed. + +Optional arguments can be provided that have specific semantics for the queue +or server. + +Inflight messages, limited by Channel.Qos will be buffered until received from +the returned chan. + +When the Channel or Connection is closed, all buffered and inflight messages will +be dropped. RabbitMQ will requeue messages not acknowledged. In other words, dropped +messages in this way won't be lost. +*/ +func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) { + // When we return from ch.call, there may be a delivery already for the + // consumer that hasn't been added to the consumer hash yet. Because of + // this, we never rely on the server picking a consumer tag for us. + + if err := args.Validate(); err != nil { + return nil, err + } + + if consumer == "" { + consumer = uniqueConsumerTag() + } + + req := &basicConsume{ + Queue: queue, + ConsumerTag: consumer, + NoLocal: noLocal, + NoAck: autoAck, + Exclusive: exclusive, + NoWait: noWait, + Arguments: args, + } + res := &basicConsumeOk{} + + select { + default: + case <-ctx.Done(): + return nil, ctx.Err() + } + + deliveries := make(chan Delivery) + + ch.consumers.add(consumer, deliveries) + + if err := ch.call(req, res); err != nil { + ch.consumers.cancel(consumer) + return nil, err + } + + go func() { + select { + case <-ch.consumers.closed: + return + case <-ctx.Done(): + if ch != nil { + _ = ch.Cancel(consumer, false) + } + } + }() + + return deliveries, nil +} + /* ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server @@ -1167,7 +1294,7 @@ Note: RabbitMQ declares the default exchange types like 'amq.fanout' as durable, so queues that bind to these pre-declared exchanges must also be durable. -Exchanges declared as `internal` do not accept accept publishings. Internal +Exchanges declared as `internal` do not accept publishings. Internal exchanges are useful when you wish to implement inter-exchange topologies that should not be exposed to users of the broker. @@ -1435,6 +1562,11 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex ch.m.Lock() defer ch.m.Unlock() + var dc *DeferredConfirmation + if ch.confirming { + dc = ch.confirms.publish() + } + if err := ch.send(&basicPublish{ Exchange: exchange, RoutingKey: key, @@ -1457,14 +1589,13 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex AppId: msg.AppId, }, }); err != nil { + if ch.confirming { + ch.confirms.unpublish() + } return nil, err } - if ch.confirming { - return ch.confirms.Publish(), nil - } - - return nil, nil + return dc, nil } /* diff --git a/vendor/github.com/rabbitmq/amqp091-go/confirms.go b/vendor/github.com/rabbitmq/amqp091-go/confirms.go index f9973b7..577e042 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/confirms.go +++ b/vendor/github.com/rabbitmq/amqp091-go/confirms.go @@ -39,7 +39,7 @@ func (c *confirms) Listen(l chan Confirmation) { } // Publish increments the publishing counter -func (c *confirms) Publish() *DeferredConfirmation { +func (c *confirms) publish() *DeferredConfirmation { c.publishedMut.Lock() defer c.publishedMut.Unlock() @@ -47,6 +47,15 @@ func (c *confirms) Publish() *DeferredConfirmation { return c.deferredConfirmations.Add(c.published) } +// unpublish decrements the publishing counter and removes the +// DeferredConfirmation. It must be called immediately after a publish fails. +func (c *confirms) unpublish() { + c.publishedMut.Lock() + defer c.publishedMut.Unlock() + c.deferredConfirmations.remove(c.published) + c.published-- +} + // confirm confirms one publishing, increments the expecting delivery tag, and // removes bookkeeping for that delivery tag. func (c *confirms) confirm(confirmation Confirmation) { @@ -135,6 +144,18 @@ func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation { return dc } +// remove is only used to drop a tag whose publish failed +func (d *deferredConfirmations) remove(tag uint64) { + d.m.Lock() + defer d.m.Unlock() + dc, found := d.confirmations[tag] + if !found { + return + } + close(dc.done) + delete(d.confirmations, tag) +} + func (d *deferredConfirmations) Confirm(confirmation Confirmation) { d.m.Lock() defer d.m.Unlock() diff --git a/vendor/github.com/rabbitmq/amqp091-go/connection.go b/vendor/github.com/rabbitmq/amqp091-go/connection.go index def2260..c8bb820 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/connection.go +++ b/vendor/github.com/rabbitmq/amqp091-go/connection.go @@ -28,7 +28,7 @@ const ( defaultHeartbeat = 10 * time.Second defaultConnectionTimeout = 30 * time.Second defaultProduct = "AMQP 0.9.1 Client" - buildVersion = "1.6.0" + buildVersion = "1.9.0" platform = "golang" // Safer default that makes channel leaks a lot easier to spot // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. @@ -112,6 +112,8 @@ type Connection struct { blocks []chan Blocking errors chan *Error + // if connection is closed should close this chan + close chan struct{} Config Config // The negotiated Config after connection.open @@ -263,6 +265,7 @@ func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) { rpc: make(chan message), sends: make(chan time.Time), errors: make(chan *Error, 1), + close: make(chan struct{}), deadlines: make(chan readDeadliner, 1), } go c.reader(conn) @@ -399,12 +402,47 @@ func (c *Connection) Close() error { ) } +// CloseDeadline requests and waits for the response to close this AMQP connection. +// +// Accepts a deadline for waiting the server response. The deadline is passed +// to the low-level connection i.e. network socket. +// +// Regardless of the error returned, the connection is considered closed, and it +// should not be used after calling this function. +// +// In the event of an I/O timeout, connection-closed listeners are NOT informed. +// +// After returning from this call, all resources associated with this connection, +// including the underlying io, Channels, Notify listeners and Channel consumers +// will also be closed. +func (c *Connection) CloseDeadline(deadline time.Time) error { + if c.IsClosed() { + return ErrClosed + } + + defer c.shutdown(nil) + + err := c.setDeadline(deadline) + if err != nil { + return err + } + + return c.call( + &connectionClose{ + ReplyCode: replySuccess, + ReplyText: "kthxbai", + }, + &connectionCloseOk{}, + ) +} + func (c *Connection) closeWith(err *Error) error { if c.IsClosed() { return ErrClosed } defer c.shutdown(err) + return c.call( &connectionClose{ ReplyCode: uint16(err.Code), @@ -420,6 +458,18 @@ func (c *Connection) IsClosed() bool { return atomic.LoadInt32(&c.closed) == 1 } +// setDeadline is a wrapper to type assert Connection.conn and set an I/O +// deadline in the underlying TCP connection socket, by calling +// net.Conn.SetDeadline(). It returns an error, in case the type assertion fails, +// although this should never happen. +func (c *Connection) setDeadline(t time.Time) error { + con, ok := c.conn.(net.Conn) + if !ok { + return errInvalidTypeAssertion + } + return con.SetDeadline(t) +} + func (c *Connection) send(f frame) error { if c.IsClosed() { return ErrClosed @@ -550,6 +600,8 @@ func (c *Connection) shutdown(err *Error) { } c.conn.Close() + // reader exit + close(c.close) c.channels = nil c.allocator = nil @@ -587,15 +639,23 @@ func (c *Connection) dispatch0(f frame) { c <- Blocking{Active: false} } default: - c.rpc <- m + select { + case <-c.close: + return + case c.rpc <- m: + } + } case *heartbeatFrame: // kthx - all reads reset our deadline. so we can drop this default: // lolwat - channel0 only responds to methods and heartbeats - if err := c.closeWith(ErrUnexpectedFrame); err != nil { - Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err) - } + // closeWith use call don't block reader + go func() { + if err := c.closeWith(ErrUnexpectedFrame); err != nil { + Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err) + } + }() } } @@ -642,9 +702,12 @@ func (c *Connection) dispatchClosed(f frame) { // we are already closed, so do nothing default: // unexpected method on closed channel - if err := c.closeWith(ErrClosed); err != nil { - Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err) - } + // closeWith use call don't block reader + go func() { + if err := c.closeWith(ErrClosed); err != nil { + Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err) + } + }() } } } @@ -766,13 +829,16 @@ func (c *Connection) allocateChannel() (*Channel, error) { // releaseChannel removes a channel from the registry as the final part of the // channel lifecycle -func (c *Connection) releaseChannel(id uint16) { +func (c *Connection) releaseChannel(ch *Channel) { c.m.Lock() defer c.m.Unlock() if !c.IsClosed() { - delete(c.channels, id) - c.allocator.release(int(id)) + got, ok := c.channels[ch.id] + if ok && got == ch { + delete(c.channels, ch.id) + c.allocator.release(int(ch.id)) + } } } @@ -784,7 +850,7 @@ func (c *Connection) openChannel() (*Channel, error) { } if err := ch.open(); err != nil { - c.releaseChannel(ch.id) + c.releaseChannel(ch) return nil, err } return ch, nil @@ -795,7 +861,7 @@ func (c *Connection) openChannel() (*Channel, error) { // this connection. func (c *Connection) closeChannel(ch *Channel, e *Error) { ch.shutdown(e) - c.releaseChannel(ch.id) + c.releaseChannel(ch) } /* @@ -816,13 +882,14 @@ func (c *Connection) call(req message, res ...message) error { } } - msg, ok := <-c.rpc - if !ok { - err, errorsChanIsOpen := <-c.errors - if !errorsChanIsOpen { - return ErrClosed + var msg message + select { + case e, ok := <-c.errors: + if ok { + return e } - return err + return ErrClosed + case msg = <-c.rpc: } // Try to match one of the result types diff --git a/vendor/github.com/rabbitmq/amqp091-go/consumers.go b/vendor/github.com/rabbitmq/amqp091-go/consumers.go index 8c23fad..c352fec 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/consumers.go +++ b/vendor/github.com/rabbitmq/amqp091-go/consumers.go @@ -75,6 +75,33 @@ func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) { } case out <- *queue[0]: + /* + * https://github.com/rabbitmq/amqp091-go/issues/179 + * https://github.com/rabbitmq/amqp091-go/pull/180 + * + * Comment from @lars-t-hansen: + * + * Given Go's slice semantics, and barring any information + * available to the compiler that proves that queue is the only + * pointer to the memory it references, the only meaning that + * queue = queue[1:] can have is basically queue += sizeof(queue + * element), ie, it bumps a pointer. Looking at the generated + * code for a simple example (on ARM64 in this case) bears this + * out. So what we're left with is an array that we have a + * pointer into the middle of. When the GC traces this pointer, + * it too does not know whether the array has multiple + * referents, and so its only sensible choice is to find the + * beginning of the array, and if the array is not already + * visited, mark every element in it, including the "dead" + * pointer. + * + * (Depending on the program dynamics, an element may eventually + * be appended to the queue when the queue is at capacity, and + * in this case the live elements are copied into a new array + * and the old array is left to be GC'd eventually, along with + * the dead object. But that can take time.) + */ + queue[0] = nil queue = queue[1:] } } diff --git a/vendor/github.com/rabbitmq/amqp091-go/spec091.go b/vendor/github.com/rabbitmq/amqp091-go/spec091.go index d86e753..6e02ba9 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/spec091.go +++ b/vendor/github.com/rabbitmq/amqp091-go/spec091.go @@ -2817,7 +2817,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // connection start - //fmt.Println("NextMethod: class:10 method:10") + // fmt.Println("NextMethod: class:10 method:10") method := &connectionStart{} if err = method.read(r.r); err != nil { return @@ -2825,7 +2825,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // connection start-ok - //fmt.Println("NextMethod: class:10 method:11") + // fmt.Println("NextMethod: class:10 method:11") method := &connectionStartOk{} if err = method.read(r.r); err != nil { return @@ -2833,7 +2833,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 20: // connection secure - //fmt.Println("NextMethod: class:10 method:20") + // fmt.Println("NextMethod: class:10 method:20") method := &connectionSecure{} if err = method.read(r.r); err != nil { return @@ -2841,7 +2841,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 21: // connection secure-ok - //fmt.Println("NextMethod: class:10 method:21") + // fmt.Println("NextMethod: class:10 method:21") method := &connectionSecureOk{} if err = method.read(r.r); err != nil { return @@ -2849,7 +2849,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 30: // connection tune - //fmt.Println("NextMethod: class:10 method:30") + // fmt.Println("NextMethod: class:10 method:30") method := &connectionTune{} if err = method.read(r.r); err != nil { return @@ -2857,7 +2857,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 31: // connection tune-ok - //fmt.Println("NextMethod: class:10 method:31") + // fmt.Println("NextMethod: class:10 method:31") method := &connectionTuneOk{} if err = method.read(r.r); err != nil { return @@ -2865,7 +2865,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 40: // connection open - //fmt.Println("NextMethod: class:10 method:40") + // fmt.Println("NextMethod: class:10 method:40") method := &connectionOpen{} if err = method.read(r.r); err != nil { return @@ -2873,7 +2873,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 41: // connection open-ok - //fmt.Println("NextMethod: class:10 method:41") + // fmt.Println("NextMethod: class:10 method:41") method := &connectionOpenOk{} if err = method.read(r.r); err != nil { return @@ -2881,7 +2881,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 50: // connection close - //fmt.Println("NextMethod: class:10 method:50") + // fmt.Println("NextMethod: class:10 method:50") method := &connectionClose{} if err = method.read(r.r); err != nil { return @@ -2889,7 +2889,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 51: // connection close-ok - //fmt.Println("NextMethod: class:10 method:51") + // fmt.Println("NextMethod: class:10 method:51") method := &connectionCloseOk{} if err = method.read(r.r); err != nil { return @@ -2897,7 +2897,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 60: // connection blocked - //fmt.Println("NextMethod: class:10 method:60") + // fmt.Println("NextMethod: class:10 method:60") method := &connectionBlocked{} if err = method.read(r.r); err != nil { return @@ -2905,7 +2905,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 61: // connection unblocked - //fmt.Println("NextMethod: class:10 method:61") + // fmt.Println("NextMethod: class:10 method:61") method := &connectionUnblocked{} if err = method.read(r.r); err != nil { return @@ -2913,7 +2913,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 70: // connection update-secret - //fmt.Println("NextMethod: class:10 method:70") + // fmt.Println("NextMethod: class:10 method:70") method := &connectionUpdateSecret{} if err = method.read(r.r); err != nil { return @@ -2921,7 +2921,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 71: // connection update-secret-ok - //fmt.Println("NextMethod: class:10 method:71") + // fmt.Println("NextMethod: class:10 method:71") method := &connectionUpdateSecretOk{} if err = method.read(r.r); err != nil { return @@ -2936,7 +2936,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // channel open - //fmt.Println("NextMethod: class:20 method:10") + // fmt.Println("NextMethod: class:20 method:10") method := &channelOpen{} if err = method.read(r.r); err != nil { return @@ -2944,7 +2944,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // channel open-ok - //fmt.Println("NextMethod: class:20 method:11") + // fmt.Println("NextMethod: class:20 method:11") method := &channelOpenOk{} if err = method.read(r.r); err != nil { return @@ -2952,7 +2952,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 20: // channel flow - //fmt.Println("NextMethod: class:20 method:20") + // fmt.Println("NextMethod: class:20 method:20") method := &channelFlow{} if err = method.read(r.r); err != nil { return @@ -2960,7 +2960,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 21: // channel flow-ok - //fmt.Println("NextMethod: class:20 method:21") + // fmt.Println("NextMethod: class:20 method:21") method := &channelFlowOk{} if err = method.read(r.r); err != nil { return @@ -2968,7 +2968,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 40: // channel close - //fmt.Println("NextMethod: class:20 method:40") + // fmt.Println("NextMethod: class:20 method:40") method := &channelClose{} if err = method.read(r.r); err != nil { return @@ -2976,7 +2976,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 41: // channel close-ok - //fmt.Println("NextMethod: class:20 method:41") + // fmt.Println("NextMethod: class:20 method:41") method := &channelCloseOk{} if err = method.read(r.r); err != nil { return @@ -2991,7 +2991,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // exchange declare - //fmt.Println("NextMethod: class:40 method:10") + // fmt.Println("NextMethod: class:40 method:10") method := &exchangeDeclare{} if err = method.read(r.r); err != nil { return @@ -2999,7 +2999,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // exchange declare-ok - //fmt.Println("NextMethod: class:40 method:11") + // fmt.Println("NextMethod: class:40 method:11") method := &exchangeDeclareOk{} if err = method.read(r.r); err != nil { return @@ -3007,7 +3007,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 20: // exchange delete - //fmt.Println("NextMethod: class:40 method:20") + // fmt.Println("NextMethod: class:40 method:20") method := &exchangeDelete{} if err = method.read(r.r); err != nil { return @@ -3015,7 +3015,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 21: // exchange delete-ok - //fmt.Println("NextMethod: class:40 method:21") + // fmt.Println("NextMethod: class:40 method:21") method := &exchangeDeleteOk{} if err = method.read(r.r); err != nil { return @@ -3023,7 +3023,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 30: // exchange bind - //fmt.Println("NextMethod: class:40 method:30") + // fmt.Println("NextMethod: class:40 method:30") method := &exchangeBind{} if err = method.read(r.r); err != nil { return @@ -3031,7 +3031,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 31: // exchange bind-ok - //fmt.Println("NextMethod: class:40 method:31") + // fmt.Println("NextMethod: class:40 method:31") method := &exchangeBindOk{} if err = method.read(r.r); err != nil { return @@ -3039,7 +3039,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 40: // exchange unbind - //fmt.Println("NextMethod: class:40 method:40") + // fmt.Println("NextMethod: class:40 method:40") method := &exchangeUnbind{} if err = method.read(r.r); err != nil { return @@ -3047,7 +3047,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 51: // exchange unbind-ok - //fmt.Println("NextMethod: class:40 method:51") + // fmt.Println("NextMethod: class:40 method:51") method := &exchangeUnbindOk{} if err = method.read(r.r); err != nil { return @@ -3062,7 +3062,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // queue declare - //fmt.Println("NextMethod: class:50 method:10") + // fmt.Println("NextMethod: class:50 method:10") method := &queueDeclare{} if err = method.read(r.r); err != nil { return @@ -3070,7 +3070,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // queue declare-ok - //fmt.Println("NextMethod: class:50 method:11") + // fmt.Println("NextMethod: class:50 method:11") method := &queueDeclareOk{} if err = method.read(r.r); err != nil { return @@ -3078,7 +3078,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 20: // queue bind - //fmt.Println("NextMethod: class:50 method:20") + // fmt.Println("NextMethod: class:50 method:20") method := &queueBind{} if err = method.read(r.r); err != nil { return @@ -3086,7 +3086,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 21: // queue bind-ok - //fmt.Println("NextMethod: class:50 method:21") + // fmt.Println("NextMethod: class:50 method:21") method := &queueBindOk{} if err = method.read(r.r); err != nil { return @@ -3094,7 +3094,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 50: // queue unbind - //fmt.Println("NextMethod: class:50 method:50") + // fmt.Println("NextMethod: class:50 method:50") method := &queueUnbind{} if err = method.read(r.r); err != nil { return @@ -3102,7 +3102,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 51: // queue unbind-ok - //fmt.Println("NextMethod: class:50 method:51") + // fmt.Println("NextMethod: class:50 method:51") method := &queueUnbindOk{} if err = method.read(r.r); err != nil { return @@ -3110,7 +3110,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 30: // queue purge - //fmt.Println("NextMethod: class:50 method:30") + // fmt.Println("NextMethod: class:50 method:30") method := &queuePurge{} if err = method.read(r.r); err != nil { return @@ -3118,7 +3118,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 31: // queue purge-ok - //fmt.Println("NextMethod: class:50 method:31") + // fmt.Println("NextMethod: class:50 method:31") method := &queuePurgeOk{} if err = method.read(r.r); err != nil { return @@ -3126,7 +3126,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 40: // queue delete - //fmt.Println("NextMethod: class:50 method:40") + // fmt.Println("NextMethod: class:50 method:40") method := &queueDelete{} if err = method.read(r.r); err != nil { return @@ -3134,7 +3134,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 41: // queue delete-ok - //fmt.Println("NextMethod: class:50 method:41") + // fmt.Println("NextMethod: class:50 method:41") method := &queueDeleteOk{} if err = method.read(r.r); err != nil { return @@ -3149,7 +3149,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // basic qos - //fmt.Println("NextMethod: class:60 method:10") + // fmt.Println("NextMethod: class:60 method:10") method := &basicQos{} if err = method.read(r.r); err != nil { return @@ -3157,7 +3157,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // basic qos-ok - //fmt.Println("NextMethod: class:60 method:11") + // fmt.Println("NextMethod: class:60 method:11") method := &basicQosOk{} if err = method.read(r.r); err != nil { return @@ -3165,7 +3165,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 20: // basic consume - //fmt.Println("NextMethod: class:60 method:20") + // fmt.Println("NextMethod: class:60 method:20") method := &basicConsume{} if err = method.read(r.r); err != nil { return @@ -3173,7 +3173,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 21: // basic consume-ok - //fmt.Println("NextMethod: class:60 method:21") + // fmt.Println("NextMethod: class:60 method:21") method := &basicConsumeOk{} if err = method.read(r.r); err != nil { return @@ -3181,7 +3181,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 30: // basic cancel - //fmt.Println("NextMethod: class:60 method:30") + // fmt.Println("NextMethod: class:60 method:30") method := &basicCancel{} if err = method.read(r.r); err != nil { return @@ -3189,7 +3189,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 31: // basic cancel-ok - //fmt.Println("NextMethod: class:60 method:31") + // fmt.Println("NextMethod: class:60 method:31") method := &basicCancelOk{} if err = method.read(r.r); err != nil { return @@ -3197,7 +3197,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 40: // basic publish - //fmt.Println("NextMethod: class:60 method:40") + // fmt.Println("NextMethod: class:60 method:40") method := &basicPublish{} if err = method.read(r.r); err != nil { return @@ -3205,7 +3205,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 50: // basic return - //fmt.Println("NextMethod: class:60 method:50") + // fmt.Println("NextMethod: class:60 method:50") method := &basicReturn{} if err = method.read(r.r); err != nil { return @@ -3213,7 +3213,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 60: // basic deliver - //fmt.Println("NextMethod: class:60 method:60") + // fmt.Println("NextMethod: class:60 method:60") method := &basicDeliver{} if err = method.read(r.r); err != nil { return @@ -3221,7 +3221,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 70: // basic get - //fmt.Println("NextMethod: class:60 method:70") + // fmt.Println("NextMethod: class:60 method:70") method := &basicGet{} if err = method.read(r.r); err != nil { return @@ -3229,7 +3229,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 71: // basic get-ok - //fmt.Println("NextMethod: class:60 method:71") + // fmt.Println("NextMethod: class:60 method:71") method := &basicGetOk{} if err = method.read(r.r); err != nil { return @@ -3237,7 +3237,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 72: // basic get-empty - //fmt.Println("NextMethod: class:60 method:72") + // fmt.Println("NextMethod: class:60 method:72") method := &basicGetEmpty{} if err = method.read(r.r); err != nil { return @@ -3245,7 +3245,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 80: // basic ack - //fmt.Println("NextMethod: class:60 method:80") + // fmt.Println("NextMethod: class:60 method:80") method := &basicAck{} if err = method.read(r.r); err != nil { return @@ -3253,7 +3253,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 90: // basic reject - //fmt.Println("NextMethod: class:60 method:90") + // fmt.Println("NextMethod: class:60 method:90") method := &basicReject{} if err = method.read(r.r); err != nil { return @@ -3261,7 +3261,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 100: // basic recover-async - //fmt.Println("NextMethod: class:60 method:100") + // fmt.Println("NextMethod: class:60 method:100") method := &basicRecoverAsync{} if err = method.read(r.r); err != nil { return @@ -3269,7 +3269,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 110: // basic recover - //fmt.Println("NextMethod: class:60 method:110") + // fmt.Println("NextMethod: class:60 method:110") method := &basicRecover{} if err = method.read(r.r); err != nil { return @@ -3277,7 +3277,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 111: // basic recover-ok - //fmt.Println("NextMethod: class:60 method:111") + // fmt.Println("NextMethod: class:60 method:111") method := &basicRecoverOk{} if err = method.read(r.r); err != nil { return @@ -3285,7 +3285,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 120: // basic nack - //fmt.Println("NextMethod: class:60 method:120") + // fmt.Println("NextMethod: class:60 method:120") method := &basicNack{} if err = method.read(r.r); err != nil { return @@ -3300,7 +3300,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // tx select - //fmt.Println("NextMethod: class:90 method:10") + // fmt.Println("NextMethod: class:90 method:10") method := &txSelect{} if err = method.read(r.r); err != nil { return @@ -3308,7 +3308,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // tx select-ok - //fmt.Println("NextMethod: class:90 method:11") + // fmt.Println("NextMethod: class:90 method:11") method := &txSelectOk{} if err = method.read(r.r); err != nil { return @@ -3316,7 +3316,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 20: // tx commit - //fmt.Println("NextMethod: class:90 method:20") + // fmt.Println("NextMethod: class:90 method:20") method := &txCommit{} if err = method.read(r.r); err != nil { return @@ -3324,7 +3324,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 21: // tx commit-ok - //fmt.Println("NextMethod: class:90 method:21") + // fmt.Println("NextMethod: class:90 method:21") method := &txCommitOk{} if err = method.read(r.r); err != nil { return @@ -3332,7 +3332,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 30: // tx rollback - //fmt.Println("NextMethod: class:90 method:30") + // fmt.Println("NextMethod: class:90 method:30") method := &txRollback{} if err = method.read(r.r); err != nil { return @@ -3340,7 +3340,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 31: // tx rollback-ok - //fmt.Println("NextMethod: class:90 method:31") + // fmt.Println("NextMethod: class:90 method:31") method := &txRollbackOk{} if err = method.read(r.r); err != nil { return @@ -3355,7 +3355,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err switch mf.MethodId { case 10: // confirm select - //fmt.Println("NextMethod: class:85 method:10") + // fmt.Println("NextMethod: class:85 method:10") method := &confirmSelect{} if err = method.read(r.r); err != nil { return @@ -3363,7 +3363,7 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err mf.Method = method case 11: // confirm select-ok - //fmt.Println("NextMethod: class:85 method:11") + // fmt.Println("NextMethod: class:85 method:11") method := &confirmSelectOk{} if err = method.read(r.r); err != nil { return diff --git a/vendor/github.com/rabbitmq/amqp091-go/types.go b/vendor/github.com/rabbitmq/amqp091-go/types.go index 427eefb..8f43a72 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/types.go +++ b/vendor/github.com/rabbitmq/amqp091-go/types.go @@ -11,6 +11,8 @@ import ( "time" ) +// DefaultExchange is the default direct exchange that binds every queue by its +// name. Applications can route to a queue using the queue name as routing key. const DefaultExchange = "" // Constants for standard AMQP 0-9-1 exchange types. @@ -63,6 +65,11 @@ var ( ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} ) +// internal errors used inside the library +var ( + errInvalidTypeAssertion = &Error{Code: InternalError, Reason: "type assertion unsuccessful", Server: false, Recover: true} +) + // Error captures the code and reason a channel or connection has been closed // by the server. type Error struct { @@ -209,29 +216,39 @@ type Decimal struct { // Most common queue argument keys in queue declaration. For a comprehensive list // of queue arguments, visit [RabbitMQ Queue docs]. // -// QueueTypeArg queue argument is used to declare quorum and stream queues. -// Accepted values are QueueTypeClassic (default), QueueTypeQuorum and -// QueueTypeStream. [Quorum Queues] accept (almost) all queue arguments as their +// [QueueTypeArg] queue argument is used to declare quorum and stream queues. +// Accepted values are [QueueTypeClassic] (default), [QueueTypeQuorum] and +// [QueueTypeStream]. [Quorum Queues] accept (almost) all queue arguments as their // Classic Queues counterparts. Check [feature comparison] docs for more // information. // -// Queues can define their [max length] using QueueMaxLenArg and -// QueueMaxLenBytesArg queue arguments. Overflow behaviour is set using -// QueueOverflowArg. Accepted values are QueueOverflowDropHead (default), -// QueueOverflowRejectPublish and QueueOverflowRejectPublishDLX. +// Queues can define their [max length] using [QueueMaxLenArg] and +// [QueueMaxLenBytesArg] queue arguments. Overflow behaviour is set using +// [QueueOverflowArg]. Accepted values are [QueueOverflowDropHead] (default), +// [QueueOverflowRejectPublish] and [QueueOverflowRejectPublishDLX]. // -// [Queue TTL] can be defined using QueueTTLArg. That is, the time-to-live for an -// unused queue. [Queue Message TTL] can be defined using QueueMessageTTLArg. -// This will set a time-to-live for **messages** in the queue. +// [Queue TTL] can be defined using [QueueTTLArg]. That is, the time-to-live for an +// unused queue. [Queue Message TTL] can be defined using [QueueMessageTTLArg]. +// This will set a time-to-live for messages in the queue. // -// [Stream retention] can be configured using StreamMaxLenBytesArg, to set the +// [Stream retention] can be configured using [StreamMaxLenBytesArg], to set the // maximum size of the stream. Please note that stream queues always keep, at -// least, one segment. [Stream retention] can also be set using StreamMaxAgeArg, +// least, one segment. [Stream retention] can also be set using [StreamMaxAgeArg], // to set time-based retention. Values are string with unit suffix. Valid // suffixes are Y, M, D, h, m, s. E.g. "7D" for one week. The maximum segment -// size can be set using StreamMaxSegmentSizeBytesArg. The default value is +// size can be set using [StreamMaxSegmentSizeBytesArg]. The default value is // 500_000_000 bytes ~= 500 megabytes // +// Starting with RabbitMQ 3.12, consumer timeout can be configured as a queue +// argument. This is the timeout for a consumer to acknowledge a message. The +// value is the time in milliseconds. The timeout is evaluated periodically, +// at one minute intervals. Values lower than one minute are not supported. +// See the [consumer timeout] guide for more information. +// +// [Single Active Consumer] on quorum and classic queues can be configured +// using [SingleActiveConsumerArg]. This argument expects a boolean value. It is +// false by default. +// // [RabbitMQ Queue docs]: https://rabbitmq.com/queues.html // [Stream retention]: https://rabbitmq.com/streams.html#retention // [max length]: https://rabbitmq.com/maxlength.html @@ -239,6 +256,8 @@ type Decimal struct { // [Queue Message TTL]: https://rabbitmq.com/ttl.html#per-queue-message-ttl // [Quorum Queues]: https://rabbitmq.com/quorum-queues.html // [feature comparison]: https://rabbitmq.com/quorum-queues.html#feature-comparison +// [consumer timeout]: https://rabbitmq.com/consumers.html#acknowledgement-timeout +// [Single Active Consumer]: https://rabbitmq.com/consumers.html#single-active-consumer const ( QueueTypeArg = "x-queue-type" QueueMaxLenArg = "x-max-length" @@ -249,6 +268,11 @@ const ( QueueTTLArg = "x-expires" StreamMaxAgeArg = "x-max-age" StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes" + // QueueVersionArg declares the Classic Queue version to use. Expects an integer, either 1 or 2. + QueueVersionArg = "x-queue-version" + // ConsumerTimeoutArg is available in RabbitMQ 3.12+ as a queue argument. + ConsumerTimeoutArg = "x-consumer-timeout" + SingleActiveConsumerArg = "x-single-active-consumer" ) // Values for queue arguments. Use as values for queue arguments during queue declaration. @@ -260,6 +284,8 @@ const ( // amqp.QueueMaxLenArg: 100, // amqp.QueueTTLArg: 1800000, // } +// +// Refer to [Channel.QueueDeclare] for more examples. const ( QueueTypeClassic = "classic" QueueTypeQuorum = "quorum" diff --git a/vendor/github.com/rabbitmq/amqp091-go/write.go b/vendor/github.com/rabbitmq/amqp091-go/write.go index d0011f8..dcec314 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/write.go +++ b/vendor/github.com/rabbitmq/amqp091-go/write.go @@ -72,7 +72,6 @@ func (f *heartbeatFrame) write(w io.Writer) (err error) { // short short long long short remainder... func (f *headerFrame) write(w io.Writer) (err error) { var payload bytes.Buffer - var zeroTime time.Time if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil { return @@ -118,7 +117,7 @@ func (f *headerFrame) write(w io.Writer) (err error) { if len(f.Properties.MessageId) > 0 { mask = mask | flagMessageId } - if f.Properties.Timestamp != zeroTime { + if !f.Properties.Timestamp.IsZero() { mask = mask | flagTimestamp } if len(f.Properties.Type) > 0 { diff --git a/vendor/modules.txt b/vendor/modules.txt index 8765d0d..c57d426 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,3 +1,3 @@ -# github.com/rabbitmq/amqp091-go v1.7.0 +# github.com/rabbitmq/amqp091-go v1.9.0 ## explicit; go 1.16 github.com/rabbitmq/amqp091-go