Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/rsearch #1

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rsearch/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vendor/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They seem to use godeps instead of vendoring, so this probably isn't needed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I maintain godep directory, but with VENDOREXPERIMENT vendor/ folder gets created automatically. This line is to prevent accidental commits including this folder.

23 changes: 23 additions & 0 deletions rsearch/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions rsearch/Godeps/Readme

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions rsearch/TODO
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1. When starting server read through namespace list and fire up ns.Producer per namespace.
Read throug resource list per namespace and send them to Processor
2. Import Kubernetes and use Objects from there
1 change: 1 addition & 0 deletions rsearch/bin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
bin
57 changes: 57 additions & 0 deletions rsearch/bin/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"flag"
"fmt"
search "github.com/romana/contrib/rsearch"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar for this, should potentially be k8s.io/contrib.
It means you need to clone this repo manually to the location it would live in GOPATH.
Not really much choice for an upstream submission.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why is the package being renamed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Historical reasons. Also i don't see any harm in it - does anyone feel like this need to be reverted back to package name ?

Also, does anyone have ideas for better package name ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's surprising to be aliasing your own package, since you have the option of naming it differently. No better suggestions, just pick one and use it consistently.

"log"
// "net/http"
"encoding/json"

// "io"
)

func main() {
var cfgFile = flag.String("c", "", "Kubernetes reverse search config file")
var server = flag.Bool("s", false, "Start a server")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this just a server, and a separate tool for a client?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra package to maintain. Extra package to push.

var host = flag.String("h", "", "Protocol://host for client to connect to")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A partial URL is neither a host or a URL.
You should pick one -- either a full URL (including port), or a hostname and port that you'll compose into a URL.

var searchTag = flag.String("r", "", "Search resources by tag")
flag.Parse()

done := make(chan search.Done)

config, err := search.NewConfig(*cfgFile)
if *host != "" {
config.Server.Host = *host

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is config a useful value if an error occured?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not an error check, though there should be one.

}

if err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could go directly below the point where err was assigned.

fmt.Printf("Can not read config file %s, %s\n", *cfgFile, err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some things use fmt, others use log. Is there a reason for the difference?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only bin/main.go uses fmt. Package itself uses log.

But maybe i was a bit less careful with main.go since it's just using the library.

return
}

if *server {
fmt.Println("Starting server")
nsUrl := fmt.Sprintf("%s/%s", config.Api.Url, config.Api.NamespaceUrl)
nsEvents, err := search.NsWatch(done, nsUrl, config)
if err != nil {
log.Fatal("Namespace watcher failed to start", err)
}

events := search.Conductor(nsEvents, done, config)
req := search.Process(events, done, config)
log.Println("All routines started")
search.Serve(config, req)
} else if len(*searchTag) > 0 {
if config.Server.Debug {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config.Server.Debug is controlling the debug level of a client?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And a server.

fmt.Println("Making request t the server")
}
r := search.SearchResource(config, search.SearchRequest{Tag: *searchTag})
response, _ := json.Marshal(r)
if err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This err is from a higher scope.

panic(err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be log.Fatal instead.

}
fmt.Println(string(response))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if it were neither?


}
5 changes: 5 additions & 0 deletions rsearch/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing else has a build.sh, so this is a bit odd.
And because of Godeps, it should probably munge the GOPATH to include the Godeps/_workspace path before running go build.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh that's just a helper for myself - shouldn't be in the repo.

find . -type d | while read line; do
( echo Building in the $line ....; cd $line && go build .)
done
47 changes: 47 additions & 0 deletions rsearch/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package rsearch

import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
)

// SearchResource connects to instance of a server

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instance of what server?

// and resolves SearchRequest

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "resolves SearchRequest" mean?

And when I ask something like this, I mean: Please explain in the docstring.

