Skip to content

Commit

Permalink
added hang_int_test.go example to demonstrate a problem with amqp091-go
Browse files Browse the repository at this point in the history
  • Loading branch information
Danlock committed Nov 14, 2023
1 parent 6f6aafc commit 17f5efe
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ topology := rmq.Topology{
rmqConn := rmq.ConnectWithURLs(ctx, rmq.ConnectArgs{Args: cfg, Topology: topology}, os.Getenv("AMQP_URL"))
```

Take a look at healthcheck_int_test.go for a more complete example of using all of danlock/rmq together.
Take a look at healthcheck_int_test.go for a more complete example of using all of danlock/rmq together, or hang_int_test.go for an example of danlock/rmq being more network-aware than amqp091-go.

# Logging

Expand Down
111 changes: 111 additions & 0 deletions hang_int_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
//go:build rabbit

package rmq_test

import (
"context"
"errors"
"log"
"log/slog"
"net"
"os"
"testing"
"time"

"github.com/danlock/rmq"
amqp "github.com/rabbitmq/amqp091-go"
)

func TestHanging(t *testing.T) {
Example_hanging()
}

func panicOnErr(err error) {
if err != nil {
panic(err)
}
}

func Example_hanging() {
var tcpConn *net.TCPConn
amqp091Config := amqp.Config{
// Set dial so we have access to the net.Conn
// This is the same as amqp091.DefaultDial(time.Second) except we also grab the connection
Dial: func(network, addr string) (net.Conn, error) {
conn, err := net.DialTimeout(network, addr, time.Second)
if err != nil {
return nil, err
}
if err := conn.SetDeadline(time.Now().Add(time.Second)); err != nil {
return nil, err
}
tcpConn = conn.(*net.TCPConn)
return conn, nil
},
}
// Create an innocent, unsuspecting amqp091 connection
amqp091Conn, err := amqp.DialConfig(os.Getenv("TEST_AMQP_URI"), amqp091Config)
panicOnErr(err)
// Create a channel to ensure the connection's working.
amqp091Chan, err := amqp091Conn.Channel()
panicOnErr(err)
panicOnErr(amqp091Chan.Close())
// Betray amqp091Conn expectations by dawdling. While this is unnatural API usage, the intention is to emulate a connection hang.
dawdlingBegins := make(chan struct{}, 1)
// hangTime is 3 seconds for faster tests, but this could easily be much longer...
hangTime := 3 * time.Second
hangConnection := func(tcpConn *net.TCPConn) {
go func() {
sysConn, err := tcpConn.SyscallConn()
panicOnErr(err)
// sysConn.Write blocks the whole connection until it finishes
err = sysConn.Write(func(fd uintptr) bool {
dawdlingBegins <- struct{}{}
time.Sleep(hangTime)
return true
})
panicOnErr(err)
}()
select {
case <-time.After(time.Second):
panic("sysConn.Write took too long!")
case <-dawdlingBegins:
}
}
hangConnection(tcpConn)
// The unsuspecting amqp091Conn.Channel() dutifully waits for hangTime.
// Doesn't matter what amqp.DefaultDial(connectionTimeout) was (only 1 second...)
chanStart := time.Now()
amqp091Chan, err = amqp091Conn.Channel()
panicOnErr(err)
panicOnErr(amqp091Chan.Close())
panicOnErr(amqp091Conn.Close())
// test our expectation that amqp091Conn.Channel hung for at least 90% of hangTime, to prevent flaky tests.
if time.Since(chanStart) < (hangTime - (hangTime / 10)) {
panic("amqp091Conn.Channel returned faster than expected")
}
// The above demonstrates one of the biggest issues with amqp091, since your applications stuck if the connection hangs,
// and you don't have any options to prevent this.
ctx := context.Background()
// danlock/rmq gives you 2 ways to prevent unbound hangs, Args.AMQPTimeout and the context passed into each function call.
rmqConnCfg := rmq.ConnectArgs{Args: rmq.Args{Log: slog.Log, AMQPTimeout: time.Second}}
// Create a paranoid AMQP connection
rmqConn := rmq.ConnectWithAMQPConfig(ctx, rmqConnCfg, os.Getenv("TEST_AMQP_URI"), amqp091Config)
// Grab a channel to ensure the connection is working
amqp091Chan, err = rmqConn.Channel(ctx)
panicOnErr(err)
panicOnErr(amqp091Chan.Close())
// a hung connection, just like we've always feared
hangConnection(tcpConn)
// However we will simply error long before hangTime.
chanStart = time.Now()
_, err = rmqConn.Channel(ctx)
if !errors.Is(err, context.DeadlineExceeded) {
log.Fatalf("rmqConn.Channel returned unexpected error %v", err)
}
chanDur := time.Since(chanStart)
// rmqConn is too paranoid to hang for 90% of hangTime, but double check anyway
if time.Since(chanStart) > (hangTime - (hangTime / 10)) {
log.Fatalf("rmqConn.Channel hung for (%s)", chanDur)
}
}
4 changes: 2 additions & 2 deletions healthcheck_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import (
)

func TestHealthcheck(t *testing.T) {
Example()
Example_healthcheck()
}

// Example shows how to write an unsophisticated healthcheck for a service intending to ensure it's rmq.Connection is capable of processing messages.
// Even though rmq.Connection reconnects on errors, there can always be unforeseen networking/DNS/RNGesus issues
// that necessitate a docker/kubernetes healthcheck restarting the service when unhealthy.
// While this is useful as an example, it wouldn't be used on production for several reasons, only one of which is the lack of reuse of AMQP connections and AMQP channels.
func Example() {
func Example_healthcheck() {
// Real applications should use a real context. If this healthcheck was called via HTTP request for example,
// that HTTP request's context would be a good candidate.
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
Expand Down
10 changes: 10 additions & 0 deletions internal/test/test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package test

import "testing"

func FailOnError(t testing.TB, err error) {
if err != nil {
t.Helper()
t.Fatalf("%+v", err)
}
}

0 comments on commit 17f5efe

Please sign in to comment.