Skip to content

Commit

Permalink
Reimplement watches without extraneous goroutines
Browse files Browse the repository at this point in the history
The original implementation of Hypcast's watch.Value used a long-running
goroutine per watch, along with a second goroutine to protect against
runtime.Goexit breaking the watch invariants. I wanted to challenge
myself to reimplement watches with one goroutine active only while
executing a handler, while maintaining correct runtime.Goexit handling.

The core idea seems simple, at least to me: use a mutex-protected
"running" flag to track whether there's a handler goroutine ready to
deal with new values, and simultaneously hand off those values without
data races. On runtime.Goexit, start up a new copy of the same goroutine
to avoid missing any new values.

The WaitGroup handling might be the trickiest part, since it could be
marked done by either Cancel or the goroutine. But the invariant is that
the first party to see (w.running == false && w.cancel == true) is the
one responsible for marking it done.

I brought back and cleaned up some old benchmarks to see how this
compared with the old implementation under load, and while it does
better in a lot of cases it's not a *complete* win for every combination
of watcher count and GOMAXPROCS. Or, my data is bad since I ran this on
too noisy of a laptop and don't even have the raw data to tell whether
the differences are significant. I have to say that after using a Rust
port of Haskell's Criterion in another project and seeing the level of
statistical analysis it does, I'm not impressed with the capabilities of
the Go test framework here. But I can confidently say this is nowhere
close to a net negative.

I had to add a few new unit tests to restore 100% mutation testing
coverage (via go-mutesting), which was a good opportunity to clean up
the existing tests with newer patterns like range over int. One was an
issue even in the original, where I didn't fully test that Cancel stops
new handler executions. Others were related to the new ability to safely
double-Cancel a watch.

Finally, I wrote a Promela model to validate the "mutual exclusion of
handlers" and "handler executed for at least one value" properties of
this approach, which I might document and commit here later.
  • Loading branch information
ahamlinman committed Aug 31, 2024
2 parents bdc5e14 + b930605 commit 2e8d680
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 79 deletions.
119 changes: 65 additions & 54 deletions internal/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import "sync"
//
// The zero value of a Value is valid and stores the zero value of T.
type Value[T any] struct {
// Invariant: Every Watch must receive one update call for every value of the
// Value from the time it is added to the watchers set to the time it is
// removed.
//
// mu protects this invariant, and prevents data races on value.
mu sync.RWMutex
// mu prevents data races on the value, and protects the invariant that every
// Watch receives one update call for every value of the Value from the time
// it's added to the watchers set to the time it's removed.
mu sync.Mutex
value T
watchers map[*watch[T]]struct{}
}
Expand All @@ -25,8 +23,8 @@ func NewValue[T any](x T) *Value[T] {

// Get returns the current value stored in v.
func (v *Value[T]) Get() T {
v.mu.RLock()
defer v.mu.RUnlock()
v.mu.Lock()
defer v.mu.Unlock()

return v.value
}
Expand All @@ -46,31 +44,30 @@ func (v *Value[T]) Set(x T) {
//
// Each active watch executes up to one instance of handler at a time in a new
// goroutine, first with the value stored in v upon creation of the watch, then
// with subsequent values stored in v by calls to Set. If the value stored in v
// changes while a handler execution is in flight, handler will be called once
// more with the latest value stored in v following its current execution.
// Intermediate updates preceding the latest value will be dropped.
// with subsequent values stored in v by calls to [Value.Set]. If the value
// stored in v changes while a handler execution is in flight, handler will be
// called once more with the latest value stored in v following its current
// execution. Intermediate updates preceding the latest value will be dropped.
//
// Values are not recovered by the garbage collector until all of their
// associated watches have terminated. A watch is terminated after it has been
// canceled by a call to Watch.Cancel, and any pending or in-flight handler
// canceled by a call to [Watch.Cancel], and any pending or in-flight handler
// execution has finished.
func (v *Value[T]) Watch(handler func(x T)) Watch {
w := newWatch(handler, v.unregisterWatch)
v.updateAndRegisterWatch(w)
v.registerAndUpdateWatch(w)
return w
}

func (v *Value[T]) updateAndRegisterWatch(w *watch[T]) {
func (v *Value[T]) registerAndUpdateWatch(w *watch[T]) {
v.mu.Lock()
defer v.mu.Unlock()

w.update(v.value)

if v.watchers == nil {
v.watchers = make(map[*watch[T]]struct{})
}
v.watchers[w] = struct{}{}
w.update(v.value)
}

func (v *Value[T]) unregisterWatch(w *watch[T]) {
Expand All @@ -80,7 +77,7 @@ func (v *Value[T]) unregisterWatch(w *watch[T]) {
delete(v.watchers, w)
}

// Watch represents a single watch on a Value. See Value.Watch for details.
// Watch represents a single watch on a Value. See [Value.Watch] for details.
type Watch interface {
// Cancel requests that this watch be terminated as soon as possible,
// potentially after a pending or in-flight handler execution has finished.
Expand All @@ -98,61 +95,75 @@ type Watch interface {
type watch[T any] struct {
handler func(T)
unregister func(*watch[T])
pending chan T
done chan struct{}

mu sync.Mutex
wg sync.WaitGroup
next T
ok bool // There is a valid value in next.
running bool // There is (or will be) a goroutine responsible for handling values.
cancel bool // The WaitGroup must be canceled as soon as running == false.
}

func newWatch[T any](handler func(T), unregister func(*watch[T])) *watch[T] {
w := &watch[T]{
handler: handler,
unregister: unregister,
pending: make(chan T, 1),
done: make(chan struct{}),
}
go w.run()
w.wg.Add(1)
return w
}

func (w *watch[T]) run() {
var wg sync.WaitGroup
defer close(w.done)

for next := range w.pending {
x := next
wg.Add(1)
// Insulate the handler from the main loop, e.g. if it calls runtime.Goexit
// it should not terminate this loop and break the processing of new values.
go func() {
defer wg.Done()
w.handler(x)
}()
wg.Wait()
func (w *watch[T]) update(x T) {
w.mu.Lock()
start := !w.running
w.next, w.ok, w.running = x, true, true
w.mu.Unlock()
if start {
go w.run()
}
}

func (w *watch[T]) update(x T) {
// It's important that this call not block, so we assume w.pending is buffered
// and drop a pending update to free space if necessary.
select {
case <-w.pending:
w.pending <- x
case w.pending <- x:
func (w *watch[T]) run() {
var unwind bool
defer func() {
if unwind {
// Only possible if w.running == true, so we must maintain the invariant.
go w.run()
}
}()

for {
w.mu.Lock()
next, cancel := w.next, w.cancel
stop := !w.ok || cancel
w.running = !stop
w.next, w.ok = *new(T), false
w.mu.Unlock()

if cancel {
w.wg.Done()
}
if stop {
return
}

unwind = true
w.handler(next) // May panic or call runtime.Goexit.
unwind = false
}
}

func (w *watch[T]) Cancel() {
w.unregister(w)
w.clearPending()
close(w.pending)
}

func (w *watch[T]) clearPending() {
select {
case <-w.pending:
default:
w.unregister(w) // After this, we are guaranteed no new w.update calls.
w.mu.Lock()
finish := !w.running && !w.cancel
w.cancel = true
w.mu.Unlock()
if finish {
w.wg.Done()
}
}

func (w *watch[T]) Wait() {
<-w.done
w.wg.Wait()
}
Loading

0 comments on commit 2e8d680

Please sign in to comment.