Skip to content

Commit

Permalink
v0.4.119 refactor AtomicMax closers errorglue.ErrorList perrors.Error…
Browse files Browse the repository at this point in the history
…List
  • Loading branch information
haraldrudell committed Nov 3, 2023
1 parent 4bb102e commit ae2ee21
Show file tree
Hide file tree
Showing 17 changed files with 316 additions and 250 deletions.
65 changes: 42 additions & 23 deletions atomic-max.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,61 +13,80 @@ import (
)

// AtomicMax is a thread-safe max container
type AtomicMax[T constraints.Integer] struct{ value, value0 atomic.Uint64 }
type AtomicMax[T constraints.Integer] struct {
// threshold is an optional minimum value for a new max
// - valid if greater than 0
threshold uint64
// whether [AtomicMax.Value] has been invoked
didValue atomic.Bool
// value is current max or 0 if no value is present
value atomic.Uint64
}

// NewAtomicMax returns a thread-safe max container
// - T underlying type must be int
// - negative values are not allowed
// - to set initial value, use Init
func NewAtomicMax[T constraints.Integer]() (atomicMax *AtomicMax[T]) { return &AtomicMax[T]{} }

// Init performs actions that cannot happen prior to copying AtomicMax
// - supports functional chaining
// - Thread-safe
func (m *AtomicMax[T]) Init(value T) (atomicMax *AtomicMax[T]) {
atomicMax = m
m.value.Store(uint64(value))
m.value0.Store(uint64(value)) // set initial threshold
return
// - negative values are not allowed and cause panic
func NewAtomicMax[T constraints.Integer](threshold T) (atomicMax *AtomicMax[T]) {
m := AtomicMax[T]{}
if threshold != 0 {
m.threshold = m.tToUint64(threshold)
}
return &m
}

// Value updates the container possibly with a new Max value
// - value cannot be negative
// - Thread-safe
func (m *AtomicMax[T]) Value(value T) (isNewMax bool) {

// check if value is a new max
var valueU64, err = ints.Unsigned[uint64](value, "")
if err != nil {
panic(err) // value out of range, ie. negative
// check value against threshold
var valueU64 = m.tToUint64(value)
if valueU64 < m.threshold {
return // below threshold return: isNewMax false
}

// 0 as max case
if isNewMax = m.didValue.CompareAndSwap(false, true) && value == 0; isNewMax {
return // first invocation with 0: isNewMax true
}

// check against present value
var current = m.value.Load()
if isNewMax = valueU64 > current; !isNewMax {
return // not a new max return
return // not a new max return: isNewMax false
}

// store the new max
for {

// try to write value to *max
if m.value.CompareAndSwap(current, valueU64) {
return // new max written return
return // new max written return: isNewMax true
}
if current = m.value.Load(); current >= valueU64 {
return // no longer a need to write return
return // no longer a need to write return: isNewMax true
}
}
}

// Max returns current max and a flag whether a value is present
// - Thread-safe
func (m *AtomicMax[T]) Max() (value T, hasValue bool) {
var u64 = m.value.Load()
value = T(u64)
hasValue = u64 != m.value0.Load()
value = T(m.value.Load())
hasValue = m.didValue.Load()
return
}

// Max1 returns current maximum whether default or set by Value
// - Thread-safe
func (m *AtomicMax[T]) Max1() (value T) { return T(m.value.Load()) }

// tToUint64 converts T value to uint64
// - panic if T value is negative
func (m *AtomicMax[T]) tToUint64(value T) (valueU64 uint64) {
var err error
if valueU64, err = ints.Unsigned[uint64](value, ""); err != nil {
panic(err) // value out of range, ie. negative
}
return
}
74 changes: 74 additions & 0 deletions close-channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
© 2022–present Harald Rudell <[email protected]> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/

package parl

import (
"github.com/haraldrudell/parl/perrors"
"github.com/haraldrudell/parl/pruntime"
)

const (
CloseChannelDrain = true
)

// CloseChannel closes a channel
// - CloseChannel is thread-safe, deferrable and panic-free,
// handles closed-channel panic, nil-channel case and
// has channel drain feature
// - isNilChannel returns true if ch is nil.
// closing a nil channel would cause panic.
// - isCloseOfClosedChannel is true if close paniced due to
// the channel already closed.
// A channel transferring data cannot be inspected for being
// closed
// - if errp is non-nil, panic values updates it using errors.AppendError.
// - if doDrain is [parl.CloseChannelDrain], the channel is drained first.
// Note: closing a channel while a thread is blocked in channel send is
// a data race.
// If a thread is continuously sending items and doDrain is true,
// CloseChannel will block indefinitely.
// - n returns the number of drained items.
func CloseChannel[T any](ch chan T, errp *error, drainChannel ...bool) (
isNilChannel, isCloseOfClosedChannel bool, n int, err error,
) {

// nil channel case
if isNilChannel = ch == nil; isNilChannel {
return // closing of nil channel return
}

// channel drain feature
if len(drainChannel) > 0 && drainChannel[0] {
for {
select {
// read non-blocking from the channel
// - ok true: received item, channel is not closed
// - ok false: channel is closed
case _, ok := <-ch:
if ok {
// the channel is not closed
n++
continue // read next item
}
default: // channel is open but has no items
}
break // closed or no items
}
}

// close channel
if Closer(ch, &err); err == nil {
return // close successful return
}

// handle close error
isCloseOfClosedChannel = pruntime.IsCloseOfClosedChannel(err)
if errp != nil {
*errp = perrors.AppendError(*errp, err)
}

return
}
55 changes: 55 additions & 0 deletions close-channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
© 2022–present Harald Rudell <[email protected]> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/

package parl

import "testing"

func TestCloseChannel(t *testing.T) {
var value = 3
var doDrain = true

var ch chan int
var err, errp error
var n int
var isNilChannel, isCloseOfClosedChannel bool

// close of nil channel should return isNilChannel true
ch = nil
isNilChannel, isCloseOfClosedChannel, n, err = CloseChannel(ch, &errp)
if !isNilChannel {
t.Error("isNilChannel false")
}
_ = err
_ = n
_ = isCloseOfClosedChannel

// n should return number of items when draining
ch = make(chan int, 1)
ch <- value
isNilChannel, isCloseOfClosedChannel, n, err = CloseChannel(ch, &errp, doDrain)
if n != 1 {
t.Errorf("n bad %d exp %d", n, 1)
}
_ = isNilChannel
_ = err
_ = isCloseOfClosedChannel

// close of closed channel should set isCloseOfClosedChannel, err, errp
ch = make(chan int)
close(ch)
isNilChannel, isCloseOfClosedChannel, n, err = CloseChannel(ch, &errp)
if !isCloseOfClosedChannel {
t.Error("isCloseOfClosedChannel false")
}
if err == nil {
t.Error("isCloseOfClosedChannel err nil")
}
if errp == nil {
t.Error("isCloseOfClosedChannel errp nil")
}
_ = isNilChannel
_ = n
}
73 changes: 11 additions & 62 deletions closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,86 +9,35 @@ import (
"io"

"github.com/haraldrudell/parl/perrors"
"github.com/haraldrudell/parl/pruntime"
)

const (
CloseChannelDrain = true
)

// Closer is a deferrable function that closes a channel.
// Closer handles panics.
// if errp is non-nil, panic values updates it using errors.AppendError.
// - if errp is non-nil, panic values updates it using errors.AppendError.
// - Closer is thread-safe, panic-free and deferrable
func Closer[T any](ch chan T, errp *error) {
defer PanicToErr(errp)

close(ch)
}

// CloserSend is a deferrable function that closes a send-channel.
// CloserSend handles panics.
// if errp is non-nil, panic values updates it using errors.AppendError.
// - if errp is non-nil, panic values updates it using errors.AppendError.
// - CloserSend is thread-safe, panic-free and deferrable
func CloserSend[T any](ch chan<- T, errp *error) {
defer PanicToErr(errp)

close(ch)
}

// Close is a deferrable function that closes an io.Closer object.
// Close handles panics.
// if errp is non-nil, panic values updates it using errors.AppendError.
// Close closes an io.Closer object.
// - if errp is non-nil, panic values updates it using errors.AppendError.
// - Close is thread-safe, panic-free and deferrable
// - type Closer interface { Close() error }
func Close(closable io.Closer, errp *error) {
defer PanicToErr(errp)

if e := closable.Close(); e != nil {
*errp = perrors.AppendError(*errp, e)
}
}
var err error
defer RecoverErr(func() DA { return A() }, errp)

// CloseChannel closes a channel recovering panics
// - deferrable
// - if errp is non-nil, panic values updates it using errors.AppendError.
// - if doDrain is CloseChannelDrain or true, the channel is drained first.
// Note: closing a channel while a thread is blocked in channel send is
// a data race.
// If a thread is continuously sending items and doDrain is true,
// CloseChannel will block indefinitely.
// - n returns the number of drained items.
// - isNilChannel returns true if ch is nil.
// No close will be attempted for a nil channel, it would panic.
func CloseChannel[T any](ch chan T, errp *error, drainChannel ...bool) (
isNilChannel, isCloseOfClosedChannel bool, n int, err error,
) {
if isNilChannel = ch == nil; isNilChannel {
return // closing of nil channel return
}
var doDrain bool
if len(drainChannel) > 0 {
doDrain = drainChannel[0]
}
if doDrain {
var hasItems = true
for hasItems {
select {
// read non-blocking from the channel
case _, ok := <-ch:
if ok {
// the channel is not closed
n++
continue // read next item
}
default:
}
hasItems = false
}
}
Closer(ch, &err)
if err == nil {
return // close successful
}
isCloseOfClosedChannel = pruntime.IsCloseOfClosedChannel(err)
if errp != nil {
if err = closable.Close(); perrors.IsPF(&err, "%w", err) {
*errp = perrors.AppendError(*errp, err)
}
return
}
Loading

0 comments on commit ae2ee21

Please sign in to comment.