Skip to content

Commit

Permalink
Use generic method for open db
Browse files Browse the repository at this point in the history
  • Loading branch information
mrdimidium committed Apr 16, 2024
1 parent 1520713 commit bd5d449
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 115 deletions.
196 changes: 113 additions & 83 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"encoding/binary"
"errors"
"fmt"
"net"
"os"
"sync"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
"github.com/drpcorg/chotki/rdx"
"github.com/learn-decentralized-systems/toyqueue"
"github.com/learn-decentralized-systems/toytlv"
"net"
"os"
"sync"
)

type Packet []byte
Expand Down Expand Up @@ -53,9 +55,28 @@ type Chotki struct {
types map[rdx.ID]Fields
}

var ErrCausalityBroken = errors.New("order fail: refs an unknown op")
var ErrOutOfOrder = errors.New("order fail: sequence gap")
var ErrNotImplemented = errors.New("not implemented yet")
var (
ErrDbClosed = errors.New("chotki: db is closed")
ErrDirnameIsFile = errors.New("chotki: the dirname is file")
ErrNotImplemented = errors.New("chotki: not implemented yet")

ErrHookNotFound = errors.New("chotki: hook not found")
ErrBadIRecord = errors.New("chotki: bad id-ref record")
ErrBadHPacket = errors.New("chotki: bad handshake packet")
ErrBadEPacket = errors.New("chotki: bad E packet")
ErrBadVPacket = errors.New("chotki: bad V packet")
ErrBadYPacket = errors.New("chotki: bad Y packet")
ErrBadLPacket = errors.New("chotki: bad L packet")
ErrBadTPacket = errors.New("chotki: bad T packet")
ErrBadOPacket = errors.New("chotki: bad O packet")
ErrSrcUnknown = errors.New("chotki: source unknown")
ErrSyncUnknown = errors.New("chotki: sync session unknown")
ErrBadRRecord = errors.New("chotki: bad ref record")
ErrClosed = errors.New("chotki: no replica open")

ErrOutOfOrder = errors.New("chotki: order fail: sequence gap")
ErrCausalityBroken = errors.New("chotki: order fail: refs an unknown op")
)

