Skip to content

Commit

Permalink
Simplify sync/hatmap interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Zamony committed Jul 15, 2024
1 parent 069c126 commit aa8746d
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 58 deletions.
22 changes: 12 additions & 10 deletions sync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ Additional synchronization primitives:

**Thread-safe map**
```
Map.Len() int
Map.Get(key K) (V, bool)
Map.Set(key K, value V)
Map.SetIf(key K, cond func(value V, exists bool) bool, valfunc func(prev V) V) (value V, ok bool)
Map.Delete(key K)
Map.DeleteIf(key K, cond func(value V) bool) bool
Map.Clear()
Map.All(yield func(key K, value V) bool) bool
Len() int
Get(key K) (V, bool)
Set(key K, value V)
SetIf(key K, newValue V, cond Condition) (actual V, ok bool)
Delete(key K)
DeleteIf(key K, cond Condition) bool
Clear()
All(yield func(key K, value V) bool) bool
```

**Safer waitgroup**
Expand All @@ -31,6 +31,8 @@ Group.Do(key K, fun func() V) V
```
TryLock(ctx context.Context) error
TryRLock(ctx context.Context) error
Lock()/Unlock()
RLock()/RUnlock()
Lock()
Unlock()
RLock()
RUnlock()
```
30 changes: 16 additions & 14 deletions sync/hatmap/map.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
// Package hatmap provides generic goroutine-safe map.
// It is implemented as an ordinary map protected by mutex ("mutex hat" idiom).
package hatmap

import "sync"

// Condition to operate on a current value by the given key.
// May be called multiple times.
type Condition[V any] func(currValue V) bool

// Map is a goroutine-safe map.
//
// Map must not be copied after first use.
Expand Down Expand Up @@ -33,37 +39,34 @@ func (m *Map[K, V]) Set(key K, value V) {
m.mu.Unlock()
}

func (m *Map[K, V]) canSet(key K, cond func(value V, exists bool) bool) (V, bool) {
func (m *Map[K, V]) canSet(key K, cond Condition[V]) (V, bool) {
m.mu.RLock()
defer m.mu.RUnlock()

value, ok := m.data[key]
condOk := cond(value, ok)
value := m.data[key]
condOk := cond(value)
return value, condOk
}

// SetIf conditionally sets the value by the given key.
// Condition function must be pure.
// Returns final value and condition result.
func (m *Map[K, V]) SetIf(key K, cond func(value V, exists bool) bool, valfunc func(prev V) V) (value V, ok bool) {
value, ok = m.canSet(key, cond)
func (m *Map[K, V]) SetIf(key K, newValue V, cond Condition[V]) (actual V, ok bool) {
value, ok := m.canSet(key, cond)
if !ok {
return value, false
}

m.mu.Lock()
defer m.mu.Unlock()
value, ok = m.data[key]
if !cond(value, ok) {
if !cond(value) {
return value, false
}

value = valfunc(value)
if m.data == nil {
m.data = make(map[K]V)
}
m.data[key] = value
return value, true
m.data[key] = newValue
return newValue, true
}

// Get gets value y the given key.
Expand All @@ -82,7 +85,7 @@ func (m *Map[K, V]) Delete(key K) {
m.mu.Unlock()
}

func (m *Map[K, V]) canDelete(key K, cond func(value V) bool) bool {
func (m *Map[K, V]) canDelete(key K, cond Condition[V]) bool {
m.mu.RLock()
defer m.mu.RUnlock()

Expand All @@ -91,9 +94,8 @@ func (m *Map[K, V]) canDelete(key K, cond func(value V) bool) bool {
}

// DeleteIf conditionally deletes the value by the given key.
// Condition function must be pure.
// Returns true if the value was deleted.
func (m *Map[K, V]) DeleteIf(key K, cond func(value V) bool) bool {
func (m *Map[K, V]) DeleteIf(key K, cond Condition[V]) bool {
if !m.canDelete(key, cond) {
return false
}
Expand Down
20 changes: 6 additions & 14 deletions sync/hatmap/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,21 @@ func TestMapSetGet(t *testing.T) {
var m hatmap.Map[string, int]
m.Set("a", 1)

value, ok := m.SetIf("b", func(value int, exists bool) bool {
value, ok := m.SetIf("b", 2, func(currValue int) bool {
return true
}, func(int) int {
return 2
})
equal(t, value, 2)
equal(t, ok, true)

value, ok = m.SetIf("b", func(value int, exists bool) bool {
return !exists
}, func(int) int {
return 3
value, ok = m.SetIf("b", 3, func(currValue int) bool {
return currValue == 0
})
equal(t, value, 2)
equal(t, ok, false)

m.Set("c", 4)
value, ok = m.SetIf("c", func(value int, exists bool) bool {
return value == 4
}, func(int) int {
return 5
value, ok = m.SetIf("c", 5, func(currValue int) bool {
return currValue == 4
})
equal(t, value, 5)
equal(t, ok, true)
Expand Down Expand Up @@ -121,10 +115,8 @@ func TestMapConcurrent(t *testing.T) {
m.Set(key, i)
})
goGroup(&wg, func() {
m.SetIf(key, func(int, bool) bool {
m.SetIf(key, 77, func(currValue int) bool {
return true
}, func(v int) int {
return v + 1
})
})
goGroup(&wg, func() {
Expand Down
40 changes: 21 additions & 19 deletions sync/singleflight/singleflight.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package singleflight provides duplicate function calls suppression.
package singleflight

import (
Expand All @@ -6,41 +7,42 @@ import (
"github.com/Zamony/go/sync/hatmap"
)

type result[T any] struct {
Value T
Done chan struct{}
Waiters int64
type flight[T any] struct {
Result T
Done chan struct{}
WaitersCount int64
}

// Group suppresses duplicate function calls.
//
// Group must not be copied after first use.
type Group[K comparable, V any] struct {
results hatmap.Map[K, *result[V]]
flights hatmap.Map[K, *flight[V]]
}

// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group[K, V]) Do(key K, fun func() V) V {
res, isPrimary := g.results.SetIf(key, func(_ *result[V], exists bool) bool {
return !exists
}, func(*result[V]) *result[V] {
return &result[V]{Waiters: 1, Done: make(chan struct{})}
})
if !isPrimary {
atomic.AddInt64(&res.Waiters, 1)
newFlight := &flight[V]{WaitersCount: 1, Done: make(chan struct{})}
currFlight, isSet := g.flights.SetIf(key, newFlight, notExists)
if !isSet {
atomic.AddInt64(&currFlight.WaitersCount, 1)
} else {
res.Value = fun()
close(res.Done)
currFlight.Result = fun()
close(currFlight.Done)
}

<-res.Done
atomic.AddInt64(&res.Waiters, -1)
g.results.DeleteIf(key, func(value *result[V]) bool {
return atomic.LoadInt64(&value.Waiters) == 0
<-currFlight.Done
atomic.AddInt64(&currFlight.WaitersCount, -1)
g.flights.DeleteIf(key, func(v *flight[V]) bool {
return atomic.LoadInt64(&v.WaitersCount) == 0
})

return res.Value
return currFlight.Result
}

func notExists[V any](currValue *flight[V]) bool {
return currValue == nil
}
2 changes: 1 addition & 1 deletion sync/singleflight/singleflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestSingleFlight(t *testing.T) {
var wg sync.WaitGroup
defer wg.Wait()
single := singleflight.Group[string, int64]{}
for i := 0; i < 5; i++ {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
Expand Down

0 comments on commit aa8746d

Please sign in to comment.