diff --git a/client/db.go b/client/db.go index 4ec9643fff..e52dfed60a 100644 --- a/client/db.go +++ b/client/db.go @@ -75,7 +75,7 @@ type DB interface { // // It may be used to monitor database events - a new event will be yielded for each mutation. // Note: it does not copy the queue, just the reference to it. - Events() event.Bus + Events() *event.Bus // MaxTxnRetries returns the number of retries that this DefraDB instance has been configured to // make in the event of a transaction conflict in certain scenarios. diff --git a/event/buffered_bus.go b/event/buffered_bus.go deleted file mode 100644 index 5ab7f0aafa..0000000000 --- a/event/buffered_bus.go +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright 2024 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package event - -import ( - "sync" - "sync/atomic" -) - -type subscribeCommand *Subscription - -type unsubscribeCommand *Subscription - -type publishCommand Message - -type closeCommand struct{} - -// bufferedBus must implement the Bus interface -var _ (Bus) = (*bufferedBus)(nil) - -// bufferedBus is a bus that uses a buffered channel to manage subscribers and publish messages. -type bufferedBus struct { - // subID is incremented for each subscriber and used to set subscriber ids. - subID atomic.Uint64 - // subs is a mapping of subscriber ids to subscriptions. - subs map[uint64]*Subscription - // events is a mapping of event names to subscriber ids. - events map[Name]map[uint64]struct{} - // commandChannel manages all commands sent to the bufferedBus. - // - // It is important that all stuff gets sent through this single channel to ensure - // that the order of operations is preserved. - // - // This does mean that non-event commands can block the database if the buffer - // size is breached (e.g. if many subscribe commands occupy the buffer). - commandChannel chan any - eventBufferSize int - hasClosedChan chan struct{} - isClosed bool - // closeMutex is only locked when the bus is closing. - closeMutex sync.RWMutex -} - -// NewBufferedBus creates a new event bus with the given commandBufferSize and -// eventBufferSize. -// -// Should the buffers be filled, subsequent calls on this bus will block. -func NewBufferedBus(commandBufferSize int, eventBufferSize int) *bufferedBus { - bus := bufferedBus{ - subs: make(map[uint64]*Subscription), - events: make(map[Name]map[uint64]struct{}), - commandChannel: make(chan any, commandBufferSize), - hasClosedChan: make(chan struct{}), - eventBufferSize: eventBufferSize, - } - go bus.handleChannel() - return &bus -} - -func (b *bufferedBus) Publish(msg Message) { - b.closeMutex.RLock() - defer b.closeMutex.RUnlock() - - if b.isClosed { - return - } - b.commandChannel <- publishCommand(msg) -} - -func (b *bufferedBus) Subscribe(events ...Name) (*Subscription, error) { - b.closeMutex.RLock() - defer b.closeMutex.RUnlock() - - if b.isClosed { - return nil, ErrSubscribedToClosedChan - } - sub := &Subscription{ - id: b.subID.Add(1), - value: make(chan Message, b.eventBufferSize), - events: events, - } - b.commandChannel <- subscribeCommand(sub) - return sub, nil -} - -func (b *bufferedBus) Unsubscribe(sub *Subscription) { - b.closeMutex.RLock() - defer b.closeMutex.RUnlock() - - if b.isClosed { - return - } - b.commandChannel <- unsubscribeCommand(sub) -} - -func (b *bufferedBus) Close() { - b.closeMutex.Lock() - defer b.closeMutex.Unlock() - - if b.isClosed { - return - } - b.isClosed = true - b.commandChannel <- closeCommand{} - // Wait for the close command to be handled, in order, before returning - <-b.hasClosedChan -} - -func (b *bufferedBus) handleChannel() { - for cmd := range b.commandChannel { - switch t := cmd.(type) { - case closeCommand: - for _, subscriber := range b.subs { - close(subscriber.value) - } - close(b.commandChannel) - close(b.hasClosedChan) - return - - case subscribeCommand: - for _, event := range t.events { - if _, ok := b.events[event]; !ok { - b.events[event] = make(map[uint64]struct{}) - } - b.events[event][t.id] = struct{}{} - } - b.subs[t.id] = t - - case unsubscribeCommand: - if _, ok := b.subs[t.id]; !ok { - continue // not subscribed - } - for _, event := range t.events { - delete(b.events[event], t.id) - } - delete(b.subs, t.id) - close(t.value) - - case publishCommand: - for id := range b.events[WildCardName] { - b.subs[id].value <- Message(t) - } - for id := range b.events[t.Name] { - if _, ok := b.events[WildCardName][id]; ok { - continue - } - b.subs[id].value <- Message(t) - } - } - } -} diff --git a/event/bus.go b/event/bus.go index e202cc2f8b..371173686d 100644 --- a/event/bus.go +++ b/event/bus.go @@ -10,53 +10,154 @@ package event -// Bus is an event bus used to broadcasts messages to subscribers. -type Bus interface { - // Subscribe subscribes to the Channel, returning a channel by which events can - // be read from, or an error should one occur (e.g. if this object is closed). - // - // This function is non-blocking unless the subscription-buffer is full. - Subscribe(events ...Name) (*Subscription, error) +import ( + "sync" + "sync/atomic" +) - // Unsubscribe unsubscribes from the Channel, closing the provided channel. - // - // Will do nothing if this object is already closed. - Unsubscribe(sub *Subscription) +type subscribeCommand *Subscription + +type unsubscribeCommand *Subscription + +type publishCommand Message - // Publish pushes the given item into this channel. Non-blocking. - Publish(msg Message) +type closeCommand struct{} + +// Bus uses a buffered channel to manage subscribers and publish messages. +type Bus struct { + // subID is incremented for each subscriber and used to set subscriber ids. + subID atomic.Uint64 + // subs is a mapping of subscriber ids to subscriptions. + subs map[uint64]*Subscription + // events is a mapping of event names to subscriber ids. + events map[Name]map[uint64]struct{} + // commandChannel manages all commands sent to the bufferedBus. + // + // It is important that all stuff gets sent through this single channel to ensure + // that the order of operations is preserved. + // + // This does mean that non-event commands can block the database if the buffer + // size is breached (e.g. if many subscribe commands occupy the buffer). + commandChannel chan any + eventBufferSize int + hasClosedChan chan struct{} + isClosed bool + // closeMutex is only locked when the bus is closing. + closeMutex sync.RWMutex +} - // Close closes this Channel, and any owned or subscribing channels. - Close() +// NewBus creates a new event bus with the given commandBufferSize and +// eventBufferSize. +// +// Should the buffers be filled, subsequent calls on this bus will block. +func NewBus(commandBufferSize int, eventBufferSize int) *Bus { + bus := Bus{ + subs: make(map[uint64]*Subscription), + events: make(map[Name]map[uint64]struct{}), + commandChannel: make(chan any, commandBufferSize), + hasClosedChan: make(chan struct{}), + eventBufferSize: eventBufferSize, + } + go bus.handleChannel() + return &bus } -// Message contains event info. -type Message struct { - // Name is the name of the event this message was generated from. - Name Name +// Publish broadcasts the given message to the bus subscribers. Non-blocking. +func (b *Bus) Publish(msg Message) { + b.closeMutex.RLock() + defer b.closeMutex.RUnlock() - // Data contains optional event information. - Data any + if b.isClosed { + return + } + b.commandChannel <- publishCommand(msg) } -// NewMessage returns a new message with the given name and optional data. -func NewMessage(name Name, data any) Message { - return Message{name, data} +// Subscribe returns a new subscription that will receive all of the events +// contained in the given list of events. +func (b *Bus) Subscribe(events ...Name) (*Subscription, error) { + b.closeMutex.RLock() + defer b.closeMutex.RUnlock() + + if b.isClosed { + return nil, ErrSubscribedToClosedChan + } + sub := &Subscription{ + id: b.subID.Add(1), + value: make(chan Message, b.eventBufferSize), + events: events, + } + b.commandChannel <- subscribeCommand(sub) + return sub, nil } -// Subscription is a read-only event stream. -type Subscription struct { - id uint64 - value chan Message - events []Name +// Unsubscribe removes all event subscriptions and closes the subscription channel. +// +// Will do nothing if this object is already closed. +func (b *Bus) Unsubscribe(sub *Subscription) { + b.closeMutex.RLock() + defer b.closeMutex.RUnlock() + + if b.isClosed { + return + } + b.commandChannel <- unsubscribeCommand(sub) } -// Message returns the next event value from the subscription. -func (s *Subscription) Message() <-chan Message { - return s.value +// Close unsubscribes all active subscribers and closes the command channel. +func (b *Bus) Close() { + b.closeMutex.Lock() + defer b.closeMutex.Unlock() + + if b.isClosed { + return + } + b.isClosed = true + b.commandChannel <- closeCommand{} + // Wait for the close command to be handled, in order, before returning + <-b.hasClosedChan } -// Events returns the names of all subscribed events. -func (s *Subscription) Events() []Name { - return s.events +func (b *Bus) handleChannel() { + for cmd := range b.commandChannel { + switch t := cmd.(type) { + case closeCommand: + for _, subscriber := range b.subs { + close(subscriber.value) + } + close(b.commandChannel) + close(b.hasClosedChan) + return + + case subscribeCommand: + for _, event := range t.events { + if _, ok := b.events[event]; !ok { + b.events[event] = make(map[uint64]struct{}) + } + b.events[event][t.id] = struct{}{} + } + b.subs[t.id] = t + + case unsubscribeCommand: + if _, ok := b.subs[t.id]; !ok { + continue // not subscribed + } + for _, event := range t.events { + delete(b.events[event], t.id) + } + delete(b.subs, t.id) + close(t.value) + + case publishCommand: + for id := range b.events[WildCardName] { + b.subs[id].value <- Message(t) + } + for id := range b.events[t.Name] { + if _, ok := b.events[WildCardName][id]; ok { + continue + } + b.subs[id].value <- Message(t) + } + } + } } diff --git a/event/buffered_bus_test.go b/event/bus_test.go similarity index 80% rename from event/buffered_bus_test.go rename to event/bus_test.go index e976186485..4d5352bc94 100644 --- a/event/buffered_bus_test.go +++ b/event/bus_test.go @@ -18,8 +18,8 @@ import ( "github.com/stretchr/testify/assert" ) -func TestBufferedBus_PushIsNotBlockedWithoutSubscribers_Succeed(t *testing.T) { - bus := NewBufferedBus(0, 0) +func TestBus_PushIsNotBlockedWithoutSubscribers_Succeed(t *testing.T) { + bus := NewBus(0, 0) defer bus.Close() msg := NewMessage("test", 1) @@ -29,8 +29,8 @@ func TestBufferedBus_PushIsNotBlockedWithoutSubscribers_Succeed(t *testing.T) { assert.True(t, true) } -func TestBufferedBus_SubscribersAreNotBlockedAfterClose_Succeed(t *testing.T) { - bus := NewBufferedBus(0, 0) +func TestBus_SubscribersAreNotBlockedAfterClose_Succeed(t *testing.T) { + bus := NewBus(0, 0) defer bus.Close() sub, err := bus.Subscribe("test") @@ -44,8 +44,8 @@ func TestBufferedBus_SubscribersAreNotBlockedAfterClose_Succeed(t *testing.T) { assert.True(t, true) } -func TestBufferedBus_UnsubscribeTwice_Succeed(t *testing.T) { - bus := NewBufferedBus(0, 0) +func TestBus_UnsubscribeTwice_Succeed(t *testing.T) { + bus := NewBus(0, 0) defer bus.Close() sub, err := bus.Subscribe(WildCardName) @@ -55,8 +55,8 @@ func TestBufferedBus_UnsubscribeTwice_Succeed(t *testing.T) { bus.Unsubscribe(sub) } -func TestBufferedBus_WildCardDeduplicates_Succeed(t *testing.T) { - bus := NewBufferedBus(0, 0) +func TestBus_WildCardDeduplicates_Succeed(t *testing.T) { + bus := NewBus(0, 0) defer bus.Close() sub, err := bus.Subscribe("test", WildCardName) @@ -76,8 +76,8 @@ func TestBufferedBus_WildCardDeduplicates_Succeed(t *testing.T) { } } -func TestBufferedBus_EachSubscribersRecievesEachItem_Succeed(t *testing.T) { - bus := NewBufferedBus(0, 0) +func TestBus_EachSubscribersRecievesEachItem_Succeed(t *testing.T) { + bus := NewBus(0, 0) defer bus.Close() msg1 := NewMessage("test", 1) @@ -130,8 +130,8 @@ func TestBufferedBus_EachSubscribersRecievesEachItem_Succeed(t *testing.T) { assert.Equal(t, msg2, event2) } -func TestBufferedBus_EachSubscribersRecievesEachItemGivenBufferedEventChan_Succeed(t *testing.T) { - bus := NewBufferedBus(0, 2) +func TestBus_EachSubscribersRecievesEachItemGivenBufferedEventChan_Succeed(t *testing.T) { + bus := NewBus(0, 2) defer bus.Close() msg1 := NewMessage("test", 1) @@ -159,8 +159,8 @@ func TestBufferedBus_EachSubscribersRecievesEachItemGivenBufferedEventChan_Succe assert.Equal(t, msg2, output2Ch2) } -func TestBufferedBus_SubscribersDontRecieveItemsAfterUnsubscribing_Succeed(t *testing.T) { - bus := NewBufferedBus(0, 0) +func TestBus_SubscribersDontRecieveItemsAfterUnsubscribing_Succeed(t *testing.T) { + bus := NewBus(0, 0) defer bus.Close() sub, err := bus.Subscribe("test") diff --git a/event/event.go b/event/event.go index cf29859012..e9afdf1a57 100644 --- a/event/event.go +++ b/event/event.go @@ -84,3 +84,29 @@ type Merge struct { // SchemaRoot is the root identifier of the schema that defined the shape of the document that was updated. SchemaRoot string } + +// Message contains event info. +type Message struct { + // Name is the name of the event this message was generated from. + Name Name + + // Data contains optional event information. + Data any +} + +// NewMessage returns a new message with the given name and optional data. +func NewMessage(name Name, data any) Message { + return Message{name, data} +} + +// Subscription is a read-only event stream. +type Subscription struct { + id uint64 + value chan Message + events []Name +} + +// Message returns the next event value from the subscription. +func (s *Subscription) Message() <-chan Message { + return s.value +} diff --git a/http/client.go b/http/client.go index e1fb1034ce..2843ee4f2d 100644 --- a/http/client.go +++ b/http/client.go @@ -451,7 +451,7 @@ func (c *Client) Headstore() ds.Read { panic("client side database") } -func (c *Client) Events() event.Bus { +func (c *Client) Events() *event.Bus { panic("client side database") } diff --git a/internal/db/db.go b/internal/db/db.go index bb4308c1d9..574e748819 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -58,7 +58,7 @@ type db struct { rootstore datastore.RootStore multistore datastore.MultiStore - events event.Bus + events *event.Bus parser core.Parser @@ -109,7 +109,7 @@ func newDB( lensRegistry: lens, parser: parser, options: options, - events: event.NewBufferedBus(commandBufferSize, eventBufferSize), + events: event.NewBus(commandBufferSize, eventBufferSize), } // apply options @@ -251,7 +251,7 @@ func (db *db) initialize(ctx context.Context) error { } // Events returns the events Channel. -func (db *db) Events() event.Bus { +func (db *db) Events() *event.Bus { return db.events } diff --git a/tests/clients/cli/wrapper.go b/tests/clients/cli/wrapper.go index 1681dd6e7b..18e306c0f4 100644 --- a/tests/clients/cli/wrapper.go +++ b/tests/clients/cli/wrapper.go @@ -525,7 +525,7 @@ func (w *Wrapper) Close() { w.node.Close() } -func (w *Wrapper) Events() event.Bus { +func (w *Wrapper) Events() *event.Bus { return w.node.Events() } diff --git a/tests/clients/http/wrapper.go b/tests/clients/http/wrapper.go index 7288781ba5..89b5bce5e7 100644 --- a/tests/clients/http/wrapper.go +++ b/tests/clients/http/wrapper.go @@ -221,7 +221,7 @@ func (w *Wrapper) Close() { w.node.Close() } -func (w *Wrapper) Events() event.Bus { +func (w *Wrapper) Events() *event.Bus { return w.node.Events() }