Skip to content

Commit

Permalink
restart watcher on EOF from read stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim Engquist committed Oct 25, 2018
1 parent d7b9dfc commit 048b5b2
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
)

Expand All @@ -29,6 +30,11 @@ type Notification struct {
Object JsonObject
}

type HttpConfig struct {
token string
tr *http.Transport
}

var watchUrlByKind = map[string]string{
"Pod": "/api/v1/watch/pods?watch=true",
"Deployment": "/apis/apps/v1/watch/deployments?watch=true",
Expand Down Expand Up @@ -75,18 +81,18 @@ func getTlsConfig() *tls.Config {
return nil
}

var k8sClient *http.Client
var token = ""
var httpConfig HttpConfig

func initClient() {
func initHttpConfig() {
tr := &http.Transport{}
if tlsConfig := getTlsConfig(); tlsConfig != nil {
tr.TLSClientConfig = tlsConfig
}
k8sClient = &http.Client{Transport: tr}
token := ""
if tok := readSecret("token"); tok != nil {
token = string(tok)
}
httpConfig = HttpConfig{token, tr}
}

func makeWatchRequest(kind string) *http.Request {
Expand All @@ -95,29 +101,33 @@ func makeWatchRequest(kind string) *http.Request {
panic(fmt.Sprintf("watcher http.NewRequest error for kind %s: %s\n",
kind, err.Error()))
}
if token != "" {
req.Header.Add("Authorization", "Bearer "+token)
if httpConfig.token != "" {
req.Header.Add("Authorization", "Bearer "+httpConfig.token)
}
return req
}

func runWatcher(kind string) {
func runWatcher(kind string, endchan chan string) {
req := makeWatchRequest(kind)
k8sClient := &http.Client{Transport: httpConfig.tr}
resp, err := k8sClient.Do(req)
if err != nil {
panic(fmt.Sprintf("http GET error for watch of kind %s: %s",
kind, err.Error()))
}
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)
for {
line, err := reader.ReadBytes('\n')
if err != nil {
panic(fmt.Sprintf("read error on watcher stream: %s\n", err.Error()))
log.Printf("read error on %s watcher stream: %s\n", kind, err.Error())
break
}
var notif Notification
if err := json.Unmarshal(line, &notif); err != nil {
panic(fmt.Sprintf("JSON unmarshal error on watcher input: %s\n",
err.Error()))
log.Printf("JSON unmarshal error on %s watcher input: %s\n",
kind, err.Error())
break
}

if notif.Type == "ADDED" || notif.Type == "MODIFIED" {
Expand All @@ -126,11 +136,24 @@ func runWatcher(kind string) {
GetCache().Remove(&notif.Object)
}
}
endchan <- kind
}

func restartWatchers(endchan chan string) {
// if a watcher terminates, restart it
for {
restartName := <-endchan
log.Printf("restarting terminated watcher: %s\n", restartName)
go runWatcher(restartName, endchan)
}
}

func initWatchers() {
initClient()
initHttpConfig()
endchan := make(chan string)
for key, _ := range watchUrlByKind {
go runWatcher(key)
go runWatcher(key, endchan)
}

go restartWatchers(endchan)
}

0 comments on commit 048b5b2

Please sign in to comment.