Skip to content

Commit

Permalink
Merge pull request #3 from drpcorg/nikolay/setup-poc
Browse files Browse the repository at this point in the history
Setup PoC
  • Loading branch information
learn-decentralized-systems authored Apr 18, 2024
2 parents c6e637b + 80130b6 commit a9baa17
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 160 deletions.
224 changes: 130 additions & 94 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"encoding/binary"
"errors"
"fmt"
"net"
"os"
"sync"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
"github.com/drpcorg/chotki/rdx"
"github.com/learn-decentralized-systems/toyqueue"
"github.com/learn-decentralized-systems/toytlv"
"net"
"os"
"sync"
)

type Packet []byte
Expand All @@ -19,10 +21,24 @@ type Packet []byte
type Batch [][]byte

type Options struct {
pebble.Options

Orig uint64
Name string
RelaxedOrder bool
MaxLogLen int64
}

func (o *Options) SetDefaults() {
if o.MaxLogLen == 0 {
o.MaxLogLen = 1 << 23
}

if o.Merger == nil {
o.Merger = &pebble.Merger{Name: "CRDT", Merge: merger}
}
}

type Hook func(cho *Chotki, id rdx.ID) error

type CallHook struct {
Expand Down Expand Up @@ -53,9 +69,28 @@ type Chotki struct {
types map[rdx.ID]Fields
}

var ErrCausalityBroken = errors.New("order fail: refs an unknown op")
var ErrOutOfOrder = errors.New("order fail: sequence gap")
var ErrNotImplemented = errors.New("not implemented yet")
var (
ErrDbClosed = errors.New("chotki: db is closed")
ErrDirnameIsFile = errors.New("chotki: the dirname is file")
ErrNotImplemented = errors.New("chotki: not implemented yet")

ErrHookNotFound = errors.New("chotki: hook not found")
ErrBadIRecord = errors.New("chotki: bad id-ref record")
ErrBadHPacket = errors.New("chotki: bad handshake packet")
ErrBadEPacket = errors.New("chotki: bad E packet")
ErrBadVPacket = errors.New("chotki: bad V packet")
ErrBadYPacket = errors.New("chotki: bad Y packet")
ErrBadLPacket = errors.New("chotki: bad L packet")
ErrBadTPacket = errors.New("chotki: bad T packet")
ErrBadOPacket = errors.New("chotki: bad O packet")
ErrSrcUnknown = errors.New("chotki: source unknown")
ErrSyncUnknown = errors.New("chotki: sync session unknown")
ErrBadRRecord = errors.New("chotki: bad ref record")
ErrClosed = errors.New("chotki: no replica open")

ErrOutOfOrder = errors.New("chotki: order fail: sequence gap")
ErrCausalityBroken = errors.New("chotki: order fail: refs an unknown op")
)

func OKey(id rdx.ID, rdt byte) (key []byte) {
var ret = [16]byte{'O'}
Expand Down Expand Up @@ -99,14 +134,6 @@ func ReplicaDirName(rno uint64) string {
return fmt.Sprintf("cho%x", rno)
}

var ErrAlreadyOpen = errors.New("the db is already open")

func (o *Options) SetDefaults() {
if o.MaxLogLen == 0 {
o.MaxLogLen = 1 << 23
}
}

func merger(key, value []byte) (pebble.ValueMerger, error) {
/*if len(key) != 10 {
return nil, nil
Expand All @@ -120,84 +147,102 @@ func merger(key, value []byte) (pebble.ValueMerger, error) {
return &pma, nil
}

// Create a replica. orig=0 for read-only replicas.
func (cho *Chotki) Create(orig uint64, name string) (err error) {
opts := pebble.Options{
ErrorIfExists: true,
ErrorIfNotExists: false,
Merger: &pebble.Merger{
Name: "CRDT",
Merge: merger,
}}
cho.opts.SetDefaults() // todo param
path := ReplicaDirName(orig)
cho.db, err = pebble.Open(path, &opts)
func Exists(dirname string) (bool, error) {
stats, err := os.Stat(dirname)
if err != nil {
return
if os.IsNotExist(err) {
return false, nil
}

return false, err
}
var _0 rdx.ID
id0 := rdx.IDFromSrcSeqOff(orig, 0, 0)
rec0 := toytlv.Concat(
toytlv.Record('Y',
toytlv.Record('I', id0.ZipBytes()),
toytlv.Record('R', _0.ZipBytes()),
toytlv.Record('S', rdx.Stlv(name)),
),
)
init := append(Log0, rec0)
err = cho.Drain(init)

if !stats.IsDir() {
return false, ErrDirnameIsFile
}

desc, err := pebble.Peek(dirname, vfs.Default)
if err != nil {
return
return false, err
}
_ = cho.Close()
return cho.Open(orig)
}

// Open a replica. orig=0 for read-only replicas.
func (cho *Chotki) Open(orig uint64) (err error) {
cho.src = orig
opts := pebble.Options{
ErrorIfNotExists: true,
Merger: &pebble.Merger{
Name: "CRDT",
Merge: merger,
}}
cho.opts.SetDefaults() // todo param
path := ReplicaDirName(orig)
cho.db, err = pebble.Open(path, &opts)

return desc.Exists, nil
}

func Open(dirname string, opts Options) (*Chotki, error) {
exists, err := Exists(dirname)
if err != nil {
return
return nil, err
}
cho.dir = path

opts.SetDefaults() // todo param

db, err := pebble.Open(dirname, &opts.Options)
if err != nil {
_ = cho.db.Close()
return err
return nil, err
}

conn := Chotki{
db: db,
src: opts.Orig,
dir: dirname,
opts: opts,
types: make(map[rdx.ID]Fields),
hooks: make(map[rdx.ID][]Hook),
syncs: make(map[rdx.ID]*pebble.Batch),
outq: make(map[string]toyqueue.DrainCloser),
}

if !exists {
id0 := rdx.IDFromSrcSeqOff(opts.Orig, 0, 0)

init := append(toyqueue.Records(nil), Log0...)
init = append(init, toytlv.Record('Y',
toytlv.Record('I', id0.ZipBytes()),
toytlv.Record('R', rdx.ID0.ZipBytes()),
toytlv.Record('S', rdx.Stlv(opts.Name)),
))

if err = conn.Drain(init); err != nil {
return nil, err
}
}
cho.types = make(map[rdx.ID]Fields)
cho.syncs = make(map[rdx.ID]*pebble.Batch)
cho.hooks = make(map[rdx.ID][]Hook)
var vv rdx.VV
vv, err = cho.VersionVector()

vv, err := conn.VersionVector()
if err != nil {
return
return nil, err
}
cho.last = vv.GetID(cho.src)
cho.outq = make(map[string]toyqueue.DrainCloser)
// repl.last = repl.heads.GetID(orig) todo root VV
return

conn.last = vv.GetID(conn.src)

return &conn, nil
}

func (cho *Chotki) OpenTCP(tcp *toytlv.TCPDepot) {
func (cho *Chotki) OpenTCP(tcp *toytlv.TCPDepot) error {
if cho.db == nil {
return ErrDbClosed
}

tcp.Open(func(conn net.Conn) toyqueue.FeedDrainCloser {
return &Syncer{Host: cho, Name: conn.RemoteAddr().String(), Mode: SyncRWLive}
})

return nil
}

func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) {
cho.OpenTCP(tcp)
func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) error {
if cho.db == nil {
return ErrDbClosed
}

if err := cho.OpenTCP(tcp); err != nil {
return err
}
// ...
io := pebble.IterOptions{}
i := cho.db.NewIter(&io)
defer i.Close()

for i.SeekGE([]byte{'l'}); i.Valid() && i.Key()[0] == 'L'; i.Next() {
address := string(i.Key()[1:])
err := tcp.Listen(address)
Expand All @@ -212,7 +257,7 @@ func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) {
_, _ = fmt.Fprintln(os.Stderr, err.Error())
}
}
_ = i.Close()
return nil
}

func (cho *Chotki) AddPacketHose(name string) (feed toyqueue.FeedCloser) {
Expand Down Expand Up @@ -326,10 +371,14 @@ func (cho *Chotki) Drain(recs toyqueue.Records) (err error) {
default:
return fmt.Errorf("unsupported packet type %c", lit)
}

if !yvh && err == nil {
_ = cho.db.Apply(&pb, &WriteOptions)
if err := cho.db.Apply(&pb, &WriteOptions); err != nil {
return err
}
}
}

if err != nil { // fixme separate packets
return
}
Expand All @@ -356,21 +405,7 @@ func (cho *Chotki) VersionVector() (vv rdx.VV, err error) {
return
}

var WriteOptions = pebble.WriteOptions{Sync: false}

var ErrBadIRecord = errors.New("bad id-ref record")

var ErrBadHPacket = errors.New("bad handshake packet")
var ErrBadEPacket = errors.New("bad E packet")
var ErrBadVPacket = errors.New("bad V packet")
var ErrBadYPacket = errors.New("bad Y packet")
var ErrBadLPacket = errors.New("bad L packet")
var ErrBadTPacket = errors.New("bad T packet")
var ErrBadOPacket = errors.New("bad O packet")
var ErrSrcUnknown = errors.New("source unknown")
var ErrSyncUnknown = errors.New("sync session unknown")
var ErrBadRRecord = errors.New("bad ref record")
var ErrClosed = errors.New("no replica open")
var WriteOptions = pebble.WriteOptions{Sync: true}

var KeyLogLen = []byte("Mloglen")

Expand All @@ -388,7 +423,9 @@ func (cho *Chotki) Close() error {
cho.lock.Unlock()
return ErrClosed
}
_ = cho.db.Close()
if err := cho.db.Close(); err != nil {
return err
}
cho.db = nil
// todo
cho.last = rdx.ID0
Expand Down Expand Up @@ -437,10 +474,11 @@ func (cho *Chotki) ObjectIterator(oid rdx.ID) *pebble.Iterator {
if it.SeekGE(fro) {
id, rdt := OKeyIdRdt(it.Key())
if rdt == 'O' && id == oid {
// An iterator is returned from a function, it cannot be closed
return it
}
}
_ = it.Close()
it.Close()
return nil
}

Expand All @@ -454,14 +492,14 @@ func GetFieldTLV(reader pebble.Reader, id rdx.ID) (rdt byte, tlv []byte) {
LowerBound: []byte{'O'},
UpperBound: []byte{'P'},
})
defer it.Close()
if it.SeekGE(key) {
fact, r := OKeyIdRdt(it.Key())
if fact == id {
tlv = it.Value()
rdt = r
}
}
_ = it.Close()
return
}

Expand All @@ -479,8 +517,6 @@ func (cho *Chotki) RemoveAllHooks(fid rdx.ID) {
cho.hlock.Unlock()
}

var ErrHookNotFound = errors.New("hook not found")

func (cho *Chotki) RemoveHook(fid rdx.ID, hook Hook) (err error) {
cho.hlock.Lock()
list := cho.hooks[fid]
Expand Down
Loading

0 comments on commit a9baa17

Please sign in to comment.