Skip to content

Commit

Permalink
refactor store api.
Browse files Browse the repository at this point in the history
  • Loading branch information
jolestar committed Aug 26, 2016
1 parent 76910dd commit b8cc2f5
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 55 deletions.
4 changes: 2 additions & 2 deletions backends/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bo
time.Sleep(time.Duration(1000) * time.Millisecond)
continue
}
store.SetBulk("/", val)
store.PutBulk("/", val)
inited = true
}

Expand All @@ -267,7 +267,7 @@ func processSyncChange(prefix string, store store.Store, resp *client.Response)
case "delete":
store.Delete(nodePath)
default:
store.Set(nodePath, false, resp.Node.Value)
store.Put(nodePath, resp.Node.Value)
}
}

Expand Down
6 changes: 3 additions & 3 deletions backends/etcdv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bo
time.Sleep(time.Duration(1000) * time.Millisecond)
continue
}
store.SetBulk("/", val)
store.PutBulk("/", val)
inited = true
}
for resp := range watchChan {
Expand All @@ -224,12 +224,12 @@ func processSyncChange(prefix string, store store.Store, resp *client.WatchRespo
log.Debug("process sync change, event_type: %s, prefix: %v, nodePath:%v, value: %v ", event.Type, prefix, nodePath, value)
switch event.Type {
case mvccpb.PUT:
store.Set(nodePath, false, value)
store.Put(nodePath, value)
case mvccpb.DELETE:
store.Delete(nodePath)
default:
log.Warning("Unknow watch event type: %s ", event.Type)
store.Set(nodePath, false, value)
store.Put(nodePath, value)

}
}
Expand Down
65 changes: 30 additions & 35 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package store

import (
"fmt"
"github.com/yunify/metadata-proxy/util"
"github.com/yunify/metadata-proxy/util/flatmap"
"path"
"reflect"
"strings"
"sync"
)

type Store interface {
// Get
// return a string (nodePath is a leaf node) or
// a map[string]interface{} (nodePath is dir)
Get(nodePath string) (interface{}, bool)
Set(nodePath string, dir bool, value string)
Sets(nodePath string, values map[string]interface{})
// Put value can be a map[string]interface{} or string
Put(nodePath string, value interface{})
Delete(nodePath string)
SetBulk(nodePath string, value map[string]string)
// PutBulk value should be a flatmap
PutBulk(nodePath string, value map[string]string)
}

type store struct {
Expand Down Expand Up @@ -48,35 +54,27 @@ func (s *store) Get(nodePath string) (interface{}, bool) {
return nil, false
}

// Set creates or replace the node at nodePath, return old value
func (s *store) Set(nodePath string, dir bool, value string) {
// Put creates or update the node at nodePath, value should a map[string]interface{} or a string
func (s *store) Put(nodePath string, value interface{}) {
nodePath = path.Clean(path.Join("/", nodePath))

s.worldLock.Lock()
defer s.worldLock.Unlock()
s.internalSet(nodePath, dir, value)
}

func (s *store) SetBulk(nodePath string, values map[string]string) {
s.worldLock.Lock()
defer s.worldLock.Unlock()
s.internalSetBulk(nodePath, values)
}

func (s *store) internalSetBulk(nodePath string, values map[string]string) {
for k, v := range values {
key := util.AppendPathPrefix(k, nodePath)
s.internalSet(key, false, v)
switch t := value.(type) {
case map[string]interface{}, map[string]string, []interface{}:
flatValues := flatmap.Flatten(t)
s.internalPutBulk(nodePath, flatValues)
case string:
s.internalPut(nodePath, t)
default:
panic(fmt.Sprintf("Unsupport type: %s", reflect.TypeOf(t)))
}
}

func (s *store) Sets(nodePath string, values map[string]interface{}) {
nodePath = path.Clean(path.Join("/", nodePath))

func (s *store) PutBulk(nodePath string, values map[string]string) {
s.worldLock.Lock()
defer s.worldLock.Unlock()
flatValues := flatmap.Flatten(values)
s.internalSetBulk(nodePath, flatValues)
s.internalPutBulk(nodePath, values)
}

// Delete deletes the node at the given path.
Expand Down Expand Up @@ -120,7 +118,7 @@ func (s *store) walk(nodePath string, walkFunc func(prev *node, component string
return curr
}

func (s *store) internalSet(nodePath string, dir bool, value string) *node {
func (s *store) internalPut(nodePath string, value string) *node {

dirName, nodeName := path.Split(nodePath)

Expand All @@ -130,26 +128,23 @@ func (s *store) internalSet(nodePath string, dir bool, value string) *node {

// force will try to replace an existing file
if n != nil {
if dir {
n.AsDir()
}
n.value = value
return n
}

if !dir { // create file

n = newKV(s, nodePath, value, d)
} else { // create directory

n = newDir(s, nodePath, d)
}

n = newKV(s, nodePath, value, d)
d.Add(n)

return n
}

func (s *store) internalPutBulk(nodePath string, values map[string]string) {
for k, v := range values {
key := util.AppendPathPrefix(k, nodePath)
s.internalPut(key, v)
}
}

// InternalGet gets the node of the given nodePath.
func (s *store) internalGet(nodePath string) *node {

Expand Down
27 changes: 12 additions & 15 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestStoreBasic(t *testing.T) {
assert.False(t, ok)
assert.Nil(t, val)

store.Set("/foo", false, "bar")
store.Put("/foo", "bar")

val, ok = store.Get("/foo")
assert.True(t, ok)
Expand All @@ -30,17 +30,14 @@ func TestStoreBasic(t *testing.T) {
func TestStoreDir(t *testing.T) {
store := New()

store.Set("/foo", true, "")
store.Put("/foo/foo1", "")

store.Set("/foo/foo1", true, "")

val, ok := store.Get("/foo/foo1")
val, ok := store.Get("/foo")
assert.True(t, ok)
mapVal, mok := val.(map[string]interface{})
_, mok := val.(map[string]interface{})
assert.True(t, mok)
assert.Equal(t, 0, len(mapVal))

store.Set("/foo/foo1/key1", false, "val1")
store.Put("/foo/foo1/key1", "val1")
val, ok = store.Get("/foo/foo1/key1")
assert.True(t, ok)
assert.Equal(t, "val1", val)
Expand All @@ -62,7 +59,7 @@ func TestStoreBulk(t *testing.T) {
values[fmt.Sprintf("/clusters/%v/ip", i)] = fmt.Sprintf("192.168.0.%v", i)
values[fmt.Sprintf("/clusters/%v/name", i)] = fmt.Sprintf("cluster-%v", i)
}
store.SetBulk("/", values)
store.PutBulk("/", values)

val, ok := store.Get("/clusters/10")
assert.True(t, ok)
Expand All @@ -83,7 +80,7 @@ func TestStoreSets(t *testing.T) {
"name": fmt.Sprintf("cluster-%v", i),
}
}
store.Sets("/clusters", values)
store.Put("/clusters", values)

val, ok := store.Get("/clusters/10")
assert.True(t, ok)
Expand All @@ -97,9 +94,9 @@ func TestStoreSets(t *testing.T) {
func TestStoreNodeToDirPanic(t *testing.T) {
store := New()
// first set a node value.
store.Set("/nodes/6", false, "node6")
store.Put("/nodes/6", "node6")
// create pre node's child's child, will cause panic.
store.Set("/nodes/6/label/key1", false, "value1")
store.Put("/nodes/6/label/key1", "value1")

v, _ := store.Get("/nodes/6")
_, mok := v.(map[string]interface{})
Expand All @@ -113,8 +110,8 @@ func TestStoreRemoveEmptyParent(t *testing.T) {
store := New()

// if dir has children, dir's text value will be hidden.
store.Set("/nodes/6", false, "node6")
store.Set("/nodes/6/label/key1", false, "value1")
store.Put("/nodes/6", "node6")
store.Put("/nodes/6/label/key1", "value1")

store.Delete("/nodes/6/label/key1")

Expand All @@ -127,7 +124,7 @@ func TestStoreRemoveEmptyParent(t *testing.T) {
assert.Equal(t, "node6", v)

// when delete leaf node, empty parent dir will been auto delete.
store.Set("/nodes/7/label/key1", false, "value1")
store.Put("/nodes/7/label/key1", "value1")
store.Delete("/nodes/7/label/key1")

_, ok = store.Get("/nodes/7")
Expand Down

0 comments on commit b8cc2f5

Please sign in to comment.