diff --git a/main.go b/main.go index 661880c..605f929 100644 --- a/main.go +++ b/main.go @@ -5,15 +5,11 @@ package main 1. [x] serve HTTP /v1/instances/:app_id to retrieve ip:port pairs for given app 2. [x] implement UDP proxy (one-way/two-way, fanout (& roundrobin)) 3. [x] implement upstream-conf.d file management -4. [ ] implement TCP proxy (with pluggable impls: haproxy, lvs, ...) -5. [ ] tcp-proxy: add `accept-proxy` support -6. [ ] tcp-proxy: add `proxy-protocol` support -7. [ ] logging: add readable up/down notices, such as: - - "APP_ID: Task $TASK_STATUS on host $HOSTNAME ($TASK_ID)." - with TASK_STATUS translating into readable ("killed", "finished", "running", ...) - - "APP_ID: Task on $HOSTNAME:$PORT is unhealthy ($TASK_ID)." - - "APP_ID: Task on $HOSTNAME:$PORT is healthy ($TASK_ID)." -8. [ ] implement HTTP(S) gateway support +4. [x] logging: add readable up/down notices, such as: +5. [ ] implement TCP proxy (with pluggable impls: haproxy, lvs, ...) +7. [ ] tcp-proxy: add `accept-proxy` support +8. [ ] tcp-proxy: add `proxy-protocol` support +9. [ ] implement HTTP(S) gateway support XXX Changes: @@ -222,6 +218,7 @@ func (mmsd *mmsdService) setupEventBusListener() { switch event.TaskStatus { case marathon.TaskFinished, marathon.TaskFailed, marathon.TaskKilled, marathon.TaskLost: + log.Printf("App %v task %v on %v changed status. %v.\n", event.AppId, event.TaskId, event.Host, event.TaskStatus) mmsd.Update(event.AppId, event.TaskId, false) case marathon.TaskRunning: // XXX we require our apps to always have health checks set, @@ -235,15 +232,52 @@ func (mmsd *mmsdService) setupEventBusListener() { sse.AddEventListener("health_status_changed_event", func(data string) { var event marathon.HealthStatusChangedEvent json.Unmarshal([]byte(data), &event) + app, err := mmsd.getMarathonApp(event.AppId) + if err != nil { + log.Printf("Failed to fetch Marathon app. %+v. %v\n", event, err) + return + } + if app == nil { + log.Printf("App %v not found anymore.\n", event.AppId) + return + } + + task := app.GetTaskById(event.TaskId) + if task == nil { + log.Printf("App %v task %v not found anymore.\n", event.AppId, event.TaskId) + return + } + + // app & task definitely do exist, so propagate health change event + + if event.Alive { + log.Printf("App %v task %v on %v is healthy.\n", event.AppId, event.TaskId, task.Host) + } else { + log.Printf("App %v task %v on %v is unhealthy.\n", event.AppId, event.TaskId, task.Host) + } + mmsd.Update(event.AppId, event.TaskId, event.Alive) }) go sse.RunForever() } +func (mmsd *mmsdService) getMarathonApp(appID string) (*marathon.App, error) { + m, err := marathon.NewService(mmsd.MarathonIP, mmsd.MarathonPort) + if err != nil { + return nil, err + } + + app, err := m.GetApp(appID) + if err != nil { + return nil, err + } + + return app, nil +} + // enable/disable given app:task func (mmsd *mmsdService) Update(appID string, taskID string, alive bool) { - log.Printf("Update %v: %v (%v)\n", appID, taskID, alive) m, err := marathon.NewService(mmsd.MarathonIP, mmsd.MarathonPort) if err != nil { log.Printf("Update: NewService(%q, %v) failed. %v\n", mmsd.MarathonIP, mmsd.MarathonPort, err)