diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..78f951e --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*~ +coverage.* +vendor/github.com/* +vendor/golang.org/* \ No newline at end of file diff --git a/cache.go b/cache_private.go similarity index 74% rename from cache.go rename to cache_private.go index f3c0230..0829b14 100644 --- a/cache.go +++ b/cache_private.go @@ -18,17 +18,42 @@ import ( "fmt" ) +// N.B.: this is the cache implementation whose functions are +// not intended to be called directly. +// The intended cache interface +// is in cache_public.go: namely "lookup", "add", "remove", +// and the key-building functions. +// All access to the cache is intended to be via the server mailbox. +// That is our serialization mechanism (akin to an erlang gen_server). +// +// (Deliberately resisting the "internal" package goo...) var cache map[string]interface{} +func runServer(mbox <-chan *CacheRequest) { + for { + req, ok := <-mbox + if ok { + req.replyChan <- req.operation() + } else { + break + } + } +} + func initCache() { cache = make(map[string]interface{}) + serverMbox := make(chan *CacheRequest) + go runServer(serverMbox) + initCacheClient(serverMbox) } func findInList(clist []*JsonObject, target *JsonObject) int { targids := getObjIds(target) for idx, obj := range clist { ids := getObjIds(obj) - if targids.name == ids.name && targids.namespace == ids.namespace { + if targids.name == ids.name && + targids.namespace == ids.namespace && + targids.kind == ids.kind { return idx } } @@ -40,7 +65,7 @@ func deleteFromCacheList(key string, obj *JsonObject) { clist := val.([]*JsonObject) idx := findInList(clist, obj) if idx > -1 { - clist = append(clist[:idx], clist[idx+1:]...) + cache[key] = append(clist[:idx], clist[idx+1:]...) } } } @@ -100,11 +125,3 @@ func cacheLookup(key string) interface{} { return nil } } - -func cacheKey(kind, namespace, name string) string { - return kind + "#" + namespace + "#" + name -} - -func nsCacheKey(kind, namespace string) string { - return kind + "#" + namespace -} diff --git a/cache_public.go b/cache_public.go new file mode 100644 index 0000000..2eb64c9 --- /dev/null +++ b/cache_public.go @@ -0,0 +1,93 @@ +// Copyright (c) 2018 CA. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +var client CacheClient + +type CacheOp func() interface{} + +type CacheRequest struct { + operation CacheOp + replyChan chan interface{} +} + +type clientif interface { + Lookup(key string) interface{} + Remove(obj *JsonObject) + Add(obj *JsonObject) +} + +type CacheClient struct { + serverMbox chan<- *CacheRequest +} + +func initCacheClient(serverMbox chan *CacheRequest) { + client = CacheClient{serverMbox} +} + +func GetCache() *CacheClient { + return &client +} + +func (client *CacheClient) Lookup(key string) interface{} { + replyChan := make(chan interface{}) + req := &CacheRequest{ + func() interface{} { + return cacheLookup(key) + }, replyChan, + } + client.serverMbox <- req + retval := <-replyChan + return retval +} + +func (client *CacheClient) Remove(obj *JsonObject) { + replyChan := make(chan interface{}) + req := &CacheRequest{ + func() interface{} { + removeFromCache(obj) + return true + }, replyChan, + } + client.serverMbox <- req + if retval := <-replyChan; retval != true { + panic("bad return from cache Remove") + } +} + +func (client *CacheClient) Add(obj *JsonObject) { + replyChan := make(chan interface{}) + req := &CacheRequest{ + func() interface{} { + addToCache(obj) + return true + }, replyChan, + } + client.serverMbox <- req + if retval := <-replyChan; retval != true { + panic("bad return from cache Add") + } +} + +// compilation error if we don't implement the i/f properly +var _ clientif = (*CacheClient)(nil) + +func cacheKey(kind, namespace, name string) string { + return kind + "#" + namespace + "#" + name +} + +func nsCacheKey(kind, namespace string) string { + return kind + "#" + namespace +} diff --git a/kubeaccess.go b/kubeaccess.go index 2bed2db..0854d06 100644 --- a/kubeaccess.go +++ b/kubeaccess.go @@ -74,7 +74,7 @@ func lookUpMap( key := cacheKey(kind, namespace, name) var cachedVal interface{} if isWatchedKind(kind) { - cachedVal = cacheLookup(key) + cachedVal = GetCache().Lookup(key) } else { cachedVal = (*cache)[key] } @@ -128,7 +128,7 @@ func getAllK8sObjsOfKind( var objs interface{} if isWatchedKind(kind) { - objs = cacheLookup(kind) + objs = GetCache().Lookup(kind) } else { objs = (*cache)[kind] } @@ -187,7 +187,7 @@ func getAllK8sObjsOfKindInNamespace( key := nsCacheKey(kind, ns) var objs interface{} if isWatchedKind(kind) { - objs = cacheLookup(key) + objs = GetCache().Lookup(key) } else { objs = (*cache)[key] } diff --git a/watchers.go b/watchers.go index 32ac472..21e29fb 100644 --- a/watchers.go +++ b/watchers.go @@ -59,13 +59,13 @@ func buildWatchUrl(kind string) string { if watchurl, ok := watchUrlByKind[kind]; ok { return apiHost + watchurl } - panic(fmt.Sprintf("no url for kind: '%s'", kind)) + panic(fmt.Sprintf("no watcher url for kind: '%s'", kind)) } func readSecret(name string) []byte { b, err := ioutil.ReadFile(secretDir + "/" + name) if err != nil { - panic(fmt.Sprintf("error reading secret %s: %s\n", + panic(fmt.Sprintf("watcher error reading secret %s: %s\n", name, err.Error())) } return b @@ -112,7 +112,7 @@ func initClient() { func makeWatchRequest(kind string) *http.Request { req, err := http.NewRequest("GET", buildWatchUrl(kind), nil) if err != nil { - panic(fmt.Sprintf("http.NewRequest error for kind %s: %s\n", + panic(fmt.Sprintf("watcher http.NewRequest error for kind %s: %s\n", kind, err.Error())) } req.Header.Add("Authorization", "Bearer "+token) @@ -130,20 +130,20 @@ func runWatcher(kind string) { for { line, err := reader.ReadBytes('\n') if err != nil { - panic(err) + panic(fmt.Sprintf("read error on watcher stream: %s\n", err.Error())) } var notif Notification if err := json.Unmarshal(line, ¬if); err != nil { - panic(err) + panic(fmt.Sprintf("JSON unmarshal error on watcher input: %s\n", + err.Error())) } if notif.Type == "ADDED" || notif.Type == "MODIFIED" { - addToCache(¬if.Object) + GetCache().Add(¬if.Object) } else if notif.Type == "DELETED" { - removeFromCache(¬if.Object) + GetCache().Remove(¬if.Object) } } - fmt.Printf("watcher terminates...\n") } func initWatchers() {