diff --git a/agent.go b/agent.go index 896bad1..40a4138 100644 --- a/agent.go +++ b/agent.go @@ -3,7 +3,6 @@ package main import ( "fmt" "math/rand" - "net/http" "os" "os/signal" "syscall" @@ -85,12 +84,8 @@ func serve(c *cli.Context) error { errChan <- nodeManager.Run(ctx) }() - additionalHandler := map[string]map[string]func(http.ResponseWriter, *http.Request){ - "GET": { - "/log/": workloadManager.LogHandler(), - }, - } - go api.Serve(config.API.Addr, additionalHandler) + apiHandler := api.NewHandler(config, workloadManager) + go apiHandler.Serve() select { case err := <-errChan: diff --git a/api/http.go b/api/http.go index 63633af..56c2f4f 100644 --- a/api/http.go +++ b/api/http.go @@ -7,6 +7,8 @@ import ( // enable api _ "net/http/pprof" // nolint + "github.com/projecteru2/agent/manager/workload" + "github.com/projecteru2/agent/types" "github.com/projecteru2/agent/version" "github.com/bmizerany/pat" @@ -19,6 +21,8 @@ type JSON map[string]interface{} // Handler define handler type Handler struct { + config *types.Config + workloadManager *workload.Manager } // URL /version/ @@ -39,20 +43,47 @@ func (h *Handler) profile(w http.ResponseWriter, _ *http.Request) { _ = json.NewEncoder(w).Encode(r) } +// URL /log/ +func (h *Handler) log(w http.ResponseWriter, req *http.Request) { + app := req.URL.Query().Get("app") + if app == "" { + w.WriteHeader(http.StatusBadRequest) + return + } + // fuck httpie + w.WriteHeader(http.StatusOK) + if hijack, ok := w.(http.Hijacker); ok { + conn, buf, err := hijack.Hijack() + if err != nil { + log.Errorf("[apiLog] connect failed %v", err) + return + } + defer conn.Close() + h.workloadManager.Subscribe(app, buf) + } +} + +func NewHandler(config *types.Config, workloadManager *workload.Manager) *Handler { + return &Handler{ + config: config, + workloadManager: workloadManager, + } +} + // Serve start a api service // blocks by http.ListenAndServe // run this in a separated goroutine -func Serve(addr string, additionalHandlers map[string]map[string]func(http.ResponseWriter, *http.Request)) { - if addr == "" { +func (h *Handler) Serve() { + if h.config.API.Addr == "" { return } - h := &Handler{} restfulAPIServer := pat.New() handlers := map[string]map[string]func(http.ResponseWriter, *http.Request){ "GET": { "/profile/": h.profile, "/version/": h.version, + "/log/": h.log, }, } @@ -62,17 +93,11 @@ func Serve(addr string, additionalHandlers map[string]map[string]func(http.Respo } } - for method, routes := range additionalHandlers { - for route, handler := range routes { - restfulAPIServer.Add(method, route, http.HandlerFunc(handler)) - } - } - http.Handle("/", restfulAPIServer) http.Handle("/metrics", promhttp.Handler()) - log.Infof("[apiServe] http api started %s", addr) + log.Infof("[apiServe] http api started %s", h.config.API.Addr) - err := http.ListenAndServe(addr, nil) + err := http.ListenAndServe(h.config.API.Addr, nil) if err != nil { log.Panicf("http api failed %s", err) } diff --git a/manager/workload/log.go b/manager/workload/log.go index af0642e..24906f9 100644 --- a/manager/workload/log.go +++ b/manager/workload/log.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "net/http" "github.com/projecteru2/agent/types" coreutils "github.com/projecteru2/core/utils" @@ -83,22 +82,3 @@ func (l *logBroadcaster) run(ctx context.Context) { } } } - -func (l *logBroadcaster) handler(w http.ResponseWriter, req *http.Request) { - app := req.URL.Query().Get("app") - if app == "" { - w.WriteHeader(http.StatusBadRequest) - return - } - // fuck httpie - w.WriteHeader(http.StatusOK) - if hijack, ok := w.(http.Hijacker); ok { - conn, buf, err := hijack.Hijack() - if err != nil { - logrus.Errorf("[apiLog] connect failed %v", err) - return - } - defer conn.Close() - l.subscribe(app, buf) - } -} diff --git a/manager/workload/log_test.go b/manager/workload/log_test.go index 49b99b0..5a77f6e 100644 --- a/manager/workload/log_test.go +++ b/manager/workload/log_test.go @@ -10,15 +10,35 @@ import ( "github.com/projecteru2/agent/types" "github.com/bmizerany/pat" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" ) func TestLogBroadcaster(t *testing.T) { l := newLogBroadcaster() + handler := func(w http.ResponseWriter, req *http.Request) { + app := req.URL.Query().Get("app") + if app == "" { + w.WriteHeader(http.StatusBadRequest) + return + } + // fuck httpie + w.WriteHeader(http.StatusOK) + if hijack, ok := w.(http.Hijacker); ok { + conn, buf, err := hijack.Hijack() + if err != nil { + logrus.Errorf("[apiLog] connect failed %v", err) + return + } + defer conn.Close() + l.subscribe(app, buf) + } + } + go func() { restfulAPIServer := pat.New() - restfulAPIServer.Add("GET", "/log/", http.HandlerFunc(l.handler)) + restfulAPIServer.Add("GET", "/log/", http.HandlerFunc(handler)) http.Handle("/", restfulAPIServer) http.ListenAndServe(":12310", nil) }() @@ -30,7 +50,6 @@ func TestLogBroadcaster(t *testing.T) { Name: "nerv", Type: "stdout", EntryPoint: "eva0", - Ident: "", Data: "data0", } l.logC <- &types.Log{ @@ -38,7 +57,6 @@ func TestLogBroadcaster(t *testing.T) { Name: "nerv", Type: "stdout", EntryPoint: "eva0", - Ident: "", Data: "data1", } }() diff --git a/manager/workload/manager.go b/manager/workload/manager.go index eb8f074..9a98327 100644 --- a/manager/workload/manager.go +++ b/manager/workload/manager.go @@ -1,8 +1,8 @@ package workload import ( + "bufio" "context" - "net/http" "github.com/projecteru2/agent/common" "github.com/projecteru2/agent/runtime" @@ -118,7 +118,6 @@ func (m *Manager) Run(ctx context.Context) error { } } -// LogHandler returns the http handler for /log/ -func (m *Manager) LogHandler() func(w http.ResponseWriter, req *http.Request) { - return m.logBroadcaster.handler +func (m *Manager) Subscribe(app string, buf *bufio.ReadWriter) { + m.logBroadcaster.subscribe(app, buf) }