Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent deadlock #30

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 52 additions & 22 deletions connector/amqp10/amqp10.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type AMQP10Connector struct {
receivers map[string]*amqp.Receiver
senders map[string]*amqp.Sender
logger *logging.Logger
interrupt chan bool
inInterrupt chan bool
outInterrupt chan bool
}

// AMQP10Message holds received (or to be sent) messages from (to) AMQP-1.0 entity
Expand Down Expand Up @@ -95,7 +96,6 @@ func CreateAMQP10Connector(
logger: logger,
receivers: make(map[string]*amqp.Receiver),
senders: make(map[string]*amqp.Sender),
interrupt: make(chan bool),
}

// connect
Expand Down Expand Up @@ -230,9 +230,12 @@ func (conn *AMQP10Connector) connect(connType string) error {
switch connType {
case "in":
conn.inConnection = sess
conn.inInterrupt = make(chan bool)
case "out":
conn.outConnection = sess
conn.outInterrupt = make(chan bool)
}

return nil
}

Expand Down Expand Up @@ -293,7 +296,7 @@ func (conn *AMQP10Connector) CreateReceiver(address string, prefetch int) error
func (conn *AMQP10Connector) CreateSender(address string) (*amqp.Sender, error) {
channel := strings.TrimPrefix(address, "/")
if s, ok := conn.senders[channel]; ok {
s.Close(nil)
s.Close(context.Background())
delete(conn.senders, channel)
}

Expand Down Expand Up @@ -390,6 +393,12 @@ func (conn *AMQP10Connector) startSenders(inchan chan interface{}, wg *sync.Wait
sndLock.Unlock()
message.SetIdFromCounter(&counter)
go func(sender *amqp.Sender, msg AMQP10Message, sndLock *sync.RWMutex, lfLock *sync.RWMutex, timeout time.Duration) {
var (
connErr *amqp.ConnError
linkErr *amqp.LinkError
sessErr *amqp.SessionError
)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer func(cancel context.CancelFunc) {
cancel()
Expand Down Expand Up @@ -420,7 +429,15 @@ func (conn *AMQP10Connector) startSenders(inchan chan interface{}, wg *sync.Wait
lfLock.Lock()
linkFail += 1
lfLock.Unlock()
msgMeta["reason"] = err.(*amqp.Error).Description
if urr, ok := err.(*amqp.Error); ok {
msgMeta["reason"] = urr.Description
} else if errors.As(err, &connErr) && connErr.RemoteErr != nil {
msgMeta["reason"] = connErr.RemoteErr.Description
} else if errors.As(err, &linkErr) && linkErr.RemoteErr != nil {
msgMeta["reason"] = linkErr.RemoteErr.Description
} else if errors.As(err, &sessErr) && sessErr.RemoteErr != nil {
msgMeta["reason"] = sessErr.RemoteErr.Description
}
conn.logger.Metadata(msgMeta)
conn.logger.Warn("Failed to send message")

Expand All @@ -437,7 +454,7 @@ func (conn *AMQP10Connector) startSenders(inchan chan interface{}, wg *sync.Wait
})
conn.logger.Debug("Skipped processing of sent AMQP1.0 message with invalid type")
}
case <-conn.interrupt:
case <-conn.outInterrupt:
goto doneSend
}
}
Expand Down Expand Up @@ -472,6 +489,12 @@ func (conn *AMQP10Connector) startReceivers(outchan chan interface{}, wg *sync.W
for _, rcv := range conn.receivers {
wg.Add(1)
go func(receiver *amqp.Receiver) {
var (
connErr *amqp.ConnError
linkErr *amqp.LinkError
sessErr *amqp.SessionError
)

defer wg.Done()

connLogMeta := logging.Metadata{
Expand All @@ -480,25 +503,29 @@ func (conn *AMQP10Connector) startReceivers(outchan chan interface{}, wg *sync.W
}
conn.logger.Metadata(connLogMeta)
conn.logger.Warn("Created receiver")
// number of link failures
linkFail := int64(0)
for {
select {
case <-conn.interrupt:
case <-conn.inInterrupt:
goto doneReceive
default:
}
var connErr *amqp.ConnError
if msg, err := receiver.Receive(context.Background(), nil); err == nil {
receiver.AcceptMessage(context.Background(), msg)
conn.processIncomingMessage(msg.GetData(), outchan, receiver)
conn.logger.Debug("Message ACKed")
} else if errors.As(err, &connErr) {
conn.logger.Metadata(connLogMeta)
conn.logger.Warn("Channel closed, reconnecting")
goto reconnectReceive
} else {
connLogMeta["err"] = err
conn.logger.Metadata(connLogMeta)
conn.logger.Error("Received AMQP1.0 error")
ctx, _ := context.WithTimeout(context.Background(), time.Second)
if msg, err := receiver.Receive(ctx, nil); err == nil {
receiver.AcceptMessage(context.Background(), msg)
conn.processIncomingMessage(msg.GetData(), outchan, receiver)
conn.logger.Debug("Message ACKed")
linkFail = int64(0)
} else if errors.As(err, &connErr) || errors.As(err, &linkErr) || errors.As(err, &sessErr) {
linkFail += int64(1)
if linkFail > conn.LinkFailureLimit {
conn.logger.Metadata(connLogMeta)
conn.logger.Warn("Too many link failures in row, reconnecting")
goto reconnectReceive
}
} else {
// receiver wait timeouted
}
}
}

Expand All @@ -525,8 +552,8 @@ func (conn *AMQP10Connector) stopReceivers() {
"receiver": conn.receivers[r].Address(),
})
conn.logger.Debug("Closed receiver link")
delete(conn.receivers, r)
}
conn.receivers = map[string]*amqp.Receiver{}
}

// Reconnect tries to reconnect connector to configured AMQP1.0 node. Returns nil if failed
Expand All @@ -535,13 +562,15 @@ func (conn *AMQP10Connector) Reconnect(connectionType string, msgChannel chan in
listen := []string{}
switch connectionType {
case "in":
close(conn.inInterrupt)
for r := range conn.receivers {
listen = append(listen, conn.receivers[r].Address())
}
conn.stopReceivers()
conn.inConnection.Close(ctx)
conn.logger.Debug("Disconnected incoming connection")
case "out":
close(conn.outInterrupt)
conn.stopSenders()
conn.outConnection.Close(ctx)
conn.logger.Debug("Disconnected outgoing connection")
Expand Down Expand Up @@ -577,7 +606,8 @@ func (conn *AMQP10Connector) Reconnect(connectionType string, msgChannel chan in
// Disconnect closes connection in both directions
func (conn *AMQP10Connector) Disconnect() {
ctx := context.Background()
close(conn.interrupt)
close(conn.inInterrupt)
close(conn.outInterrupt)
time.Sleep(time.Second)
conn.inConnection.Close(ctx)
conn.outConnection.Close(ctx)
Expand Down
38 changes: 31 additions & 7 deletions tests/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,35 @@ func TestAMQP10SendAndReceiveMessage(t *testing.T) {
wg.Wait()
})

t.Run("Test reconnect", func(t *testing.T) {
t.Run("Test reconnect of sender", func(t *testing.T) {
var wg sync.WaitGroup

require.NoError(t, conn.Reconnect("out", sender, cwg))

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 3; i++ {
data := <-receiver
assert.Equal(t, QDRMsg2, (data.(amqp10.AMQP10Message)).Body)
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 3; i++ {
sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg2}
}
}()

wg.Wait()
})

t.Run("Test reconnect of receiver", func(t *testing.T) {
var wg sync.WaitGroup

require.NoError(t, conn.Reconnect("in", receiver, cwg))
require.NoError(t, conn.Reconnect("out", receiver, cwg))

wg.Add(1)
go func() {
Expand All @@ -258,16 +282,16 @@ func TestAMQP10SendAndReceiveMessage(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg2}
sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg2}
sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg2}
for i := 0; i < 3; i++ {
sender <- amqp10.AMQP10Message{Address: "qdrtest", Body: QDRMsg2}
}
}()

wg.Wait()
conn.Disconnect()
cwg.Wait()
})

conn.Disconnect()
cwg.Wait()
}

func TestLoki(t *testing.T) {
Expand Down
Loading