Skip to content

Commit

Permalink
all access to cache map via single goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim Engquist committed Sep 20, 2018
1 parent 45b490f commit b4574e9
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 21 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*~
coverage.*
vendor/github.com/*
vendor/golang.org/*
37 changes: 27 additions & 10 deletions cache.go → cache_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,42 @@ import (
"fmt"
)

// N.B.: this is the cache implementation whose functions are
// not intended to be called directly.
// The intended cache interface
// is in cache_public.go: namely "lookup", "add", "remove",
// and the key-building functions.
// All access to the cache is intended to be via the server mailbox.
// That is our serialization mechanism (akin to an erlang gen_server).
//
// (Deliberately resisting the "internal" package goo...)
var cache map[string]interface{}

func runServer(mbox <-chan *CacheRequest) {
for {
req, ok := <-mbox
if ok {
req.replyChan <- req.operation()
} else {
break
}
}
}

func initCache() {
cache = make(map[string]interface{})
serverMbox := make(chan *CacheRequest)
go runServer(serverMbox)
initCacheClient(serverMbox)
}

func findInList(clist []*JsonObject, target *JsonObject) int {
targids := getObjIds(target)
for idx, obj := range clist {
ids := getObjIds(obj)
if targids.name == ids.name && targids.namespace == ids.namespace {
if targids.name == ids.name &&
targids.namespace == ids.namespace &&
targids.kind == ids.kind {
return idx
}
}
Expand All @@ -40,7 +65,7 @@ func deleteFromCacheList(key string, obj *JsonObject) {
clist := val.([]*JsonObject)
idx := findInList(clist, obj)
if idx > -1 {
clist = append(clist[:idx], clist[idx+1:]...)
cache[key] = append(clist[:idx], clist[idx+1:]...)
}
}
}
Expand Down Expand Up @@ -100,11 +125,3 @@ func cacheLookup(key string) interface{} {
return nil
}
}

func cacheKey(kind, namespace, name string) string {
return kind + "#" + namespace + "#" + name
}

func nsCacheKey(kind, namespace string) string {
return kind + "#" + namespace
}
93 changes: 93 additions & 0 deletions cache_public.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2018 CA. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

var client CacheClient

type CacheOp func() interface{}

type CacheRequest struct {
operation CacheOp
replyChan chan interface{}
}

type clientif interface {
Lookup(key string) interface{}
Remove(obj *JsonObject)
Add(obj *JsonObject)
}

type CacheClient struct {
serverMbox chan<- *CacheRequest
}

func initCacheClient(serverMbox chan *CacheRequest) {
client = CacheClient{serverMbox}
}

func GetCache() *CacheClient {
return &client
}

func (client *CacheClient) Lookup(key string) interface{} {
replyChan := make(chan interface{})
req := &CacheRequest{
func() interface{} {
return cacheLookup(key)
}, replyChan,
}
client.serverMbox <- req
retval := <-replyChan
return retval
}

func (client *CacheClient) Remove(obj *JsonObject) {
replyChan := make(chan interface{})
req := &CacheRequest{
func() interface{} {
removeFromCache(obj)
return true
}, replyChan,
}
client.serverMbox <- req
if retval := <-replyChan; retval != true {
panic("bad return from cache Remove")
}
}

func (client *CacheClient) Add(obj *JsonObject) {
replyChan := make(chan interface{})
req := &CacheRequest{
func() interface{} {
addToCache(obj)
return true
}, replyChan,
}
client.serverMbox <- req
if retval := <-replyChan; retval != true {
panic("bad return from cache Add")
}
}

// compilation error if we don't implement the i/f properly
var _ clientif = (*CacheClient)(nil)

func cacheKey(kind, namespace, name string) string {
return kind + "#" + namespace + "#" + name
}

func nsCacheKey(kind, namespace string) string {
return kind + "#" + namespace
}
6 changes: 3 additions & 3 deletions kubeaccess.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func lookUpMap(
key := cacheKey(kind, namespace, name)
var cachedVal interface{}
if isWatchedKind(kind) {
cachedVal = cacheLookup(key)
cachedVal = GetCache().Lookup(key)
} else {
cachedVal = (*cache)[key]
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func getAllK8sObjsOfKind(

var objs interface{}
if isWatchedKind(kind) {
objs = cacheLookup(kind)
objs = GetCache().Lookup(kind)
} else {
objs = (*cache)[kind]
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func getAllK8sObjsOfKindInNamespace(
key := nsCacheKey(kind, ns)
var objs interface{}
if isWatchedKind(kind) {
objs = cacheLookup(key)
objs = GetCache().Lookup(key)
} else {
objs = (*cache)[key]
}
Expand Down
16 changes: 8 additions & 8 deletions watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ func buildWatchUrl(kind string) string {
if watchurl, ok := watchUrlByKind[kind]; ok {
return apiHost + watchurl
}
panic(fmt.Sprintf("no url for kind: '%s'", kind))
panic(fmt.Sprintf("no watcher url for kind: '%s'", kind))
}

func readSecret(name string) []byte {
b, err := ioutil.ReadFile(secretDir + "/" + name)
if err != nil {
panic(fmt.Sprintf("error reading secret %s: %s\n",
panic(fmt.Sprintf("watcher error reading secret %s: %s\n",
name, err.Error()))
}
return b
Expand Down Expand Up @@ -112,7 +112,7 @@ func initClient() {
func makeWatchRequest(kind string) *http.Request {
req, err := http.NewRequest("GET", buildWatchUrl(kind), nil)
if err != nil {
panic(fmt.Sprintf("http.NewRequest error for kind %s: %s\n",
panic(fmt.Sprintf("watcher http.NewRequest error for kind %s: %s\n",
kind, err.Error()))
}
req.Header.Add("Authorization", "Bearer "+token)
Expand All @@ -130,20 +130,20 @@ func runWatcher(kind string) {
for {
line, err := reader.ReadBytes('\n')
if err != nil {
panic(err)
panic(fmt.Sprintf("read error on watcher stream: %s\n", err.Error()))
}
var notif Notification
if err := json.Unmarshal(line, &notif); err != nil {
panic(err)
panic(fmt.Sprintf("JSON unmarshal error on watcher input: %s\n",
err.Error()))
}

if notif.Type == "ADDED" || notif.Type == "MODIFIED" {
addToCache(&notif.Object)
GetCache().Add(&notif.Object)
} else if notif.Type == "DELETED" {
removeFromCache(&notif.Object)
GetCache().Remove(&notif.Object)
}
}
fmt.Printf("watcher terminates...\n")
}

func initWatchers() {
Expand Down

0 comments on commit b4574e9

Please sign in to comment.