Skip to content

Commit

Permalink
v0.4.117 recovery deferred location: PanicToErr RecoverErr RecoverDA2…
Browse files Browse the repository at this point in the history
… RecoverDA, pmaps use B-Tree: OrderedMap OrderedMapFunc KeyOrderedMap ThreadSafeOrderedMapFunc, pqs, Go GoGroup uses Awaitable, refactor: ClosableChan g0.goContext Go.WaitCh Go.EntityID parl.GoEntityID sets.Elements iters-iterators CyclicAwaitable
  • Loading branch information
haraldrudell committed Oct 30, 2023
1 parent 0db3e88 commit ae0f641
Show file tree
Hide file tree
Showing 126 changed files with 3,866 additions and 2,648 deletions.
39 changes: 39 additions & 0 deletions annotation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
© 2020–present Harald Rudell <[email protected]> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/

package parl

import (
"fmt"

"github.com/haraldrudell/parl/pruntime"
)

const (
// counts the stack-frame of [parl.Annotation]
parlAnnotationFrames = 1
// counts the stack-frame of [parl.getAnnotation]
getAnnotationFrames = 1
)

// Annotation provides a default reovered-panic code annotation
// - “Recover from panic in mypackage.MyFunc”
// - [base package].[function]: "mypackage.MyFunc"
func Annotation() (a string) {
return getAnnotation(parlAnnotationFrames)
}

// getAnnotation provides a default reovered-panic code getAnnotation
// - frames = 0 means immediate caller of getAnnotation
// - “Recover from panic in mypackage.MyFunc”
// - [base package].[function]: "mypackage.MyFunc"
func getAnnotation(frames int) (a string) {
if frames < 0 {
frames = 0
}
return fmt.Sprintf("Recover from panic in %s:",
pruntime.NewCodeLocation(frames+getAnnotationFrames).PackFunc(),
)
}
14 changes: 13 additions & 1 deletion awaitable-ch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,19 @@ All rights reserved

package parl

