Skip to content

Commit

Permalink
remove event.Bus interface
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Jun 17, 2024
1 parent 2dfa4d2 commit 56136c5
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 215 deletions.
2 changes: 1 addition & 1 deletion client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type DB interface {
//
// It may be used to monitor database events - a new event will be yielded for each mutation.
// Note: it does not copy the queue, just the reference to it.
Events() event.Bus
Events() *event.Bus

// MaxTxnRetries returns the number of retries that this DefraDB instance has been configured to
// make in the event of a transaction conflict in certain scenarios.
Expand Down
159 changes: 0 additions & 159 deletions event/buffered_bus.go

This file was deleted.

171 changes: 136 additions & 35 deletions event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,53 +10,154 @@

package event

// Bus is an event bus used to broadcasts messages to subscribers.
type Bus interface {
// Subscribe subscribes to the Channel, returning a channel by which events can
// be read from, or an error should one occur (e.g. if this object is closed).
//
// This function is non-blocking unless the subscription-buffer is full.
Subscribe(events ...Name) (*Subscription, error)
import (
"sync"
"sync/atomic"
)

// Unsubscribe unsubscribes from the Channel, closing the provided channel.
//
// Will do nothing if this object is already closed.
Unsubscribe(sub *Subscription)
type subscribeCommand *Subscription

type unsubscribeCommand *Subscription

type publishCommand Message

// Publish pushes the given item into this channel. Non-blocking.
Publish(msg Message)
type closeCommand struct{}

// Bus uses a buffered channel to manage subscribers and publish messages.
type Bus struct {
// subID is incremented for each subscriber and used to set subscriber ids.
subID atomic.Uint64
// subs is a mapping of subscriber ids to subscriptions.
subs map[uint64]*Subscription
// events is a mapping of event names to subscriber ids.
events map[Name]map[uint64]struct{}
// commandChannel manages all commands sent to the bufferedBus.
//
// It is important that all stuff gets sent through this single channel to ensure
// that the order of operations is preserved.
//
// This does mean that non-event commands can block the database if the buffer
// size is breached (e.g. if many subscribe commands occupy the buffer).
commandChannel chan any
eventBufferSize int
hasClosedChan chan struct{}
isClosed bool
// closeMutex is only locked when the bus is closing.
closeMutex sync.RWMutex
}

// Close closes this Channel, and any owned or subscribing channels.
Close()
// NewBus creates a new event bus with the given commandBufferSize and
// eventBufferSize.
//
// Should the buffers be filled, subsequent calls on this bus will block.
func NewBus(commandBufferSize int, eventBufferSize int) *Bus {
bus := Bus{
subs: make(map[uint64]*Subscription),
events: make(map[Name]map[uint64]struct{}),
commandChannel: make(chan any, commandBufferSize),
hasClosedChan: make(chan struct{}),
eventBufferSize: eventBufferSize,
}
go bus.handleChannel()
return &bus
}

// Message contains event info.
type Message struct {
// Name is the name of the event this message was generated from.
Name Name
// Publish broadcasts the given message to the bus subscribers. Non-blocking.
func (b *Bus) Publish(msg Message) {
b.closeMutex.RLock()
defer b.closeMutex.RUnlock()

// Data contains optional event information.
Data any
if b.isClosed {
return
}
b.commandChannel <- publishCommand(msg)
}

// NewMessage returns a new message with the given name and optional data.
func NewMessage(name Name, data any) Message {
return Message{name, data}
// Subscribe returns a new subscription that will receive all of the events
// contained in the given list of events.
func (b *Bus) Subscribe(events ...Name) (*Subscription, error) {
b.closeMutex.RLock()
defer b.closeMutex.RUnlock()

if b.isClosed {
return nil, ErrSubscribedToClosedChan

Check warning on line 83 in event/bus.go

View check run for this annotation

Codecov / codecov/patch

event/bus.go#L83

Added line #L83 was not covered by tests
}
sub := &Subscription{
id: b.subID.Add(1),
value: make(chan Message, b.eventBufferSize),
events: events,
}
b.commandChannel <- subscribeCommand(sub)
return sub, nil
}

// Subscription is a read-only event stream.
type Subscription struct {
id uint64
value chan Message
events []Name
// Unsubscribe removes all event subscriptions and closes the subscription channel.
//
// Will do nothing if this object is already closed.
func (b *Bus) Unsubscribe(sub *Subscription) {
b.closeMutex.RLock()
defer b.closeMutex.RUnlock()

if b.isClosed {
return
}
b.commandChannel <- unsubscribeCommand(sub)
}

// Message returns the next event value from the subscription.
func (s *Subscription) Message() <-chan Message {
return s.value
// Close unsubscribes all active subscribers and closes the command channel.
func (b *Bus) Close() {
b.closeMutex.Lock()
defer b.closeMutex.Unlock()

if b.isClosed {
return
}
b.isClosed = true
b.commandChannel <- closeCommand{}
// Wait for the close command to be handled, in order, before returning
<-b.hasClosedChan
}

// Events returns the names of all subscribed events.
func (s *Subscription) Events() []Name {
return s.events
func (b *Bus) handleChannel() {
for cmd := range b.commandChannel {
switch t := cmd.(type) {
case closeCommand:
for _, subscriber := range b.subs {
close(subscriber.value)
}
close(b.commandChannel)
close(b.hasClosedChan)
return

case subscribeCommand:
for _, event := range t.events {
if _, ok := b.events[event]; !ok {
b.events[event] = make(map[uint64]struct{})
}
b.events[event][t.id] = struct{}{}
}
b.subs[t.id] = t

case unsubscribeCommand:
if _, ok := b.subs[t.id]; !ok {
continue // not subscribed
}
for _, event := range t.events {
delete(b.events[event], t.id)
}
delete(b.subs, t.id)
close(t.value)

case publishCommand:
for id := range b.events[WildCardName] {
b.subs[id].value <- Message(t)
}
for id := range b.events[t.Name] {
if _, ok := b.events[WildCardName][id]; ok {
continue
}
b.subs[id].value <- Message(t)
}
}
}
}
Loading

0 comments on commit 56136c5

Please sign in to comment.