Skip to content

Commit

Permalink
Merge pull request #11 from drpcorg/atomic-counters
Browse files Browse the repository at this point in the history
Atomic counters
  • Loading branch information
Termina1 authored Oct 10, 2024
2 parents 1095af5 + 287c8ef commit 1893cea
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 62 deletions.
100 changes: 100 additions & 0 deletions atomic_counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package chotki

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/drpcorg/chotki/protocol"
"github.com/drpcorg/chotki/rdx"
)

var ErrNotCounter error = fmt.Errorf("not a counter")

type AtomicCounter struct {
db *Chotki
rid rdx.ID
offset uint64
localValue atomic.Int64
tlv atomic.Value
rdt atomic.Value
loaded atomic.Bool
lock sync.Mutex
expiration time.Time
updatePeriod time.Duration
}

// creates counter that has two properties
// - its atomic as long as you use single instance to do all increments, creating multiple instances will break this guarantee
// - it can ease CPU load if updatePeiod > 0, in that case it will not read from db backend
// current value of the counter
//
// Because we use LSM backend writes are cheap, reads are expensive. You can trade off up to date value of counter
// for less CPU cycles
func NewAtomicCounter(db *Chotki, rid rdx.ID, offset uint64, updatePeriod time.Duration) *AtomicCounter {
return &AtomicCounter{
db: db,
rid: rid,
offset: offset,
updatePeriod: updatePeriod,
}
}

func (a *AtomicCounter) load() error {
a.lock.Lock()
defer a.lock.Unlock()
now := time.Now()
if a.loaded.Load() && now.Sub(a.expiration) < 0 {
return nil
}
rdt, tlv, err := a.db.ObjectFieldTLV(a.rid.ToOff(a.offset))
if err != nil {
return err
}
a.rdt.Store(rdt)
a.loaded.Store(true)
a.tlv.Store(tlv)
switch rdt {
case rdx.ZCounter:
a.localValue.Store(rdx.Znative(tlv))
case rdx.Natural:
a.localValue.Store(int64(rdx.Nnative(tlv)))
default:
return ErrNotCounter
}
a.expiration = now.Add(a.updatePeriod)
return nil
}

// Loads (if needed) and increments counter
func (a *AtomicCounter) Increment(ctx context.Context, val int64) (int64, error) {
err := a.load()
if err != nil {
return 0, err
}
if val == 2 {
fmt.Println(1)
}
rdt := a.rdt.Load().(byte)
a.localValue.Add(val)
var dtlv []byte
a.lock.Lock()
tlv := a.tlv.Load().([]byte)
switch rdt {
case rdx.Natural:
dtlv = rdx.Ndelta(tlv, uint64(a.localValue.Load()), a.db.Clock())
case rdx.ZCounter:
dtlv = rdx.Zdelta(tlv, a.localValue.Load(), a.db.Clock())
default:
return 0, ErrNotCounter
}
a.tlv.Store(dtlv)
a.lock.Unlock()
changes := make(protocol.Records, 0)
changes = append(changes, protocol.Record('F', rdx.ZipUint64(uint64(a.offset))))
changes = append(changes, protocol.Record(rdt, dtlv))
a.db.CommitPacket(ctx, 'E', a.rid, changes)
return a.localValue.Load(), nil
}
119 changes: 119 additions & 0 deletions atomic_counter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package chotki

import (
"context"
"os"
"testing"
"time"

"github.com/cockroachdb/pebble"
"github.com/drpcorg/chotki/protocol"
"github.com/drpcorg/chotki/rdx"
"github.com/stretchr/testify/assert"
)

func TestAtomicCounter(t *testing.T) {
dir, err := os.MkdirTemp("", "*")
assert.NoError(t, err)

a, err := Open(dir, Options{
Src: 0x1a,
Name: "test replica",
Options: pebble.Options{ErrorIfExists: true},
})
assert.NoError(t, err)

cid, err := a.NewClass(context.Background(), rdx.ID0, Field{Name: "test", RdxType: rdx.Natural})
assert.NoError(t, err)

rid, err := a.NewObjectTLV(context.Background(), cid, protocol.Records{protocol.Record('N', rdx.Ntlv(0))})
assert.NoError(t, err)

counterA := NewAtomicCounter(a, rid, 1, 0)
counterB := NewAtomicCounter(a, rid, 1, 0)

res, err := counterA.Increment(context.Background(), 1)
assert.NoError(t, err)
assert.EqualValues(t, 1, res)

res, err = counterB.Increment(context.Background(), 1)
assert.NoError(t, err)
assert.EqualValues(t, 2, res)

res, err = counterA.Increment(context.Background(), 1)
assert.NoError(t, err)
assert.EqualValues(t, 3, res)
}

