diff --git a/backends/client.go b/backends/client.go index 0e04162..38d84f0 100644 --- a/backends/client.go +++ b/backends/client.go @@ -12,18 +12,17 @@ import ( // The StoreClient interface is implemented by objects that can retrieve // key/value pairs from a backend store. type StoreClient interface { - GetValues(nodePath string) (map[string]interface{}, error) - GetValue(nodePath string) (string, error) - Sync(store store.Store, stopChan chan bool) - SetValues(nodePath string, values map[string]interface{}, replace bool) error - SetValue(nodePath string, value string) error - Set(nodePath string, value interface{}, replace bool) error + Get(nodePath string, dir bool) (interface{}, error) + Put(nodePath string, value interface{}, replace bool) error // Delete // if the 'key' represent a dir, 'dir' should be true. Delete(nodePath string, dir bool) error - SyncMapping(mapping store.Store, stopChan chan bool) - UpdateMapping(nodePath string, mapping interface{}, replace bool) error + Sync(store store.Store, stopChan chan bool) + + GetMapping(nodePath string, dir bool) (interface{}, error) + PutMapping(nodePath string, mapping interface{}, replace bool) error DeleteMapping(nodePath string, dir bool) error + SyncMapping(mapping store.Store, stopChan chan bool) } // New is used to create a storage client based on our configuration. diff --git a/backends/client_test.go b/backends/client_test.go index 152e6e1..5a00d05 100644 --- a/backends/client_test.go +++ b/backends/client_test.go @@ -24,7 +24,7 @@ func init() { rand.Seed(int64(time.Now().Nanosecond())) } -func TestClientGetSet(t *testing.T) { +func TestClientGetPut(t *testing.T) { for _, backend := range backendNodes { println("Test backend: ", backend) @@ -42,15 +42,15 @@ func TestClientGetSet(t *testing.T) { storeClient.Delete("/", true) - err = storeClient.SetValue("testkey", "testvalue") + err = storeClient.Put("testkey", "testvalue", false) assert.NoError(t, err) - val, getErr := storeClient.GetValue("testkey") + val, getErr := storeClient.Get("testkey", false) assert.NoError(t, getErr) assert.Equal(t, "testvalue", val) // test no exist key - val, getErr = storeClient.GetValue("noexistkey") + val, getErr = storeClient.Get("noexistkey", false) assert.NoError(t, getErr) assert.Equal(t, "", val) @@ -58,7 +58,7 @@ func TestClientGetSet(t *testing.T) { } } -func TestClientGetsSets(t *testing.T) { +func TestClientGetsPuts(t *testing.T) { for _, backend := range backendNodes { println("Test backend: ", backend) @@ -83,10 +83,10 @@ func TestClientGetsSets(t *testing.T) { }, } - err = storeClient.SetValues("testkey", values, true) + err = storeClient.Put("testkey", values, true) assert.NoError(t, err) - val, getErr := storeClient.GetValues("testkey") + val, getErr := storeClient.Get("testkey", true) assert.NoError(t, getErr) assert.Equal(t, values, val) @@ -98,7 +98,7 @@ func TestClientGetsSets(t *testing.T) { }, } - err = storeClient.SetValues("testkey", values2, false) + err = storeClient.Put("testkey", values2, false) assert.NoError(t, err) values3 := map[string]interface{}{ @@ -109,16 +109,16 @@ func TestClientGetsSets(t *testing.T) { }, } - val, getErr = storeClient.GetValues("testkey") + val, getErr = storeClient.Get("testkey", true) assert.NoError(t, getErr) assert.Equal(t, values3, val) //test replace - err = storeClient.SetValues("testkey", values2, true) + err = storeClient.Put("testkey", values2, true) assert.NoError(t, err) - val, getErr = storeClient.GetValues("testkey") + val, getErr = storeClient.Get("testkey", true) assert.NoError(t, getErr) assert.Equal(t, values2, val) @@ -126,7 +126,7 @@ func TestClientGetsSets(t *testing.T) { } } -func TestClientSet(t *testing.T) { +func TestClientPutJSON(t *testing.T) { for _, backend := range backendNodes { println("Test backend: ", backend) @@ -156,10 +156,10 @@ func TestClientSet(t *testing.T) { err = json.Unmarshal(jsonVal, &values) assert.NoError(t, err) - err = storeClient.Set("testkey", values, true) + err = storeClient.Put("testkey", values, true) assert.NoError(t, err) - val, getErr := storeClient.GetValues("testkey") + val, getErr := storeClient.Get("testkey", true) assert.NoError(t, getErr) assert.Equal(t, values, val) @@ -177,7 +177,7 @@ func TestClientSet(t *testing.T) { err = json.Unmarshal(jsonVal2, &values2) assert.NoError(t, err) - err = storeClient.Set("testkey", values2, false) + err = storeClient.Put("testkey", values2, false) assert.NoError(t, err) values3 := map[string]interface{}{ @@ -188,16 +188,16 @@ func TestClientSet(t *testing.T) { }, } - val, getErr = storeClient.GetValues("testkey") + val, getErr = storeClient.Get("testkey", true) assert.NoError(t, getErr) assert.Equal(t, values3, val) //test replace - err = storeClient.Set("testkey", values2, true) + err = storeClient.Put("testkey", values2, true) assert.NoError(t, err) - val, getErr = storeClient.GetValues("testkey") + val, getErr = storeClient.Get("testkey", true) assert.NoError(t, getErr) assert.Equal(t, values2, val) @@ -261,6 +261,48 @@ func TestMapping(t *testing.T) { prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) + for _, backend := range backendNodes { + println("Test backend: ", backend) + nodes := GetDefaultBackends(backend) + + config := Config{ + Backend: backend, + BackendNodes: nodes, + Prefix: prefix, + } + storeClient, err := New(config) + assert.NoError(t, err) + mappings := make(map[string]interface{}) + for i := 0; i < 10; i++ { + ip := fmt.Sprintf("192.168.1.%v", i) + mapping := map[string]string{ + "instance": fmt.Sprintf("/instances/%v", i), + "config": fmt.Sprintf("/configs/%v", i), + } + mappings[ip] = mapping + } + storeClient.PutMapping("/", mappings, true) + + val, err := storeClient.GetMapping("/", true) + assert.NoError(t, err) + m, mok := val.(map[string]interface{}) + assert.True(t, mok) + assert.True(t, m["192.168.1.0"] != nil) + + ip := fmt.Sprintf("192.168.1.%v", 1) + nodePath := "/" + ip + "/" + "instance" + storeClient.PutMapping(nodePath, "/instances/new1", true) + time.Sleep(1000 * time.Millisecond) + val, err = storeClient.GetMapping(nodePath, false) + assert.NoError(t, err) + assert.Equal(t, "/instances/new1", val) + } +} + +func TestMappingSync(t *testing.T) { + + prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) + for _, backend := range backendNodes { println("Test backend: ", backend) stopChan := make(chan bool) @@ -287,7 +329,7 @@ func TestMapping(t *testing.T) { "instance": fmt.Sprintf("/instances/%v", i), "config": fmt.Sprintf("/configs/%v", i), } - storeClient.UpdateMapping(ip, mapping, true) + storeClient.PutMapping(ip, mapping, true) } time.Sleep(1000 * time.Millisecond) storeClient.SyncMapping(metastore, stopChan) @@ -309,7 +351,7 @@ func TestMapping(t *testing.T) { "instance": fmt.Sprintf("/instances/%v", i), "config": fmt.Sprintf("/configs/%v", i), } - storeClient.UpdateMapping(ip, mapping, true) + storeClient.PutMapping(ip, mapping, true) } time.Sleep(1000 * time.Millisecond) for i := 10; i < 20; i++ { @@ -323,7 +365,7 @@ func TestMapping(t *testing.T) { } ip := fmt.Sprintf("192.168.1.%v", 1) nodePath := ip + "/" + "instance" - storeClient.UpdateMapping(nodePath, "/instances/new1", true) + storeClient.PutMapping(nodePath, "/instances/new1", true) time.Sleep(1000 * time.Millisecond) val, ok := metastore.Get(nodePath) assert.True(t, ok) @@ -340,7 +382,7 @@ func FillTestData(storeClient StoreClient) map[string]string { } testData[fmt.Sprintf("%v", i)] = ci } - err := storeClient.SetValues("/", testData, true) + err := storeClient.Put("/", testData, true) if err != nil { log.Error("SetValues error", err.Error()) } @@ -358,7 +400,7 @@ func RandomUpdate(testData map[string]string, storeClient StoreClient, times int key := keys[idx] val := testData[key] newVal := fmt.Sprintf("%s-%v", val, 0) - storeClient.SetValue(key, newVal) + storeClient.Put(key, newVal, true) testData[key] = newVal } } diff --git a/backends/etcd/client.go b/backends/etcd/client.go index 97ec049..7953ffc 100644 --- a/backends/etcd/client.go +++ b/backends/etcd/client.go @@ -88,16 +88,57 @@ func NewEtcdClient(prefix string, machines []string, cert, key, caCert string, b return &Client{kapi, prefix}, nil } -// GetValues queries etcd for nodePath Recursive:true. -func (c *Client) GetValues(nodePath string) (map[string]interface{}, error) { - m, err := c.internalGetValues(c.prefix, nodePath) - if err != nil { - return nil, err +// Get queries etcd for nodePath. Dir for query is recursive. +func (c *Client) Get(nodePath string, dir bool) (interface{}, error) { + if dir { + m, err := c.internalGets(c.prefix, nodePath) + if err != nil { + return nil, err + } + return flatmap.Expand(m, nodePath), nil + } else { + return c.internalGet(c.prefix, nodePath) + } +} + +func (c *Client) Put(nodePath string, value interface{}, replace bool) error { + return c.internalPut(c.prefix, nodePath, value, replace) +} + +func (c *Client) Delete(nodePath string, dir bool) error { + return c.internalDelete(c.prefix, nodePath, dir) +} + +func (c *Client) Sync(store store.Store, stopChan chan bool) { + go c.internalSync(c.prefix, store, stopChan) +} + +func (c *Client) GetMapping(nodePath string, dir bool) (interface{}, error) { + if dir { + m, err := c.internalGets(SELF_MAPPING_PATH, nodePath) + if err != nil { + return nil, err + } + return flatmap.Expand(m, nodePath), nil + } else { + return c.internalGet(SELF_MAPPING_PATH, nodePath) } - return flatmap.Expand(m, nodePath), nil } -func (c *Client) internalGetValues(prefix, nodePath string) (map[string]string, error) { +func (c *Client) PutMapping(nodePath string, mapping interface{}, replace bool) error { + return c.internalPut(SELF_MAPPING_PATH, nodePath, mapping, replace) +} + +func (c *Client) SyncMapping(mapping store.Store, stopChan chan bool) { + go c.internalSync(SELF_MAPPING_PATH, mapping, stopChan) +} + +func (c *Client) DeleteMapping(nodePath string, dir bool) error { + nodePath = path.Join("/", nodePath) + return c.internalDelete(SELF_MAPPING_PATH, nodePath, dir) +} + +func (c *Client) internalGets(prefix, nodePath string) (map[string]string, error) { vars := make(map[string]string) nodePath = util.AppendPathPrefix(nodePath, prefix) resp, err := c.client.Get(context.Background(), nodePath, &client.GetOptions{ @@ -140,12 +181,7 @@ func nodeWalk(prefix string, node *client.Node, vars map[string]string) error { return nil } -// GetValue queries etcd for nodePath -func (c *Client) GetValue(nodePath string) (string, error) { - return c.internalGetValue(c.prefix, nodePath) -} - -func (c *Client) internalGetValue(prefix, nodePath string) (string, error) { +func (c *Client) internalGet(prefix, nodePath string) (string, error) { resp, err := c.client.Get(context.Background(), util.AppendPathPrefix(nodePath, prefix), nil) if err != nil { switch e := err.(type) { @@ -188,7 +224,7 @@ func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bo }() for !inited { - val, err := c.internalGetValues(prefix, "/") + val, err := c.internalGets(prefix, "/") if err != nil { log.Error("GetValue from etcd nodePath:%s, error-type: %s, error: %s", prefix, reflect.TypeOf(err), err.Error()) switch e := err.(type) { @@ -235,31 +271,28 @@ func processSyncChange(prefix string, store store.Store, resp *client.Response) } } -func (c *Client) Sync(store store.Store, stopChan chan bool) { - go c.internalSync(c.prefix, store, stopChan) -} - -func (c *Client) Set(nodePath string, value interface{}, replace bool) error { - return c.internalSet(c.prefix, nodePath, value, replace) -} - -func (c *Client) internalSet(prefix, nodePath string, value interface{}, replace bool) error { +func (c *Client) internalPut(prefix, nodePath string, value interface{}, replace bool) error { switch t := value.(type) { case map[string]interface{}, map[string]string, []interface{}: flatValues := flatmap.Flatten(t) - return c.internalSetValues(prefix, nodePath, flatValues, replace) + return c.internalPutValues(prefix, nodePath, flatValues, replace) default: val := fmt.Sprintf("%v", t) - return c.internalSetValue(prefix, nodePath, val) + return c.internalPutValue(prefix, nodePath, val) } } -func (c *Client) SetValues(nodePath string, values map[string]interface{}, replace bool) error { - flatValue := flatmap.Flatten(values) - return c.internalSetValues(c.prefix, nodePath, flatValue, replace) +func (c *Client) internalPutValue(prefix string, nodePath string, value string) error { + nodePath = util.AppendPathPrefix(nodePath, prefix) + resp, err := c.client.Set(context.TODO(), nodePath, value, nil) + log.Debug("SetValue nodePath: %s, value:%s, resp:%v", nodePath, value, resp) + if err != nil { + return err + } + return nil } -func (c *Client) internalSetValues(prefix string, nodePath string, values map[string]string, replace bool) error { +func (c *Client) internalPutValues(prefix string, nodePath string, values map[string]string, replace bool) error { if replace { c.internalDelete(prefix, nodePath, true) } @@ -275,40 +308,9 @@ func (c *Client) internalSetValues(prefix string, nodePath string, values map[st return nil } -func (c *Client) SetValue(nodePath string, value string) error { - return c.internalSetValue(c.prefix, nodePath, value) -} - -func (c *Client) internalSetValue(prefix string, nodePath string, value string) error { - nodePath = util.AppendPathPrefix(nodePath, prefix) - resp, err := c.client.Set(context.TODO(), nodePath, value, nil) - log.Debug("SetValue nodePath: %s, value:%s, resp:%v", nodePath, value, resp) - if err != nil { - return err - } - return nil -} - -func (c *Client) Delete(nodePath string, dir bool) error { - return c.internalDelete(c.prefix, nodePath, dir) -} - func (c *Client) internalDelete(prefix, nodePath string, dir bool) error { nodePath = util.AppendPathPrefix(nodePath, prefix) log.Debug("Delete from backend, nodePath:%s, dir:%v", nodePath, dir) _, err := c.client.Delete(context.Background(), nodePath, &client.DeleteOptions{Recursive: dir}) return err } - -func (c *Client) SyncMapping(mapping store.Store, stopChan chan bool) { - go c.internalSync(SELF_MAPPING_PATH, mapping, stopChan) -} - -func (c *Client) UpdateMapping(nodePath string, mapping interface{}, replace bool) error { - return c.internalSet(SELF_MAPPING_PATH, nodePath, mapping, replace) -} - -func (c *Client) DeleteMapping(nodePath string, dir bool) error { - nodePath = path.Join("/", nodePath) - return c.internalDelete(SELF_MAPPING_PATH, nodePath, dir) -} diff --git a/backends/etcdv3/client.go b/backends/etcdv3/client.go index 2209489..f02d9e7 100644 --- a/backends/etcdv3/client.go +++ b/backends/etcdv3/client.go @@ -81,16 +81,58 @@ func NewEtcdClient(prefix string, machines []string, cert, key, caCert string, b return &Client{c, prefix}, nil } -// GetValues queries etcd for nodePath prefix. -func (c *Client) GetValues(nodePath string) (map[string]interface{}, error) { - m, err := c.internalGetValues(c.prefix, nodePath) - if err != nil { - return nil, err +// Get queries etcd for nodePath. +func (c *Client) Get(nodePath string, dir bool) (interface{}, error) { + if dir { + m, err := c.internalGets(c.prefix, nodePath) + if err != nil { + return nil, err + } + return flatmap.Expand(m, nodePath), nil + } else { + return c.internalGet(c.prefix, nodePath) } - return flatmap.Expand(m, nodePath), nil } -func (c *Client) internalGetValues(prefix, nodePath string) (map[string]string, error) { +func (c *Client) Put(nodePath string, value interface{}, replace bool) error { + return c.internalPut(c.prefix, nodePath, value, replace) +} + +func (c *Client) Delete(nodePath string, dir bool) error { + return c.internalDelete(c.prefix, nodePath, dir) +} + +func (c *Client) Sync(store store.Store, stopChan chan bool) { + go c.internalSync(c.prefix, store, stopChan) +} + +func (c *Client) GetMapping(nodePath string, dir bool) (interface{}, error) { + if dir { + m, err := c.internalGets(SELF_MAPPING_PATH, nodePath) + if err != nil { + return nil, err + } + return flatmap.Expand(m, nodePath), nil + } else { + return c.internalGet(SELF_MAPPING_PATH, nodePath) + } +} + +func (c *Client) PutMapping(nodePath string, mapping interface{}, replace bool) error { + log.Debug("UpdateMapping nodePath:%s, mapping:%v, replace:%v", nodePath, mapping, replace) + return c.internalPut(SELF_MAPPING_PATH, nodePath, mapping, replace) +} + +func (c *Client) DeleteMapping(nodePath string, dir bool) error { + nodePath = path.Join("/", nodePath) + return c.internalDelete(SELF_MAPPING_PATH, nodePath, dir) +} + +func (c *Client) SyncMapping(mapping store.Store, stopChan chan bool) { + go c.internalSync(SELF_MAPPING_PATH, mapping, stopChan) +} + +func (c *Client) internalGets(prefix, nodePath string) (map[string]string, error) { vars := make(map[string]string) resp, err := c.client.Get(context.Background(), util.AppendPathPrefix(nodePath, prefix), client.WithPrefix()) if err != nil { @@ -105,12 +147,7 @@ func (c *Client) internalGetValues(prefix, nodePath string) (map[string]string, return vars, nil } -// GetValue queries etcd for nodePath -func (c *Client) GetValue(nodePath string) (string, error) { - return c.internalGetValue(c.prefix, nodePath) -} - -func (c *Client) internalGetValue(prefix, nodePath string) (string, error) { +func (c *Client) internalGet(prefix, nodePath string) (string, error) { resp, err := c.client.Get(context.Background(), util.AppendPathPrefix(nodePath, prefix)) if err != nil { return "", err @@ -129,7 +166,7 @@ func handleGetResp(prefix string, resp *client.GetResponse, vars map[string]stri for _, kv := range kvs { vars[util.TrimPathPrefix(string(kv.Key), prefix)] = string(kv.Value) } - //TODO handle resp.More + //TODO handle resp.More for pages } return nil } @@ -163,7 +200,7 @@ func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bo }() for !inited { - val, err := c.internalGetValues(prefix, "/") + val, err := c.internalGets(prefix, "/") if err != nil { log.Error("GetValue from etcd nodePath:%s, error-type: %s, error: %s", prefix, reflect.TypeOf(err), err.Error()) time.Sleep(time.Duration(1000) * time.Millisecond) @@ -198,34 +235,21 @@ func processSyncChange(prefix string, store store.Store, resp *client.WatchRespo } } -func (c *Client) Sync(store store.Store, stopChan chan bool) { - go c.internalSync(c.prefix, store, stopChan) -} - -func (c *Client) Set(nodePath string, value interface{}, replace bool) error { - return c.internalSet(c.prefix, nodePath, value, replace) -} - -func (c *Client) internalSet(prefix, nodePath string, value interface{}, replace bool) error { +func (c *Client) internalPut(prefix, nodePath string, value interface{}, replace bool) error { switch t := value.(type) { case map[string]interface{}, map[string]string, []interface{}: flatValues := flatmap.Flatten(t) - return c.internalSetValues(prefix, nodePath, flatValues, replace) + return c.internalPutValues(prefix, nodePath, flatValues, replace) case string: - return c.internalSetValue(prefix, nodePath, t) + return c.internalPutValue(prefix, nodePath, t) default: log.Warning("Set unexpect value type: %s", reflect.TypeOf(value)) val := fmt.Sprintf("%v", t) - return c.internalSetValue(prefix, nodePath, val) + return c.internalPutValue(prefix, nodePath, val) } } -func (c *Client) SetValues(nodePath string, values map[string]interface{}, replace bool) error { - flatValues := flatmap.Flatten(values) - return c.internalSetValues(c.prefix, nodePath, flatValues, replace) -} - -func (c *Client) internalSetValues(prefix string, nodePath string, values map[string]string, replace bool) error { +func (c *Client) internalPutValues(prefix string, nodePath string, values map[string]string, replace bool) error { new_prefix := util.AppendPathPrefix(nodePath, prefix) ops := make([]client.Op, 0, len(values)+1) @@ -259,11 +283,7 @@ func (c *Client) internalSetValues(prefix string, nodePath string, values map[st return nil } -func (c *Client) SetValue(nodePath string, value string) error { - return c.internalSetValue(c.prefix, nodePath, value) -} - -func (c *Client) internalSetValue(prefix string, nodePath string, value string) error { +func (c *Client) internalPutValue(prefix string, nodePath string, value string) error { nodePath = util.AppendPathPrefix(nodePath, prefix) resp, err := c.client.Put(context.TODO(), nodePath, value) log.Debug("SetValue nodePath: %s, value:%s, resp:%v", nodePath, value, resp) @@ -273,10 +293,6 @@ func (c *Client) internalSetValue(prefix string, nodePath string, value string) return nil } -func (c *Client) Delete(nodePath string, dir bool) error { - return c.internalDelete(c.prefix, nodePath, dir) -} - func (c *Client) internalDelete(prefix, nodePath string, dir bool) error { log.Debug("Delete from backend, prefix:%s, nodePath:%s, dir:%v", prefix, nodePath, dir) nodePath = util.AppendPathPrefix(nodePath, prefix) @@ -291,17 +307,3 @@ func (c *Client) internalDelete(prefix, nodePath string, dir bool) error { } return err } - -func (c *Client) SyncMapping(mapping store.Store, stopChan chan bool) { - go c.internalSync(SELF_MAPPING_PATH, mapping, stopChan) -} - -func (c *Client) UpdateMapping(nodePath string, mapping interface{}, replace bool) error { - log.Debug("UpdateMapping nodePath:%s, mapping:%v, replace:%v", nodePath, mapping, replace) - return c.internalSet(SELF_MAPPING_PATH, nodePath, mapping, replace) -} - -func (c *Client) DeleteMapping(nodePath string, dir bool) error { - nodePath = path.Join("/", nodePath) - return c.internalDelete(SELF_MAPPING_PATH, nodePath, dir) -} diff --git a/main.go b/main.go index 7ca0350..92825fd 100644 --- a/main.go +++ b/main.go @@ -74,16 +74,13 @@ func main() { router.HandleFunc("/favicon.ico", http.NotFound) router.HandleFunc("/self", selfHandler). - Methods("GET", "HEAD"). - Name("SelfRoot") + Methods("GET", "HEAD") router.HandleFunc("/self/{key:.*}", selfHandler). - Methods("GET", "HEAD"). - Name("Self") + Methods("GET", "HEAD") - router.HandleFunc("/{key:.*}", metadataHandler). - Methods("GET", "HEAD"). - Name("Metadata") + router.HandleFunc("/{key:.*}", rootHandler). + Methods("GET", "HEAD") log.Info("Listening on %s", config.Listen) log.Fatal("%v", http.ListenAndServe(config.Listen, router)) @@ -205,7 +202,7 @@ func dataUpdate(w http.ResponseWriter, req *http.Request) { // POST means replace old value // PUT means merge to old value replace := "POST" == strings.ToUpper(req.Method) - err = metadataRepo.UpdateData(nodePath, data, replace) + err = metadataRepo.PutData(nodePath, data, replace) if err != nil { msg := fmt.Sprintf("Update data error:%s", err.Error()) log.Error("dataUpdate nodePath:%s, data:%v, error:%s", nodePath, data, err.Error()) @@ -270,7 +267,7 @@ func mappingUpdate(w http.ResponseWriter, req *http.Request) { // POST means replace old value // PUT means merge to old value replace := "POST" == strings.ToUpper(req.Method) - err = metadataRepo.UpdateMapping(nodePath, data, replace) + err = metadataRepo.PutMapping(nodePath, data, replace) if err != nil { msg := fmt.Sprintf("Update mapping error:%s", err.Error()) log.Error("mappingUpdate nodePath:%s, data:%v, error:%s", nodePath, data, err.Error()) @@ -323,12 +320,12 @@ func contentType(req *http.Request) int { } } -func metadataHandler(w http.ResponseWriter, req *http.Request) { +func rootHandler(w http.ResponseWriter, req *http.Request) { clientIP := requestIP(req) requestPath := req.URL.EscapedPath() //strings.TrimRight(req.URL.EscapedPath()[1:], "/") log.Debug("clientIP: %s, requestPath: %s", clientIP, requestPath) - val, ok := metadataRepo.Get(clientIP, requestPath) + val, ok := metadataRepo.Root(clientIP, requestPath) if !ok { log.Warning("%s not found %s", requestPath, clientIP) respondError(w, req, "Not found", http.StatusNotFound) @@ -343,7 +340,7 @@ func selfHandler(w http.ResponseWriter, req *http.Request) { clientIP := requestIP(req) requestPath := strings.TrimLeft(req.URL.EscapedPath(), "/self") - val, ok := metadataRepo.GetSelf(clientIP, requestPath) + val, ok := metadataRepo.Self(clientIP, requestPath) if !ok { log.Warning("self not found %s", clientIP) respondError(w, req, "Not found", http.StatusNotFound) diff --git a/metadata/metarepo.go b/metadata/metarepo.go index 3e81335..5825c8d 100644 --- a/metadata/metarepo.go +++ b/metadata/metarepo.go @@ -65,14 +65,14 @@ func (r *MetadataRepo) StopSync() { r.mappingStopChan <- true } -func (r *MetadataRepo) Get(clientIP string, metapath string) (interface{}, bool) { +func (r *MetadataRepo) Root(clientIP string, metapath string) (interface{}, bool) { log.Debug("Get clientIP:%s metapath:%s", clientIP, metapath) metapath = path.Clean(path.Join("/", metapath)) if r.onlySelf { if metapath == "/" { val := make(map[string]interface{}) - selfVal, ok := r.GetSelf(clientIP, "/") + selfVal, ok := r.Self(clientIP, "/") if ok { val["self"] = selfVal } @@ -86,7 +86,7 @@ func (r *MetadataRepo) Get(clientIP string, metapath string) (interface{}, bool) return nil, false } else { if metapath == "/" { - selfVal, ok := r.GetSelf(clientIP, "/") + selfVal, ok := r.Self(clientIP, "/") if ok { mapVal, ok := val.(map[string]interface{}) if ok { @@ -99,10 +99,10 @@ func (r *MetadataRepo) Get(clientIP string, metapath string) (interface{}, bool) } } -func (r *MetadataRepo) GetSelf(clientIP string, metapath string) (interface{}, bool) { +func (r *MetadataRepo) Self(clientIP string, metapath string) (interface{}, bool) { metapath = path.Clean(path.Join("/", metapath)) log.Debug("GetSelf clientIP:%s metapath:%s", clientIP, metapath) - mapping, ok := r.SelfMapping(clientIP) + mapping, ok := r.getIPMapping(clientIP) if !ok { log.Warning("Can not find mapping for %s", clientIP) return nil, false @@ -137,7 +137,7 @@ func (r *MetadataRepo) GetSelf(clientIP string, metapath string) (interface{}, b } } -func (r *MetadataRepo) SelfMapping(clientIP string) (map[string]string, bool) { +func (r *MetadataRepo) getIPMapping(clientIP string) (map[string]string, bool) { mappingVal, ok := r.mapping.Get(clientIP) if !ok { return nil, false @@ -158,8 +158,8 @@ func (r *MetadataRepo) GetData(nodePath string) (interface{}, bool) { return r.data.Get(nodePath) } -func (r *MetadataRepo) UpdateData(nodePath string, data interface{}, replace bool) error { - return r.storeClient.Set(nodePath, data, replace) +func (r *MetadataRepo) PutData(nodePath string, data interface{}, replace bool) error { + return r.storeClient.Put(nodePath, data, replace) } func (r *MetadataRepo) DeleteData(nodePath string, subs ...string) error { @@ -196,7 +196,7 @@ func (r *MetadataRepo) GetMapping(nodePath string) (interface{}, bool) { return r.mapping.Get(nodePath) } -func (r *MetadataRepo) UpdateMapping(nodePath string, data interface{}, replace bool) error { +func (r *MetadataRepo) PutMapping(nodePath string, data interface{}, replace bool) error { nodePath = path.Join("/", nodePath) if nodePath == "/" { m, ok := data.(map[string]interface{}) @@ -238,7 +238,7 @@ func (r *MetadataRepo) UpdateMapping(nodePath string, data interface{}, replace } } } - return r.storeClient.UpdateMapping(nodePath, data, replace) + return r.storeClient.PutMapping(nodePath, data, replace) } func (r *MetadataRepo) DeleteMapping(nodePath string, subs ...string) error { diff --git a/metadata/metarepo_test.go b/metadata/metarepo_test.go index c4d96f3..dac90e0 100644 --- a/metadata/metarepo_test.go +++ b/metadata/metarepo_test.go @@ -45,7 +45,7 @@ func TestMetarepoData(t *testing.T) { time.Sleep(1000 * time.Millisecond) ValidTestData(t, testData, metarepo.data) - val, ok := metarepo.Get("192.168.0.1", "/nodes/0") + val, ok := metarepo.Root("192.168.0.1", "/nodes/0") assert.True(t, ok) assert.NotNil(t, val) @@ -113,7 +113,7 @@ func TestMetarepoMapping(t *testing.T) { mappings[ip] = mapping } // batch update - err = metarepo.UpdateMapping("/", mappings, true) + err = metarepo.PutMapping("/", mappings, true) assert.NoError(t, err) time.Sleep(1000 * time.Millisecond) @@ -176,7 +176,7 @@ func TestMetarepoSelf(t *testing.T) { mappings[ip] = mapping } // batch update - err = metarepo.UpdateMapping("/", mappings, true) + err = metarepo.PutMapping("/", mappings, true) assert.NoError(t, err) time.Sleep(1000 * time.Millisecond) @@ -190,13 +190,13 @@ func TestMetarepoSelf(t *testing.T) { p := rand.Intn(maxNode) ip := fmt.Sprintf("192.168.1.%v", p) - val, ok := metarepo.GetSelf(ip, "/") + val, ok := metarepo.Self(ip, "/") mapVal, mok := val.(map[string]interface{}) assert.True(t, mok) assert.NotNil(t, mapVal[key]) - val, ok = metarepo.GetSelf(ip, "/node/name") + val, ok = metarepo.Self(ip, "/node/name") assert.True(t, ok) assert.Equal(t, fmt.Sprintf("node%v", p), val) @@ -208,7 +208,7 @@ func TestMetarepoSelf(t *testing.T) { metarepo.ReSync() } time.Sleep(1000 * time.Millisecond) - val, ok = metarepo.GetSelf(ip, "/node/name") + val, ok = metarepo.Self(ip, "/node/name") assert.False(t, ok) assert.Nil(t, val) @@ -218,7 +218,7 @@ func TestMetarepoSelf(t *testing.T) { } // test update replace(false) - err = metarepo.UpdateMapping(ip, map[string]interface{}{"node2": "/nodes/2"}, false) + err = metarepo.PutMapping(ip, map[string]interface{}{"node2": "/nodes/2"}, false) assert.NoError(t, err) expectMapping1 := map[string]interface{}{ @@ -232,7 +232,7 @@ func TestMetarepoSelf(t *testing.T) { assert.Equal(t, expectMapping1, mapping) // test update key - err = metarepo.UpdateMapping(ip+"/node3", "/nodes/3", false) + err = metarepo.PutMapping(ip+"/node3", "/nodes/3", false) assert.NoError(t, err) expectMapping2 := map[string]interface{}{ @@ -254,7 +254,7 @@ func TestMetarepoSelf(t *testing.T) { assert.Equal(t, expectMapping1, mapping) // test update replace(true) - err = metarepo.UpdateMapping(ip, expectMapping0, true) + err = metarepo.PutMapping(ip, expectMapping0, true) assert.NoError(t, err) time.Sleep(1000 * time.Millisecond) mapping, ok = metarepo.GetMapping(fmt.Sprintf("/%s", ip)) @@ -292,11 +292,11 @@ func TestMetarepoRoot(t *testing.T) { ip := "192.168.1.0" mapping := make(map[string]interface{}) mapping["node"] = "/nodes/0" - err = metarepo.UpdateMapping(ip, mapping, true) + err = metarepo.PutMapping(ip, mapping, true) assert.NoError(t, err) time.Sleep(1000 * time.Millisecond) - val, ok := metarepo.Get(ip, "/") + val, ok := metarepo.Root(ip, "/") assert.True(t, ok) mapVal, mok := val.(map[string]interface{}) assert.True(t, mok) @@ -306,7 +306,7 @@ func TestMetarepoRoot(t *testing.T) { metarepo.SetOnlySelf(true) - val, ok = metarepo.Get(ip, "/") + val, ok = metarepo.Root(ip, "/") mapVal = val.(map[string]interface{}) selfVal = mapVal["self"] assert.NotNil(t, selfVal) @@ -327,7 +327,7 @@ func FillTestData(metarepo *MetadataRepo) map[string]string { testData := map[string]interface{}{ "nodes": nodes, } - err := metarepo.UpdateData("/", testData, true) + err := metarepo.PutData("/", testData, true) if err != nil { log.Error("SetValues error", err.Error()) panic(err)