diff --git a/.golangci.yml b/.golangci.yml index c8985dc..4658df5 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -40,10 +40,8 @@ linters: # All available linters list: = 500: + w.WriteHeader(rd.unavailableStatusCode) + + report = append(report, &Report{ + PluginName: k, + ErrorMessage: "internal server error, see logs", + StatusCode: rd.unavailableStatusCode, + }) + case st.Code >= 100 && st.Code <= 400: + report = append(report, &Report{ + PluginName: k, + StatusCode: st.Code, + }) + default: + report = append(report, &Report{ + PluginName: k, + ErrorMessage: "unexpected status code", + StatusCode: st.Code, + }) + } + } + + data, err := json.Marshal(report) + if err != nil { + // TODO do we need to write this error to the ResponseWriter? + rd.log.Error("failed to marshal response", zap.Error(err)) + return + } + // write the response + _, _ = w.Write(data) + return } - // iterate over all provided plugins + // iterate over all provided Plugins for i := 0; i < len(pl); i++ { - switch { - // check workers for the plugin - case rd.statusRegistry[pl[i]] != nil: - st, errS := rd.statusRegistry[pl[i]].Status() - if errS != nil { - http.Error(w, errS.Error(), rd.unavailableStatusCode) - return + if svc, ok := rd.statusRegistry[pl[i]]; ok { + if svc == nil { + continue + } + + st, err := rd.statusRegistry[pl[i]].Status() + if err != nil { + report = append(report, &Report{ + PluginName: pl[i], + ErrorMessage: err.Error(), + StatusCode: http.StatusInternalServerError, + }) + + continue } if st == nil { - w.WriteHeader(rd.unavailableStatusCode) - // nil can be only if the service unavailable - _, _ = w.Write([]byte(fmt.Sprintf(template, html.EscapeString(pl[i]), rd.unavailableStatusCode))) - return + report = append(report, &Report{ + PluginName: pl[i], + ErrorMessage: "plugin is not available", + StatusCode: rd.unavailableStatusCode, + }) + + continue } switch { case st.Code >= 500: + // on >=500, write header, because it'll be written on Write (200) w.WriteHeader(rd.unavailableStatusCode) - // if there is 500 or 503 status code return immediately - _, _ = w.Write([]byte(fmt.Sprintf(template, html.EscapeString(pl[i]), rd.unavailableStatusCode))) - return + report = append(report, &Report{ + PluginName: pl[i], + ErrorMessage: "internal server error, see logs", + StatusCode: rd.unavailableStatusCode, + }) case st.Code >= 100 && st.Code <= 400: - _, _ = w.Write([]byte(fmt.Sprintf(template, html.EscapeString(pl[i]), st.Code))) - continue + report = append(report, &Report{ + PluginName: pl[i], + StatusCode: st.Code, + }) default: - _, _ = w.Write([]byte(fmt.Sprintf("plugin: %s not found", html.EscapeString(pl[i])))) + report = append(report, &Report{ + PluginName: pl[i], + ErrorMessage: "unexpected status code", + StatusCode: st.Code, + }) } - default: - _, _ = w.Write([]byte(fmt.Sprintf("plugin: %s not found", html.EscapeString(pl[i])))) + } else { + rd.log.Info("plugin does not support health checks", zap.String("plugin", pl[i])) } } + + data, err := json.Marshal(report) + if err != nil { + rd.log.Error("failed to marshal response", zap.Error(err)) + } + + // write the response + _, _ = w.Write(data) } diff --git a/jobs.go b/jobs.go index f17e792..490a986 100644 --- a/jobs.go +++ b/jobs.go @@ -2,18 +2,13 @@ package status import ( "context" - "fmt" - "html" + "encoding/json" "net/http" "sync/atomic" "go.uber.org/zap" ) -const ( - jobsTemplate string = "plugin: %s: pipeline: %s | priority: %d | ready: %t | queue: %s | active: %d | delayed: %d | reserved: %d | driver: %s | error: %s \n" -) - type Jobs struct { statusJobsRegistry JobsChecker unavailableStatusCode int @@ -47,19 +42,28 @@ func (jb *Jobs) ServeHTTP(w http.ResponseWriter, _ *http.Request) { return } + report := make([]*JobsReport, 0, 2) + // write info about underlying drivers for i := 0; i < len(jobStates); i++ { - _, _ = w.Write([]byte(html.EscapeString(fmt.Sprintf(jobsTemplate, - "jobs", // only JOBS plugin - jobStates[i].Pipeline, - jobStates[i].Priority, - jobStates[i].Ready, - jobStates[i].Queue, - jobStates[i].Active, - jobStates[i].Delayed, - jobStates[i].Reserved, - jobStates[i].Driver, - jobStates[i].ErrorMessage, - )))) + report = append(report, &JobsReport{ + Pipeline: jobStates[i].Pipeline, + Priority: jobStates[i].Priority, + Ready: jobStates[i].Ready, + Queue: jobStates[i].Queue, + Active: jobStates[i].Active, + Delayed: jobStates[i].Delayed, + Reserved: jobStates[i].Reserved, + Driver: jobStates[i].Driver, + ErrorMessage: jobStates[i].ErrorMessage, + }) + } + + data, err := json.Marshal(report) + if err != nil { + jb.log.Error("failed to marshal jobs state report", zap.Error(err)) + return } + + _, _ = w.Write(data) } diff --git a/plugin.go b/plugin.go index d263790..4e6c6a3 100644 --- a/plugin.go +++ b/plugin.go @@ -19,7 +19,6 @@ const ( // PluginName declares public plugin name. PluginName = "status" pluginsQuery string = "plugin" - template string = "plugin: %s, status: %d\n" ) type Configurer interface { diff --git a/ready.go b/ready.go index 0edc1fa..f9b5041 100644 --- a/ready.go +++ b/ready.go @@ -1,16 +1,15 @@ package status import ( - "fmt" - "html" + "encoding/json" "net/http" "sync/atomic" "go.uber.org/zap" ) -// readinessHandler return 200OK if all plugins are ready to serve -// if one of the plugins returns status from the 5xx range, the status for all queries will be 503 +// readiness Handler return 200OK if all Plugins are ready to serve +// if one of the Plugins returns status from the 5xx range, the status for all queries will be 503 type Ready struct { log *zap.Logger @@ -34,59 +33,139 @@ func (rd *Ready) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if r == nil || r.URL == nil || r.URL.Query() == nil { - http.Error( - w, - "No plugins provided in query. Query should be in form of: ready?plugin=plugin1&plugin=plugin2", - http.StatusBadRequest, - ) - return - } + // report will be used either for all plugin or for the Plugins in the query + report := make([]*Report, 0, 2) pl := r.URL.Query()[pluginsQuery] - + // if no Plugins provided, check them all if len(pl) == 0 { - http.Error( - w, - "No plugins provided in query. Query should be in form of: ready?plugin=plugin1&plugin=plugin2", - http.StatusBadRequest, - ) + for k, pl := range rd.statusRegistry { + if pl == nil { + report = append(report, &Report{ + PluginName: k, + ErrorMessage: "plugin is nil or not initialized", + StatusCode: http.StatusNotFound, + }) + + rd.log.Info("plugin is nil or not initialized", zap.String("plugin", k)) + continue + } + + st, err := pl.Ready() + if err != nil { + report = append(report, &Report{ + PluginName: k, + ErrorMessage: err.Error(), + StatusCode: http.StatusInternalServerError, + }) + + continue + } + + if st == nil { + report = append(report, &Report{ + PluginName: k, + ErrorMessage: "plugin is not available", + StatusCode: rd.unavailableStatusCode, + }) + + continue + } + + switch { + case st.Code >= 500: + w.WriteHeader(rd.unavailableStatusCode) + + report = append(report, &Report{ + PluginName: k, + ErrorMessage: "internal server error, see logs", + StatusCode: rd.unavailableStatusCode, + }) + case st.Code >= 100 && st.Code <= 400: + report = append(report, &Report{ + PluginName: k, + StatusCode: st.Code, + }) + default: + report = append(report, &Report{ + PluginName: k, + ErrorMessage: "unexpected status code", + StatusCode: st.Code, + }) + } + } + + data, err := json.Marshal(report) + if err != nil { + // TODO do we need to write this error to the ResponseWriter? + rd.log.Error("failed to marshal response", zap.Error(err)) + return + } + + // write the response + _, _ = w.Write(data) + return } - // iterate over all provided plugins + // iterate over all provided Plugins for i := 0; i < len(pl); i++ { - switch { - // check workers for the plugin - case rd.statusRegistry[pl[i]] != nil: - st, errS := rd.statusRegistry[pl[i]].Ready() - if errS != nil { - http.Error(w, errS.Error(), rd.unavailableStatusCode) - return + if svc, ok := rd.statusRegistry[pl[i]]; ok { + if svc == nil { + continue } - if st == nil { - // nil can be only if the service is unavailable + st, err := rd.statusRegistry[pl[i]].Ready() + if err != nil { w.WriteHeader(rd.unavailableStatusCode) - _, _ = w.Write([]byte(fmt.Sprintf(template, html.EscapeString(pl[i]), rd.unavailableStatusCode))) - return + report = append(report, &Report{ + PluginName: pl[i], + ErrorMessage: err.Error(), + StatusCode: http.StatusInternalServerError, + }) + continue } - if st.Code >= 500 { - // if there are 500 or 503 status codes return immediately - w.WriteHeader(rd.unavailableStatusCode) - _, _ = w.Write([]byte(fmt.Sprintf(template, html.EscapeString(pl[i]), rd.unavailableStatusCode))) - return - } else if st.Code >= 100 && st.Code <= 400 { - _, _ = w.Write([]byte(fmt.Sprintf(template, html.EscapeString(pl[i]), st.Code))) + if st == nil { + report = append(report, &Report{ + PluginName: pl[i], + ErrorMessage: "plugin is not available", + StatusCode: rd.unavailableStatusCode, + }) continue } - _, _ = w.Write([]byte(fmt.Sprintf("plugin: %s not found", html.EscapeString(pl[i])))) - // check job drivers statuses - // map is plugin -> states - default: - _, _ = w.Write([]byte(fmt.Sprintf("plugin: %s not found", html.EscapeString(pl[i])))) + switch { + case st.Code >= 500: + // on >=500, write header, because it'll be written on Write (200) + w.WriteHeader(rd.unavailableStatusCode) + report = append(report, &Report{ + PluginName: pl[i], + ErrorMessage: "internal server error, see logs", + StatusCode: rd.unavailableStatusCode, + }) + case st.Code >= 100 && st.Code <= 400: + report = append(report, &Report{ + PluginName: pl[i], + StatusCode: st.Code, + }) + default: + report = append(report, &Report{ + PluginName: pl[i], + ErrorMessage: "unexpected status code", + StatusCode: st.Code, + }) + } + } else { + rd.log.Info("plugin does not support readiness checks", zap.String("plugin", pl[i])) } } + + data, err := json.Marshal(report) + if err != nil { + rd.log.Error("failed to marshal response", zap.Error(err)) + } + + // write the response + _, _ = w.Write(data) } diff --git a/tests/configs/.rr-jobs-status.yaml b/tests/configs/.rr-jobs-status.yaml index 6346da0..86bd41e 100644 --- a/tests/configs/.rr-jobs-status.yaml +++ b/tests/configs/.rr-jobs-status.yaml @@ -11,7 +11,7 @@ status: address: "127.0.0.1:35544" jobs: - num_pollers: 1 + num_pollers: 2 pipeline_size: 100000 pool: num_workers: 1 diff --git a/tests/configs/.rr-status-503.yaml b/tests/configs/.rr-status-503.yaml index 39d0217..9c33847 100644 --- a/tests/configs/.rr-status-503.yaml +++ b/tests/configs/.rr-status-503.yaml @@ -11,6 +11,14 @@ server: status: address: "127.0.0.1:34711" +jobs: + num_pollers: 1 + pipeline_size: 100000 + pool: + num_workers: 1 + allocate_timeout: 60s + destroy_timeout: 60s + logs: mode: development level: debug @@ -21,4 +29,4 @@ http: num_workers: 1 endure: - grace_period: 10s \ No newline at end of file + grace_period: 10s diff --git a/tests/go.sum b/tests/go.sum index 482b0dc..be17e1b 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -102,8 +102,6 @@ github.com/roadrunner-server/goridge/v3 v3.8.3 h1:XmjrOFnI6ZbQTPaP39DEk8KwLUNTgj github.com/roadrunner-server/goridge/v3 v3.8.3/go.mod h1:4TZU8zgkKIZCsH51qwGMpvyXCT59u/8z6q8sCe4ZGAQ= github.com/roadrunner-server/http/v5 v5.0.3 h1:RCzYWQx7f0+WCoq9Gj357YAxTyMQBZ5fWp8/Z2OtcOE= github.com/roadrunner-server/http/v5 v5.0.3/go.mod h1:n/5uvzU+0Gxkk7LwwxcSD0yLEU6tfQ9xT4wYC+Faz80= -github.com/roadrunner-server/jobs/v5 v5.0.4 h1:bRJpbVfTlkJraFattpmAu3Xg1hHwkcrDpgdCt91jAZc= -github.com/roadrunner-server/jobs/v5 v5.0.4/go.mod h1:me+Aaf45VQFg0GhuUQjZYUIY9MqhGB2MLy2A87WQ1lo= github.com/roadrunner-server/logger/v5 v5.0.3 h1:uJF9/3KEYAvf+AAavbTMfW1kpOdnTybOlTLmOL7vgwo= github.com/roadrunner-server/logger/v5 v5.0.3/go.mod h1:w079LabG2e17ba9bjdRGVziS7/kww0/5tjvplfMIUs0= github.com/roadrunner-server/memory/v5 v5.0.3 h1:WYyCfyQfY4jEkVNSZHFjKMzH+vlkfNRgVRwpAEkruWE= diff --git a/tests/plugin_test.go b/tests/plugin_test.go index ababe33..3f795b1 100644 --- a/tests/plugin_test.go +++ b/tests/plugin_test.go @@ -2,6 +2,7 @@ package status import ( "context" + "encoding/json" "io" "log/slog" "net" @@ -29,9 +30,6 @@ import ( "github.com/stretchr/testify/require" ) -const resp = `plugin: http, status: 200 -plugin: rpc not found` - func TestStatusHttp(t *testing.T) { cont := endure.New(slog.LevelDebug, endure.GracefulShutdownTimeout(time.Minute)) @@ -95,6 +93,7 @@ func TestStatusHttp(t *testing.T) { time.Sleep(time.Second) t.Run("CheckerGetStatus", checkHTTPStatus) + t.Run("CheckerGetStatusAll", checkHTTPStatusAll) stopCh <- struct{}{} wg.Wait() @@ -241,6 +240,7 @@ func TestReadyHttp(t *testing.T) { time.Sleep(time.Second) t.Run("CheckerGetReadiness", checkHTTPReadiness) + t.Run("CheckerGetReadinessAll", checkHTTPReadinessAll) stopCh <- struct{}{} wg.Wait() @@ -345,9 +345,7 @@ func TestJobsStatus(t *testing.T) { assert.NoError(t, err) err = cont.Init() - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) ch, err := cont.Serve() assert.NoError(t, err) @@ -386,7 +384,7 @@ func TestJobsStatus(t *testing.T) { time.Sleep(time.Second) - req, err := http.NewRequest("GET", "http://127.0.0.1:35544/jobs", nil) + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "http://127.0.0.1:35544/jobs", nil) assert.NoError(t, err) r, err := http.DefaultClient.Do(req) @@ -397,8 +395,27 @@ func TestJobsStatus(t *testing.T) { assert.NoError(t, err) assert.Equal(t, http.StatusOK, r.StatusCode) - assert.Contains(t, string(b), "test-1") - assert.Contains(t, string(b), "test-2") + jr := make([]*status.JobsReport, 0, 2) + + err = json.Unmarshal(b, &jr) + require.NoError(t, err) + + require.Len(t, jr, 2) + require.Equal(t, jr[0].Priority, uint64(13)) + require.Equal(t, jr[0].Ready, true) + require.Equal(t, jr[0].Active, int64(0)) + require.Equal(t, jr[0].Delayed, int64(0)) + require.Equal(t, jr[0].Reserved, int64(0)) + require.Equal(t, jr[0].Driver, "memory") + require.Equal(t, jr[0].ErrorMessage, "") + + require.Equal(t, jr[1].Priority, uint64(13)) + require.Equal(t, jr[1].Ready, true) + require.Equal(t, jr[1].Active, int64(0)) + require.Equal(t, jr[1].Delayed, int64(0)) + require.Equal(t, jr[1].Reserved, int64(0)) + require.Equal(t, jr[1].Driver, "memory") + require.Equal(t, jr[1].ErrorMessage, "") err = r.Body.Close() assert.NoError(t, err) @@ -427,13 +444,12 @@ func TestJobsReadiness(t *testing.T) { &server.Plugin{}, sp, &jobs.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) err = cont.Init() - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) ch, err := cont.Serve() assert.NoError(t, err) @@ -553,8 +569,12 @@ func TestShutdown503(t *testing.T) { req, err2 := http.NewRequestWithContext(context.Background(), http.MethodGet, "http://127.0.0.1:11934", nil) assert.NoError(t, err2) - _, _ = httpClient.Do(req) + rsp, _ := httpClient.Do(req) + if rsp != nil { + _ = rsp.Body.Close() + } }() + time.Sleep(time.Second) stopCh <- struct{}{} @@ -583,6 +603,10 @@ func TestShutdown503(t *testing.T) { wg.Wait() t.Cleanup(func() { + if rsp != nil { + _ = rsp.Body.Close() + } + sp.StopHTTPServer() }) } @@ -595,8 +619,17 @@ func checkJobsReadiness(t *testing.T) { assert.NoError(t, err) b, err := io.ReadAll(r.Body) assert.NoError(t, err) + + rep := make([]*status.Report, 0, 2) + err = json.Unmarshal(b, &rep) + require.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) - assert.Equal(t, "plugin: jobs, status: 200\n", string(b)) + + assert.Len(t, rep, 1) + assert.Equal(t, "jobs", rep[0].PluginName) + assert.Equal(t, "", rep[0].ErrorMessage) + assert.Equal(t, 200, rep[0].StatusCode) err = r.Body.Close() assert.NoError(t, err) @@ -611,7 +644,38 @@ func checkHTTPStatus(t *testing.T) { b, err := io.ReadAll(r.Body) assert.NoError(t, err) assert.Equal(t, 200, r.StatusCode) - assert.Equal(t, resp, string(b)) + + rep := make([]*status.Report, 0, 2) + err = json.Unmarshal(b, &rep) + require.NoError(t, err) + + assert.Len(t, rep, 1) + assert.Equal(t, "http", rep[0].PluginName) + assert.Equal(t, "", rep[0].ErrorMessage) + assert.Equal(t, 200, rep[0].StatusCode) + + err = r.Body.Close() + assert.NoError(t, err) +} + +func checkHTTPStatusAll(t *testing.T) { + req, err := http.NewRequest("GET", "http://127.0.0.1:34333/health", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + b, err := io.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + rep := make([]*status.Report, 0, 2) + err = json.Unmarshal(b, &rep) + require.NoError(t, err) + + assert.Len(t, rep, 1) + assert.Equal(t, "http", rep[0].PluginName) + assert.Equal(t, "", rep[0].ErrorMessage) + assert.Equal(t, 200, rep[0].StatusCode) err = r.Body.Close() assert.NoError(t, err) @@ -622,7 +686,7 @@ func doHTTPReq(t *testing.T) { client := &http.Client{ Timeout: time.Second * 10, } - req, err := http.NewRequest("GET", "http://127.0.0.1:11933", nil) + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "http://127.0.0.1:11933", nil) assert.NoError(t, err) _, err = client.Do(req) //nolint:bodyclose @@ -637,9 +701,33 @@ func checkHTTPReadiness2(t *testing.T) { r, err := http.DefaultClient.Do(req) assert.NoError(t, err) b, err := io.ReadAll(r.Body) - assert.NoError(t, err) + require.NoError(t, err) + + res := make([]*status.Report, 0, 2) + err = json.Unmarshal(b, &res) + require.NoError(t, err) + assert.Len(t, res, 1) + assert.Equal(t, "http", res[0].PluginName) + assert.Equal(t, "internal server error, see logs", res[0].ErrorMessage) + assert.Equal(t, 503, res[0].StatusCode) assert.Equal(t, 503, r.StatusCode) - assert.Equal(t, "plugin: http, status: 503\n", string(b)) + + err = r.Body.Close() + assert.NoError(t, err) +} + +func checkHTTPReadinessAll(t *testing.T) { + req, err := http.NewRequest("GET", "http://127.0.0.1:34333/ready", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + b, err := io.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + res := make([]*status.Report, 0, 2) + err = json.Unmarshal(b, &res) + assert.NoError(t, err) err = r.Body.Close() assert.NoError(t, err) @@ -654,7 +742,9 @@ func checkHTTPReadiness(t *testing.T) { b, err := io.ReadAll(r.Body) assert.NoError(t, err) assert.Equal(t, 200, r.StatusCode) - assert.Equal(t, resp, string(b)) + res := make([]*status.Report, 0, 2) + err = json.Unmarshal(b, &res) + assert.NoError(t, err) err = r.Body.Close() assert.NoError(t, err)