Skip to content

Commit

Permalink
chore: use init(), add concurrency test
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Jun 26, 2024
1 parent 21ef534 commit b119b61
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 45 deletions.
47 changes: 47 additions & 0 deletions events_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -216,3 +217,49 @@ func TestEvenHandler10(t *testing.T) {

eh.Unsubscribe(id)
}

func TestEvenHandlerConcurrent(t *testing.T) {
eh, id := NewEventBus()
ch := make(chan Event, 100)
eh2, id2 := NewEventBus()
ch2 := make(chan Event, 100)

wg := &sync.WaitGroup{}
wg.Add(3)

go func() {
defer wg.Done()
err := eh.SubscribeP(id, "http.EventWorkerError", ch)
require.NoError(t, err)

for e := range ch {
t.Log(e)
}
}()

go func() {
defer wg.Done()
err := eh2.SubscribeP(id2, "http.EventWorkerError", ch2)
require.NoError(t, err)

for e2 := range ch2 {
t.Log(e2)
}
}()

go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
eh.Send(NewEvent(EventWorkerError, "http", "foo"))
eh2.Send(NewEvent(EventWorkerError, "http", "foo"))
}
}()

time.Sleep(time.Second)

eh.Unsubscribe(id)
eh2.Unsubscribe(id2)
close(ch)
close(ch2)
wg.Wait()
}
71 changes: 34 additions & 37 deletions eventsbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ type sub struct {
}

type Bus struct {
sync.RWMutex
subscribers sync.Map
mu sync.RWMutex
subscribers map[string][]*sub
internalEvCh chan Event
stop chan struct{}
}

func newEventsBus() *Bus {
return &Bus{
subscribers: make(map[string][]*sub, 10),
internalEvCh: make(chan Event, 100),
stop: make(chan struct{}),
}
Expand Down Expand Up @@ -80,24 +81,27 @@ func (eb *Bus) SubscribeP(subID string, pattern string, ch chan<- Event) error {
}

func (eb *Bus) Unsubscribe(subID string) {
eb.subscribers.Delete(subID)
eb.mu.Lock()
defer eb.mu.Unlock()
delete(eb.subscribers, subID)
}

func (eb *Bus) UnsubscribeP(subID, pattern string) {
if sb, ok := eb.subscribers.Load(subID); ok {
eb.Lock()
defer eb.Unlock()

sbArr := sb.([]*sub)

for i := 0; i < len(sbArr); i++ {
if sbArr[i].pattern == pattern {
sbArr[i] = sbArr[len(sbArr)-1]
sbArr = sbArr[:len(sbArr)-1]
// replace with new array
eb.subscribers.Store(subID, sbArr)
return
}
eb.mu.Lock()
defer eb.mu.Unlock()

if _, ok := eb.subscribers[subID]; !ok {
return
}

sbArr := eb.subscribers[subID]

for i := 0; i < len(sbArr); i++ {
if sbArr[i].pattern == pattern {
sbArr[i] = sbArr[len(sbArr)-1]
sbArr = sbArr[:len(sbArr)-1]
// replace with new array
eb.subscribers[subID] = sbArr
}
}
}
Expand All @@ -113,34 +117,29 @@ func (eb *Bus) Send(ev Event) {
}

func (eb *Bus) Len() uint {
var ln uint

eb.subscribers.Range(func(key, value any) bool {
ln++
return true
})

return ln
eb.mu.RLock()
defer eb.mu.RUnlock()
return uint(len(eb.subscribers))
}

func (eb *Bus) subscribe(subID string, pattern string, ch chan<- Event) error {
eb.Lock()
defer eb.Unlock()
eb.mu.Lock()
defer eb.mu.Unlock()

w, err := newWildcard(pattern)
if err != nil {
return err
}

if sb, ok := eb.subscribers.Load(subID); ok {
if subArr, ok := eb.subscribers[subID]; ok {
// at this point we are confident that sb is a []*sub type
subArr := sb.([]*sub)
subArr = append(subArr, &sub{
pattern: pattern,
w: w,
events: ch,
})

eb.subscribers.Store(subID, subArr)
eb.subscribers[subID] = subArr

return nil
}
Expand All @@ -152,7 +151,7 @@ func (eb *Bus) subscribe(subID string, pattern string, ch chan<- Event) error {
events: ch,
})

eb.subscribers.Store(subID, subArr)
eb.subscribers[subID] = subArr

return nil
}
Expand All @@ -162,21 +161,19 @@ func (eb *Bus) handleEvents() {
// http.WorkerError for example
wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String())

eb.subscribers.Range(func(key, value any) bool {
vsub := value.([]*sub)
eb.mu.RLock()

for _, vsub := range eb.subscribers {
for i := 0; i < len(vsub); i++ {
if vsub[i].w.match(wc) {
select {
case vsub[i].events <- ev:
return true
default:
return true
}
}
}
}

return true
})
eb.mu.RUnlock()
}
}
13 changes: 5 additions & 8 deletions init.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
package events

import (
"sync"

"github.com/google/uuid"
)

var evBus *Bus
var onInit = &sync.Once{}

func NewEventBus() (*Bus, string) {
onInit.Do(func() {
evBus = newEventsBus()
go evBus.handleEvents()
})
func init() {
evBus = newEventsBus()
go evBus.handleEvents()
}

func NewEventBus() (*Bus, string) {
// return events bus with subscriberID
return evBus, uuid.NewString()
}

0 comments on commit b119b61

Please sign in to comment.