func TestAtomicCounterWithPeriodicUpdate(t *testing.T) {
dira, err := os.MkdirTemp("", "*")
assert.NoError(t, err)

a, err := Open(dira, Options{
Src: 0x1a,
Name: "test replica",
Options: pebble.Options{ErrorIfExists: true},
})
assert.NoError(t, err)

dirb, err := os.MkdirTemp("", "*")
assert.NoError(t, err)

b, err := Open(dirb, Options{
Src: 0x1b,
Name: "test replica2",
Options: pebble.Options{ErrorIfExists: true},
})
assert.NoError(t, err)

cid, err := a.NewClass(
context.Background(), rdx.ID0,
Field{Name: "test", RdxType: rdx.Natural},
Field{Name: "test2", RdxType: rdx.ZCounter},
)
assert.NoError(t, err)

rid, err := a.NewObjectTLV(
context.Background(), cid,
protocol.Records{
protocol.Record('N', rdx.Ntlv(0)),
protocol.Record('Z', rdx.Ztlv(0)),
},
)
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for i := 1; i <= 2; i++ {

counterA := NewAtomicCounter(a, rid, uint64(i), 100*time.Millisecond)
counterB := NewAtomicCounter(b, rid, uint64(i), 0)

// first increment
res, err := counterA.Increment(ctx, 1)
assert.NoError(t, err)
assert.EqualValues(t, 1, res)

syncData(a, b)

// increment from another replica
res, err = counterB.Increment(ctx, 1)
assert.NoError(t, err)
assert.EqualValues(t, 2, res)

syncData(a, b)

// this increment does not account data from other replica because current value is cached
res, err = counterA.Increment(ctx, 1)
assert.NoError(t, err)
assert.EqualValues(t, 2, res)

time.Sleep(100 * time.Millisecond)

// after wait we increment, and we get actual value
res, err = counterA.Increment(ctx, 1)
assert.NoError(t, err)
assert.EqualValues(t, 4, res)
}
}
39 changes: 29 additions & 10 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/drpcorg/chotki/protocol"
"github.com/drpcorg/chotki/rdx"
"github.com/drpcorg/chotki/utils"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/puzpuzpuz/xsync/v3"
)
Expand Down Expand Up @@ -84,6 +85,7 @@ type Options struct {
ReadMaxBufferSize int
TcpReadBufferSize int
TcpWriteBufferSize int
CounterCacheSize int

TlsConfig *tls.Config
}
Expand All @@ -93,6 +95,10 @@ func (o *Options) SetDefaults() {
o.MaxLogLen = 1 << 23
}

if o.CounterCacheSize == 0 {
o.CounterCacheSize = 1000
}

if o.PingPeriod == 0 {
o.PingPeriod = 30 * time.Second
}
Expand Down Expand Up @@ -152,12 +158,13 @@ type Chotki struct {
src uint64
clock rdx.Clock

lock sync.Mutex
db *pebble.DB
net *protocol.Net
dir string
opts Options
log utils.Logger
lock sync.Mutex
db *pebble.DB
net *protocol.Net
dir string
opts Options
log utils.Logger
counterCache *lru.Cache[rdx.ID, *AtomicCounter]

outq *xsync.MapOf[string, protocol.DrainCloser] // queues to broadcast all new packets
syncs *xsync.MapOf[rdx.ID, *pebble.Batch]
Expand Down Expand Up @@ -205,6 +212,10 @@ func Open(dirname string, opts Options) (*Chotki, error) {
return nil, err
}

lruCache, err := lru.New[rdx.ID, *AtomicCounter](opts.CounterCacheSize)
if err != nil {
return nil, err
}
cho := Chotki{
db: db,
src: opts.Src,
Expand All @@ -213,10 +224,11 @@ func Open(dirname string, opts Options) (*Chotki, error) {
opts: opts,
clock: &rdx.LocalLogicalClock{Source: opts.Src},

outq: xsync.NewMapOf[string, protocol.DrainCloser](),
syncs: xsync.NewMapOf[rdx.ID, *pebble.Batch](),
hooks: xsync.NewMapOf[rdx.ID, []Hook](),
types: xsync.NewMapOf[rdx.ID, Fields](),
outq: xsync.NewMapOf[string, protocol.DrainCloser](),
syncs: xsync.NewMapOf[rdx.ID, *pebble.Batch](),
hooks: xsync.NewMapOf[rdx.ID, []Hook](),
types: xsync.NewMapOf[rdx.ID, Fields](),
counterCache: lruCache,
}

cho.net = protocol.NewNet(cho.log,
Expand Down Expand Up @@ -314,6 +326,13 @@ func (cho *Chotki) Close() error {
return nil
}

func (cho *Chotki) Counter(rid rdx.ID, offset uint64, updatePeriod time.Duration) *AtomicCounter {
counter, _, _ := cho.counterCache.PeekOrAdd(rid.ToOff(offset),
NewAtomicCounter(cho, rid, offset, updatePeriod))
cho.counterCache.Get(rid.ToOff(offset))
return counter
}

func (cho *Chotki) KeepAliveLoop() {
var err error
for err == nil {
Expand Down
Loading

0 comments on commit 1893cea

Please sign in to comment.