Skip to content

Commit

Permalink
ping-pong, 1K events/second locally (read/write)
Browse files Browse the repository at this point in the history
  • Loading branch information
gritzko committed Apr 8, 2024
1 parent c31e7f3 commit 496987e
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 33 deletions.
37 changes: 15 additions & 22 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Options struct {
MaxLogLen int64
}

type Hook func(id rdx.ID) error
type Hook func(cho *Chotki, id rdx.ID) error

type CallHook struct {
hook Hook
Expand Down Expand Up @@ -336,24 +336,14 @@ func (cho *Chotki) Drain(recs toyqueue.Records) (err error) {
}

if len(calls) > 0 {
go cho.fireCalls(calls)
for _, call := range calls {
go call.hook(cho, call.id)
}
}

return
}

func (cho *Chotki) fireCalls(calls []CallHook) {
cho.hlock.Lock()
for i := 0; i < len(calls); i++ {
err := calls[i].hook(calls[i].id)
if err != nil {
calls[i] = calls[len(calls)-1]
calls = calls[:len(calls)-1]
}
}
cho.hlock.Unlock()
}

func (cho *Chotki) VersionVector() (vv rdx.VV, err error) {
key0 := VKey(rdx.ID0)
val, clo, err := cho.db.Get(key0)
Expand Down Expand Up @@ -477,21 +467,24 @@ func (cho *Chotki) AddHook(fid rdx.ID, hook Hook) {
cho.hlock.Unlock()
}

func (cho *Chotki) RemoveAllHooks(fid rdx.ID) {
cho.hlock.Lock()
delete(cho.hooks, fid)
cho.hlock.Unlock()
}

var ErrHookNotFound = errors.New("hook not found")

func (cho *Chotki) RemoveHook(fid rdx.ID, hook Hook) (err error) {
cho.hlock.Lock()
list := cho.hooks[fid]
i := 0
for i < len(list) {
if &list[i] == &hook { // todo ?
list[i] = list[len(list)-1]
list = list[:len(list)-1]
break
new_list := make([]Hook, 0, len(list))
for _, h := range list {
if &h != &hook {
new_list = append(new_list, h)
}
i++
}
if i == len(list) {
if len(new_list) == len(list) {
err = ErrHookNotFound
}
cho.hooks[fid] = list
Expand Down
2 changes: 2 additions & 0 deletions packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ func (cho *Chotki) ApplyE(id, r rdx.ID, body []byte, batch *pebble.Batch, calls
fkey,
rebar,
&WriteOptions)
cho.hlock.Lock()
hook, ok := cho.hooks[fid]
cho.hlock.Unlock()
if ok {
for _, h := range hook {
(*calls) = append((*calls), CallHook{h, fid})
Expand Down
73 changes: 63 additions & 10 deletions repl/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,33 @@ func (repl *REPL) CommandPing(arg *rdx.RDX) (id rdx.ID, err error) {
}

var HelpPinc = errors.New("pinc b0b-12-2 // N field id")
var ErrBadField = errors.New("bad field")

func KeepOddEven(oddeven uint64, cho *chotki.Chotki, fid rdx.ID) error {
rdt, tlv, err := cho.ObjectFieldTLV(fid)
if err != nil || rdt != rdx.Natural {
return ErrBadField
}
src := cho.Source()
mine := rdx.Nmine(tlv, src)
sum := rdx.Nnative(tlv)
if (sum & 1) != oddeven {
tlvs := toyqueue.Records{
toytlv.Record('F', rdx.ZipUint64(fid.Off())),
toytlv.Record(rdx.Natural, toytlv.Record(rdx.Term, rdx.ZipUint64Pair(mine+1, src))),
}
_, err = cho.CommitPacket('E', fid.ZeroOff(), tlvs)
}
return err
}

func KeepOdd(cho *chotki.Chotki, fid rdx.ID) error {
return KeepOddEven(1, cho, fid)
}

func KeepEven(cho *chotki.Chotki, fid rdx.ID) error {
return KeepOddEven(0, cho, fid)
}

func (repl *REPL) CommandPinc(arg *rdx.RDX) (id rdx.ID, err error) {
id, err = rdx.BadId, HelpPinc
Expand All @@ -364,21 +391,47 @@ func (repl *REPL) CommandPinc(arg *rdx.RDX) (id rdx.ID, err error) {
if fid.Off() == 0 {
return
}
rdt, tlv, err := repl.Host.ObjectFieldTLV(fid)
if err != nil || rdt != rdx.Natural {
err = KeepOdd(&repl.Host, fid)
if err != nil {
return
}
src := repl.Host.Source()
mine := rdx.Nmine(tlv, src)
tlvs := toyqueue.Records{
toytlv.Record('F', rdx.ZipUint64(uint64(fid.Off()))),
toytlv.Record(rdx.Natural, toytlv.Record(rdx.Term, rdx.ZipUint64Pair(mine+1, src))),
}
id, err = repl.Host.CommitPacket('E', fid.ZeroOff(), tlvs)
repl.Host.AddHook(fid, KeepOdd)
id = fid
err = nil
return
}

func (repl *REPL) CommandPonc(arg *rdx.RDX) (id rdx.ID, err error) {
id, err = rdx.BadId, HelpPinc
if arg == nil || arg.RdxType != rdx.Reference {
return
}
fid := rdx.IDFromText(arg.Text)
if fid.Off() == 0 {
return
}
err = KeepEven(&repl.Host, fid)
if err != nil {
return
}
repl.Host.AddHook(fid, KeepEven)
id = fid
err = nil
return
}

func (repl *REPL) CommandMute(arg *rdx.RDX) (id rdx.ID, err error) {
id, err = rdx.BadId, HelpPinc
if arg == nil || arg.RdxType != rdx.Reference {
return
}
fid := rdx.IDFromText(arg.Text)
if fid.Off() == 0 {
return
}
repl.Host.RemoveAllHooks(fid)
id = rdx.ID0
err = nil
return
}

Expand All @@ -395,7 +448,7 @@ func (repl *REPL) CommandTell(arg *rdx.RDX) (id rdx.ID, err error) {
return
} else if arg.RdxType == rdx.Reference {
id = rdx.IDFromText(arg.Text)
repl.Host.AddHook(id, func(id rdx.ID) error {
repl.Host.AddHook(id, func(cho *chotki.Chotki, id rdx.ID) error {
fmt.Println("field changed")
return nil
})
Expand Down
4 changes: 3 additions & 1 deletion repl/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ func (repl *REPL) REPL() (id rdx.ID, err error) {
case "pinc":
id, err = repl.CommandPinc(arg)
case "ponc":
id, err = repl.CommandPinc(arg)
id, err = repl.CommandPonc(arg)
case "mute":
id, err = repl.CommandMute(arg)
case "tic":
id, err = repl.CommandTic(arg)
default:
Expand Down

0 comments on commit 496987e

Please sign in to comment.