func SearchResource(config Config, req SearchRequest) SearchResponse {
// TODO need to make url configurable
url := config.Server.Host + ":" + config.Server.Port
data := []byte(`{ "tag" : "` + req.Tag + `"}`)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something's happening with data, but I have no idea what's going on. Some comments would be really useful.

if config.Server.Debug {
log.Println("Making request with", string(data))
}

// Make request
request, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
request.Header.Set("Content-Type", "application/json")
client := &http.Client{}
response, err := client.Do(request)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might as well use http.DefaultClient if you're not doing any special setup of the client or transport.

if err != nil {
log.Println("HTTP request failed", url, err)
panic(err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not log.Fatal? This is dumping the error twice and a stack trace.

}

defer response.Body.Close()
decoder := json.NewDecoder(response.Body)
sr := SearchResponse{}
if config.Server.Debug {
log.Println("Trying to decode", response.Body)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response.Body won't be a stringified version of the response, but some arbitrary pointers and struct values like &{0xc8201040e0 {0 0} false <nil> 0xd4840 0xd47e0}

}
err = decoder.Decode(&sr)
if err != nil {
log.Println("Failed to decode", response.Body)
panic(err)
}

if config.Server.Debug {
fmt.Println("Decoded response form a server", sr)
}
return sr
}
43 changes: 43 additions & 0 deletions rsearch/conductor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package rsearch

func manageResources(ns Event, terminators map[string]chan Done, config Config, out chan Event) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring

uid := ns.Object.Metadata.Uid
if ns.Type == "ADDED" {
done := make(chan Done)
terminators[uid] = done

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if it already existed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't really run into such case but if we are we just update termination channel for such goroutine and let it's previous termination channel to be garbage collected.

ns.Object.Produce(out, terminators[uid], config)
} else if ns.Type == "DELETED" {
close(terminators[uid])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if it doesn't exist?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't really happen but we will blow up in this case. However, spitting out readable message could be helpful.

delete(terminators, uid)
} else if ns.Type == "_CRASH" {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be documented a bit better. It's not really a crash, but an internal management event when the connection went bad, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now it represents HTTP GET timeout which for us is a controlled disaster.
Actions needed here (on receiving end) is to terminate all existing goroutines which sound like crash.

But maybe i could turn it into a type KubeEventType string and then define bunch of constants like

const (
KubeEventAdded KubeEventType = "ADDED"
KubeEventDeleted KubeEventType = "DELETED"
InternalEventDeleteAll KubeEventType = "_DELETE_ALL"
)

what do you think ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need a new type for that.

Normally, closing the channel would be a good way of communicating that. But the current implementation keeps it open even when the connection is re-established.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining constants, rather than using string literals, is definitely an improvement.

for uid, c := range terminators {
close(c)
delete(terminators, uid)
}
}
}

// Conductor manages a set of goroutines one per namespace.
func Conductor(in <-chan Event, done <-chan Done, config Config) <-chan Event {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots of done's in here. It's a bit confusing to follow the purpose of each of them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, i don't see a better way of doing it since each goroutine must receive some done channel and some goroutines are managing other goroutines so some goroutines would have their done channel and a set of done channels for their children.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For starters, you might add a comment that explains what's happening. Then this could also help to clear up 'done' ambiguities.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Idea of this map is to keep termination channels organized
// so when DELETED event occurs on a namespace it would be possible
// to terminater related goroutine
var terminators map[string]chan Done
terminators = make(map[string]chan Done)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

terminators := map[string]chan Done{}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's wrong with make ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't about avoiding make, but avoiding 'var' and repetition of the type.


ns := Event{}
out := make(chan Event)

go func() {
for {
select {
case ns = <-in:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case ns := <-in:, then you won't need to predeclare it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is deliberate to show what kind of things coming out.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the declaration of in make that clear?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically yes. I just like it when i read it. I understand it's unnecessary, do you think it's also wrong thing to do ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not wrong, it was just surprising. It's a subjective thing.

manageResources(ns, terminators, config, out)
case <-done:
return
}
}
}()

return out
}
48 changes: 48 additions & 0 deletions rsearch/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package rsearch

import (
"gopkg.in/gcfg.v1"
)

// Done is an alias for empty struct, used for termination channels

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a good habit to alias the types if you're not adding methods.
struct{} is a fairly ugly type, to be fair.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For that particular use case it's documented practice.

https://blog.golang.org/pipelines

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.
    done <- struct{}{}
    done <- struct{}{}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a "termination channel"?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a pattern of sending broadcast message by means of closing a shared channel.

There are no process management and only 2 ways of communicating. Shared state variables or channels. Shared state variables are bad so channels if obvious way. The problem with channels is that you can't send message to all gorotines listening on a channel.

Termination channel or "done" channel is a pattern of creating a channel of nil types and have arbitrary number of goroutines sitting blocked on this channel forever. When you want to send message you close the channel and all goroutines receive broadcast message that channel was closed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Can you explain that in some comments, please?

type Done struct{}

// Config is a top level struct describing expected structure of config file.
type Config struct {
Resource Resource
Server Server
Api Api
}

// Server is a config section describing server instance of this package.
type Server struct {
Port string
Host string
Debug bool
}

// API is a config section describing kubernetes API parameters.
type Api struct {
Url string
NamespaceUrl string
}

// Resource is a config section describing kubernetes resource to be cached and searched for.
type Resource struct {
Name string
Type string
Selector string
Namespaced string
UrlPrefix string
UrlPostfix string
}

// NewConfig parsing config file and returning initialized instance of Config structure
func NewConfig(configFile string) (Config, error) {
cfg := Config{}
if err := gcfg.ReadFileInto(&cfg, configFile); err != nil {
return cfg, err

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to the other comment. What state is cfg in at the point an error occurs? Unchanged or partially changed? Is it appropriate to return it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had an issue where it wouldn't accept nil for cfg in return.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, you'd have to return a zero-ed value of Config, ie: return Config{}, err
That'd be marginally better than returning a potentially invalid value of cfg (or changing it to *Config so you can return nil)
The caller should be checking err anyway.

}

return cfg, nil
}
18 changes: 18 additions & 0 deletions rsearch/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
; comment
[api]
url=http://192.168.0.10:8080

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this just be watchURL as a single item? Does it actually vary between deployments?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kube api url does change, namespace endpoint might not change but we want to be able to define api url separately as it's going to be used in few places.