func OKey(id rdx.ID, rdt byte) (key []byte) {
var ret = [16]byte{'O'}
Expand Down Expand Up @@ -99,8 +120,6 @@ func ReplicaDirName(rno uint64) string {
return fmt.Sprintf("cho%x", rno)
}

var ErrAlreadyOpen = errors.New("the db is already open")

func (o *Options) SetDefaults() {
if o.MaxLogLen == 0 {
o.MaxLogLen = 1 << 23
Expand All @@ -120,81 +139,99 @@ func merger(key, value []byte) (pebble.ValueMerger, error) {
return &pma, nil
}

// Create a replica. orig=0 for read-only replicas.
func (cho *Chotki) Create(orig uint64, name string) (err error) {
opts := pebble.Options{
ErrorIfExists: true,
ErrorIfNotExists: false,
Merger: &pebble.Merger{
Name: "CRDT",
Merge: merger,
}}
cho.opts.SetDefaults() // todo param
path := ReplicaDirName(orig)
cho.db, err = pebble.Open(path, &opts)
func Exists(dirname string) (bool, error) {
stats, err := os.Stat(dirname)
if err != nil {
return
if os.IsNotExist(err) {
return false, nil
}

return false, err
}
var _0 rdx.ID
id0 := rdx.IDFromSrcSeqOff(orig, 0, 0)
rec0 := toytlv.Concat(
toytlv.Record('Y',
toytlv.Record('I', id0.ZipBytes()),
toytlv.Record('R', _0.ZipBytes()),
toytlv.Record('S', rdx.Stlv(name)),
),
)
init := append(Log0, rec0)
err = cho.Drain(init)

if !stats.IsDir() {
return false, ErrDirnameIsFile
}

desc, err := pebble.Peek(dirname, vfs.Default)
if err != nil {
return
return false, err
}
_ = cho.Close()
return cho.Open(orig)
}

// Open a replica. orig=0 for read-only replicas.
func (cho *Chotki) Open(orig uint64) (err error) {
cho.src = orig
opts := pebble.Options{
ErrorIfNotExists: true,
Merger: &pebble.Merger{
Name: "CRDT",
Merge: merger,
}}
cho.opts.SetDefaults() // todo param
path := ReplicaDirName(orig)
cho.db, err = pebble.Open(path, &opts)

return desc.Exists, nil
}

func Open(orig uint64, name, dirname string) (*Chotki, bool, error) {
exists, err := Exists(dirname)
if err != nil {
return
return nil, false, err
}
cho.dir = path

db, err := pebble.Open(dirname, &pebble.Options{
ErrorIfExists: false,
ErrorIfNotExists: false,
Merger: &pebble.Merger{Name: "CRDT", Merge: merger},
})
if err != nil {
_ = cho.db.Close()
return err
return nil, exists, err
}
cho.types = make(map[rdx.ID]Fields)
cho.syncs = make(map[rdx.ID]*pebble.Batch)
cho.hooks = make(map[rdx.ID][]Hook)
var vv rdx.VV
vv, err = cho.VersionVector()

conn := Chotki{
db: db,
src: orig,
dir: dirname,
types: make(map[rdx.ID]Fields),
hooks: make(map[rdx.ID][]Hook),
syncs: make(map[rdx.ID]*pebble.Batch),
outq: make(map[string]toyqueue.DrainCloser),
}
conn.opts.SetDefaults() // todo param

if !exists {
id0 := rdx.IDFromSrcSeqOff(orig, 0, 0)

init := append(toyqueue.Records(nil), Log0...)
init = append(init, toytlv.Record('Y',
toytlv.Record('I', id0.ZipBytes()),
toytlv.Record('R', rdx.ID0.ZipBytes()),
toytlv.Record('S', rdx.Stlv(name)),
))

if err = conn.Drain(init); err != nil {
return nil, exists, err
}
}

vv, err := conn.VersionVector()
if err != nil {
return
return nil, exists, err
}
cho.last = vv.GetID(cho.src)
cho.outq = make(map[string]toyqueue.DrainCloser)
// repl.last = repl.heads.GetID(orig) todo root VV
return

conn.last = vv.GetID(conn.src)

return &conn, exists, nil
}

func (cho *Chotki) OpenTCP(tcp *toytlv.TCPDepot) {
func (cho *Chotki) OpenTCP(tcp *toytlv.TCPDepot) error {
if cho.db == nil {
return ErrDbClosed
}

tcp.Open(func(conn net.Conn) toyqueue.FeedDrainCloser {
return &Syncer{Host: cho, Name: conn.RemoteAddr().String(), Mode: SyncRWLive}
})

return nil
}

func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) {
cho.OpenTCP(tcp)
func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) error {
if cho.db == nil {
return ErrDbClosed
}

if err := cho.OpenTCP(tcp); err != nil {
return err
}
// ...
io := pebble.IterOptions{}
i := cho.db.NewIter(&io)
Expand All @@ -214,6 +251,7 @@ func (cho *Chotki) ReOpenTCP(tcp *toytlv.TCPDepot) {
_, _ = fmt.Fprintln(os.Stderr, err.Error())
}
}
return nil
}

func (cho *Chotki) AddPacketHose(name string) (feed toyqueue.FeedCloser) {
Expand Down Expand Up @@ -327,10 +365,18 @@ func (cho *Chotki) Drain(recs toyqueue.Records) (err error) {
default:
return fmt.Errorf("unsupported packet type %c", lit)
}

if !yvh && err == nil {
_ = cho.db.Apply(&pb, &WriteOptions)
if err := cho.db.Apply(&pb, &WriteOptions); err != nil {
return err
}
}
}

if err := cho.db.Flush(); err != nil {
return err
}

if err != nil { // fixme separate packets
return
}
Expand Down Expand Up @@ -359,20 +405,6 @@ func (cho *Chotki) VersionVector() (vv rdx.VV, err error) {

var WriteOptions = pebble.WriteOptions{Sync: false}

var ErrBadIRecord = errors.New("bad id-ref record")

var ErrBadHPacket = errors.New("bad handshake packet")
var ErrBadEPacket = errors.New("bad E packet")
var ErrBadVPacket = errors.New("bad V packet")
var ErrBadYPacket = errors.New("bad Y packet")
var ErrBadLPacket = errors.New("bad L packet")
var ErrBadTPacket = errors.New("bad T packet")
var ErrBadOPacket = errors.New("bad O packet")
var ErrSrcUnknown = errors.New("source unknown")
var ErrSyncUnknown = errors.New("sync session unknown")
var ErrBadRRecord = errors.New("bad ref record")
var ErrClosed = errors.New("no replica open")

var KeyLogLen = []byte("Mloglen")

func (cho *Chotki) Last() rdx.ID {
Expand Down Expand Up @@ -483,8 +515,6 @@ func (cho *Chotki) RemoveAllHooks(fid rdx.ID) {
cho.hlock.Unlock()
}

var ErrHookNotFound = errors.New("hook not found")

func (cho *Chotki) RemoveHook(fid rdx.ID, hook Hook) (err error) {
cho.hlock.Lock()
list := cho.hooks[fid]
Expand Down
25 changes: 14 additions & 11 deletions chotki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,34 @@ func TestChotki_Debug(t *testing.T) {
}

func TestChotki_Create(t *testing.T) {
_ = os.RemoveAll("cho1a")
var a Chotki
err := a.Create(0x1a, "test replica")
dirname := ReplicaDirName(0x1a)
_ = os.RemoveAll(dirname)
a, exists, err := Open(0x1a, "test replica", dirname)
assert.Nil(t, err)
assert.Equal(t, exists, false)
//a.DumpAll()
_ = a.Close()
_ = os.RemoveAll("cho1a")
_ = os.RemoveAll(dirname)
}

type KVMerger interface {
Merge(key, value []byte) error
}

func TestChotki_Sync(t *testing.T) {
_ = os.RemoveAll("choa")
_ = os.RemoveAll("chob")
var a, b Chotki
err := a.Create(0xa, "test replica A")
adir, bdir := ReplicaDirName(0xa), ReplicaDirName(0xb)
_ = os.RemoveAll(adir)
_ = os.RemoveAll(bdir)

a, _, err := Open(0xa, "test replica A", adir)
assert.Nil(t, err)
//a.DumpAll()
err = b.Create(0xb, "test replica B")

b, _, err := Open(0xb, "test replica B", bdir)
assert.Nil(t, err)

synca := Syncer{Host: &a, Mode: SyncRW, Name: "a"}
syncb := Syncer{Host: &b, Mode: SyncRW, Name: "b"}
synca := Syncer{Host: a, Mode: SyncRW, Name: "a"}
syncb := Syncer{Host: b, Mode: SyncRW, Name: "b"}
err = toyqueue.Relay(&syncb, &synca)
assert.Nil(t, err)
err = toyqueue.Pump(&synca, &syncb)
Expand Down
14 changes: 7 additions & 7 deletions object_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
)

func TestORMExample(t *testing.T) {
edir, fdir := ReplicaDirName(0x1e), ReplicaDirName(0x1f)
_ = os.RemoveAll("cho1e")
_ = os.RemoveAll("cho1f")
defer func() {
_ = os.RemoveAll("cho1e")
_ = os.RemoveAll("cho1f")
}()

var a, b Chotki
err := a.Create(0x1e, "test replica")
a, _, err := Open(0x1e, "test replica", edir)
assert.Nil(t, err)
var tid, oid rdx.ID
tid, err = a.NewClass(rdx.ID0,
Expand All @@ -35,9 +35,9 @@ func TestORMExample(t *testing.T) {

err = a.Close()
assert.Nil(t, err)
err = a.Open(0x1e)

a, _, err = Open(0x1e, "test replica", edir)
assert.Nil(t, err)
//a.DumpAll()

var exa Example
ita := a.ObjectIterator(rdx.IDFromString("1e-2"))
Expand All @@ -50,11 +50,11 @@ func TestORMExample(t *testing.T) {
exa.Score = 103
// todo save the object

err = b.Create(0x1f, "another test replica")
b, _, err := Open(0x1f, "another test replica", fdir)
assert.Nil(t, err)

syncera := Syncer{Host: &a, Mode: SyncRW}
syncerb := Syncer{Host: &b, Mode: SyncRW}
syncera := Syncer{Host: a, Mode: SyncRW}
syncerb := Syncer{Host: b, Mode: SyncRW}
err = toyqueue.Relay(&syncerb, &syncera)
assert.Nil(t, err)
err = toyqueue.Pump(&syncera, &syncerb)
Expand Down
7 changes: 4 additions & 3 deletions objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
)

func TestTypes(t *testing.T) {
_ = os.RemoveAll("cho1a")
var a Chotki
err := a.Create(0x1a, "test replica A")
adir := ReplicaDirName(0x1a)

_ = os.RemoveAll(adir)
a, _, err := Open(0x1a, "test replica A", adir)
assert.Nil(t, err)

var tid, oid rdx.ID
Expand Down
Loading

0 comments on commit bd5d449

Please sign in to comment.