From 0db3e8859cee99d91a8053521b7cef81c008976a Mon Sep 17 00:00:00 2001 From: Harald Rudell Date: Thu, 26 Oct 2023 08:57:11 -0700 Subject: [PATCH] v0.4.116 deprecate AtomicBool AtomicReference --- adderror.go => add-error.go | 3 + android-serial.go | 16 ++- android-status.go | 20 ++-- atomic-bool.go | 48 --------- atomic-bool_test.go | 26 ----- atomic-counter.go | 58 +++++----- atomic-max-duration.go | 75 ------------- atomic-max.go | 45 +++++--- atomic-min.go | 68 +++++++----- atomic-reference.go | 43 -------- awaitable.go | 10 +- channel-send.go | 34 ++++-- cyclic-awaitable.go | 22 ++-- cyclic-waiter.go | 9 +- enum/enum-item-impl.go | 3 + future.go | 7 +- g0/go-chain.go | 2 +- g0/go-group.go | 71 ++++++------ g0/go-group_test.go | 12 +-- g0/thread-logger.go | 4 +- g0/thread-safe-thread-data.go | 7 +- go.mod | 2 +- go.sum | 4 +- if-adb-serverette.go | 2 +- internal/cyclebreaker/atomic-bool.go | 48 --------- internal/cyclebreaker/atomic-max.go | 6 +- mains/go.mod | 2 +- mutex-wait.go | 11 +- once.go | 19 ++-- parl.go | 102 +++++++++--------- pfmt/no-recurse-v-print.go | 4 +- pfs/directory-lister.go | 4 +- pio/closer-callback.go | 7 +- pio/tee-writer.go | 7 +- pio/write-closer-to-string.go | 8 +- pnet/http.go | 23 ++-- pnet/https.go | 2 +- pnet/udp.go | 27 ++--- pterm/go.mod | 2 +- ...est.go => recover-invocation-panic_test.go | 0 slow-detector-core.go | 35 +++--- slow-detector-invocation.go | 8 +- slow-detector-thread.go | 5 +- slow-detector.go | 10 +- sqliter/go.mod | 4 +- sqliter/go.sum | 4 +- watchfs/go.mod | 4 +- watchfs/go.sum | 4 +- watchfs/watcher.go | 4 +- win-or-waiter-core.go | 23 ++-- win-or-waiter-core_test.go | 16 +-- yamler/generic-yaml.go | 12 ++- yamler/go.mod | 2 +- yamler/unmarshaler.go | 7 +- yamler/visited-references-map.go | 54 ++++++---- 55 files changed, 448 insertions(+), 607 deletions(-) rename adderror.go => add-error.go (56%) delete mode 100644 atomic-bool.go delete mode 100644 atomic-bool_test.go delete mode 100644 atomic-max-duration.go delete mode 100644 atomic-reference.go delete mode 100644 internal/cyclebreaker/atomic-bool.go rename callbacks_test.go => recover-invocation-panic_test.go (100%) diff --git a/adderror.go b/add-error.go similarity index 56% rename from adderror.go rename to add-error.go index 3ade7e62..76cd0930 100644 --- a/adderror.go +++ b/add-error.go @@ -5,8 +5,11 @@ ISC License package parl +// AddError is a function to submit non-fatal errors type AddError func(err error) +// AddErrorIf is an interface implementing the AddError function type AddErrorIf interface { + // AddError is a function to submit non-fatal errors AddError(err error) } diff --git a/android-serial.go b/android-serial.go index 35d87a1d..5884ad36 100644 --- a/android-serial.go +++ b/android-serial.go @@ -5,14 +5,12 @@ ISC License package parl -func NewAndroidSerial(s string) (serial AndroidSerial) { - return AndroidSerial(s) -} +// NewAndroidSerial returns Android serial for s +// - typically a string of a dozen or so 8-bit chanacters consisting of +// lower and upper case a-zA-Z0-9 +func NewAndroidSerial(s string) (serial AndroidSerial) { return AndroidSerial(s) } -func (a AndroidSerial) String() (s string) { - return string(a) -} +// IsValid() returns whether a contains a valid Android serial +func (a AndroidSerial) IsValid() (isValid bool) { return len(string(a)) > 0 } -func (a AndroidSerial) IsValid() (isValid bool) { - return len(string(a)) > 0 -} +func (a AndroidSerial) String() (s string) { return string(a) } diff --git a/android-status.go b/android-status.go index 53743f8d..92002115 100644 --- a/android-status.go +++ b/android-status.go @@ -5,18 +5,14 @@ ISC License package parl -func NewAndroidStatus(s string) (status AndroidStatus) { - return AndroidStatus(s) -} +// NewAndroidStatus returns Anddroid status of s +// - AndroidStatus is a single word of ANSII-set characters +func NewAndroidStatus(s string) (status AndroidStatus) { return AndroidStatus(s) } -func (a AndroidStatus) String() (s string) { - return string(a) -} +// IsValid returns whether a conatins a valid Android device status +func (a AndroidStatus) IsValid() (isValid bool) { return len(string(a)) > 0 } -func (a AndroidStatus) IsValid() (isValid bool) { - return len(string(a)) > 0 -} +// IsOnline returns whether the Android status is device online, ie. ready for interactions +func (a AndroidStatus) IsOnline() (isOnline bool) { return a == AndroidOnline } -func (a AndroidStatus) IsOnline() (isOnline bool) { - return a == AndroidOnline -} +func (a AndroidStatus) String() (s string) { return string(a) } diff --git a/atomic-bool.go b/atomic-bool.go deleted file mode 100644 index 55d00ea6..00000000 --- a/atomic-bool.go +++ /dev/null @@ -1,48 +0,0 @@ -/* -© 2021–present Harald Rudell (https://haraldrudell.github.io/haraldrudell/) -ISC License -*/ - -package parl - -import "sync/atomic" - -/* -AtomicBool is a thread-safe flag. -AtomicBool requires no initialization - - var isDone parl.AtomicBool - if isDone.Set() // isDone was not set, but is set now - … - if !isDone.IsTrue() // isDone is not set -*/ -type AtomicBool struct { - value int32 // atomic -} - -const ( - abTrue = int32(1) - abFalse = int32(0) -) - -// IsTrue returns the flag’s current bool value. thread-safe -func (ab *AtomicBool) IsTrue() (isTrue bool) { - return atomic.LoadInt32(&ab.value) == abTrue -} - -// IsFalse returns the flag’s current bool value. thread-safe -func (ab *AtomicBool) IsFalse() (isFalse bool) { - return atomic.LoadInt32(&ab.value) == abFalse -} - -// Set sets the flag to true and returns true if the flag was not already true. -// thread-safe -func (ab *AtomicBool) Set() (wasNotSet bool) { - return atomic.SwapInt32(&ab.value, abTrue) != abTrue -} - -// Clear sets the flag to false and returns true if the flag was not already false. -// thread-safe -func (ab *AtomicBool) Clear() (wasSet bool) { - return atomic.SwapInt32(&ab.value, abFalse) == abTrue -} diff --git a/atomic-bool_test.go b/atomic-bool_test.go deleted file mode 100644 index 75ce5a48..00000000 --- a/atomic-bool_test.go +++ /dev/null @@ -1,26 +0,0 @@ -/* -© 2022–present Harald Rudell https://haraldrudell.github.io/haraldrudell/) -ISC License -*/ - -package parl - -import "testing" - -func TestAtomicBool(t *testing.T) { - var b AtomicBool - - if b.IsTrue() { - t.Error("default AtomicBool value is true") - } - - if !b.Set() { - t.Error("Set returned false") - } - if b.Set() { - t.Error("Set returned true second time") - } - if !b.Clear() { - t.Error("Clear returned true second time") - } -} diff --git a/atomic-counter.go b/atomic-counter.go index ced15242..1f00c55b 100644 --- a/atomic-counter.go +++ b/atomic-counter.go @@ -10,51 +10,47 @@ import ( "sync/atomic" ) -type AtomicCounter uint64 +// AtomicCounter is a uint64 thread-safe counter +type AtomicCounter atomic.Uint64 -func (max *AtomicCounter) Inc() (value uint64) { - value = atomic.AddUint64((*uint64)(max), 1) - return -} +// Inc increments with wrap-around. Thread-Safe +// - value is new value +func (a *AtomicCounter) Inc() (value uint64) { return (*atomic.Uint64)(a).Add(1) } -func (max *AtomicCounter) Inc2() (value uint64, didInc bool) { +// Inc2 is increment without wrap-around. Thread-Safe +// - at math.MaxUint64, increments are ineffective +func (a *AtomicCounter) Inc2() (value uint64, didInc bool) { for { - var beforeValue = atomic.LoadUint64((*uint64)(max)) + var beforeValue = (*atomic.Uint64)(a).Load() if beforeValue == math.MaxUint64 { - return - } else if didInc = atomic.CompareAndSwapUint64((*uint64)(max), beforeValue, beforeValue+1); didInc { - return + return // at max + } else if didInc = (*atomic.Uint64)(a).CompareAndSwap(beforeValue, beforeValue+1); didInc { + return // inc successful return } } } -func (max *AtomicCounter) Dec() (value uint64) { - value = atomic.AddUint64((*uint64)(max), math.MaxUint64) - return -} +// Dec is decrement with wrap-around. Thread-Safe +func (a *AtomicCounter) Dec() (value uint64) { return (*atomic.Uint64)(a).Add(math.MaxUint64) } -func (max *AtomicCounter) Dec2() (value uint64, didDec bool) { +// Dec2 is decrement with no wrap-around. Thread-Safe +// - at 0, decrements are ineffective +func (a *AtomicCounter) Dec2() (value uint64, didDec bool) { for { - var beforeValue = atomic.LoadUint64((*uint64)(max)) + var beforeValue = (*atomic.Uint64)(a).Load() if beforeValue == 0 { - return - } else if didDec = atomic.CompareAndSwapUint64((*uint64)(max), beforeValue, beforeValue-1); didDec { - return + return // no dec return + } else if didDec = (*atomic.Uint64)(a).CompareAndSwap(beforeValue, beforeValue-1); didDec { + return // dec successful return } } } -func (max *AtomicCounter) Add(value uint64) (newValue uint64) { - newValue = atomic.AddUint64((*uint64)(max), value) - return -} +// Add is add with wrap-around. Thread-Safe +func (a *AtomicCounter) Add(value uint64) (newValue uint64) { return (*atomic.Uint64)(a).Add(value) } -func (max *AtomicCounter) Set(value uint64) (oldValue uint64) { - oldValue = atomic.SwapUint64((*uint64)(max), value) - return -} +// Set sets a new aggregate value. Thread-Safe +func (a *AtomicCounter) Set(value uint64) (oldValue uint64) { return (*atomic.Uint64)(a).Swap(value) } -func (max *AtomicCounter) Value() (value uint64) { - value = atomic.LoadUint64((*uint64)(max)) - return -} +// Value returns current counter-value +func (a *AtomicCounter) Value() (value uint64) { return (*atomic.Uint64)(a).Load() } diff --git a/atomic-max-duration.go b/atomic-max-duration.go deleted file mode 100644 index 67fa996f..00000000 --- a/atomic-max-duration.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -© 2023–present Harald Rudell (https://haraldrudell.github.io/haraldrudell/) -ISC License -*/ - -package parl - -import ( - "time" - - "github.com/haraldrudell/parl/perrors" -) - -// AtomicMaxDuration calculates durations maintaining max duration value -// - Thread-Safe but designed for single thread -// - for re-entrant timer, use SlowDetector -type AtomicMaxDuration struct { - t0Reference AtomicReference[time.Time] - dMax AtomicMax[time.Duration] -} - -// Start returns the effective start time for a new timing cycle -// - value is optional start time, default time.Now() -func (ad *AtomicMaxDuration) Start(value ...time.Time) (tStart time.Time) { - - // deteremine tStart and store in atomic reference - var previousReference *time.Time - if tStart, previousReference = ad.do(false, value...); previousReference != nil { - panic(perrors.ErrorfPF("two Start without Stop: %s", (*previousReference).Format(Rfc3339ns))) - } - - return -} - -// Stop returns the duration of a timing cycle -func (ad *AtomicMaxDuration) Stop(value ...time.Time) (duration time.Duration, isNewMax bool) { - - // determine tStop, retrieve atomic reference abnd calculate duration - if tStop, previousReference := ad.do(true, value...); previousReference == nil { - panic(perrors.ErrorfPF("Stop without Start: %s", tStop.Format(Rfc3339ns))) - } else { - duration = tStop.Sub(*previousReference) - } - - // calculate maximum duration - isNewMax = ad.dMax.Value(duration) - - return -} - -// Stop returns the duration of a timing cycle -func (ad *AtomicMaxDuration) Max() (max time.Duration, hasValue bool) { - max, hasValue = ad.dMax.Max() - return -} - -// do returns the previous reference and the active time for a Start or Stop operation -func (ad *AtomicMaxDuration) do(isStop bool, value ...time.Time) (activeTime time.Time, previousReference *time.Time) { - - // get time value for this operation - if len(value) > 0 { - activeTime = value[0] - } else { - activeTime = time.Now() - } - - // do reference swap - var newReference *time.Time - if !isStop { - newReference = &activeTime // start time to store for Start operation - } - previousReference = ad.t0Reference.Put(newReference) - - return -} diff --git a/atomic-max.go b/atomic-max.go index 10ddb9d4..48e44741 100644 --- a/atomic-max.go +++ b/atomic-max.go @@ -12,23 +12,32 @@ import ( "golang.org/x/exp/constraints" ) -type AtomicMax[T constraints.Integer] struct { - value atomic.Uint64 - hasValue atomic.Bool -} +// AtomicMax is a thread-safe max container +type AtomicMax[T constraints.Integer] struct{ value, value0 atomic.Uint64 } -func NewAtomicMax[T constraints.Integer](value T) (atomicMax *AtomicMax[T]) { - m := AtomicMax[T]{} - if value != 0 { - m.value.Store(uint64(value)) // set initial threshold - } - return &m +// NewAtomicMax returns a thread-safe max container +// - T underlying type must be int +// - negative values are not allowed +// - to set initial value, use Init +func NewAtomicMax[T constraints.Integer]() (atomicMax *AtomicMax[T]) { return &AtomicMax[T]{} } + +// Init performs actions that cannot happen prior to copying AtomicMax +// - supports functional chaining +// - Thread-safe +func (m *AtomicMax[T]) Init(value T) (atomicMax *AtomicMax[T]) { + atomicMax = m + m.value.Store(uint64(value)) + m.value0.Store(uint64(value)) // set initial threshold + return } +// Value updates the container possibly with a new Max value +// - value cannot be negative +// - Thread-safe func (m *AtomicMax[T]) Value(value T) (isNewMax bool) { // check if value is a new max - valueU64, err := ints.Unsigned[uint64](value, "") + var valueU64, err = ints.Unsigned[uint64](value, "") if err != nil { panic(err) // value out of range, ie. negative } @@ -36,7 +45,6 @@ func (m *AtomicMax[T]) Value(value T) (isNewMax bool) { if isNewMax = valueU64 > current; !isNewMax { return // not a new max return } - m.hasValue.CompareAndSwap(false, true) // store the new max for { @@ -51,10 +59,15 @@ func (m *AtomicMax[T]) Value(value T) (isNewMax bool) { } } +// Max returns current max and a flag whether a value is present +// - Thread-safe func (m *AtomicMax[T]) Max() (value T, hasValue bool) { - return T(m.value.Load()), m.hasValue.Load() + var u64 = m.value.Load() + value = T(u64) + hasValue = u64 != m.value0.Load() + return } -func (m *AtomicMax[T]) Max1() (value T) { - return T(m.value.Load()) -} +// Max1 returns current maximum whether default or set by Value +// - Thread-safe +func (m *AtomicMax[T]) Max1() (value T) { return T(m.value.Load()) } diff --git a/atomic-min.go b/atomic-min.go index e28fccb9..26600094 100644 --- a/atomic-min.go +++ b/atomic-min.go @@ -12,36 +12,33 @@ import ( "golang.org/x/exp/constraints" ) -const ( - stateUninitialized = 0 - stateHasValue = 1 -) - +// AtomicMin is a thread-safe container for a minimum value of any integer type type AtomicMin[T constraints.Integer] struct { - state uint32 - once sync.Once - value uint64 + isInitialized atomic.Bool // whether a value is present + value atomic.Uint64 // current min value as uint64 + initLock sync.Mutex // thread selector and wait for wtriting initial value } -func (min *AtomicMin[T]) Value(value T) (isNewMin bool) { +// Value notes a new min-candidate +// - if not a new minima, state is not changed +// - Thread-safe +func (a *AtomicMin[T]) Value(value T) (isNewMin bool) { + // value-valueU64 is candidate min-value var valueU64 uint64 = uint64(value) // ensure initialized - if atomic.LoadUint32(&min.state) == stateUninitialized { - min.once.Do(func() { - atomic.StoreUint64(&min.value, valueU64) - atomic.StoreUint32(&min.state, stateHasValue) - isNewMin = true - }) - if isNewMin { - return // value-initializing invocation always has min value + if !a.isInitialized.Load() { + if isNewMin = a.init(valueU64); isNewMin { + return // this thread set initial value return } } // aggregate minimum - var current uint64 = atomic.LoadUint64(&min.value) - if isNewMin = valueU64 < current; !isNewMin { + var current = a.value.Load() + var currentT = T(current) + // make comparison in T domain + if isNewMin = value < currentT; !isNewMin { return // too large value, nothing to do return } @@ -49,21 +46,38 @@ func (min *AtomicMin[T]) Value(value T) (isNewMin bool) { for { // try to write - if atomic.CompareAndSwapUint64(&min.value, current, valueU64) { - return // min.value updated return + if a.value.CompareAndSwap(current, valueU64) { + return // min-value updated return } // load new copy of value - current = atomic.LoadUint64(&min.value) - if current <= valueU64 { - return // min.value now ok return + current = a.value.Load() + currentT = T(current) + if currentT <= value { + return // ok min-value written by other thread return } } } -func (min *AtomicMin[T]) Min() (value T, hasValue bool) { - if hasValue = atomic.LoadUint32(&min.state) == stateHasValue; hasValue { - value = T(atomic.LoadUint64(&min.value)) +// Min returns current minimum value and a flag whether a value is present +// - Thread-safe +func (a *AtomicMin[T]) Min() (value T, hasValue bool) { + if hasValue = a.isInitialized.Load(); !hasValue { + return // no min yet return + } + value = T(a.value.Load()) + return +} + +// init uses lock to have loser threads wait until winner thread has updated value +func (a *AtomicMin[T]) init(valueU64 uint64) (didStore bool) { + a.initLock.Lock() + defer a.initLock.Unlock() + + if didStore = !a.isInitialized.Load(); !didStore { + return // another thread was first } + a.value.Store(valueU64) + a.isInitialized.Store(true) return } diff --git a/atomic-reference.go b/atomic-reference.go deleted file mode 100644 index 9320e734..00000000 --- a/atomic-reference.go +++ /dev/null @@ -1,43 +0,0 @@ -/* -© 2022–present Harald Rudell (https://haraldrudell.github.io/haraldrudell/) -ISC License -*/ - -package parl - -import ( - "sync/atomic" - "unsafe" -) - -// AtomicReference holds a typed reference that is accessed atomically. -type AtomicReference[T any] struct { - reference *T -} - -func MakeAtomicReference[T any]() (reference AtomicReference[T]) { - return AtomicReference[T]{} -} - -func (ref *AtomicReference[T]) Get() (reference *T) { - return (*T)(atomic.LoadPointer( - (*unsafe.Pointer)(unsafe.Pointer(&ref.reference)), - )) -} - -func (ref *AtomicReference[T]) Put(reference *T) (r0 *T) { - r0 = (*T)(atomic.SwapPointer( - (*unsafe.Pointer)(unsafe.Pointer(&ref.reference)), - unsafe.Pointer(reference), - )) - return -} - -func (ref *AtomicReference[T]) PutIf(reference *T, expected *T) (swapped bool) { - swapped = atomic.CompareAndSwapPointer( - (*unsafe.Pointer)(unsafe.Pointer(&ref.reference)), - unsafe.Pointer(expected), - unsafe.Pointer(reference), - ) - return -} diff --git a/awaitable.go b/awaitable.go index 77b85eea..a0f3d448 100644 --- a/awaitable.go +++ b/awaitable.go @@ -10,7 +10,7 @@ import "sync/atomic" // Awaitable is a semaphore that allows any number of threads to inspect // and await an event // - one-to-many, happens-before -// - the synchronization mechanic is closing channel, allowing threads to await +// - the synchronization mechanic is closing channel, allowing consumers to await // multiple events // - status can be inspected in a thread-safe manner: isClosed, isAboutToClose // allows for race-free consumers @@ -35,8 +35,8 @@ func (a *Awaitable) Ch() (ch AwaitableCh) { // - isClosed indicates that the channel is closed // - isAboutToClose indicates that Close has been invoked, // but that channel close may still be in progress -// - the two values are requried to attain race-free consumers // - if isClosed is true, isAboutToClose is also true +// - the two values are requried to attain race-free consumers // - Thread-safe func (a *Awaitable) IsClosed() (isClosed, isAboutToClose bool) { select { @@ -49,10 +49,10 @@ func (a *Awaitable) IsClosed() (isClosed, isAboutToClose bool) { } // Close triggers awaitable by closing the channel -// - upon return, the channel is guarantee to be closed +// - upon return, the channel is guaranteed to be closed // - idempotent, panic-free, thread-safe -func (a *Awaitable) Close() (wasClosed bool) { - if wasClosed = !a.isClosed.CompareAndSwap(false, true); wasClosed { +func (a *Awaitable) Close() (didClose bool) { + if didClose = a.isClosed.CompareAndSwap(false, true); !didClose { <-a.ch // wait to make certain the channel is closed return // already closed return } diff --git a/channel-send.go b/channel-send.go index ed9358b8..06ab7356 100644 --- a/channel-send.go +++ b/channel-send.go @@ -10,7 +10,8 @@ import ( "github.com/haraldrudell/parl/pruntime" ) -const SendNonBlocking = true // with nonBlocking set to SendNonBlocking, ChannelSend will never block +// with nonBlocking set to SendNonBlocking, ChannelSend will never block +const SendNonBlocking = true // ChannelSend is channel send without panics and possibly non-blocking // - if nonBlocking is SendNonBlocking or true, channel send will be attempted but not block @@ -20,21 +21,36 @@ const SendNonBlocking = true // with nonBlocking set to SendNonBlocking, Channel // - isClosedChannel is true if the panic was caused by ch being closed // - there should be no panics other than from ch being closed func ChannelSend[T any](ch chan<- T, value T, nonBlocking ...bool) (didSend, isNilChannel, isClosedChannel bool, err error) { + + // check for nil channel + if isNilChannel = ch == nil; isNilChannel { + err = perrors.NewPF("ch channel nil") + return + } + + // get non-blocking flag var sendNb bool if len(nonBlocking) > 0 { sendNb = nonBlocking[0] } - defer Recover(Annotation(), &err, func(e error) { - if pruntime.IsSendOnClosedChannel(e) { - isClosedChannel = true - } - }) - if isNilChannel = ch == nil; isNilChannel { - err = perrors.NewPF("ch channel nil") - return + // send, recovering panics + didSend, err = channelSend(ch, value, sendNb) + + // set isClosed flag + if err != nil && pruntime.IsSendOnClosedChannel(err) { + isClosedChannel = true } + return +} + +// channelSend sends possibly non-blocking +// - the only way to determine closed channel is to send, which panics +// - a separate function to recover the panic +func channelSend[T any](ch chan<- T, value T, sendNb bool) (didSend bool, err error) { + defer Recover(Annotation(), &err, NoOnError) + // send non-blocking if sendNb { select { diff --git a/cyclic-awaitable.go b/cyclic-awaitable.go index 7fc69c32..ba86ee3f 100644 --- a/cyclic-awaitable.go +++ b/cyclic-awaitable.go @@ -19,9 +19,7 @@ const ( // allows for race-free consumers // - Close is idempotent, panic-free // - if atomic.Pointer[Awaitable] is used for retrieval, a cyclic semaphore is achieved -type CyclicAwaitable struct { - p atomic.Pointer[Awaitable] -} +type CyclicAwaitable atomic.Pointer[Awaitable] // NewCyclicAwaitable returns an awaitable that can be re-initialized // - Init must be invoked prior to use @@ -40,31 +38,31 @@ func (a *CyclicAwaitable) Init(initiallyClosed ...bool) (a2 *CyclicAwaitable) { if shouldBeClosed { awaitable.Close() } - a.p.Store(awaitable) + (*atomic.Pointer[Awaitable])(a).Store(awaitable) return } // Ch returns an awaitable channel. Thread-safe func (a *CyclicAwaitable) Ch() (ch AwaitableCh) { - return a.p.Load().Ch() + return (*atomic.Pointer[Awaitable])(a).Load().Ch() } // isClosed inspects whether the awaitable has been triggered // - isClosed indicates that the channel is closed // - isAboutToClose indicates that Close has been invoked, // but that channel close may still be in progress -// - the two values are requried to attain race-free consumers // - if isClosed is true, isAboutToClose is also true +// - the two values are requried to attain race-free consumers // - Thread-safe func (a *CyclicAwaitable) IsClosed() (isClosed, isAboutToClose bool) { - return a.p.Load().IsClosed() + return (*atomic.Pointer[Awaitable])(a).Load().IsClosed() } // Close triggers awaitable by closing the channel -// - upon return, the channel is guarantee to be closed +// - upon return, the channel is guaranteed to be closed // - idempotent, panic-free, thread-safe -func (a *CyclicAwaitable) Close() (wasClosed bool) { - return a.p.Load().Close() +func (a *CyclicAwaitable) Close() (didClose bool) { + return (*atomic.Pointer[Awaitable])(a).Load().Close() } // Open rearms the awaitable for another cycle @@ -73,7 +71,7 @@ func (a *CyclicAwaitable) Close() (wasClosed bool) { func (a *CyclicAwaitable) Open() (didOpen bool) { var openp *Awaitable for { - var ap = a.p.Load() + var ap = (*atomic.Pointer[Awaitable])(a).Load() var isClosed, _ = ap.IsClosed() if !isClosed { return // was open return @@ -81,7 +79,7 @@ func (a *CyclicAwaitable) Open() (didOpen bool) { if openp == nil { openp = NewAwaitable() } - if didOpen = a.p.CompareAndSwap(ap, openp); didOpen { + if didOpen = (*atomic.Pointer[Awaitable])(a).CompareAndSwap(ap, openp); didOpen { return // did open the channel return } } diff --git a/cyclic-waiter.go b/cyclic-waiter.go index a9c2202b..fe420a56 100644 --- a/cyclic-waiter.go +++ b/cyclic-waiter.go @@ -8,6 +8,7 @@ package parl import ( "context" "sync" + "sync/atomic" "github.com/haraldrudell/parl/perrors" ) @@ -25,7 +26,7 @@ import ( // - the cycles can be permanently canceled or trigged and rearmed type CyclicWait struct { parentContext context.Context - isCancel AtomicBool + isCancel atomic.Bool lock sync.RWMutex ow OnceWaiter @@ -91,14 +92,14 @@ func (cw *CyclicWait) Cancel() { defer cw.lock.Unlock() // trig this cycle - cw.isCancel.Set() + cw.isCancel.Store(true) cw.ow.Cancel() } // IsCancel returns whether Cancel has been invoked. // ISCancel will return false during CancelAndRearm cycles. func (cw *CyclicWait) IsCancel() (isCancel bool) { - return cw.isCancel.IsTrue() + return cw.isCancel.Load() } // CancelAndRearm trigs the object and then rearms unless @@ -110,7 +111,7 @@ func (cw *CyclicWait) CancelAndRearm() (wasRearmed bool) { // trig this cycle cw.ow.Cancel() - if cw.parentContext.Err() != nil || cw.isCancel.IsTrue() { + if cw.parentContext.Err() != nil || cw.isCancel.Load() { return // ream false: parent context has been canceled } diff --git a/enum/enum-item-impl.go b/enum/enum-item-impl.go index ed34dcdd..6e8d3a02 100644 --- a/enum/enum-item-impl.go +++ b/enum/enum-item-impl.go @@ -7,6 +7,7 @@ package enum import ( "github.com/haraldrudell/parl" + "github.com/haraldrudell/parl/sets" "golang.org/x/exp/constraints" ) @@ -19,6 +20,8 @@ type EnumItemImpl[K constraints.Ordered, V any] struct { Full string // sentence describing this flag } +var _ sets.Element[int] = &EnumItemImpl[int, int]{} + func NewEnumItemImpl[K constraints.Ordered, V any](value V, key K, full string) (item parl.EnumItem[K, V]) { return &EnumItemImpl[K, V]{KeyK: key, Full: full, ValueV: value} } diff --git a/future.go b/future.go index 9b2bf40c..40606df3 100644 --- a/future.go +++ b/future.go @@ -7,13 +7,14 @@ package parl import ( "sync" + "sync/atomic" ) // Future contains an awaitable calculation using performant // sync.RWMutex and atomics. Thread-safe type Future[T any] struct { // isCompleted makes lock atomic-access observable - isCompleted AtomicBool + isCompleted atomic.Bool // lock’s write lock is held by the calculating thread until the calculation completes // - other threads are held waiting in Result using RLock for the calculation to complete @@ -37,7 +38,7 @@ func NewFuture[T any]() (calculation *Future[T]) { // IsCompleted returns whether the calculation is complete. Thread-safe func (cn *Future[T]) IsCompleted() (isCompleted bool) { - return cn.isCompleted.IsTrue() + return cn.isCompleted.Load() } // Result retrieves the calculation’s result. May block. Thread-safe @@ -54,7 +55,7 @@ func (cn *Future[T]) Result() (result T, isValid bool) { // - result is considered valid if errp is nil or *errp is nil func (cn *Future[T]) End(result *T, errp *error) { defer cn.lock.Unlock() - defer cn.isCompleted.Set() + defer cn.isCompleted.Store(true) // store value if calculation successful if cn.isValid = errp == nil || *errp == nil; cn.isValid { diff --git a/g0/go-chain.go b/g0/go-chain.go index 39321ee0..384ea7f6 100644 --- a/g0/go-chain.go +++ b/g0/go-chain.go @@ -49,7 +49,7 @@ func GoNo(g0 parl.GoGen) (goNo string) { case *Go: goNo = "Go" + g.id.String() + ":" + g.GoID().String() case *GoGroup: - if g.hasErrorChannel.IsFalse() { + if !g.hasErrorChannel.Load() { goNo = "SubGo" } else if g.parent != nil { goNo = "SubGroup" diff --git a/g0/go-group.go b/g0/go-group.go index ca1d2a33..71e126d9 100644 --- a/g0/go-group.go +++ b/g0/go-group.go @@ -8,6 +8,7 @@ package g0 import ( "context" "sync" + "sync/atomic" "github.com/haraldrudell/parl" "github.com/haraldrudell/parl/parli" @@ -41,16 +42,16 @@ const ( type GoGroup struct { creator pruntime.CodeLocation parent goGroupParent - hasErrorChannel parl.AtomicBool // this GoGroup uses its error channel: NewGoGroup() or SubGroup() - isSubGroup parl.AtomicBool // is SubGroup(): not NewGoGroup() or SubGo() - hadFatal parl.AtomicBool + hasErrorChannel atomic.Bool // this GoGroup uses its error channel: NewGoGroup() or SubGroup() + isSubGroup atomic.Bool // is SubGroup(): not NewGoGroup() or SubGo() + hadFatal atomic.Bool onFirstFatal parl.GoFatalCallback gos parli.ThreadSafeMap[GoEntityID, *ThreadData] goContext // Cancel() Context() ch parl.NBChan[parl.GoError] - noTermination parl.AtomicBool - isDebug parl.AtomicBool - aggregateThreads parl.AtomicBool + noTermination atomic.Bool + isDebug atomic.Bool + aggregateThreads atomic.Bool // doneLock ensures: // - consistency of data during GoDone // - change in isWaitGroupDone and g0.goEntityID.wg.DoneBool is atomic @@ -179,18 +180,18 @@ func new( gos: pmaps.NewRWMap[GoEntityID, *ThreadData](), } if parl.IsThisDebug() { - g.isDebug.Set() + g.isDebug.Store(true) } if len(onFirstFatal) > 0 { g.onFirstFatal = onFirstFatal[0] } if hasErrorChannel { - g.hasErrorChannel.Set() + g.hasErrorChannel.Store(true) } if isSubGroup { - g.isSubGroup.Set() + g.isSubGroup.Store(true) } - if g.isDebug.IsTrue() { + if g.isDebug.Load() { s := "new:" + g.typeString() if parent != nil { if p, ok := parent.(*GoGroup); ok { @@ -208,10 +209,10 @@ func (g0 *GoGroup) Add(goEntityID GoEntityID, threadData *ThreadData) { defer g0.doneLock.Unlock() g0.wg.Add(1) - if g0.isDebug.IsTrue() { + if g0.isDebug.Load() { parl.Log("goGroup#%s:Add(id%s:%s)#%d", g0.G0ID(), goEntityID, threadData.Short(), g0.goEntityID.wg.Count()) } - if g0.aggregateThreads.IsTrue() { + if g0.aggregateThreads.Load() { g0.gos.Put(goEntityID, threadData) } if g0.parent != nil { @@ -220,7 +221,7 @@ func (g0 *GoGroup) Add(goEntityID GoEntityID, threadData *ThreadData) { } func (g0 *GoGroup) UpdateThread(goEntityID GoEntityID, threadData *ThreadData) { - if g0.aggregateThreads.IsTrue() { + if g0.aggregateThreads.Load() { g0.gos.Put(goEntityID, threadData) } if g0.parent != nil { @@ -235,7 +236,7 @@ func (g0 *GoGroup) GoDone(thread parl.Go, err error) { } // first fatal thread-exit of this thread-group - if err != nil && g0.hadFatal.Set() { + if err != nil && g0.hadFatal.CompareAndSwap(false, true) { // handle FirstFatal() g0.setFirstFatal() @@ -260,7 +261,7 @@ func (g0 *GoGroup) GoDone(thread parl.Go, err error) { panic(perrors.ErrorfPF("in GoGroup after termination: %s", perrors.Short(err))) } - if g0.isDebug.IsTrue() { + if g0.isDebug.Load() { var threadData parl.ThreadData var id string if thread != nil { @@ -278,7 +279,7 @@ func (g0 *GoGroup) GoDone(thread parl.Go, err error) { panic(perrors.NewPF("type assertion failed")) } g0.gos.Delete(goImpl.G0ID(), parli.MapDeleteWithZeroValue) - if g0.isSubGroup.IsTrue() { + if g0.isSubGroup.Load() { // SubGroup with its own error channel with fatals not affecting parent // send fatal error to parent as non-fatal error with error context GeLocalChan @@ -288,7 +289,7 @@ func (g0 *GoGroup) GoDone(thread parl.Go, err error) { // pretend good thread exit to parent g0.parent.GoDone(thread, nil) } - if g0.hasErrorChannel.IsTrue() { + if g0.hasErrorChannel.Load() { // emit on local error channel var context parl.GoErrorContext @@ -307,10 +308,10 @@ func (g0 *GoGroup) GoDone(thread parl.Go, err error) { g0.parent.GoDone(thread, err) } - if g0.isDebug.IsTrue() { + if g0.isDebug.Load() { s := "goGroup#" + g0.G0ID().String() + ":" if isTermination { - s += parl.Sprintf("Terminated:isSubGroup:%t:hasEc:%t", g0.isSubGroup.IsTrue(), g0.hasErrorChannel.IsTrue()) + s += parl.Sprintf("Terminated:isSubGroup:%t:hasEc:%t", g0.isSubGroup.Load(), g0.hasErrorChannel.Load()) } else { s += Shorts(g0.Threads()) } @@ -363,7 +364,7 @@ func (g0 *GoGroup) FirstFatal() (firstFatal *parl.OnceWaiterRO) { if g0.onceWaiter == nil { g0.onceWaiter = parl.NewOnceWaiter(context.Background()) - if g0.hadFatal.IsTrue() { + if g0.hadFatal.Load() { g0.onceWaiter.Cancel() } } @@ -371,20 +372,20 @@ func (g0 *GoGroup) FirstFatal() (firstFatal *parl.OnceWaiterRO) { } func (g0 *GoGroup) EnableTermination(allowTermination bool) { - if g0.isDebug.IsTrue() { + if g0.isDebug.Load() { parl.Log("goGroup%s#:EnableTermination:%t", g0.G0ID(), allowTermination) } if g0.endCh.IsClosed() { return // GoGroup is already shutdown return } else if !allowTermination { - if g0.noTermination.Set() { // prevent termination, it was previously allowed + if g0.noTermination.CompareAndSwap(false, true) { // prevent termination, it was previously allowed g0.CascadeEnableTermination(1) } return // prevent termination complete } // now allow termination - if !g0.noTermination.Clear() { + if !g0.noTermination.CompareAndSwap(true, false) { return // termination allowed already } @@ -397,7 +398,7 @@ func (g0 *GoGroup) EnableTermination(allowTermination bool) { return // GoGroup did not terminate } - if g0.hasErrorChannel.IsTrue() { + if g0.hasErrorChannel.Load() { g0.ch.Close() // close local error channel } // mark GoGroup terminated @@ -405,7 +406,7 @@ func (g0 *GoGroup) EnableTermination(allowTermination bool) { g0.goContext.Cancel() } -func (g0 *GoGroup) IsEnableTermination() (mayTerminate bool) { return !g0.noTermination.IsTrue() } +func (g0 *GoGroup) IsEnableTermination() (mayTerminate bool) { return !g0.noTermination.Load() } // CascadeEnableTermination manipulates wait groups of this goGroup and // those of its parents to allow or prevent termination @@ -462,24 +463,24 @@ func (g0 *GoGroup) NamedThreads() (threads []parl.ThreadData) { func (g0 *GoGroup) SetDebug(debug parl.GoDebug) { if debug == parl.DebugPrint { - g0.isDebug.Set() - g0.aggregateThreads.Set() + g0.isDebug.Store(true) + g0.aggregateThreads.Store(true) return } - g0.isDebug.Clear() + g0.isDebug.Store(false) if debug == parl.AggregateThread { - g0.aggregateThreads.Set() + g0.aggregateThreads.Store(true) return } - g0.aggregateThreads.Clear() + g0.aggregateThreads.Store(false) } // Cancel signals shutdown to all threads of a thread-group. func (g *GoGroup) Cancel() { g.goContext.Cancel() - if g.isEnd() || g.goContext.wg.Count() > 0 || g.noTermination.IsTrue() { + if g.isEnd() || g.goContext.wg.Count() > 0 || g.noTermination.Load() { return // already ended or have child object or termination off return } @@ -489,10 +490,10 @@ func (g *GoGroup) Cancel() { g.doneLock.Lock() defer g.doneLock.Unlock() - if g.isEnd() || g.goContext.wg.Count() > 0 || g.noTermination.IsTrue() { + if g.isEnd() || g.goContext.wg.Count() > 0 || g.noTermination.Load() { return // already ended or have child object or termination off return } - if g.hasErrorChannel.IsTrue() { + if g.hasErrorChannel.Load() { g.ch.Close() // close local error channel } // mark GoGroup terminated @@ -537,7 +538,7 @@ func (g0 *GoGroup) setFirstFatal() { func (g0 *GoGroup) isEnd() (isEnd bool) { // SubGo termination flag - if !g0.hasErrorChannel.IsTrue() { + if !g0.hasErrorChannel.Load() { return g0.endCh.IsClosed() } @@ -549,7 +550,7 @@ func (g0 *GoGroup) isEnd() (isEnd bool) { func (g0 *GoGroup) typeString() (s string) { if g0.parent == nil { s = "goGroup" - } else if g0.isSubGroup.IsTrue() { + } else if g0.isSubGroup.Load() { s = "subGroup" } else { s = "subGo" diff --git a/g0/go-group_test.go b/g0/go-group_test.go index b563f9ee..8ad273ab 100644 --- a/g0/go-group_test.go +++ b/g0/go-group_test.go @@ -137,18 +137,18 @@ func TestGoGroup(t *testing.T) { goGroup = NewGoGroup(context.Background()) subGo = goGroup.SubGo() goGroupImpl = subGo.(*GoGroup) - if goGroupImpl.isSubGroup.IsTrue() { + if goGroupImpl.isSubGroup.Load() { t.Error("SubGo returned SubGroup") } - if goGroupImpl.hasErrorChannel.IsTrue() { + if goGroupImpl.hasErrorChannel.Load() { t.Error("SubGo has error channel") } subGroup = goGroup.SubGroup() goGroupImpl = subGroup.(*GoGroup) - if goGroupImpl.isSubGroup.IsFalse() { + if !goGroupImpl.isSubGroup.Load() { t.Error("SubGroup did not return SubGroup") } - if goGroupImpl.hasErrorChannel.IsFalse() { + if !goGroupImpl.hasErrorChannel.Load() { t.Error("SubGroup does not have error channel") } @@ -183,11 +183,11 @@ func TestGoGroup(t *testing.T) { t.Error("goGroup no-debug collects threads") } goGroup.SetDebug(parl.DebugPrint) - if goGroupImpl.isDebug.IsFalse() { + if !goGroupImpl.isDebug.Load() { t.Error("goGroup.SetDebug DebugPrint failed") } goGroup.SetDebug(parl.AggregateThread) - if goGroupImpl.isDebug.IsTrue() { + if goGroupImpl.isDebug.Load() { t.Error("goGroup.SetDebug AggregateThread failed") } goGroup.Go().Register(label) diff --git a/g0/thread-logger.go b/g0/thread-logger.go index 5ce78ceb..18955a22 100644 --- a/g0/thread-logger.go +++ b/g0/thread-logger.go @@ -82,7 +82,7 @@ func (t *ThreadLogger) Log() (t2 *ThreadLogger) { close(t.endCh) return // thread-group already ended } - g.aggregateThreads.Set() + g.aggregateThreads.Store(true) if g.Context().Err() == nil { g.goContext.cancelListener = t.cancelListener @@ -125,7 +125,7 @@ func (t *ThreadLogger) printThread() { defer ticker.Stop() var endCh <-chan struct{} - if g.hasErrorChannel.IsTrue() { + if g.hasErrorChannel.Load() { endCh = g.ch.WaitForCloseCh() } else { endCh = g.endCh.Ch() diff --git a/g0/thread-safe-thread-data.go b/g0/thread-safe-thread-data.go index fd2ed4bd..1a8bce54 100644 --- a/g0/thread-safe-thread-data.go +++ b/g0/thread-safe-thread-data.go @@ -7,6 +7,7 @@ package g0 import ( "sync" + "sync/atomic" "github.com/haraldrudell/parl" "github.com/haraldrudell/parl/pruntime" @@ -16,7 +17,7 @@ import ( // - ThreadSafeThreadData does not have initialization // - haveThreadID indicates whether data is present type ThreadSafeThreadData struct { - haveThreadID parl.AtomicBool + haveThreadID atomic.Bool lock sync.RWMutex td ThreadData @@ -25,7 +26,7 @@ type ThreadSafeThreadData struct { // HaveThreadID indicates whether Update has been invoked on this ThreadDataWrap // object. func (tw *ThreadSafeThreadData) HaveThreadID() (haveThreadID bool) { - return tw.haveThreadID.IsTrue() + return tw.haveThreadID.Load() } // Update populates the wrapped ThreadData from the stack trace. @@ -40,7 +41,7 @@ func (tw *ThreadSafeThreadData) Update( tw.td.Update(threadID, createInvocation, goFunction, label) if threadID.IsValid() { - tw.haveThreadID.Set() // if we know have a vald ThreadID + tw.haveThreadID.Store(true) // if we know have a vald ThreadID } } diff --git a/go.mod b/go.mod index 4907eb9f..441b3155 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ toolchain go1.21.3 require ( github.com/google/btree v1.1.2 - github.com/haraldrudell/parl/yamler v0.4.114 + github.com/haraldrudell/parl/yamler v0.4.115 golang.org/x/exp v0.0.0-20231006140011-7918f672742d golang.org/x/sys v0.13.0 golang.org/x/text v0.13.0 diff --git a/go.sum b/go.sum index ac958bcf..fc7b0eba 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= -github.com/haraldrudell/parl/yamler v0.4.114 h1:Ptpjvn5/38zA+ozoqSdVpirgvDqLWrlGW4ZbkgrYL5M= -github.com/haraldrudell/parl/yamler v0.4.114/go.mod h1:Bd+PCMUuBonpyQX7VMxK5WzWM2gmJ+HxDP53BRxwsv4= +github.com/haraldrudell/parl/yamler v0.4.115 h1:D0BUn2pkON3k2yLm0yBC/EE/1cirtZtVUcDwkcKDPgQ= +github.com/haraldrudell/parl/yamler v0.4.115/go.mod h1:N130FM4CNuyAwuPDT0YvF+zlF3tvCRPYtvhQOEExAyc= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= diff --git a/if-adb-serverette.go b/if-adb-serverette.go index 5636ecc5..4cc5535e 100644 --- a/if-adb-serverette.go +++ b/if-adb-serverette.go @@ -71,7 +71,7 @@ type ServerFactory interface { // AndroidStatus indicates the current status of a device // known to a Server or Serverette -// it is a single word of ANSII-set characters +// - AndroidStatus is a single word of ANSII-set characters type AndroidStatus string // AndroidOnline is the Android device status diff --git a/internal/cyclebreaker/atomic-bool.go b/internal/cyclebreaker/atomic-bool.go deleted file mode 100644 index 59d5a9d2..00000000 --- a/internal/cyclebreaker/atomic-bool.go +++ /dev/null @@ -1,48 +0,0 @@ -/* -© 2021–present Harald Rudell (https://haraldrudell.github.io/haraldrudell/) -ISC License -*/ - -package cyclebreaker - -import "sync/atomic" - -/* -AtomicBool is a thread-safe flag. -AtomicBool requires no initialization - - var isDone parl.AtomicBool - if isDone.Set() // isDone was not set, but is set now - … - if !isDone.IsTrue() // isDone is not set -*/ -type AtomicBool struct { - value int32 // atomic -} - -const ( - abTrue = int32(1) - abFalse = int32(0) -) - -// IsTrue returns the flag’s current bool value. thread-safe -func (ab *AtomicBool) IsTrue() (isTrue bool) { - return atomic.LoadInt32(&ab.value) == abTrue -} - -// IsFalse returns the flag’s current bool value. thread-safe -func (ab *AtomicBool) IsFalse() (isFalse bool) { - return atomic.LoadInt32(&ab.value) == abFalse -} - -// Set sets the flag to true and returns true if the flag was not already true. -// thread-safe -func (ab *AtomicBool) Set() (wasNotSet bool) { - return atomic.SwapInt32(&ab.value, abTrue) != abTrue -} - -// Clear sets the flag to false and returns true if the flag was not already false. -// thread-safe -func (ab *AtomicBool) Clear() (wasSet bool) { - return atomic.SwapInt32(&ab.value, abFalse) == abTrue -} diff --git a/internal/cyclebreaker/atomic-max.go b/internal/cyclebreaker/atomic-max.go index 3ddd42ff..6348777a 100644 --- a/internal/cyclebreaker/atomic-max.go +++ b/internal/cyclebreaker/atomic-max.go @@ -14,7 +14,7 @@ import ( type AtomicMax[T constraints.Integer] struct { value uint64 - hasValue AtomicBool + hasValue atomic.Bool } func NewAtomicMax[T constraints.Integer](value T) (atomicMax *AtomicMax[T]) { @@ -35,7 +35,7 @@ func (max *AtomicMax[T]) Value(value T) (isNewMax bool) { if isNewMax = valueU64 > current; !isNewMax { return // not a new max return } - max.hasValue.Set() + max.hasValue.Store(true) // store the new max for { @@ -52,7 +52,7 @@ func (max *AtomicMax[T]) Value(value T) (isNewMax bool) { func (max *AtomicMax[T]) Max() (value T, hasValue bool) { value = T(atomic.LoadUint64((*uint64)(&max.value))) - hasValue = max.hasValue.IsTrue() + hasValue = max.hasValue.Load() return } diff --git a/mains/go.mod b/mains/go.mod index 38679f5a..af16ed64 100644 --- a/mains/go.mod +++ b/mains/go.mod @@ -7,7 +7,7 @@ toolchain go1.21.3 replace github.com/haraldrudell/parl => ../../parl require ( - github.com/haraldrudell/parl v0.4.114 + github.com/haraldrudell/parl v0.4.115 golang.org/x/sys v0.13.0 ) diff --git a/mutex-wait.go b/mutex-wait.go index 6514b18d..a449d6c1 100644 --- a/mutex-wait.go +++ b/mutex-wait.go @@ -5,12 +5,15 @@ ISC License package parl -import "sync" +import ( + "sync" + "sync/atomic" +) // MutexWait is maximum-lightweight observable single-fire Mutex. Thread-Safe type MutexWait struct { lock sync.Mutex - isUnlocked AtomicBool + isUnlocked atomic.Bool } // NewMutexWait returns a maximum-lightweight observable single-fire Mutex. Thread-Safe @@ -22,7 +25,7 @@ func NewMutexWait() (mutexWait *MutexWait) { // IsUnlocked returns whether the MutexWait has fired func (mw *MutexWait) IsUnlocked() (isUnlocked bool) { - return mw.isUnlocked.IsTrue() + return mw.isUnlocked.Load() } // Wait blocks until MutexWait has fired @@ -33,7 +36,7 @@ func (mw *MutexWait) Wait() { // Unlock fires MutexWait func (mw *MutexWait) Unlock() { - if mw.isUnlocked.Set() { + if mw.isUnlocked.CompareAndSwap(false, true) { mw.lock.Unlock() } } diff --git a/once.go b/once.go index 32b1e434..19afa2e1 100644 --- a/once.go +++ b/once.go @@ -5,7 +5,10 @@ ISC License package parl -import "sync" +import ( + "sync" + "sync/atomic" +) // parl.Once is an observable sync.Once with an alternative DoErr method. Thread-safe // - parl.Once is thread-safe and does not require initialization @@ -15,9 +18,9 @@ import "sync" // - Result returns DoErr outcome, hasValue indicates if values are present. Atomic eprformance type Once struct { once sync.Once - isDone AtomicBool // isDone indicates if the Once has completed, either by Do or Doerr + isDone atomic.Bool // isDone indicates if the Once has completed, either by Do or Doerr - hasResult AtomicBool // hasResult is true when Once has been completed by DoErr + hasResult atomic.Bool // hasResult is true when Once has been completed by DoErr // result is the outcome of a possible DoErr invocation // - thread-safe by hasResult atomic result InvokeResult @@ -59,13 +62,13 @@ func (o *Once) DoErr(f func() (err error)) (didOnce, isPanic bool, err error) { // IsDone returns whether the Once did execute, provided with atomic performance func (o *Once) IsDone() (isDone bool) { - return o.isDone.IsTrue() + return o.isDone.Load() } // Result returns the DoErr outcome provided with atomic performance // - only available if hasResult is true func (o *Once) Result() (isPanic bool, hasResult bool, err error) { - if hasResult = o.hasResult.IsTrue(); !hasResult { + if hasResult = o.hasResult.Load(); !hasResult { return // no result available return } @@ -86,7 +89,7 @@ type doErrData struct { // invokeFErr is behind o.once func (d *doErrData) invokeFErr() { - defer d.isDone.Set() + defer d.isDone.Store(true) defer d.updateResult() *d.didOnce = true @@ -94,7 +97,7 @@ func (d *doErrData) invokeFErr() { } func (d *doErrData) updateResult() { - defer d.hasResult.Set() + defer d.hasResult.Store(true) d.result.IsPanic = *d.isPanic d.result.Err = *d.errp @@ -107,7 +110,7 @@ type doErrF struct { // invokeF is behind o.once func (d *doErrF) invokeF() { - defer d.isDone.Set() + defer d.isDone.Store(true) d.f() } diff --git a/parl.go b/parl.go index 04405613..c94effff 100644 --- a/parl.go +++ b/parl.go @@ -7,19 +7,21 @@ ISC License Package parl handles inter-thread communication and controls parallelism parl has sub-packages augmenting the Go standard library: - perrors pfs plog pnet pos pruntime psql pstrings - psyscall pterm ptime + + perrors pfs plog pnet pos pruntime psql pstrings + psyscall pterm ptime parl has feature packages: - ev — handling of goroutine-based functions - goid — unique goroutine IDs - mains — functions for writing command-line utilities and services - parlca — self-signed certificate authority - progress — monitor work progress for a large number of threads - sqlite — local SQL database - threadprof — profiling and counters for what threads are doing - // statuser: thread hang detector - tracer — event lists by task rather than by time or thread + + ev — handling of goroutine-based functions + goid — unique goroutine IDs + mains — functions for writing command-line utilities and services + parlca — self-signed certificate authority + progress — monitor work progress for a large number of threads + sqlite — local SQL database + threadprof — profiling and counters for what threads are doing + // statuser: thread hang detector + tracer — event lists by task rather than by time or thread parl features per-writer thread-safe logging with topic and per-package output control: @@ -30,20 +32,21 @@ One argument is output as string, two or more arguments is Printf. The location matched against the regular expression is full package path, optional type receiver and the funtion name: “github.com/haraldrudell/mypackage.(*MyType).MyFunc” - Out(string, ...interface{}) — Standard output - Log(string, ...interface{}) — Always outputs to stderr - parl.D(string, ...interface{}) — Same as Log, intended for temporary use - Info(string, ...interface{}) — Informational progress messages - SetSilent(true) — removes Info output - IsSilent() — deteremines if Info printing applies + Out(string, ...interface{}) — Standard output + Log(string, ...interface{}) — Always outputs to stderr + parl.D(string, ...interface{}) — Same as Log, intended for temporary use + + Info(string, ...interface{}) — Informational progress messages + SetSilent(true) — removes Info output + IsSilent() — deteremines if Info printing applies - Debug(string, ...interface{}) — only prints for locations where SetDebug(true) - SetDebug(true) — Control Debug() globally, code location for all prints, long stack traces - SetRegexp(regExp string) (err error) — Regular expression controlling local Debug() printing - IsThisDebug() — Determines if debug is active for the executing function + Debug(string, ...interface{}) — only prints for locations where SetDebug(true) + SetDebug(true) — Control Debug() globally, code location for all prints, long stack traces + SetRegexp(regExp string) (err error) — Regular expression controlling local Debug() printing + IsThisDebug() — Determines if debug is active for the executing function - Console(string, ...interface{}) — terminal interactivity output + Console(string, ...interface{}) — terminal interactivity output parl.Recover() and parl.Recover2() thread recovery and mains.Executable.Recover() process recovery: @@ -53,36 +56,37 @@ plain error channels, parl.NBChan[error] or parl.ClosableChan[error]. parl.Recover and parl.Recover2 convert thread panic to error along with regular errors, annotating, retrieving and storing those errors and invoking error handling functions for them. mains.Recover is similar for the process. - func thread(errCh *parl.NBChan[error]) { // real-time non-blocking error channel - defer errCh.Close() // non-blocking close effective on send complete - var err error - defer parl.Recover2(parl.Annotation(), &err, errCh.Send) - errCh.Ch() <- err // non-blocking - if err = someFunc(); err != nil { - err = perrors.Errorf("someFunc: %w", err) // labels and attaches a stack - return - … - func myThreadSafeThread(wg *sync.WaitGroup, errs *perrors.ParlError) { // ParlError: thread-safe error store - defer wg.Done() - var err error - defer parl.Recover(parl.Annotation(), &err, errs.AddErrorProc) - … + func thread(errCh *parl.NBChan[error]) { // real-time non-blocking error channel + defer errCh.Close() // non-blocking close effective on send complete + var err error + defer parl.Recover2(parl.Annotation(), &err, errCh.Send) + errCh.Ch() <- err // non-blocking + if err = someFunc(); err != nil { + err = perrors.Errorf("someFunc: %w", err) // labels and attaches a stack + return + … + func myThreadSafeThread(wg *sync.WaitGroup, errs *perrors.ParlError) { // ParlError: thread-safe error store + defer wg.Done() + var err error + defer parl.Recover(parl.Annotation(), &err, errs.AddErrorProc) + … parl package features: - AtomicBool — Thread-safe boolean - Closer — Deferrable, panic-free channel close - ClosableChan — Initialization-free channel with observable deferrable panic-free close - Moderator — A ticketing system for limited parallelism - NBChan — A non-blocking channel with trillion-size dynamic buffer - SerialDo — Serialization of invocations - WaitGroup —Observable WaitGroup - Debouncer — Invocation debouncer, pre-generics - Sprintf — Supporting thousands separator - -Parl is about 15,000 lines of Go code with first line written on November 21, 2018 - -On March 16th, 2022, parl was open-sourced under an ISC License + + atomic.Bool — Thread-safe boolean + Closer — Deferrable, panic-free channel close + ClosableChan — Initialization-free channel with observable deferrable panic-free close + Moderator — A ticketing system for limited parallelism + NBChan — A non-blocking channel with trillion-size dynamic buffer + SerialDo — Serialization of invocations + WaitGroup —Observable WaitGroup + Debouncer — Invocation debouncer, pre-generics + Sprintf — Supporting thousands separator + +# Parl is about 15,000 lines of Go code with first line written on November 21, 2018 + +# On March 16th, 2022, parl was open-sourced under an ISC License © 2018–present Harald Rudell (https://haraldrudell.github.io/haraldrudell/) */ diff --git a/pfmt/no-recurse-v-print.go b/pfmt/no-recurse-v-print.go index 2b85efd7..aff8969f 100644 --- a/pfmt/no-recurse-v-print.go +++ b/pfmt/no-recurse-v-print.go @@ -10,9 +10,7 @@ import ( "strings" ) -type structWithPrivateFieldAny struct { - any -} +type structWithPrivateFieldAny struct{ any } // NoRecurseVPrint returns the reflection string representation of value // without invoking the String method. diff --git a/pfs/directory-lister.go b/pfs/directory-lister.go index 41cb51b6..09d138a9 100644 --- a/pfs/directory-lister.go +++ b/pfs/directory-lister.go @@ -23,7 +23,7 @@ type DirectoryLister struct { Abs string Results chan *EntryResult sdChan chan struct{} - isShutdown parl.AtomicBool + isShutdown atomic.Bool output chan *EntryResult pinger chan struct{} readEnd bool @@ -59,7 +59,7 @@ func NewDirStream(path string, chanSize int) (dir *DirectoryLister) { } func (dir *DirectoryLister) Shutdown() { - if dir.isShutdown.Set() { + if dir.isShutdown.CompareAndSwap(false, true) { close(dir.sdChan) } } diff --git a/pio/closer-callback.go b/pio/closer-callback.go index 7259eb51..360e950c 100644 --- a/pio/closer-callback.go +++ b/pio/closer-callback.go @@ -8,6 +8,7 @@ package pio import ( "io" "sync" + "sync/atomic" "github.com/haraldrudell/parl" "github.com/haraldrudell/parl/perrors" @@ -25,7 +26,7 @@ var _ io.Closer // CloserCallbacker implements a close callback for io.Closer type CloserCallbacker struct { closeCallback func(err error) (e error) - isClosed parl.AtomicBool + isClosed atomic.Bool wg sync.WaitGroup } @@ -33,7 +34,7 @@ func (cc *CloserCallbacker) Close(closer io.Closer) (err error) { parl.RecoverInvocationPanic(func() { err = closer.Close() }, &err) - if cc.isClosed.Set() { + if cc.isClosed.CompareAndSwap(false, true) { if cc.closeCallback != nil { var e error parl.RecoverInvocationPanic(func() { @@ -47,7 +48,7 @@ func (cc *CloserCallbacker) Close(closer io.Closer) (err error) { } func (cc *CloserCallbacker) IsClosed() (isClosed bool) { - return cc.isClosed.IsTrue() + return cc.isClosed.Load() } func (cc *CloserCallbacker) Wait() { diff --git a/pio/tee-writer.go b/pio/tee-writer.go index 9be3af12..b9dcb202 100644 --- a/pio/tee-writer.go +++ b/pio/tee-writer.go @@ -9,6 +9,7 @@ package pio import ( "io" "sync" + "sync/atomic" "github.com/haraldrudell/parl" "github.com/haraldrudell/parl/perrors" @@ -18,7 +19,7 @@ import ( type TeeWriter struct { closeCallback func() (err error) writers []io.Writer - isClosed parl.AtomicBool + isClosed atomic.Bool wg sync.WaitGroup } @@ -40,7 +41,7 @@ func NewTeeWriter(closeCallback func() (err error), writers ...io.Writer) (teeWr } func (tw *TeeWriter) Write(p []byte) (n int, err error) { - if tw.isClosed.IsTrue() { + if tw.isClosed.Load() { err = perrors.NewPF("Write after Close") return } @@ -61,7 +62,7 @@ func (tw *TeeWriter) Write(p []byte) (n int, err error) { func (tw *TeeWriter) Close() (err error) { // prevent multiple Close invocations - if !tw.isClosed.Set() { + if !tw.isClosed.CompareAndSwap(false, true) { err = perrors.NewPF("Second Close invocation") return } diff --git a/pio/write-closer-to-string.go b/pio/write-closer-to-string.go index 5fee372b..ce34c47d 100644 --- a/pio/write-closer-to-string.go +++ b/pio/write-closer-to-string.go @@ -9,8 +9,8 @@ import ( "errors" "io" "sync" + "sync/atomic" - "github.com/haraldrudell/parl" "github.com/haraldrudell/parl/perrors" ) @@ -22,7 +22,7 @@ var ErrFileAlreadyClosed = errors.New("file alread closed") // WriteCloserToString is an io.WriteCloser that aggregates its oputput in a string. Thread-safe. // - the string is available using the Data method. type WriteCloserToString struct { - isClosed parl.AtomicBool + isClosed atomic.Bool lock sync.Mutex s string } @@ -34,7 +34,7 @@ func NewWriteCloserToString() io.WriteCloser { // Write always succeeds func (wc *WriteCloserToString) Write(p []byte) (n int, err error) { - if wc.isClosed.IsTrue() { + if wc.isClosed.Load() { err = perrors.ErrorfPF(ErrFileAlreadyClosed.Error()) return } @@ -50,7 +50,7 @@ func (wc *WriteCloserToString) Write(p []byte) (n int, err error) { // Close should only be invoked once. // Close is not required for releasing resources. func (wc *WriteCloserToString) Close() (err error) { - wc.isClosed.Set() + wc.isClosed.Store(true) return } diff --git a/pnet/http.go b/pnet/http.go index 96cb8eb5..2b160a45 100644 --- a/pnet/http.go +++ b/pnet/http.go @@ -10,6 +10,7 @@ import ( "net" "net/http" "sync" + "sync/atomic" "time" "github.com/haraldrudell/parl" @@ -19,14 +20,14 @@ import ( type Http struct { Network string // "tcp", "tcp4", "tcp6", "unix" or "unixpacket" http.Server - ListenInvoked parl.AtomicBool + ListenInvoked atomic.Bool ReadyWg sync.WaitGroup ErrCh chan<- error ErrChMutex sync.Mutex - ErrChClosed parl.AtomicBool + ErrChClosed atomic.Bool net.Addr // interface - IsListening parl.AtomicBool - IsShutdown parl.AtomicBool + IsListening atomic.Bool + IsShutdown atomic.Bool } // NewHttp creates http server host is host:port, default ":http" @@ -73,7 +74,7 @@ func (hp *Http) Listen() (errCh <-chan error) { func (hp *Http) SubListen() (errCh <-chan error) { hp.ReadyWg.Add(1) defer hp.ReadyWg.Done() - if !hp.ListenInvoked.Set() { + if !hp.ListenInvoked.CompareAndSwap(false, true) { panic(perrors.New("multiple http.Run invocations")) } errChan := make(chan error) @@ -99,7 +100,7 @@ func (hp *Http) listenerThread() { } hp.ReadyWg.Done() didReadyWg = true - hp.IsListening.Set() + hp.IsListening.Store(true) if err := hp.Server.Serve(listener); err != nil { // blocking until Shutdown or Close if err != http.ErrServerClosed { @@ -121,11 +122,11 @@ func (hp *Http) Listener() (listener net.Listener, err error) { } func (hp *Http) WaitForUp() (isUp bool, addr net.Addr) { - if !hp.ListenInvoked.IsTrue() { + if !hp.ListenInvoked.Load() { return // Listen has not been invoked } hp.ReadyWg.Wait() - if isUp = hp.IsListening.IsTrue(); isUp { + if isUp = hp.IsListening.Load(); isUp { addr = hp.Addr } return @@ -134,13 +135,13 @@ func (hp *Http) WaitForUp() (isUp bool, addr net.Addr) { func (hp *Http) SendErr(err error) { hp.ErrChMutex.Lock() defer hp.ErrChMutex.Unlock() - if !hp.ErrChClosed.IsTrue() { + if !hp.ErrChClosed.Load() { hp.ErrCh <- err } } func (hp *Http) CloseErr() { - if hp.ErrChClosed.Set() { + if hp.ErrChClosed.CompareAndSwap(false, true) { hp.ErrChMutex.Lock() close(hp.ErrCh) hp.ErrChMutex.Unlock() @@ -148,7 +149,7 @@ func (hp *Http) CloseErr() { } func (hp *Http) Shutdown() { - if !hp.IsShutdown.Set() { + if !hp.IsShutdown.CompareAndSwap(false, true) { return // already shutdown } ctx, cancel := context.WithTimeout(context.Background(), httpShutdownTimeout) diff --git a/pnet/https.go b/pnet/https.go index 80d094d5..b6daade5 100644 --- a/pnet/https.go +++ b/pnet/https.go @@ -61,7 +61,7 @@ func (hp *Https) listenerThread() { } hp.ReadyWg.Done() didReadyWg = true - hp.IsListening.Set() + hp.IsListening.Store(true) srv := &hp.Server if err := srv.Serve(tlsListener); err != nil { // blocking until Shutdown or Close diff --git a/pnet/udp.go b/pnet/udp.go index 8c144d48..600ae711 100644 --- a/pnet/udp.go +++ b/pnet/udp.go @@ -8,6 +8,7 @@ package pnet import ( "net" "sync" + "sync/atomic" "github.com/haraldrudell/parl" "github.com/haraldrudell/parl/perrors" @@ -18,14 +19,14 @@ type UDP struct { F UDPFunc MaxSize int net.UDPAddr // struct IP Port Zone - ListenInvoked parl.AtomicBool + ListenInvoked atomic.Bool StartingListen sync.WaitGroup ErrCh chan<- error - IsListening parl.AtomicBool + IsListening atomic.Bool NetUDPConn *net.UDPConn connMutex sync.RWMutex Addr net.Addr - IsShutdown parl.AtomicBool + IsShutdown atomic.Bool } type UDPFunc func(b []byte, oob []byte, flags int, addr *net.UDPAddr) @@ -52,11 +53,11 @@ const ( func (udp *UDP) Listen() (errCh <-chan error) { udp.StartingListen.Add(1) - if !udp.ListenInvoked.Set() { + if !udp.ListenInvoked.CompareAndSwap(false, true) { udp.StartingListen.Done() panic(perrors.New("multiple udp.Listen invocations")) } - if udp.IsShutdown.IsTrue() { + if udp.IsShutdown.Load() { udp.StartingListen.Done() panic(perrors.New("udp.Listen after Shutdown")) } @@ -94,16 +95,16 @@ func (udp *UDP) listenThread() { return } udp.Addr = netUDPConn.LocalAddr() - udp.IsListening.Set() + udp.IsListening.Store(true) udp.StartingListen.Done() startingDone = true defer func() { - if !udp.IsShutdown.IsTrue() { + if !udp.IsShutdown.Load() { if err := netUDPConn.Close(); err != nil { errCh <- err } } - udp.IsListening.Clear() + udp.IsListening.Store(false) }() // read datagrams @@ -117,7 +118,7 @@ func (udp *UDP) listenThread() { var err error n, oobn, flags, addr, err = netUDPConn.ReadMsgUDP(b, oob) if err != nil { - if udp.IsShutdown.IsTrue() && udp.isClosedErr(err) { + if udp.IsShutdown.Load() && udp.isClosedErr(err) { return // we are shutdown } errCh <- perrors.Errorf("ReadMsgUDP: '%w'", err) @@ -132,11 +133,11 @@ func (udp *UDP) listenThread() { } func (udp *UDP) WaitForUp() (isUp bool, addr net.Addr) { - if !udp.ListenInvoked.IsTrue() { + if !udp.ListenInvoked.Load() { return // Listen has not been invoked } udp.StartingListen.Wait() - if isUp = udp.IsListening.IsTrue(); isUp { + if isUp = udp.IsListening.Load(); isUp { addr = udp.Addr } return @@ -164,7 +165,7 @@ func (udp *UDP) isClosedErr(err error) (isClose bool) { func (udp *UDP) setConn(conn *net.UDPConn) (isShutdown bool) { udp.connMutex.Lock() defer udp.connMutex.Unlock() - isShutdown = udp.IsShutdown.IsTrue() + isShutdown = udp.IsShutdown.Load() if !isShutdown { udp.NetUDPConn = conn } @@ -174,7 +175,7 @@ func (udp *UDP) setConn(conn *net.UDPConn) (isShutdown bool) { func (udp *UDP) Shutdown() { udp.connMutex.RLock() defer udp.connMutex.RUnlock() - if !udp.IsShutdown.Set() { + if !udp.IsShutdown.CompareAndSwap(false, true) { return // it was already shutdown } conn := udp.NetUDPConn diff --git a/pterm/go.mod b/pterm/go.mod index bfab25e4..fa15bbcc 100644 --- a/pterm/go.mod +++ b/pterm/go.mod @@ -5,7 +5,7 @@ go 1.21 replace github.com/haraldrudell/parl => ../../parl require ( - github.com/haraldrudell/parl v0.4.114 + github.com/haraldrudell/parl v0.4.115 golang.org/x/term v0.13.0 ) diff --git a/callbacks_test.go b/recover-invocation-panic_test.go similarity index 100% rename from callbacks_test.go rename to recover-invocation-panic_test.go diff --git a/slow-detector-core.go b/slow-detector-core.go index 7a543f3f..f409de96 100644 --- a/slow-detector-core.go +++ b/slow-detector-core.go @@ -37,36 +37,41 @@ type SlowDetectorCore struct { average ptime.Averager[time.Duration] } -func NewSlowDetectorCore(callback CbSlowDetector, slowTyp slowType, goGen GoGen, threshold ...time.Duration) (slowDetector *SlowDetectorCore) { +func NewSlowDetectorCore(callback CbSlowDetector, slowTyp slowType, goGen GoGen, nonReturnPeriod ...time.Duration) (slowDetector *SlowDetectorCore) { if callback == nil { panic(perrors.NewPF("callback cannot be nil")) } - // threshold0: minimum slowness to report, default 100 ms - var threshold0 time.Duration - if len(threshold) > 0 { - threshold0 = threshold[0] - } else { - threshold0 = defaultMinReportDuration - } - // threshold 1: time between non-return reports, default 1 minute - var nonReturnPeriod time.Duration - if len(threshold) > 1 { - nonReturnPeriod = threshold[1] + var nonReturnPeriod0 time.Duration + if len(nonReturnPeriod) > 0 { + nonReturnPeriod0 = nonReturnPeriod[0] } else { - nonReturnPeriod = defaultNonReturnPeriod + nonReturnPeriod0 = defaultNonReturnPeriod } return &SlowDetectorCore{ ID: slowIDGenerator.ID(), callback: callback, - thread: NewSlowDetectorThread(slowTyp, nonReturnPeriod, goGen), - max: *NewAtomicMax(threshold0), + thread: NewSlowDetectorThread(slowTyp, nonReturnPeriod0, goGen), + max: *NewAtomicMax[time.Duration](), average: *ptime.NewAverager[time.Duration](), } } +func (s *SlowDetectorCore) Init(threshold ...time.Duration) (slowDetector *SlowDetectorCore) { + slowDetector = s + // threshold0: minimum slowness to report, default 100 ms + var threshold0 time.Duration + if len(threshold) > 0 { + threshold0 = threshold[0] + } else { + threshold0 = defaultMinReportDuration + } + s.max.Init(threshold0) + return +} + // Start returns the effective start time for a new timing cycle // - value is optional start time, default time.Now() func (sd *SlowDetectorCore) Start(invoLabel string, value ...time.Time) (invocation *SlowDetectorInvocation) { diff --git a/slow-detector-invocation.go b/slow-detector-invocation.go index 0f926c30..08834b9f 100644 --- a/slow-detector-invocation.go +++ b/slow-detector-invocation.go @@ -11,7 +11,6 @@ import ( "sync" "sync/atomic" "time" - "unsafe" "github.com/haraldrudell/parl/ptime" ) @@ -25,7 +24,7 @@ type SlowDetectorInvocation struct { stop func(sdi *SlowDetectorInvocation, value ...time.Time) sd *SlowDetectorCore - tx AtomicReference[time.Time] + tx atomic.Pointer[time.Time] lock sync.Mutex intervals []Interval } @@ -76,12 +75,11 @@ func (sdi *SlowDetectorInvocation) Label() (label string) { // T0 returns the effective time of the invocation of Start func (sdi *SlowDetectorInvocation) Time(t time.Time) (previousT time.Time) { - addr := (*unsafe.Pointer)(unsafe.Pointer(&sdi.tx)) var tp *time.Time if t.IsZero() { - tp = (*time.Time)(atomic.LoadPointer(addr)) + tp = sdi.tx.Load() } else { - tp = (*time.Time)(atomic.SwapPointer(addr, unsafe.Pointer(&t))) + tp = sdi.tx.Swap(&t) } if tp != nil { previousT = *tp diff --git a/slow-detector-thread.go b/slow-detector-thread.go index 33bebd70..36ad215c 100644 --- a/slow-detector-thread.go +++ b/slow-detector-thread.go @@ -7,6 +7,7 @@ package parl import ( "sync" + "sync/atomic" "time" "github.com/haraldrudell/parl/parli" @@ -30,7 +31,7 @@ type SlowDetectorThread struct { slowTyp slowType nonReturnPeriod time.Duration slowMap pmaps.RWMap[slowID, *SlowDetectorInvocation] - hasThread AtomicBool + hasThread atomic.Bool slowLock sync.Mutex goGen GoGen @@ -74,7 +75,7 @@ func (sdt *SlowDetectorThread) Start(sdi *SlowDetectorInvocation) { // store in map sdt.slowMap.Put(sdi.sID, sdi) - if !sdt.hasThread.Set() { + if !sdt.hasThread.CompareAndSwap(false, true) { return // thread already running return } diff --git a/slow-detector.go b/slow-detector.go index d043ba4c..1aa419b4 100644 --- a/slow-detector.go +++ b/slow-detector.go @@ -49,7 +49,15 @@ func NewSlowDetector(label string, slowTyp slowType, printf PrintfFunc, goGen Go label: label, printf: printf, } - sd.sd = *NewSlowDetectorCore(sd.callback, slowTyp, goGen, threshold...) + var threshold0, nonReturnPeriod []time.Duration + if len(threshold) > 0 { + threshold0 = threshold[0:1] + if len(threshold) > 1 { + nonReturnPeriod = threshold[1:2] + } + } + sd.sd = *NewSlowDetectorCore(sd.callback, slowTyp, goGen, nonReturnPeriod...) + sd.sd.Init(threshold0...) return &sd } diff --git a/sqliter/go.mod b/sqliter/go.mod index c3fa1ca2..7ad8b663 100644 --- a/sqliter/go.mod +++ b/sqliter/go.mod @@ -7,8 +7,8 @@ toolchain go1.21.3 replace github.com/haraldrudell/parl => ../../parl require ( - github.com/google/uuid v1.3.1 - github.com/haraldrudell/parl v0.4.114 + github.com/google/uuid v1.4.0 + github.com/haraldrudell/parl v0.4.115 modernc.org/sqlite v1.26.0 ) diff --git a/sqliter/go.sum b/sqliter/go.sum index 5200d960..833ab98e 100644 --- a/sqliter/go.sum +++ b/sqliter/go.sum @@ -6,8 +6,8 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= diff --git a/watchfs/go.mod b/watchfs/go.mod index d7608e38..ab1f63b5 100644 --- a/watchfs/go.mod +++ b/watchfs/go.mod @@ -6,8 +6,8 @@ replace github.com/haraldrudell/parl => ../../parl require ( github.com/fsnotify/fsnotify v1.7.0 - github.com/google/uuid v1.3.1 - github.com/haraldrudell/parl v0.4.114 + github.com/google/uuid v1.4.0 + github.com/haraldrudell/parl v0.4.115 ) require ( diff --git a/watchfs/go.sum b/watchfs/go.sum index da5f2fed..b7c67612 100644 --- a/watchfs/go.sum +++ b/watchfs/go.sum @@ -2,8 +2,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= diff --git a/watchfs/watcher.go b/watchfs/watcher.go index b3694c82..d21a4277 100644 --- a/watchfs/watcher.go +++ b/watchfs/watcher.go @@ -29,7 +29,7 @@ type Watcher struct { eventFn0 func(event *WatchEvent) watcher *fsnotify.Watcher ID int64 - watcherClosed parl.AtomicBool + watcherClosed atomic.Bool wg sync.WaitGroup } @@ -62,7 +62,7 @@ func (w *Watcher) List() (paths []string) { } func (w *Watcher) Shutdown() { - if w.watcherClosed.Set() { + if w.watcherClosed.CompareAndSwap(false, true) { var err error if err = w.watcher.Close(); err != nil { w.errFn(perrors.Errorf("watcher.Close: %w", err)) diff --git a/win-or-waiter-core.go b/win-or-waiter-core.go index bc20e6c2..b7b77fe4 100644 --- a/win-or-waiter-core.go +++ b/win-or-waiter-core.go @@ -7,6 +7,7 @@ package parl import ( "context" + "sync/atomic" "time" "github.com/haraldrudell/parl/perrors" @@ -37,19 +38,19 @@ type WinOrWaiterCore struct { ctx context.Context // isCalculationPut indicates that calculation field has value. atomic access - isCalculationPut AtomicBool + isCalculationPut atomic.Bool // calculationPut makes threads wait until calculation has value calculationPut Once // calculation allow to wait for the result of a winner calculation // - winner holds lock.Lock until the calculation is complete // - loser threads wait for lock.RLock to check the result - calculation AtomicReference[Future[time.Time]] + calculation atomic.Pointer[Future[time.Time]] // winnerPicker picks winner thread using atomic access // - winner is the thread that on Set gets wasNotSet true // - true while a winner calculates next data value // - set to zero when winnerFunc returns - winnerPicker AtomicBool + winnerPicker atomic.Bool } // WinOrWaiter returns a semaphore used for completing an on-demand task by @@ -91,10 +92,10 @@ func (ww *WinOrWaiterCore) WinOrWait() (err error) { } // seenCalculation is the calculation present when this thread arrived. // seenCalculation may be nil - var seenCalculation = ww.calculation.Get() + var seenCalculation = ww.calculation.Load() // ensure that ww.calculation holds a calculation - if ww.isCalculationPut.IsFalse() { + if !ww.isCalculationPut.Load() { // invocation prior to first calculation started // start the first calculation, or wait for it to be started if another thread already started it @@ -110,7 +111,7 @@ func (ww *WinOrWaiterCore) WinOrWait() (err error) { for { // check for valid calculation result - calculation = ww.calculation.Get() + calculation = ww.calculation.Load() // calculation.Result may block if result, isValid := calculation.Result(); isValid { switch ww.strategy { @@ -127,7 +128,7 @@ func (ww *WinOrWaiterCore) WinOrWait() (err error) { } // ensure data processing is in progress - if isWinner := ww.winnerPicker.Set(); isWinner { + if isWinner := ww.winnerPicker.CompareAndSwap(false, true); isWinner { return ww.winnerFunc() // this thread completed the task return } @@ -143,13 +144,13 @@ func (ww *WinOrWaiterCore) IsCancel() (isCancel bool) { } func (ww *WinOrWaiterCore) winnerFunc() (err error) { - ww.winnerPicker.Set() - defer ww.winnerPicker.Clear() + ww.winnerPicker.Store(true) + defer ww.winnerPicker.Store(false) // get calculation var calculation = NewFuture[time.Time]() - ww.calculation.Put(calculation) - ww.isCalculationPut.Set() + ww.calculation.Store(calculation) + ww.isCalculationPut.Store(true) // calculate result := time.Now() diff --git a/win-or-waiter-core_test.go b/win-or-waiter-core_test.go index 7661ebcb..3bf5d490 100644 --- a/win-or-waiter-core_test.go +++ b/win-or-waiter-core_test.go @@ -30,13 +30,13 @@ func TestNewWinOrWaiterCore(t *testing.T) { if winOrWaiter.strategy != winOrWaiterStrategy { t.Errorf("bad startegy: %d exp %d", winOrWaiter.strategy, winOrWaiterStrategy) } - if winOrWaiter.isCalculationPut.IsTrue() { + if winOrWaiter.isCalculationPut.Load() { t.Error("winOrWaiter.isCalculationPut true") } - if winOrWaiter.calculation.Get() != nil { + if winOrWaiter.calculation.Load() != nil { t.Error("winOrWaiter.calculation not nil") } - if winOrWaiter.winnerPicker.IsTrue() { + if winOrWaiter.winnerPicker.Load() { t.Error("winOrWaiter.winnerPicker true") } @@ -50,13 +50,13 @@ func TestNewWinOrWaiterCore(t *testing.T) { t.Log("waiting for calculator invocation…") calculatorInvoked.Wait() - if winOrWaiter.isCalculationPut.IsFalse() { + if !winOrWaiter.isCalculationPut.Load() { t.Error("2 winOrWaiter.isCalculationPut false") } - if winOrWaiter.calculation.Get() == nil { + if winOrWaiter.calculation.Load() == nil { t.Error("2 winOrWaiter.calculation nil") } - if winOrWaiter.winnerPicker.IsFalse() { + if !winOrWaiter.winnerPicker.Load() { t.Error("2 winOrWaiter.winnerPicker false") } @@ -86,7 +86,7 @@ func TestNewWinOrWaiterCore(t *testing.T) { calculatorThread.Wait() loserThread.Wait() - calculationReference = winOrWaiter.calculation.Get() + calculationReference = winOrWaiter.calculation.Load() if calculationReference == nil { t.Error("3 winOrWaiter.calculation.Get nil") t.FailNow() @@ -101,7 +101,7 @@ func TestNewWinOrWaiterCore(t *testing.T) { if result.IsZero() { t.Error("3 result IsZero") } - if winOrWaiter.winnerPicker.IsTrue() { + if winOrWaiter.winnerPicker.Load() { t.Error("3 winOrWaiter.winnerPicker true") } } diff --git a/yamler/generic-yaml.go b/yamler/generic-yaml.go index 1207854b..77ffb213 100644 --- a/yamler/generic-yaml.go +++ b/yamler/generic-yaml.go @@ -6,7 +6,7 @@ ISC License package yamler // GenericYaml is a wrapper for [yamlo.Unmarshaler] that allows the -// yamler package to unmarshal yaml to an unimported type +// yamlo package to unmarshal yaml to an unimported type type GenericYaml interface { // Unmarshal updates [yamlo.Unmarshaler]’s value pointer with // data from yaml @@ -16,9 +16,13 @@ type GenericYaml interface { // - hasData indicates that unmarshal succeeded and yamlDictionaryKey // was present Unmarshal(yamlText []byte, yamlDictionaryKey string) (hasData bool, err error) - // VisitedReferencesMap unmarshals yaml to an any object and then - // build a visited references map by comparring that object to its - // value pointer + // VisitedReferencesMap returns a map of + // key: any-typed pointers to fields of u.y, + // value: lower-case field names + // - unmarshals yaml again to an any object and then + // builds the visited references map by comparing the unmarshaled object to the + // u.y struct-pointer VisitedReferencesMap(yamlText []byte, yamlDictionaryKey string) (yamlVisistedReferences map[any]string, err error) + // YDump returns field names and values for the yaml value struct YDump() (yamlVPrint string) } diff --git a/yamler/go.mod b/yamler/go.mod index ac78c89c..37a51474 100644 --- a/yamler/go.mod +++ b/yamler/go.mod @@ -7,7 +7,7 @@ replace github.com/haraldrudell/parl => ../../parl replace github.com/haraldrudell/parl/mains => ../mains require ( - github.com/haraldrudell/parl v0.4.114 + github.com/haraldrudell/parl v0.4.115 golang.org/x/exp v0.0.0-20231006140011-7918f672742d gopkg.in/yaml.v3 v3.0.1 ) diff --git a/yamler/unmarshaler.go b/yamler/unmarshaler.go index 3b35165c..f377a901 100644 --- a/yamler/unmarshaler.go +++ b/yamler/unmarshaler.go @@ -31,7 +31,7 @@ func NewUnmarshaler[T any](y *T) (unmarshaler GenericYaml) { // - yamlDictionaryKey is the name of the top-level dictionary-key // typically “options” // - hasData indicates that unmarshal succeeded and yamlDictionaryKey -// was present +// had value func (u *Unmarshaler[T]) Unmarshal(yamlText []byte, yamlDictionaryKey string) (hasData bool, err error) { // top-level object in yaml is supposed to be a dictionary @@ -48,15 +48,16 @@ func (u *Unmarshaler[T]) Unmarshal(yamlText []byte, yamlDictionaryKey string) (h // get the value of the T type read from yaml if it exists var yamlDataPointer = yamlContentObject[yamlDictionaryKey] // pick out the options dictionary value if hasData = yamlDataPointer != nil; !hasData { - return // dictionary key not present return + return // value for dictionary key not present return } // assign the unmarshaled yaml values to effective yaml values *u.y = *yamlDataPointer - return // yaml loaded return + return // yaml loaded successfully return } +// YDump returns field names and values for the yaml value struct func (u *Unmarshaler[T]) YDump() (yamlVPrint string) { return parl.Sprintf("%+v", *u.y) } diff --git a/yamler/visited-references-map.go b/yamler/visited-references-map.go index 3c1ab634..9198a50a 100644 --- a/yamler/visited-references-map.go +++ b/yamler/visited-references-map.go @@ -14,11 +14,16 @@ import ( ) const ( + // the reflect.Type.String value for the any type anyString = "interface\x20{}" ) -// structMapPath contains a struct-map[string]any pair and its path -// - struct1 is traversed and the map2 is checked for fields existing meaning they are visited +// structMapPath is a work item comparing a struct with an unmarshaled +// free-form object consisting of values any, map[string] and and []any +// - struct1 is traversed and the +// - map2 is checked for the existence of struct1 fields +// - meaning those fields were visited +// - path is a field-name list for nested structs type structMapPath struct { // y is the struct’s type struct1 *reflect.Value @@ -30,9 +35,12 @@ type structMapPath struct { path []string } -// VisitedReferencesMap unmarshals yaml to an any object and then -// build a visited references map by comparring that object to its -// value pointer +// VisitedReferencesMap returns a map of +// key: any-typed pointers to fields of u.y, +// value: lower-case field names +// - unmarshals yaml again to an any object and then +// builds the visited references map by comparing the unmarshaled object to the +// u.y struct-pointer func (u *Unmarshaler[T]) VisitedReferencesMap(yamlText []byte, yamlDictionaryKey string) (yamlVisistedReferences map[any]string, err error) { // unmarshal a freeform object: any, map[string]any, []any @@ -47,7 +55,7 @@ func (u *Unmarshaler[T]) VisitedReferencesMap(yamlText []byte, yamlDictionaryKey var freeformValue any if topLevelDictionary, ok := freeFormYaml.(map[string]any); !ok { err = perrors.NewPF("yaml top-level object not dictionary") - return + return // bad yaml return } else if freeformValue = topLevelDictionary[yamlDictionaryKey]; freeformValue == nil { yamlVisistedReferences = make(map[any]string) return // options value not present: empty map return @@ -55,10 +63,10 @@ func (u *Unmarshaler[T]) VisitedReferencesMap(yamlText []byte, yamlDictionaryKey yamlText = nil freeFormYaml = nil - // now company u.y and freeFormValue to get referenced fields of y + // now compare u.y and freeFormValue to get referenced fields of y u.refs = make(map[any]string) if err = u.compareStructs(u.y, freeformValue); err != nil { - return + return // nil or not struct* error during struct compare } yamlVisistedReferences = u.refs @@ -75,10 +83,10 @@ func (u *Unmarshaler[T]) compareStructs(y, anyYaml any) (err error) { // the structMapPath being processed var struct0 structMapPath if struct0.struct1, err = u.structp(y); perrors.Is(&err, "y: %w", err) { - return + return // y nil or not struct* } if struct0.map2, err = u.mapStringAny(anyYaml); perrors.Is(&err, "anyYaml: %w", err) { - return + return // anyYaml nil or not map[string]any } // list of struct-map-path values being processed @@ -98,11 +106,12 @@ func (u *Unmarshaler[T]) compareStructs(y, anyYaml any) (err error) { // fieldName is lower-case as in yaml: “fieldone” not ”FieldOne” var fieldName = strings.ToLower(struct0.struct1.Type().Field(i).Name) if fieldName == "" { - perrors.NewPF("") + err = perrors.ErrorfPF("field#d: field name empty", i) + return // empty field name return } var fieldKind = field.Kind() - // field from map: mapValue is the any type + // field from map: map[string]any returns the any type var mapValue = struct0.map2.MapIndex(reflect.ValueOf(fieldName)) if !mapValue.IsValid() { continue // freeFormYaml does not have the field @@ -119,7 +128,7 @@ func (u *Unmarshaler[T]) compareStructs(y, anyYaml any) (err error) { map2: mp, path: append(append([]string{}, struct0.path...), fieldName), // clone }) - continue + continue // structPair stired for processing, check next field } } } @@ -137,7 +146,7 @@ func (u *Unmarshaler[T]) compareStructs(y, anyYaml any) (err error) { // make reference to field var fieldAddr = field.Addr() - // make type any + // make type any (exits reflect domain) var anyPointerToField = fieldAddr.Interface() u.refs[anyPointerToField] = fieldName } @@ -153,44 +162,45 @@ func (u *Unmarshaler[T]) mapStringAny(m any) (reflectMapStringAny *reflect.Value var reflectTypeM = reflect.TypeOf(m) if reflectTypeM == nil { err = perrors.NewPF("m cannot be nil, must be map[string]any") - return + return // nil return } else if reflectTypeM.Kind() != reflect.Map || reflectTypeM.Key().Kind() != reflect.String || reflectTypeM.Elem().String() != anyString { err = perrors.ErrorfPF("m must be map[string]any: %T", m) - return + return // not map[string]any return } // get reflect value of m var reflectValueM = reflect.ValueOf(m) reflectMapStringAny = &reflectValueM - return + return // good return } -// structp ensures v is pointer to struct using reflection or error +// structp ensures v is pointer to struct using reflection, if not: error +// - error on v nil or not *struct func (u *Unmarshaler[T]) structp(v any) (reflectStruct *reflect.Value, err error) { // get the non-nil reflect value of v var reflectValueV = reflect.ValueOf(v) if !reflectValueV.IsValid() { err = perrors.NewPF("v cannot be nil, must be *struct") - return + return // nil return } // get the struct that supposedly v points to var structValue reflect.Value if reflectValueV.Kind() != reflect.Pointer { err = perrors.NewPF("v not pointer, must be *struct") - return + return // not pointer return } structValue = reflectValueV.Elem() if structValue.Kind() != reflect.Struct { err = perrors.ErrorfPF("v points to non-struct value, must be *struct") - return + return // not struct* return } reflectStruct = &structValue - return + return // good return }