namespaceUrl=api/v1/namespaces/?watch=true

[resource]
;type=builtin # TBD
type=3rdParty
;namespaced=false # TBD

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some comments after or before each config item to explain what the config is for? For example, it's not clear what "namespaced" means.

namespaced=true
urlPrefix=apis/romana.io/demo/v1/namespaces
urlPostfix=networkpolicys/?watch=true
name=NetworkPolicy
selector=podSelector

[server]
port=9700
debug=false
3 changes: 3 additions & 0 deletions rsearch/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// This package prvides means of searching kubernets objects by their selectors

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling, and this becomes a single sentence in documentation, ie: it'll appear as "...selectors the goal is..."

// the goal is to give the answer to a question - which resources are selecting pods with given label
package rsearch
97 changes: 97 additions & 0 deletions rsearch/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package rsearch

import (
"log"
)

/*
{"type":"ADDED","object":{"apiVersion":"romana.io/demo/v1","kind":"NetworkPolicy","metadata":{"name":"pol1","namespace":"default","selfLink":"/apis/romana.io/demo/v1/namespaces/default/networkpolicys/pol1","uid":"d7036130-e119-11e5-aab8-0213e1312dc5","resourceVersion":"119875","creationTimestamp":"2016-03-03T08:28:00Z","labels":{"owner":"t1"}},"spec":{"allowIncoming":{"from":[{"pods":{"tier":"frontend"}}],"toPorts":[{"port":80,"protocol":"TCP"}]},"podSelector":{"tier":"backend"}}}}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an internal reminder of the request structure?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both, internal reminder of how request structure looks like.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this serves as documentation then:

  • Let's improve the formatting so that it can actually be read.
  • Let's add some comments around it, so the reader knows what this is about.

*/

// Process is a goroutine that consumes resource update events and maintains a searchable
// cache of all known resources. It also accepts search requests and perform searches.
func Process(in <-chan Event, done chan Done, config Config) chan<- SearchRequest {
// Channel to submit SearchRequest's into
req := make(chan SearchRequest)

// storage map is a cache of known KubeObjects
// arranged by NPid
storage := make(map[string]KubeObject)

// search map is a cache of known NPid's
// arranged by Selectors, where selector being
// a field by which we search
search := make(map[string]map[string]bool)

go func() {
for {
select {
case e := <-in:
// On incoming event update caches
updateStorage(e, storage, search, config)
case request := <-req:
// On incoming search request return a list
// of resources with matching Selectors
processSearchRequest(storage, search, request, config)
case <-done:
return
}
}
}()

return req
}

func processSearchRequest(storage map[string]KubeObject, search map[string]map[string]bool, req SearchRequest, config Config) SearchResponse {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring

if config.Server.Debug {
log.Println("Received request", req)
}

var resp []KubeObject

if config.Server.Debug {
log.Printf("Index map has following %s, request tag is %s ", search, req.Tag)
}

// Assembling response.
for NPid, _ := range search[string(req.Tag)] {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the map accesses are going to be racey.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Process goroutine owns all the maps and it can perform only one task at a time.

we either writing or reading, not both at same time

                        case e := <-in:
                                // On incoming event update caches
                                updateStorage(e, storage, search, config)
                        case request := <-req:
                                // On incoming search request return a list
                                // of resources with matching Selectors
                                processSearchRequest(storage, search, request, config)

if config.Server.Debug {
log.Printf("Assembling response adding %s to %s", resp, storage[NPid])
}
resp = append(resp, storage[NPid])
}

if config.Server.Debug {
log.Printf("Dispatching final response %s", resp)
}

req.Resp <- resp // TODO see if it may hang up here
return resp
}

func updateStorage(e Event, storage map[string]KubeObject, search map[string]map[string]bool, config Config) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring

NPid := e.Object.makeId()
Selector := e.Object.getSelector(config)

if e.Type == "ADDED" {
if config.Server.Debug {
log.Printf("Processing ADD request for %s", e.Object.Metadata.Name)
}
storage[NPid] = e.Object
if _, ok := search[Selector]; !ok {
m := make(map[string]bool)
search[Selector] = m
}
search[Selector][NPid] = true
} else if e.Type == "DELETED" {
if config.Server.Debug {
log.Printf("Processing DELETE request for %s", e.Object.Metadata.Name)
}
delete(storage, NPid)
delete(search[Selector], NPid)
} else {
if config.Server.Debug {
log.Printf("Received unindentified request %s for %s", e.Type, e.Object.Metadata.Name)
}
}
}
Loading