// AwaitableCh is a channel whose only allowed operation is channel receive
// AwaitableCh is a one-to-many inter-thread wait-mechanic with happens-before
// - AwaitableCh implements a semaphore
// - implementation is a channel whose only allowed operation is channel receive
// - AwaitableCh transfers no data, instead channel close is the significant event
//
// Usage:
//
// <-ch // waits for event
//
// select {
// case <-ch:
// hasHappened = true
// default:
// hasHappened = false
// }
type AwaitableCh <-chan struct{}
14 changes: 4 additions & 10 deletions awaitable.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import "sync/atomic"
// - one-to-many, happens-before
// - the synchronization mechanic is closing channel, allowing consumers to await
// multiple events
// - status can be inspected in a thread-safe manner: isClosed, isAboutToClose
// allows for race-free consumers
// - Close is idempotent, panic-free
// - if atomic.Pointer[Awaitable] is used for retrieval, a cyclic semaphore is achieved
// - IsClosed provides thread-safe observability
// - Close is idempotent and panic-free
// - [parl.CyclicAwaitable] is re-armable, cyclic version
type Awaitable struct {
isClosed atomic.Bool
ch chan struct{}
Expand All @@ -33,18 +32,13 @@ func (a *Awaitable) Ch() (ch AwaitableCh) {

// isClosed inspects whether the awaitable has been triggered
// - isClosed indicates that the channel is closed
// - isAboutToClose indicates that Close has been invoked,
// but that channel close may still be in progress
// - if isClosed is true, isAboutToClose is also true
// - the two values are requried to attain race-free consumers
// - Thread-safe
func (a *Awaitable) IsClosed() (isClosed, isAboutToClose bool) {
func (a *Awaitable) IsClosed() (isClosed bool) {
select {
case <-a.ch:
isClosed = true
default:
}
isAboutToClose = a.isClosed.Load()
return
}

Expand Down
2 changes: 1 addition & 1 deletion channel-send.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func ChannelSend[T any](ch chan<- T, value T, nonBlocking ...bool) (didSend, isN
// - the only way to determine closed channel is to send, which panics
// - a separate function to recover the panic
func channelSend[T any](ch chan<- T, value T, sendNb bool) (didSend bool, err error) {
defer Recover(Annotation(), &err, NoOnError)
defer PanicToErr(&err)

// send non-blocking
if sendNb {
Expand Down
88 changes: 49 additions & 39 deletions closable-chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ ISC License
package parl

import (
"sync"
"sync/atomic"
)

Expand All @@ -29,55 +28,67 @@ import (
// defer errCh.Close(&err) // will not terminate the process
// errCh.Ch() <- err
type ClosableChan[T any] struct {
hasChannel atomic.Bool // hasChannel provides thread-safe lock-free read of ch
chLock sync.Mutex
// ch is the channel object
// - outside the new function, ch is written behind chLock
ch chan T

isCloseInvoked atomic.Bool // indicates the channel being closed or about to close
closeOnce Once // [parl.Once] is an observable sync.Once
// ch0 is the channel object
// - ability to initialize ch0 in the constructor
// - ability to update ch0 after creation
// - ch0 therefore must be pointer
// - ch0 must offer thread-safe access and update

// ch0 as provided by contructor or nil
ch0 chan T
// ch0 provided post-constructor because ch0 nil
chp atomic.Pointer[chan T]

// indicates the channel about to close or closed
// - because the channel may transfer data, it cannot be inspected for being closed
isCloseInvoked atomic.Bool
// [parl.Once] is an observable sync.Once
// - indicates that the channel is closed
closeOnce Once
}

// NewClosableChan returns a channel with idempotent panic-free observable close
// - ch is an optional non-closed channel object
// - if ch is not present, an unbuffered channel will be created
// - cannot use lock in new function
// - if an unbuffered channel is used, NewClosableChan is not required
func NewClosableChan[T any](ch ...chan T) (cl *ClosableChan[T]) {
c := ClosableChan[T]{}
func NewClosableChan[T any](ch ...chan T) (closable *ClosableChan[T]) {
var ch0 chan T
if len(ch) > 0 {
if c.ch = ch[0]; c.ch != nil {
c.hasChannel.Store(true)
}
ch0 = ch[0] // if ch is present, apply it
}
return &c
return &ClosableChan[T]{ch0: ch0}
}

// Ch retrieves the channel. Thread-safe
// Ch retrieves the channel as bi-directional. Thread-safe
// - nil is never returned
// - the channel may already be closed
// - do not close the channel other than using the Close method
// - the channel may be closed, use IsClosed to determine
// - do not close the channel other than using Close method
// - per Go channel close, if one thread is blocked in channel send
// while another thread closes the channel, a data race occurs
// - thread-safe solution is to set an additional indicator of
// close requested and then reading the channel which
// releases the sending thread
func (c *ClosableChan[T]) Ch() (ch chan T) {
return c.getCh()
}

// ReceiveCh retrieves the channel. Thread-safe
// ReceiveCh retrieves the channel as receive-only. Thread-safe
// - nil is never returned
// - the channel may already be closed
// - do not close the channel other than using the Close method
func (c *ClosableChan[T]) ReceiveCh() (ch <-chan T) {
return c.getCh()
}

// SendCh retrieves the channel. Thread-safe
// SendCh retrieves the channel as send-only. Thread-safe
// - nil is never returned
// - the channel may already be closed
// - do not close the channel other than using the Close method
// - per Go channel close, if one thread is blocked in channel send
// while another thread closes the channel, a data race occurs
// - thread-safe solution is to set an additional indicator of
// close requested and then reading the channel which
// releases the sending thread
func (c *ClosableChan[T]) SendCh() (ch chan<- T) {
return c.getCh()
}
Expand All @@ -98,11 +109,12 @@ func (c *ClosableChan[T]) IsClosed(includePending ...bool) (isClosed bool) {
// Close ensures the channel is closed
// - Close does not return until the channel is closed.
// - thread-safe panic-free deferrable observable
// - all invocations have close result in err
// - all invocations have the same close result in err
// - didClose indicates whether this invocation closed the channel
// - if errp is non-nil, it will receive the close result
// - per Go channel close, if one thread is blocked in channel send
// while another thread closes the channel, a data race occurs
// - thread-safe, panic-free, deferrable, idempotent
func (cl *ClosableChan[T]) Close(errp ...*error) (didClose bool, err error) {

// ensure isCloseInvoked true: channel is about to close
Expand Down Expand Up @@ -131,29 +143,27 @@ func (cl *ClosableChan[T]) Close(errp ...*error) (didClose bool, err error) {
}

// getCh gets or initializes the channel object [ClosableChan.ch]
func (cl *ClosableChan[T]) getCh() (ch chan T) {

// wrap lock in performance-friendly atomic
// - by reading hasChannel cl.ch access is thread-safe
// - if channel is closed, return whatever ch is
if cl.hasChannel.Load() || cl.closeOnce.IsDone() {
return cl.ch
func (c *ClosableChan[T]) getCh() (ch chan T) {
if ch = c.ch0; ch != nil {
return // channel from constructor return
}

// ensure a channel is present
cl.chLock.Lock()
defer cl.chLock.Unlock()

if ch = cl.ch; ch == nil {
ch = make(chan T)
cl.ch = ch
cl.hasChannel.Store(true)
for {
if chp := c.chp.Load(); chp != nil {
ch = *chp
return // chp was present return
}
if ch == nil {
ch = make(chan T)
}
if c.chp.CompareAndSwap(nil, &ch) {
return // chp updated return
}
}
return
}

// doClose is behind [ClosableChan.closeOnce] and
// is therefore only invoked once
// - separate function because provided to Once
func (cl *ClosableChan[T]) doClose() (err error) {

// ensure a channel exists and close it
Expand Down
6 changes: 3 additions & 3 deletions closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
// Closer handles panics.
// if errp is non-nil, panic values updates it using errors.AppendError.
func Closer[T any](ch chan T, errp *error) {
defer Recover(Annotation(), errp, NoOnError)
defer PanicToErr(errp)

close(ch)
}
Expand All @@ -29,7 +29,7 @@ func Closer[T any](ch chan T, errp *error) {
// CloserSend handles panics.
// if errp is non-nil, panic values updates it using errors.AppendError.
func CloserSend[T any](ch chan<- T, errp *error) {
defer Recover(Annotation(), errp, NoOnError)
defer PanicToErr(errp)

close(ch)
}
Expand All @@ -38,7 +38,7 @@ func CloserSend[T any](ch chan<- T, errp *error) {
// Close handles panics.
// if errp is non-nil, panic values updates it using errors.AppendError.
func Close(closable io.Closer, errp *error) {
defer Recover(Annotation(), errp, NoOnError)
defer PanicToErr(errp)

if e := closable.Close(); e != nil {
*errp = perrors.AppendError(*errp, e)
Expand Down
60 changes: 21 additions & 39 deletions cyclic-awaitable.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ package parl
import "sync/atomic"

const (
CyclicAwaitableClosed = true
// as argument to NewCyclicAwaitable, causes the awaitable ot be initially
// triggered
CyclicAwaitableClosed bool = true
)

// CyclicAwaitable is an awaitable that can be re-initialized
Expand All @@ -19,67 +21,47 @@ const (
// allows for race-free consumers
// - Close is idempotent, panic-free
// - if atomic.Pointer[Awaitable] is used for retrieval, a cyclic semaphore is achieved
type CyclicAwaitable atomic.Pointer[Awaitable]
type CyclicAwaitable struct{ *atomic.Pointer[Awaitable] }

// NewCyclicAwaitable returns an awaitable that can be re-initialized
// - Init must be invoked prior to use
func NewCyclicAwaitable() (awaitable *CyclicAwaitable) {
return &CyclicAwaitable{}
}

// Init sets the initial state of the awaitable
// - default is not triggered
// - if argument [task.CyclicAwaitableClosed], initial state
// is triggered
func (a *CyclicAwaitable) Init(initiallyClosed ...bool) (a2 *CyclicAwaitable) {
a2 = a
var shouldBeClosed = len(initiallyClosed) > 0 && initiallyClosed[0]
var awaitable = NewAwaitable()
if shouldBeClosed {
awaitable.Close()
// - if argument [task.CyclicAwaitableClosed] is provided, the initial state
// of the CyclicAwaitable is triggered
func NewCyclicAwaitable(initiallyClosed ...bool) (awaitable *CyclicAwaitable) {
c := CyclicAwaitable{Pointer: &atomic.Pointer[Awaitable]{}}
c.Store(NewAwaitable())
if len(initiallyClosed) > 0 && initiallyClosed[0] {
c.Close()
}
(*atomic.Pointer[Awaitable])(a).Store(awaitable)
return
return &c
}

// Ch returns an awaitable channel. Thread-safe
func (a *CyclicAwaitable) Ch() (ch AwaitableCh) {
return (*atomic.Pointer[Awaitable])(a).Load().Ch()
}
func (a *CyclicAwaitable) Ch() (ch AwaitableCh) { return a.Pointer.Load().Ch() }

// isClosed inspects whether the awaitable has been triggered
// - isClosed indicates that the channel is closed
// - isAboutToClose indicates that Close has been invoked,
// but that channel close may still be in progress
// - if isClosed is true, isAboutToClose is also true
// - the two values are requried to attain race-free consumers
// - Thread-safe
func (a *CyclicAwaitable) IsClosed() (isClosed, isAboutToClose bool) {
return (*atomic.Pointer[Awaitable])(a).Load().IsClosed()
}
func (a *CyclicAwaitable) IsClosed() (isClosed bool) { return a.Load().IsClosed() }

// Close triggers awaitable by closing the channel
// - upon return, the channel is guaranteed to be closed
// - idempotent, panic-free, thread-safe
func (a *CyclicAwaitable) Close() (didClose bool) {
return (*atomic.Pointer[Awaitable])(a).Load().Close()
}
func (a *CyclicAwaitable) Close() (didClose bool) { return a.Load().Close() }

// Open rearms the awaitable for another cycle
// - upon return, the channel is guarantee to be open
// - idempotent, panic-free, thread-safe
func (a *CyclicAwaitable) Open() (didOpen bool) {
var openp *Awaitable
var openedAwaitable *Awaitable
for {
var ap = (*atomic.Pointer[Awaitable])(a).Load()
var isClosed, _ = ap.IsClosed()
if !isClosed {
var awaitable = a.Load()
if !awaitable.IsClosed() {
return // was open return
}
if openp == nil {
openp = NewAwaitable()
if openedAwaitable == nil {
openedAwaitable = NewAwaitable()
}
if didOpen = (*atomic.Pointer[Awaitable])(a).CompareAndSwap(ap, openp); didOpen {
if didOpen = a.CompareAndSwap(awaitable, openedAwaitable); didOpen {
return // did open the channel return
}
}
Expand Down
Loading

0 comments on commit ae0f641

Please sign in to comment.