Skip to content

Commit

Permalink
ForceRedial test helper
Browse files Browse the repository at this point in the history
fix mispellings and ineffassign
  • Loading branch information
Danlock committed Sep 19, 2023
1 parent 2336b6e commit 3a7aee0
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 44 deletions.
18 changes: 9 additions & 9 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ jobs:
coverage-threshold: 80
test-args: '["-tags=rabbit"]'

- name: update coverage badge
env:
COVERAGE_PATH: ${{ steps.coverage.outputs.gocov-agg-pathname }}
run: make update-readme-badge

- uses: stefanzweifel/git-auto-commit-action@v4
with:
file_pattern: 'README.md'
commit_message: "Update coverage badge"
# - name: update coverage badge
# env:
# COVERAGE_PATH: ${{ steps.coverage.outputs.gocov-agg-pathname }}
# run: make update-readme-badge

# - uses: stefanzweifel/git-auto-commit-action@v4
# with:
# file_pattern: 'README.md'
# commit_message: "Update coverage badge"
14 changes: 7 additions & 7 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,17 @@ func request[T any](connCtx, ctx context.Context, reqChan chan internal.ChanReq[
respChan := make(chan internal.ChanResp[T], 1)
select {
case <-connCtx.Done():
return t, fmt.Errorf("rmq.Connection context finished: %w", context.Cause(connCtx))
return t, fmt.Errorf("rmq.Connection context timed out because %w", context.Cause(connCtx))
case <-ctx.Done():
return t, context.Cause(ctx)
return t, fmt.Errorf("request context timed out because %w", context.Cause(ctx))
case reqChan <- internal.ChanReq[T]{Ctx: ctx, RespChan: respChan}:
}

select {
case <-connCtx.Done():
return t, fmt.Errorf("rmq.Connection context finished: %w", context.Cause(connCtx))
return t, fmt.Errorf("rmq.Connection context timed out because %w", context.Cause(connCtx))
case <-ctx.Done():
return t, context.Cause(ctx)
return t, fmt.Errorf("request context timed out because %w", context.Cause(ctx))
case resp := <-respChan:
return resp.Val, resp.Err
}
Expand Down Expand Up @@ -165,12 +165,12 @@ func (c *Connection) CurrentConnection(ctx context.Context) (AMQPConnection, err
defer cancel()
conn, err := request(c.ctx, ctx, c.currentConReqChan)
if err != nil {
return conn, err
return nil, err
}
if conn.IsClosed() {
return conn, amqp.ErrClosed
return nil, amqp.ErrClosed
}
return conn, err
return conn, nil
}

func (c *Connection) redial(dialFn func() (AMQPConnection, error)) {
Expand Down
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (c *Consumer) forwardDeliveries(ctx context.Context, mqChan *amqp.Channel,
// If the client never listens to outChan, this blocks forever
// Other options include using select with a default and dropping the message if the client doesn't listen, dropping the message after a timeout,
// or buffering messages and sending them again later. Of course the buffer could grow forever in that case without listeners.
// The only thing blocked would be the rmq.Consumer.Consume goroutine listening for reconnects and logging errors, which seem unneccessary without a listener anyway.
// The only thing blocked would be the rmq.Consumer.Consume goroutine listening for reconnects and logging errors, which seem unnecessary without a listener anyway.
// Alls well since we don't lock up the entire amqp.Connection like streadway/amqp with Notify* channels...
outChan <- msg
}
Expand Down
26 changes: 13 additions & 13 deletions consumer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
)

func ForceRedial(ctx context.Context, rmqConn *rmq.Connection) error {
amqpConn, err := rmqConn.CurrentConnection(ctx)
if err != nil {
return fmt.Errorf("rmqConn.CurrentConnection failed because %w", err)
}
// close the current connection to force a redial
return amqpConn.CloseDeadline(time.Now().Add(time.Minute))
}

func TestConsumer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -57,15 +66,7 @@ func TestConsumer(t *testing.T) {
unreliableRMQPub.Publish(ctx, rmq.Publishing{Exchange: "amq.fanout"})
rmqPub := rmq.NewPublisher(ctx, rmqConn, rmq.PublisherConfig{CommonConfig: connectCfg.CommonConfig})

forceRedial := func() {
amqpConn, err := rmqConn.CurrentConnection(ctx)
if err != nil {
t.Fatalf("failed to get rmqConn's current connection %v", err)
}
// close the current connection to force a redial
amqpConn.CloseDeadline(time.Now().Add(time.Minute))
}
forceRedial()
ForceRedial(ctx, rmqConn)
pubCtx, pubCancel := context.WithTimeout(ctx, 20*time.Second)
defer pubCancel()

Expand All @@ -79,14 +80,14 @@ func TestConsumer(t *testing.T) {
errChan <- rmqPub.PublishUntilAcked(pubCtx, 0, wantedPub)
}()
}
forceRedial()
ForceRedial(ctx, rmqConn)

for i := 0; i < pubCount; i++ {
if err := <-errChan; err != nil {
t.Fatalf("PublishUntilAcked returned unexpected error %v", err)
}
if i%2 == 0 {
forceRedial()
ForceRedial(ctx, rmqConn)
}
}

Expand Down Expand Up @@ -131,8 +132,7 @@ func TestConsumer_Load(t *testing.T) {
case <-ctx.Done():
return
case <-time.After(time.Second):
amqpConn, _ := rmqConn.CurrentConnection(ctx)
amqpConn.CloseDeadline(time.Now().Add(time.Minute))
ForceRedial(ctx, rmqConn)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion healthcheck_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func Example() {
panic("where's my message?")
case msg := <-deliveries:
if !reflect.DeepEqual(msg.Body, msgOne.Body) && !reflect.DeepEqual(msg.Body, msgTwo.Body) {
panic("realistically this would probably be an error with another instance using this healthcheck simultaenously. Prevent this with an unique exchange or topic exchange with unique routing keys.")
panic("realistically this would probably be an error with another instance using this healthcheck simultaneously. Prevent this with an unique exchange or topic exchange with unique routing keys.")
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ func (p *Publisher) PublishUntilConfirmed(ctx context.Context, confirmTimeout ti
continue
}
}
// reset the delay on success
pubDelay, attempt = 0, 0
attempt = 0

confirmTimeout := time.NewTimer(confirmTimeout)
defer confirmTimeout.Stop()
Expand Down
12 changes: 2 additions & 10 deletions publisher_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,7 @@ func TestPublisher(t *testing.T) {
NotifyReturn: returnChan,
LogReturns: true,
})
forceRedial := func() {
amqpConn, err := rmqConn.CurrentConnection(ctx)
if err != nil {
t.Fatalf("failed to get rmqConn's current connection %v", err)
}
// close the current connection to force a redial
amqpConn.CloseDeadline(time.Now().Add(time.Minute))
}
forceRedial()
ForceRedial(ctx, rmqConn)
pubCtx, pubCancel := context.WithTimeout(ctx, 10*time.Second)
defer pubCancel()

Expand All @@ -59,7 +51,7 @@ func TestPublisher(t *testing.T) {
if err != nil {
t.Fatalf("PublishUntilAcked failed with %v", err)
}
forceRedial()
ForceRedial(ctx, rmqConn)
select {
case <-pubCtx.Done():
t.Fatalf("didnt get return")
Expand Down
2 changes: 1 addition & 1 deletion topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func DeclareTopology(ctx context.Context, amqpConn AMQPConnection, topology Topo
return
}
err = topology.declare(ctx, mqChan)
// An amqp.Channel must not be used from multiple goroutines simultaenously, so close it inside this goroutine to prevent cryptic RabbitMQ errors.
// An amqp.Channel must not be used from multiple goroutines simultaneously, so close it inside this goroutine to prevent cryptic RabbitMQ errors.
mqChanErr := mqChan.Close()
// Should we join mqChanErr if err is nil? When declare succeeeds a Close error is fairly inconsequential. Maybe just log it in that case? Food for thought.
if mqChanErr != nil && !errors.Is(mqChanErr, amqp.ErrClosed) {
Expand Down

0 comments on commit 3a7aee0

Please sign in to comment.