Skip to content

Commit

Permalink
Merge pull request #141 from ipfs/fix/ancestor-sep
Browse files Browse the repository at this point in the history
Only count a key as an ancestor if there is a separator
  • Loading branch information
Stebalien authored Feb 11, 2020
2 parents e9f6023 + 2fe8b68 commit 83f9a95
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 108 deletions.
19 changes: 13 additions & 6 deletions key.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,20 +212,27 @@ func (k Key) ChildString(s string) Key {
// NewKey("/Comedy").IsAncestorOf("/Comedy/MontyPython")
// true
func (k Key) IsAncestorOf(other Key) bool {
if other.string == k.string {
// equivalent to HasPrefix(other, k.string + "/")

if len(other.string) <= len(k.string) {
// We're not long enough to be a child.
return false
}
return strings.HasPrefix(other.string, k.string)

if k.string == "/" {
// We're the root and the other key is longer.
return true
}

// "other" starts with /k.string/
return other.string[len(k.string)] == '/' && other.string[:len(k.string)] == k.string
}

// IsDescendantOf returns whether this key contains another as a prefix.
// NewKey("/Comedy/MontyPython").IsDescendantOf("/Comedy")
// true
func (k Key) IsDescendantOf(other Key) bool {
if other.string == k.string {
return false
}
return strings.HasPrefix(k.string, other.string)
return other.IsAncestorOf(k)
}

// IsTopLevel returns whether this key has only one namespace.
Expand Down
20 changes: 12 additions & 8 deletions key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,23 @@ func CheckTrue(c *C, cond bool) {
func (ks *KeySuite) TestKeyAncestry(c *C) {
k1 := NewKey("/A/B/C")
k2 := NewKey("/A/B/C/D")
k3 := NewKey("/AB")
k4 := NewKey("/A")

c.Check(k1.String(), Equals, "/A/B/C")
c.Check(k2.String(), Equals, "/A/B/C/D")
CheckTrue(c, k1.IsAncestorOf(k2))
CheckTrue(c, k2.IsDescendantOf(k1))
CheckTrue(c, NewKey("/A").IsAncestorOf(k2))
CheckTrue(c, NewKey("/A").IsAncestorOf(k1))
CheckTrue(c, !NewKey("/A").IsDescendantOf(k2))
CheckTrue(c, !NewKey("/A").IsDescendantOf(k1))
CheckTrue(c, k2.IsDescendantOf(NewKey("/A")))
CheckTrue(c, k1.IsDescendantOf(NewKey("/A")))
CheckTrue(c, !k2.IsAncestorOf(NewKey("/A")))
CheckTrue(c, !k1.IsAncestorOf(NewKey("/A")))
CheckTrue(c, k4.IsAncestorOf(k2))
CheckTrue(c, k4.IsAncestorOf(k1))
CheckTrue(c, !k4.IsDescendantOf(k2))
CheckTrue(c, !k4.IsDescendantOf(k1))
CheckTrue(c, !k3.IsDescendantOf(k4))
CheckTrue(c, !k4.IsAncestorOf(k3))
CheckTrue(c, k2.IsDescendantOf(k4))
CheckTrue(c, k1.IsDescendantOf(k4))
CheckTrue(c, !k2.IsAncestorOf(k4))
CheckTrue(c, !k1.IsAncestorOf(k4))
CheckTrue(c, !k2.IsAncestorOf(k2))
CheckTrue(c, !k1.IsAncestorOf(k1))
c.Check(k1.Child(NewKey("D")).String(), Equals, k2.String())
Expand Down
45 changes: 45 additions & 0 deletions mount/lookup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package mount

import (
"testing"

datastore "github.com/ipfs/go-datastore"
)

func TestLookup(t *testing.T) {
mapds0 := datastore.NewMapDatastore()
mapds1 := datastore.NewMapDatastore()
mapds2 := datastore.NewMapDatastore()
mapds3 := datastore.NewMapDatastore()
m := New([]Mount{
{Prefix: datastore.NewKey("/"), Datastore: mapds0},
{Prefix: datastore.NewKey("/foo"), Datastore: mapds1},
{Prefix: datastore.NewKey("/bar"), Datastore: mapds2},
{Prefix: datastore.NewKey("/baz"), Datastore: mapds3},
})
_, mnts, _ := m.lookupAll(datastore.NewKey("/bar"))
if len(mnts) != 1 || mnts[0] != datastore.NewKey("/bar") {
t.Errorf("expected to find the mountpoint /bar, got %v", mnts)
}

_, mnts, _ = m.lookupAll(datastore.NewKey("/fo"))
if len(mnts) != 1 || mnts[0] != datastore.NewKey("/") {
t.Errorf("expected to find the mountpoint /, got %v", mnts)
}

_, mnt, _ := m.lookup(datastore.NewKey("/fo"))
if mnt != datastore.NewKey("/") {
t.Errorf("expected to find the mountpoint /, got %v", mnt)
}

// /foo lives in /, /foo/bar lives in /foo. Most systems don't let us use the key "" or /.
_, mnt, _ = m.lookup(datastore.NewKey("/foo"))
if mnt != datastore.NewKey("/") {
t.Errorf("expected to find the mountpoint /, got %v", mnt)
}

_, mnt, _ = m.lookup(datastore.NewKey("/foo/bar"))
if mnt != datastore.NewKey("/foo") {
t.Errorf("expected to find the mountpoint /foo, got %v", mnt)
}
}
105 changes: 84 additions & 21 deletions mount/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@ var (
ErrNoMount = errors.New("no datastore mounted for this key")
)

// Mount defines a datastore mount. It mounts the given datastore at the given
// prefix.
type Mount struct {
Prefix ds.Key
Datastore ds.Datastore
}

// New creates a new mount datstore from the given mounts. See the documentation
// on Datastore for details.
//
// The order of the mounts does not matter, they will be applied most specific
// to least specific.
func New(mounts []Mount) *Datastore {
// make a copy so we're sure it doesn't mutate
m := make([]Mount, len(mounts))
Expand All @@ -31,15 +38,39 @@ func New(mounts []Mount) *Datastore {
return &Datastore{mounts: m}
}

// Datastore is a mount datastore. In this datastore, keys live under the most
// specific mounted sub-datastore. That is, given sub-datastores mounted under:
//
// * /
// * /foo
// * /foo/bar
//
// Keys would be written as follows:
//
// * /foo, /foobar, /baz would all live under /.
// * /foo/baz, /foo/bar, etc. would live under /foo.
// * /foo/bar/baz would live under /foo/bar.
//
// Additionally, even if the datastore mounted at / contains the key /foo/thing,
// the datastore mounted at /foo would mask this value in get, deletes, and
// query results.
//
// Finally, if no root (/) mount is provided, operations on keys living outside
// all of the provided mounts will behave as follows:
//
// * Get - Returns datastore.ErrNotFound.
// * Query - Returns no results.
// * Put - Returns ErrNoMount.
type Datastore struct {
mounts []Mount
}

var _ ds.Datastore = (*Datastore)(nil)

// lookup looks up the datastore in which the given key lives.
func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
for _, m := range d.mounts {
if m.Prefix.Equal(key) || m.Prefix.IsAncestorOf(key) {
if m.Prefix.IsAncestorOf(key) {
s := strings.TrimPrefix(key.String(), m.Prefix.String())
k := ds.NewKey(s)
return m.Datastore, m.Prefix, k
Expand Down Expand Up @@ -147,38 +178,58 @@ func (h *querySet) next() (query.Result, bool) {
return next, true
}

// lookupAll returns all mounts that might contain keys that are descendant of <key>
// lookupAll returns all mounts that might contain keys that are strict
// descendants of <key>. It will not return mounts that match key exactly.
//
// Specifically, this function will return three slices:
//
// * The matching datastores.
// * The prefixes where each matching datastore has been mounted.
// * The prefix within these datastores at which descendants of the passed key
// live. If the mounted datastore is fully contained within the given key,
// this will be /.
//
// By example, given the datastores:
//
// Matching: /ao/e
// * / - root
// * /foo -
// * /bar
// * /foo/bar
//
// / B /ao/e
// /a/ not matching
// /ao/ B /e
// /ao/e/ A /
// /ao/e/uh/ A /
// /aoe/ not matching
// This function function will behave as follows:
//
// * key -> ([mountpoints], [rests]) # comment
// * / -> ([/, /foo, /bar, /foo/bar], [/, /, /, /]) # all datastores
// * /foo -> ([/foo, /foo/bar], [/, /]) # all datastores under /foo
// * /foo/bar -> ([/foo/bar], [/]) # /foo/bar
// * /bar/foo -> ([/bar], [/foo]) # the datastore mounted at /bar, rest is /foo
// * /ba -> ([/], [/]) # the root; only full components are matched.
func (d *Datastore) lookupAll(key ds.Key) (dst []ds.Datastore, mountpoint, rest []ds.Key) {
for _, m := range d.mounts {
p := m.Prefix.String()
if len(p) > 1 {
p = p + "/"
}

if strings.HasPrefix(p, key.String()) {
if m.Prefix.IsDescendantOf(key) {
dst = append(dst, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, ds.NewKey("/"))
} else if strings.HasPrefix(key.String(), p) {
} else if m.Prefix.Equal(key) || m.Prefix.IsAncestorOf(key) {
r := strings.TrimPrefix(key.String(), m.Prefix.String())

dst = append(dst, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, ds.NewKey(r))

// We've found an ancestor (or equal) key. We might have
// more general datastores, but they won't contain keys
// with this prefix so there's no point in searching them.
break
}
}
return dst, mountpoint, rest
}

// Put puts the given value into the datastore at the given key.
//
// Returns ErrNoMount if there no datastores are mounted at the appropriate
// prefix for the given key.
func (d *Datastore) Put(key ds.Key, value []byte) error {
cds, _, k := d.lookup(key)
if cds == nil {
Expand All @@ -191,20 +242,17 @@ func (d *Datastore) Put(key ds.Key, value []byte) error {
func (d *Datastore) Sync(prefix ds.Key) error {
// Sync all mount points below the prefix
// Sync the mount point right at (or above) the prefix
dstores, mountPts, rest := d.lookupAll(prefix)
dstores, _, rest := d.lookupAll(prefix)
for i, suffix := range rest {
if err := dstores[i].Sync(suffix); err != nil {
return err
}

if mountPts[i].Equal(prefix) || suffix.String() != "/" {
return nil
}
}

return nil
}

// Get returns the value associated with the key from the appropriate datastore.
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
cds, _, k := d.lookup(key)
if cds == nil {
Expand All @@ -213,6 +261,8 @@ func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
return cds.Get(k)
}

// Has returns the true if there exists a value associated with key in the
// appropriate datastore.
func (d *Datastore) Has(key ds.Key) (exists bool, err error) {
cds, _, k := d.lookup(key)
if cds == nil {
Expand All @@ -221,6 +271,8 @@ func (d *Datastore) Has(key ds.Key) (exists bool, err error) {
return cds.Has(k)
}

// Get returns the size of the value associated with the key in the appropriate
// datastore.
func (d *Datastore) GetSize(key ds.Key) (size int, err error) {
cds, _, k := d.lookup(key)
if cds == nil {
Expand All @@ -229,6 +281,10 @@ func (d *Datastore) GetSize(key ds.Key) (size int, err error) {
return cds.GetSize(k)
}

// Delete deletes the value associated with the key in the appropriate
// datastore.
//
// Delete returns no error if there is no value associated with the given key.
func (d *Datastore) Delete(key ds.Key) error {
cds, _, k := d.lookup(key)
if cds == nil {
Expand All @@ -237,6 +293,11 @@ func (d *Datastore) Delete(key ds.Key) error {
return cds.Delete(k)
}

// Query queries the appropriate mounted datastores, merging the results
// according to the given orders.
//
// If a query prefix is specified, Query will avoid querying datastores mounted
// outside that prefix.
func (d *Datastore) Query(master query.Query) (query.Results, error) {
childQuery := query.Query{
Prefix: master.Prefix,
Expand Down Expand Up @@ -292,6 +353,7 @@ func (d *Datastore) Query(master query.Query) (query.Results, error) {
return qr, nil
}

// Close closes all mounted datastores.
func (d *Datastore) Close() error {
for _, d := range d.mounts {
err := d.Datastore.Close()
Expand Down Expand Up @@ -323,6 +385,7 @@ type mountBatch struct {
d *Datastore
}

// Batch returns a batch that operates over all mounted datastores.
func (d *Datastore) Batch() (ds.Batch, error) {
return &mountBatch{
mounts: make(map[string]ds.Batch),
Expand Down
Loading

0 comments on commit 83f9a95

Please sign in to comment.