Skip to content

Commit

Permalink
[FIXED] Deadlock when accessing subscriptions map on consumer (#1671)
Browse files Browse the repository at this point in the history
This fixes an issue where a deadlock could occur when calling `Stop()` or
`Drain()` on `ConsumeContext` or `MessagesContext` and then calling `Consume` or
`Messages` immediately.
Switched to using a type-safe implementation of `sync.Map` for subscriptions map
instead of locking the whole consumer state.
Additionally, changed the type of atomic flags from `uint32` to `atomic.UInt32`
to avoid accidental non-atomic reads/writes.

Signed-off-by: Piotr Piotrowski <[email protected]>

---------

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Jul 25, 2024
1 parent 94fa0cb commit c693ec3
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 138 deletions.
14 changes: 8 additions & 6 deletions go_test.mod
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
module github.com/nats-io/nats.go

go 1.19
go 1.21

toolchain go1.22.5

require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.8
github.com/klauspost/compress v1.17.9
github.com/nats-io/jwt v1.2.2
github.com/nats-io/nats-server/v2 v2.10.16
github.com/nats-io/nats-server/v2 v2.10.17
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.3.0
golang.org/x/text v0.15.0
golang.org/x/text v0.16.0
google.golang.org/protobuf v1.23.0
)

require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.7 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/time v0.5.0 // indirect
)
24 changes: 14 additions & 10 deletions go_test.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
Expand All @@ -10,38 +11,40 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c=
github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.16 h1:2jXaiydp5oB/nAx/Ytf9fdCi9QN6ItIc9eehX8kwVV0=
github.com/nats-io/nats-server/v2 v2.10.16/go.mod h1:Pksi38H2+6xLe1vQx0/EA4bzetM0NqyIHcIbmgXSkIU=
github.com/nats-io/nats-server/v2 v2.10.17 h1:PTVObNBD3TZSNUDgzFb1qQsQX4mOgFmOuG9vhT+KBUY=
github.com/nats-io/nats-server/v2 v2.10.17/go.mod h1:5OUyc4zg42s/p2i92zbbqXvUNsbF0ivdTLKshVMn2YQ=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand All @@ -54,3 +57,4 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
73 changes: 73 additions & 0 deletions internal/syncx/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package syncx

import "sync"

// Map is a type-safe wrapper around sync.Map.
// It is safe for concurrent use.
// The zero value of Map is an empty map ready to use.
type Map[K comparable, V any] struct {
m sync.Map
}

func (m *Map[K, V]) Load(key K) (V, bool) {
v, ok := m.m.Load(key)
if !ok {
var empty V
return empty, false
}
return v.(V), true
}

func (m *Map[K, V]) Store(key K, value V) {
m.m.Store(key, value)
}

func (m *Map[K, V]) Delete(key K) {
m.m.Delete(key)
}

func (m *Map[K, V]) Range(f func(key K, value V) bool) {
m.m.Range(func(key, value any) bool {
return f(key.(K), value.(V))
})
}

func (m *Map[K, V]) LoadOrStore(key K, value V) (V, bool) {
v, loaded := m.m.LoadOrStore(key, value)
return v.(V), loaded
}

func (m *Map[K, V]) LoadAndDelete(key K) (V, bool) {
v, ok := m.m.LoadAndDelete(key)
if !ok {
var empty V
return empty, false
}
return v.(V), true
}

func (m *Map[K, V]) CompareAndSwap(key K, old, new V) bool {
return m.m.CompareAndSwap(key, old, new)
}

func (m *Map[K, V]) CompareAndDelete(key K, value V) bool {
return m.m.CompareAndDelete(key, value)
}

func (m *Map[K, V]) Swap(key K, value V) (V, bool) {
previous, loaded := m.m.Swap(key, value)
return previous.(V), loaded
}
152 changes: 152 additions & 0 deletions internal/syncx/map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package syncx

import (
"testing"
)

func TestMapLoad(t *testing.T) {
var m Map[int, string]
m.Store(1, "one")

v, ok := m.Load(1)
if !ok || v != "one" {
t.Errorf("Load(1) = %v, %v; want 'one', true", v, ok)
}

v, ok = m.Load(2)
if ok || v != "" {
t.Errorf("Load(2) = %v, %v; want '', false", v, ok)
}
}

func TestMapStore(t *testing.T) {
var m Map[int, string]
m.Store(1, "one")

v, ok := m.Load(1)
if !ok || v != "one" {
t.Errorf("Load(1) after Store(1, 'one') = %v, %v; want 'one', true", v, ok)
}
}

