Skip to content

Commit

Permalink
fix: properly return 503 status when shutdown was initiated
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Jun 17, 2024
1 parent 593f2a5 commit e0ea40c
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: weekly
interval: daily
reviewers:
- "rustatian"
assignees:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ jobs:
go-version: stable

- name: Run linter
uses: golangci/golangci-lint-action@v3.7.0 # Action page: <https://github.com/golangci/golangci-lint-action>
uses: golangci/golangci-lint-action@v6.0.1 # Action page: <https://github.com/golangci/golangci-lint-action>
with:
version: v1.54 # without patch version
version: v1.59 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m --build-tags=race ./...
13 changes: 3 additions & 10 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,11 @@

run:
timeout: 1m
skip-dirs:
- .github
- .git
allow-parallel-runners: true

output:
format: colored-line-number # colored-line-number|line-number|json|tab|checkstyle|code-climate

linters-settings:
wsl:
allow-assign-and-anything: true
govet:
check-shadowing: true
golint:
min-confidence: 0.1
gocyclo:
min-complexity: 15
godot:
Expand Down Expand Up @@ -78,6 +68,9 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- whitespace # Tool for detection of leading and trailing whitespace

issues:
exclude-dirs:
- .github
- .git
exclude-rules:
- path: _test\.go
linters:
Expand Down
52 changes: 52 additions & 0 deletions go.work.sum

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion health.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"html"
"net/http"
"sync/atomic"

"go.uber.org/zap"
)
Expand All @@ -12,17 +13,24 @@ type Health struct {
log *zap.Logger
unavailableStatusCode int
statusRegistry map[string]Checker
shutdownInitiated *atomic.Pointer[bool]
}

