diff --git a/backends/etcd/client.go b/backends/etcd/client.go index 7953ffc..5bfd296 100644 --- a/backends/etcd/client.go +++ b/backends/etcd/client.go @@ -244,7 +244,7 @@ func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bo time.Sleep(time.Duration(1000) * time.Millisecond) continue } - store.SetBulk("/", val) + store.PutBulk("/", val) inited = true } @@ -267,7 +267,7 @@ func processSyncChange(prefix string, store store.Store, resp *client.Response) case "delete": store.Delete(nodePath) default: - store.Set(nodePath, false, resp.Node.Value) + store.Put(nodePath, resp.Node.Value) } } diff --git a/backends/etcdv3/client.go b/backends/etcdv3/client.go index f02d9e7..59044ea 100644 --- a/backends/etcdv3/client.go +++ b/backends/etcdv3/client.go @@ -206,7 +206,7 @@ func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bo time.Sleep(time.Duration(1000) * time.Millisecond) continue } - store.SetBulk("/", val) + store.PutBulk("/", val) inited = true } for resp := range watchChan { @@ -224,12 +224,12 @@ func processSyncChange(prefix string, store store.Store, resp *client.WatchRespo log.Debug("process sync change, event_type: %s, prefix: %v, nodePath:%v, value: %v ", event.Type, prefix, nodePath, value) switch event.Type { case mvccpb.PUT: - store.Set(nodePath, false, value) + store.Put(nodePath, value) case mvccpb.DELETE: store.Delete(nodePath) default: log.Warning("Unknow watch event type: %s ", event.Type) - store.Set(nodePath, false, value) + store.Put(nodePath, value) } } diff --git a/store/store.go b/store/store.go index 7228146..b5113ed 100644 --- a/store/store.go +++ b/store/store.go @@ -1,19 +1,25 @@ package store import ( + "fmt" "github.com/yunify/metadata-proxy/util" "github.com/yunify/metadata-proxy/util/flatmap" "path" + "reflect" "strings" "sync" ) type Store interface { + // Get + // return a string (nodePath is a leaf node) or + // a map[string]interface{} (nodePath is dir) Get(nodePath string) (interface{}, bool) - Set(nodePath string, dir bool, value string) - Sets(nodePath string, values map[string]interface{}) + // Put value can be a map[string]interface{} or string + Put(nodePath string, value interface{}) Delete(nodePath string) - SetBulk(nodePath string, value map[string]string) + // PutBulk value should be a flatmap + PutBulk(nodePath string, value map[string]string) } type store struct { @@ -48,35 +54,27 @@ func (s *store) Get(nodePath string) (interface{}, bool) { return nil, false } -// Set creates or replace the node at nodePath, return old value -func (s *store) Set(nodePath string, dir bool, value string) { +// Put creates or update the node at nodePath, value should a map[string]interface{} or a string +func (s *store) Put(nodePath string, value interface{}) { nodePath = path.Clean(path.Join("/", nodePath)) s.worldLock.Lock() defer s.worldLock.Unlock() - s.internalSet(nodePath, dir, value) -} - -func (s *store) SetBulk(nodePath string, values map[string]string) { - s.worldLock.Lock() - defer s.worldLock.Unlock() - s.internalSetBulk(nodePath, values) -} - -func (s *store) internalSetBulk(nodePath string, values map[string]string) { - for k, v := range values { - key := util.AppendPathPrefix(k, nodePath) - s.internalSet(key, false, v) + switch t := value.(type) { + case map[string]interface{}, map[string]string, []interface{}: + flatValues := flatmap.Flatten(t) + s.internalPutBulk(nodePath, flatValues) + case string: + s.internalPut(nodePath, t) + default: + panic(fmt.Sprintf("Unsupport type: %s", reflect.TypeOf(t))) } } -func (s *store) Sets(nodePath string, values map[string]interface{}) { - nodePath = path.Clean(path.Join("/", nodePath)) - +func (s *store) PutBulk(nodePath string, values map[string]string) { s.worldLock.Lock() defer s.worldLock.Unlock() - flatValues := flatmap.Flatten(values) - s.internalSetBulk(nodePath, flatValues) + s.internalPutBulk(nodePath, values) } // Delete deletes the node at the given path. @@ -120,7 +118,7 @@ func (s *store) walk(nodePath string, walkFunc func(prev *node, component string return curr } -func (s *store) internalSet(nodePath string, dir bool, value string) *node { +func (s *store) internalPut(nodePath string, value string) *node { dirName, nodeName := path.Split(nodePath) @@ -130,26 +128,23 @@ func (s *store) internalSet(nodePath string, dir bool, value string) *node { // force will try to replace an existing file if n != nil { - if dir { - n.AsDir() - } n.value = value return n } - if !dir { // create file - - n = newKV(s, nodePath, value, d) - } else { // create directory - - n = newDir(s, nodePath, d) - } - + n = newKV(s, nodePath, value, d) d.Add(n) return n } +func (s *store) internalPutBulk(nodePath string, values map[string]string) { + for k, v := range values { + key := util.AppendPathPrefix(k, nodePath) + s.internalPut(key, v) + } +} + // InternalGet gets the node of the given nodePath. func (s *store) internalGet(nodePath string) *node { diff --git a/store/store_test.go b/store/store_test.go index c523b6e..33035f1 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -14,7 +14,7 @@ func TestStoreBasic(t *testing.T) { assert.False(t, ok) assert.Nil(t, val) - store.Set("/foo", false, "bar") + store.Put("/foo", "bar") val, ok = store.Get("/foo") assert.True(t, ok) @@ -30,17 +30,14 @@ func TestStoreBasic(t *testing.T) { func TestStoreDir(t *testing.T) { store := New() - store.Set("/foo", true, "") + store.Put("/foo/foo1", "") - store.Set("/foo/foo1", true, "") - - val, ok := store.Get("/foo/foo1") + val, ok := store.Get("/foo") assert.True(t, ok) - mapVal, mok := val.(map[string]interface{}) + _, mok := val.(map[string]interface{}) assert.True(t, mok) - assert.Equal(t, 0, len(mapVal)) - store.Set("/foo/foo1/key1", false, "val1") + store.Put("/foo/foo1/key1", "val1") val, ok = store.Get("/foo/foo1/key1") assert.True(t, ok) assert.Equal(t, "val1", val) @@ -62,7 +59,7 @@ func TestStoreBulk(t *testing.T) { values[fmt.Sprintf("/clusters/%v/ip", i)] = fmt.Sprintf("192.168.0.%v", i) values[fmt.Sprintf("/clusters/%v/name", i)] = fmt.Sprintf("cluster-%v", i) } - store.SetBulk("/", values) + store.PutBulk("/", values) val, ok := store.Get("/clusters/10") assert.True(t, ok) @@ -83,7 +80,7 @@ func TestStoreSets(t *testing.T) { "name": fmt.Sprintf("cluster-%v", i), } } - store.Sets("/clusters", values) + store.Put("/clusters", values) val, ok := store.Get("/clusters/10") assert.True(t, ok) @@ -97,9 +94,9 @@ func TestStoreSets(t *testing.T) { func TestStoreNodeToDirPanic(t *testing.T) { store := New() // first set a node value. - store.Set("/nodes/6", false, "node6") + store.Put("/nodes/6", "node6") // create pre node's child's child, will cause panic. - store.Set("/nodes/6/label/key1", false, "value1") + store.Put("/nodes/6/label/key1", "value1") v, _ := store.Get("/nodes/6") _, mok := v.(map[string]interface{}) @@ -113,8 +110,8 @@ func TestStoreRemoveEmptyParent(t *testing.T) { store := New() // if dir has children, dir's text value will be hidden. - store.Set("/nodes/6", false, "node6") - store.Set("/nodes/6/label/key1", false, "value1") + store.Put("/nodes/6", "node6") + store.Put("/nodes/6/label/key1", "value1") store.Delete("/nodes/6/label/key1") @@ -127,7 +124,7 @@ func TestStoreRemoveEmptyParent(t *testing.T) { assert.Equal(t, "node6", v) // when delete leaf node, empty parent dir will been auto delete. - store.Set("/nodes/7/label/key1", false, "value1") + store.Put("/nodes/7/label/key1", "value1") store.Delete("/nodes/7/label/key1") _, ok = store.Get("/nodes/7")