Skip to content

Commit

Permalink
Merge pull request #15 from BlockMatrixNetwork/master
Browse files Browse the repository at this point in the history
Added support for single flight checks
  • Loading branch information
autero1 authored Apr 10, 2019
2 parents 7724958 + b17056f commit 327ef58
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 37 deletions.
9 changes: 9 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@
[[constraint]]
name = "github.com/urfave/cli"
version = "1.20.0"

[[constraint]]
branch = "master"
name = "golang.org/x/sync"
34 changes: 17 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
# health-checker

A simple HTTP server that will return `200 OK` if the configured checks are all successful. If any of the checks fail,
it will return `HTTP 504 Gateway Not Found`.
it will return `HTTP 504 Gateway Not Found`.

## Motivation

We were setting up an AWS [Auto Scaling Group](http://docs.aws.amazon.com/autoscaling/latest/userguide/AutoScalingGroup.html)
(ASG) fronted by a [Load Balancer](https://aws.amazon.com/documentation/elastic-load-balancing/) that used a
(ASG) fronted by a [Load Balancer](https://aws.amazon.com/documentation/elastic-load-balancing/) that used a
[Health Check](http://docs.aws.amazon.com/elasticloadbalancing/latest/network/target-group-health-checks.html#) to
determine if the server is healthy. Each server in the ASG runs two services, which means that a server is "healthy" if
the TCP Listeners of both services are successfully accepting connections. But the Load Balancer Health Check is limited to
the TCP Listeners of both services are successfully accepting connections. But the Load Balancer Health Check is limited to
a single TCP port, or an HTTP(S) endpoint. As a result, our use case just isn't supported natively by AWS.

We wrote health-checker so that we could run a daemon on the server that reports the true health of the server by
attempting to open a TCP connection to more than one port when it receives an inbound HTTP request on the given listener.

Using the `--script` -option, the `health-checker` can be extended to check many other targets. One concrete example is monitoring
`ZooKeeper` node status during rolling deployment. Just polling the `ZooKeeper`'s TCP client port doesn't necessarily guarantee
that the node has (re-)joined the cluster. Using the `health-check` with a custom script target, we can
[monitor ZooKeeper](https://zookeeper.apache.org/doc/r3.4.8/zookeeperAdmin.html#sc_monitoring) using the
[4 letter words](https://zookeeper.apache.org/doc/r3.4.8/zookeeperAdmin.html#sc_zkCommands), ensuring we report health back to the
that the node has (re-)joined the cluster. Using the `health-check` with a custom script target, we can
[monitor ZooKeeper](https://zookeeper.apache.org/doc/r3.4.8/zookeeperAdmin.html#sc_monitoring) using the
[4 letter words](https://zookeeper.apache.org/doc/r3.4.8/zookeeperAdmin.html#sc_zkCommands), ensuring we report health back to the
[Load Balancer](https://aws.amazon.com/documentation/elastic-load-balancing/) correctly.

## How It Works

When health-checker is started, it will listen for inbound HTTP requests for any URL on the IP address and port specified
by `--listener`. When it receives a request, it will attempt to open TCP connections to each of the ports specified by
an instance of `--port` and/or execute the script target specified by `--script`. If all configured checks - all TCP
connections and zero exit status for the script - succeed, it will return `HTTP 200 OK`. If any of the checks fail,
it will return `HTTP 504 Gateway Not Found`.
an instance of `--port` and/or execute the script target specified by `--script`. If all configured checks - all TCP
connections and zero exit status for the script - succeed, it will return `HTTP 200 OK`. If any of the checks fail,
it will return `HTTP 504 Gateway Not Found`.

Configure your AWS Health Check to only pass the Health Check on `HTTP 200 OK`. Now when an HTTP Health Check request
comes in, all desired TCP ports will be checked and the script target executed.
Expand All @@ -49,15 +49,16 @@ health-checker [options]

#### Options

| Option | Description | Default
| Option | Description | Default
| ------ | ----------- | -------
| `--port` | The port number on which a TCP connection will be attempted. Specify one or more times. | |
| `--port` | The port number on which a TCP connection will be attempted. Specify one or more times. | |
| `--listener` | The IP address and port on which inbound HTTP connections will be accepted. | `0.0.0.0:5000`
| `--log-level` | Set the log level to LEVEL. Must be one of: `panic`, `fatal`, `error,` `warning`, `info`, or `debug` | `info`
| `--help` | Show the help screen | |
| `--script` | Path to script to run - will pass if it completes within configured timeout with a zero exit status. Specify one or more times. | |
| `--script-timeout` | Timeout, in seconds, to wait for the scripts to exit. Applies to all configured script targets. | `5` |
| `--version` | Show the program's version | |
| `--log-level` | Set the log level to LEVEL. Must be one of: `panic`, `fatal`, `error,` `warning`, `info`, or `debug` | `info`
| `--help` | Show the help screen | |
| `--script` | Path to script to run - will pass if it completes within configured timeout with a zero exit status. Specify one or more times. | |
| `--script-timeout` | Timeout, in seconds, to wait for the scripts to exit. Applies to all configured script targets. | `5` |
| `--singleflight` | Enables single flight mode, which allows concurrent health check requests to share the results of a single check. | |
| `--version` | Show the program's version | |

If you execute a shell script, ensure you have a `shebang` line in your script, otherwise the script will fail with an `exec format error`.

Expand Down Expand Up @@ -90,4 +91,3 @@ attempt to run the configured scripts. If both return exit code zero, return `HT
```
health-checker --listener "0.0.0.0:6000" --script "/usr/local/bin/exhibitor-health-check.sh --exhibitor-port 8080" --script "/usr/local/bin/zk-health-check.sh --zk-port 2191"
```

14 changes: 12 additions & 2 deletions commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package commands

import (
"fmt"
"os"
"strings"

"github.com/gruntwork-io/gruntwork-cli/logging"
"github.com/gruntwork-io/health-checker/options"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
"os"
"strings"
)

const DEFAULT_LISTENER_IP_ADDRESS = "0.0.0.0"
Expand All @@ -31,6 +32,11 @@ var scriptTimeoutFlag = cli.IntFlag{
Value: DEFAULT_SCRIPT_TIMEOUT_SEC,
}

var singleflightFlag = cli.BoolFlag{
Name: "singleflight",
Usage: fmt.Sprintf("[Optional] Enable singleflight mode, which makes concurrent requests share the same check."),
}

var listenerFlag = cli.StringFlag{
Name: "listener",
Usage: fmt.Sprintf("[Optional] The IP address and port on which inbound HTTP connections will be accepted."),
Expand All @@ -47,6 +53,7 @@ var defaultFlags = []cli.Flag{
portFlag,
scriptFlag,
scriptTimeoutFlag,
singleflightFlag,
listenerFlag,
logLevelFlag,
}
Expand Down Expand Up @@ -80,6 +87,8 @@ func parseOptions(cliContext *cli.Context) (*options.Options, error) {
return nil, OneOfParamsRequired{portFlag.Name, scriptFlag.Name}
}

singleflight := cliContext.Bool("singleflight")

scriptTimeout := cliContext.Int("script-timeout")

listener := cliContext.String("listener")
Expand All @@ -91,6 +100,7 @@ func parseOptions(cliContext *cli.Context) (*options.Options, error) {
Ports: ports,
Scripts: scripts,
ScriptTimeout: scriptTimeout,
Singleflight: singleflight,
Listener: listener,
Logger: logger,
}, nil
Expand Down
4 changes: 3 additions & 1 deletion options/options.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package options

import (
"github.com/sirupsen/logrus"
"strings"

"github.com/sirupsen/logrus"
)

// The options accepted by this CLI tool
type Options struct {
Ports []int
Scripts []Script
ScriptTimeout int
Singleflight bool
Listener string
Logger *logrus.Logger
}
Expand Down
54 changes: 42 additions & 12 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package server
import (
"context"
"fmt"
"github.com/gruntwork-io/gruntwork-cli/errors"
"github.com/gruntwork-io/health-checker/options"
"net"
"net/http"
"os/exec"
"sync"
"time"

"github.com/gruntwork-io/gruntwork-cli/errors"
"github.com/gruntwork-io/health-checker/options"
"golang.org/x/sync/singleflight"
)

type httpResponse struct {
Expand All @@ -18,14 +20,8 @@ type httpResponse struct {
}

func StartHttpServer(opts *options.Options) error {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
resp := runChecks(opts)
err := writeHttpResponse(w, resp)
if err != nil {
opts.Logger.Error("Failed to send HTTP response. Exiting.")
panic(err)
}
})
http.HandleFunc("/", httpHandler(opts))

err := http.ListenAndServe(opts.Listener, nil)
if err != nil {
return err
Expand All @@ -34,11 +30,45 @@ func StartHttpServer(opts *options.Options) error {
return nil
}

func httpHandler(opts *options.Options) http.HandlerFunc {
var group singleflight.Group

return func(w http.ResponseWriter, r *http.Request) {
var resp *httpResponse
logger := opts.Logger

// In Singleflight mode only one runChecks pass will be performed
// at any given time, with the result being shared across concurrent
// inbound requests
if opts.Singleflight {
logger.Infof("Received inbound request. Performing singleflight health checks...")

result, _, shared := group.Do("check", func() (interface{}, error) {
logger.Infof("Beginning health checks...")
return runChecks(opts), nil
})

if shared {
logger.Infof("Singleflight health check response was shared between multiple requests.")
}

resp = result.(*httpResponse)
} else {
logger.Infof("Received inbound request. Beginning health checks...")
resp = runChecks(opts)
}

err := writeHttpResponse(w, resp)
if err != nil {
opts.Logger.Error("Failed to send HTTP response. Exiting.")
panic(err)
}
}
}

// Check that we can open a TPC connection to all the ports in opts.Ports
func runChecks(opts *options.Options) *httpResponse {
logger := opts.Logger
logger.Infof("Received inbound request. Beginning health checks...")

allChecksOk := true

var waitGroup = sync.WaitGroup{}
Expand Down
92 changes: 87 additions & 5 deletions server/server_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package server

import (
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"os"
"sync"
"sync/atomic"
"testing"

"github.com/gruntwork-io/gruntwork-cli/logging"
"github.com/gruntwork-io/health-checker/options"
"github.com/gruntwork-io/health-checker/test"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"net"
"os"
"testing"
)

func TestParseChecksFromConfig(t *testing.T) {
Expand Down Expand Up @@ -127,7 +133,7 @@ func TestParseChecksFromConfig(t *testing.T) {
listeners = append(listeners, l)

// Separate goroutine for the tcp listeners
go handleRequests(t, l)
go handleRequests(t, l, nil)
}

defer closeListeners(t, listeners)
Expand All @@ -139,6 +145,78 @@ func TestParseChecksFromConfig(t *testing.T) {
assert.True(t, testCase.expectedStatus == response.StatusCode, "Got expected status code")
})
}
}

func TestSingleflight(t *testing.T) {

testCases := []struct {
name string
singleflight bool
expectedRequestCount int32
}{
{
"singleflight disabled",
false,
10,
},
{
"singleflight enabled",
true,
1,
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
requestCount := int32(0)

ports, err := test.GetFreePorts(1)
if err != nil {
assert.FailNow(t, "Failed to get free ports: %v", err.Error())
}

port := ports[0]
t.Logf("Creating listener for port %d", port)
l, err := net.Listen("tcp", test.ListenerString(test.DEFAULT_LISTENER_ADDRESS, port))
if err != nil {
t.Logf("Error creating listener for port %d: %s", port, err.Error())
assert.FailNow(t, "Failed to start listening: %s", err.Error())
}

// Accept incoming connections, and count how many we receive
go handleRequests(t, l, &requestCount)
defer l.Close()

// Fire the request off to /bin/sleep to ensure it takes a while
opts := createOptionsForTest(t, 10, []string{"/bin/sleep 1"}, test.DEFAULT_LISTENER_ADDRESS, []int{port})
opts.Singleflight = testCase.singleflight

handler := httpHandler(opts)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler.ServeHTTP(w, r)
}))
defer ts.Close()

// Fire off 10 concurrent requests. In Singleflight mode only one
// underyling check should be performed.
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
resp, err := http.Get(ts.URL)
if err != nil {
assert.FailNow(t, "failed to perform HTTP request: %v", err)
}

ioutil.ReadAll(resp.Body)
wg.Done()
}()
}
wg.Wait()

assert.Equal(t, testCase.expectedRequestCount, requestCount)
})
}

}

Expand All @@ -151,7 +229,7 @@ func closeListeners(t *testing.T, listeners []net.Listener) {
}
}

func handleRequests(t *testing.T, l net.Listener) {
func handleRequests(t *testing.T, l net.Listener, counter *int32) {
for {
// Listen for an incoming connection.
l.Accept()
Expand All @@ -162,6 +240,10 @@ func handleRequests(t *testing.T, l net.Listener) {
//if err != nil {
// t.Logf("Error accepting: %s", err.Error())
//}

if counter != nil {
atomic.AddInt32(counter, 1)
}
}
}

Expand Down

0 comments on commit 327ef58

Please sign in to comment.