func TestMapDelete(t *testing.T) {
var m Map[int, string]
m.Store(1, "one")
m.Delete(1)

v, ok := m.Load(1)
if ok || v != "" {
t.Errorf("Load(1) after Delete(1) = %v, %v; want '', false", v, ok)
}
}

func TestMapRange(t *testing.T) {
var m Map[int, string]
m.Store(1, "one")
m.Store(2, "two")

var keys []int
var values []string
m.Range(func(key int, value string) bool {
keys = append(keys, key)
values = append(values, value)
return true
})

if len(keys) != 2 || len(values) != 2 {
t.Errorf("Range() keys = %v, values = %v; want 2 keys and 2 values", keys, values)
}
}

func TestMapLoadOrStore(t *testing.T) {
var m Map[int, string]

v, loaded := m.LoadOrStore(1, "one")
if loaded || v != "one" {
t.Errorf("LoadOrStore(1, 'one') = %v, %v; want 'one', false", v, loaded)
}

v, loaded = m.LoadOrStore(1, "uno")
if !loaded || v != "one" {
t.Errorf("LoadOrStore(1, 'uno') = %v, %v; want 'one', true", v, loaded)
}
}

func TestMapLoadAndDelete(t *testing.T) {
var m Map[int, string]
m.Store(1, "one")

v, ok := m.LoadAndDelete(1)
if !ok || v != "one" {
t.Errorf("LoadAndDelete(1) = %v, %v; want 'one', true", v, ok)
}

v, ok = m.Load(1)
if ok || v != "" {
t.Errorf("Load(1) after LoadAndDelete(1) = %v, %v; want '', false", v, ok)
}

// Test that LoadAndDelete on a missing key returns the zero value.
v, ok = m.LoadAndDelete(2)
if ok || v != "" {
t.Errorf("LoadAndDelete(2) = %v, %v; want '', false", v, ok)
}
}

func TestMapCompareAndSwap(t *testing.T) {
var m Map[int, string]
m.Store(1, "one")

ok := m.CompareAndSwap(1, "one", "uno")
if !ok {
t.Errorf("CompareAndSwap(1, 'one', 'uno') = false; want true")
}

v, _ := m.Load(1)
if v != "uno" {
t.Errorf("Load(1) after CompareAndSwap = %v; want 'uno'", v)
}
}

func TestMapCompareAndDelete(t *testing.T) {
var m Map[int, string]
m.Store(1, "one")

ok := m.CompareAndDelete(1, "one")
if !ok {
t.Errorf("CompareAndDelete(1, 'one') = false; want true")
}

v, _ := m.Load(1)
if v != "" {
t.Errorf("Load(1) after CompareAndDelete = %v; want ''", v)
}
}

func TestMapSwap(t *testing.T) {
var m Map[int, string]
m.Store(1, "one")

v, loaded := m.Swap(1, "uno")
if !loaded || v != "one" {
t.Errorf("Swap(1, 'uno') = %v, %v; want 'one', true", v, loaded)
}

v, _ = m.Load(1)
if v != "uno" {
t.Errorf("Load(1) after Swap = %v; want 'uno'", v)
}
}
25 changes: 13 additions & 12 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"

"github.com/nats-io/nats.go/internal/syncx"
"github.com/nats-io/nuid"
)

Expand Down Expand Up @@ -233,12 +234,12 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
}

return &pullConsumer{
jetStream: js,
stream: stream,
name: resp.Name,
durable: cfg.Durable != "",
info: resp.ConsumerInfo,
subscriptions: make(map[string]*pullSubscription),
jetStream: js,
stream: stream,
name: resp.Name,
durable: cfg.Durable != "",
info: resp.ConsumerInfo,
subs: syncx.Map[string, *pullSubscription]{},
}, nil
}

Expand Down Expand Up @@ -285,12 +286,12 @@ func getConsumer(ctx context.Context, js *jetStream, stream, name string) (Consu
}

cons := &pullConsumer{
jetStream: js,
stream: stream,
name: name,
durable: resp.Config.Durable != "",
info: resp.ConsumerInfo,
subscriptions: make(map[string]*pullSubscription, 0),
jetStream: js,
stream: stream,
name: name,
durable: resp.Config.Durable != "",
info: resp.ConsumerInfo,
subs: syncx.Map[string, *pullSubscription]{},
}

return cons, nil
Expand Down
Loading

0 comments on commit c693ec3

Please sign in to comment.