func NewHealthHandler(sr map[string]Checker, log *zap.Logger, usc int) *Health {
func NewHealthHandler(sr map[string]Checker, shutdownInitiated *atomic.Pointer[bool], log *zap.Logger, usc int) *Health {
return &Health{
statusRegistry: sr,
unavailableStatusCode: usc,
log: log,
shutdownInitiated: shutdownInitiated,
}
}

func (rd *Health) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if rd.shutdownInitiated != nil && *rd.shutdownInitiated.Load() {
http.Error(w, "service is shutting down", http.StatusServiceUnavailable)
return
}

if r == nil || r.URL == nil || r.URL.Query() == nil {
http.Error(
w,
Expand Down
10 changes: 9 additions & 1 deletion jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"html"
"net/http"
"sync/atomic"

"go.uber.org/zap"
)
Expand All @@ -17,17 +18,24 @@ type Jobs struct {
statusJobsRegistry JobsChecker
unavailableStatusCode int
log *zap.Logger
shutdownInitiated *atomic.Pointer[bool]
}

func NewJobsHandler(jc JobsChecker, log *zap.Logger, usc int) *Jobs {
func NewJobsHandler(jc JobsChecker, shutdownInitiated *atomic.Pointer[bool], log *zap.Logger, usc int) *Jobs {
return &Jobs{
statusJobsRegistry: jc,
unavailableStatusCode: usc,
log: log,
shutdownInitiated: shutdownInitiated,
}
}

func (jb *Jobs) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
if jb.shutdownInitiated != nil && *jb.shutdownInitiated.Load() {
http.Error(w, "service is shutting down", http.StatusServiceUnavailable)
return
}

if jb.statusJobsRegistry == nil {
http.Error(w, "jobs plugin not found", jb.unavailableStatusCode)
}
Expand Down
49 changes: 28 additions & 21 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
stderr "errors"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/roadrunner-server/api/v4/plugins/v1/status"
Expand All @@ -24,15 +25,15 @@ const (
type Configurer interface {
// UnmarshalKey takes a single key and unmarshal it into a Struct.
UnmarshalKey(name string, out any) error
// Has checks if config section exists.
// Has checks if a config section exists.
Has(name string) bool
}

type Logger interface {
NamedLogger(name string) *zap.Logger
}

// Checker interface used to get latest status from plugin
// Checker interface used to get the latest status from plugin
type Checker interface {
Status() (*status.Status, error)
Name() string
Expand All @@ -44,7 +45,7 @@ type JobsChecker interface {
}

// Readiness interface used to get readiness status from the plugin
// that means, that worker poll inside the plugin has 1+ plugins which are ready to work
// that means that a worker pool inside the plugin has 1+ plugins which are ready to work
// at the particular moment
type Readiness interface {
Ready() (*status.Status, error)
Expand All @@ -53,15 +54,17 @@ type Readiness interface {

type Plugin struct {
mu sync.Mutex
// plugins which needs to be checked just as Status
// plugins that need to be checked just as Status
statusRegistry map[string]Checker
// plugins which needs to send Readiness status
// plugins that need to send Readiness status
readyRegistry map[string]Readiness
// jobs plugin checker
statusJobsRegistry JobsChecker
server *http.Server
log *zap.Logger
cfg *Config
// shared pointer
shutdownInitiated atomic.Pointer[bool]
server *http.Server
log *zap.Logger
cfg *Config
}

func (c *Plugin) Init(cfg Configurer, log Logger) error {
Expand All @@ -79,6 +82,7 @@ func (c *Plugin) Init(cfg Configurer, log Logger) error {

c.readyRegistry = make(map[string]Readiness)
c.statusRegistry = make(map[string]Checker)
c.shutdownInitiated.Store(toPtr(false))

c.log = log.NamedLogger(PluginName)

Expand All @@ -89,9 +93,9 @@ func (c *Plugin) Serve() chan error {
errCh := make(chan error, 1)

mux := http.NewServeMux()
mux.Handle("/health", NewHealthHandler(c.statusRegistry, c.log, c.cfg.UnavailableStatusCode))
mux.Handle("/ready", NewReadyHandler(c.readyRegistry, c.log, c.cfg.UnavailableStatusCode))
mux.Handle("/jobs", NewJobsHandler(c.statusJobsRegistry, c.log, c.cfg.UnavailableStatusCode))
mux.Handle("/health", NewHealthHandler(c.statusRegistry, &c.shutdownInitiated, c.log, c.cfg.UnavailableStatusCode))
mux.Handle("/ready", NewReadyHandler(c.readyRegistry, &c.shutdownInitiated, c.log, c.cfg.UnavailableStatusCode))
mux.Handle("/jobs", NewJobsHandler(c.statusJobsRegistry, &c.shutdownInitiated, c.log, c.cfg.UnavailableStatusCode))

c.mu.Lock()
c.server = &http.Server{
Expand Down Expand Up @@ -119,20 +123,19 @@ func (c *Plugin) Serve() chan error {
return errCh
}

func (c *Plugin) Stop(ctx context.Context) error {
const op = errors.Op("checker_plugin_stop")
func (c *Plugin) Stop(_ context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()

err := c.server.Shutdown(ctx)
if err != nil {
return errors.E(op, err)
}
// set shutdown to true, thus all endpoints will return 503
c.shutdownInitiated.Store(toPtr(true))

return nil
}

// status returns a Checker interface implementation
// Reset named service. This is not an Status interface implementation
// Status returns a Checker interface implementation
// Reset named service.
// This is not a Status interface implementation
func (c *Plugin) status(name string) (*status.Status, error) {
const op = errors.Op("checker_plugin_status")
svc, ok := c.statusRegistry[name]
Expand All @@ -143,7 +146,7 @@ func (c *Plugin) status(name string) (*status.Status, error) {
return svc.Status()
}

// ready used to provide a readiness check for the plugin
// ready is used to provide a readiness check for the plugin
func (c *Plugin) ready(name string) (*status.Status, error) {
const op = errors.Op("checker_plugin_ready")
svc, ok := c.readyRegistry[name]
Expand All @@ -154,7 +157,7 @@ func (c *Plugin) ready(name string) (*status.Status, error) {
return svc.Ready()
}

// Collects declares services to be collected.
// Collects declare services to be collected.
func (c *Plugin) Collects() []*dep.In {
return []*dep.In{
dep.Fits(func(p any) {
Expand All @@ -180,3 +183,7 @@ func (c *Plugin) Name() string {
func (c *Plugin) RPC() any {
return &rpc{srv: c, log: c.log}
}

func toPtr[T any](v T) *T {
return &v
}
16 changes: 12 additions & 4 deletions ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,36 @@ import (
"fmt"
"html"
"net/http"
"sync/atomic"

"go.uber.org/zap"
)

// readinessHandler return 200OK if all plugins are ready to serve
// if one of the plugins return status from the 5xx range, the status for all query will be 503
// if one of the plugins returns status from the 5xx range, the status for all queries will be 503

type Ready struct {
log *zap.Logger
unavailableStatusCode int
statusRegistry map[string]Readiness
shutdownInitiated *atomic.Pointer[bool]
}

func NewReadyHandler(sr map[string]Readiness, log *zap.Logger, usc int) *Ready {
func NewReadyHandler(sr map[string]Readiness, shutdownInitiated *atomic.Pointer[bool], log *zap.Logger, usc int) *Ready {
return &Ready{
log: log,
statusRegistry: sr,
unavailableStatusCode: usc,
shutdownInitiated: shutdownInitiated,
}
}

func (rd *Ready) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if rd.shutdownInitiated != nil && *rd.shutdownInitiated.Load() {
http.Error(w, "service is shutting down", http.StatusServiceUnavailable)
return
}

if r == nil || r.URL == nil || r.URL.Query() == nil {
http.Error(
w,
Expand Down Expand Up @@ -58,14 +66,14 @@ func (rd *Ready) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

if st == nil {
// nil can be only if the service unavailable
// nil can be only if the service is unavailable
w.WriteHeader(rd.unavailableStatusCode)
_, _ = w.Write([]byte(fmt.Sprintf(template, html.EscapeString(pl[i]), rd.unavailableStatusCode)))
return
}

if st.Code >= 500 {
// if there is 500 or 503 status code return immediately
// if there are 500 or 503 status codes return immediately
w.WriteHeader(rd.unavailableStatusCode)
_, _ = w.Write([]byte(fmt.Sprintf(template, html.EscapeString(pl[i]), rd.unavailableStatusCode)))
return
Expand Down
21 changes: 21 additions & 0 deletions tests/configs/.rr-status-503.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
version: '3'

rpc:
listen: tcp://127.0.0.1:6005

server:
command: "php php_test_files/sleep.php"
relay: "pipes"
relay_timeout: "20s"

status:
address: "127.0.0.1:34711"

logs:
mode: development
level: debug

http:
address: 127.0.0.1:11934
pool:
num_workers: 1
Loading

0 comments on commit e0ea40c

Please sign in to comment.