Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consul check ttl #143

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 54 additions & 39 deletions v3/registry/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (c *consulRegistry) Deregister(s *registry.Service, opts ...registry.Deregi
c.Unlock()

node := s.Nodes[0]

return c.Client().Agent().ServiceDeregister(node.Id)
}

Expand All @@ -176,12 +177,13 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
return errors.New("Require at least one node")
}

var regTCPCheck bool
var regInterval time.Duration
var regHTTPCheck bool
var httpCheckConfig consul.AgentServiceCheck

var options registry.RegisterOptions
var (
regTCPCheck bool
regInterval time.Duration
regHTTPCheck bool
httpCheckConfig consul.AgentServiceCheck
options registry.RegisterOptions
)
for _, o := range opts {
o(&options)
}
Expand All @@ -191,9 +193,9 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
regTCPCheck = true
regInterval = tcpCheckInterval
}
var ok bool
if httpCheckConfig, ok = c.opts.Context.Value("consul_http_check_config").(consul.AgentServiceCheck); ok {
if conf, ok := c.opts.Context.Value("consul_http_check_config").(consul.AgentServiceCheck); ok {
regHTTPCheck = true
httpCheckConfig = conf
}
}

Expand All @@ -212,37 +214,8 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
lastChecked := c.lastChecked[s.Name]
c.Unlock()

// if it's already registered and matches then just pass the check
if ok && v == h {
if options.TTL == time.Duration(0) {
// ensure that our service hasn't been deregistered by Consul
if time.Since(lastChecked) <= getDeregisterTTL(regInterval) {
return nil
}
services, _, err := c.Client().Health().Checks(s.Name, c.queryOptions)
if err == nil {
for _, v := range services {
if v.ServiceID == node.Id {
return nil
}
}
}
} else {
// if the err is nil we're all good, bail out
// if not, we don't know what the state is, so full re-register
if err := c.Client().Agent().PassTTL("service:"+node.Id, ""); err == nil {
return nil
}
}
}

// encode the tags
tags := encodeMetadata(node.Metadata)
tags = append(tags, encodeEndpoints(s.Endpoints)...)
tags = append(tags, encodeVersion(s.Version)...)

var check *consul.AgentServiceCheck

checkTTL := regInterval
if regTCPCheck {
deregTTL := getDeregisterTTL(regInterval)

Expand All @@ -255,6 +228,7 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
} else if regHTTPCheck {
interval, _ := time.ParseDuration(httpCheckConfig.Interval)
deregTTL := getDeregisterTTL(interval)
checkTTL = interval

host, _, _ := net.SplitHostPort(node.Address)
healthCheckURI := strings.Replace(httpCheckConfig.HTTP, "{host}", host, 1)
Expand All @@ -269,12 +243,53 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
// if the TTL is greater than 0 create an associated check
} else if options.TTL > time.Duration(0) {
deregTTL := getDeregisterTTL(options.TTL)
checkTTL = options.TTL

check = &consul.AgentServiceCheck{
TTL: fmt.Sprintf("%v", options.TTL),
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
}
}
if c.opts.Context != nil {
if ttl, ok := c.opts.Context.Value("consul_check_ttl").(time.Duration); ok {
checkTTL = ttl
}
}

// if it's already registered and matches then just pass the check
if ok && v == h {
passing := false
if time.Since(lastChecked) > checkTTL {
services, _, _ := c.Client().Health().Checks(s.Name, c.queryOptions)
for _, service := range services {
if service.ServiceID == node.Id && service.Status == "passing" {
passing = true
c.Lock()
c.lastChecked[s.Name] = time.Now()
c.Unlock()
break
}
}
} else {
passing = true
}
if passing {
if options.TTL == time.Duration(0) {
return nil
}
// if the err is nil we're all good, bail out
// if not, we don't know what the state is, so full re-register
if err := c.Client().Agent().UpdateTTL("service:"+node.Id, "", "pass"); err == nil {
return nil
}
}
c.Deregister(s)
}

// encode the tags
tags := encodeMetadata(node.Metadata)
tags = append(tags, encodeEndpoints(s.Endpoints)...)
tags = append(tags, encodeVersion(s.Version)...)

host, pt, _ := net.SplitHostPort(node.Address)
if host == "" {
Expand Down Expand Up @@ -315,7 +330,7 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
}

// pass the healthcheck
return c.Client().Agent().PassTTL("service:"+node.Id, "")
return c.Client().Agent().UpdateTTL("service:"+node.Id, "", "pass")
}

func (c *consulRegistry) GetService(name string, opts ...registry.GetOption) ([]*registry.Service, error) {
Expand Down
15 changes: 11 additions & 4 deletions v3/registry/consul/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,24 @@ func Config(c *consul.Config) registry.Option {
}
}

// CheckTTL allows you to periodically check the registration of the service to ensure
// that the registration actually exists in the consul
func CheckTTL(t time.Duration) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, "consul_check_ttl", t)
}
}

// AllowStale sets whether any Consul server (non-leader) can service
// a read. This allows for lower latency and higher throughput
// at the cost of potentially stale data.
// Works similar to Consul DNS Config option [1].
// Defaults to true.
//
// [1] https://www.consul.io/docs/agent/options.html#allow_stale
//
func AllowStale(v bool) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
Expand All @@ -49,7 +59,6 @@ func AllowStale(v bool) registry.Option {
// Consul. See `Consul API` for more information [1].
//
// [1] https://godoc.org/github.com/hashicorp/consul/api#QueryOptions
//
func QueryOptions(q *consul.QueryOptions) registry.Option {
return func(o *registry.Options) {
if q == nil {
Expand All @@ -62,13 +71,11 @@ func QueryOptions(q *consul.QueryOptions) registry.Option {
}
}

//
// TCPCheck will tell the service provider to check the service address
// and port every `t` interval. It will enabled only if `t` is greater than 0.
// See `TCP + Interval` for more information [1].
//
// [1] https://www.consul.io/docs/agent/checks.html
//
func TCPCheck(t time.Duration) registry.Option {
return func(o *registry.Options) {
if t <= time.Duration(0) {
Expand Down
93 changes: 54 additions & 39 deletions v4/registry/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (c *consulRegistry) Deregister(s *registry.Service, opts ...registry.Deregi
c.Unlock()

node := s.Nodes[0]

return c.Client().Agent().ServiceDeregister(node.Id)
}

Expand All @@ -176,12 +177,13 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
return errors.New("Require at least one node")
}

var regTCPCheck bool
var regInterval time.Duration
var regHTTPCheck bool
var httpCheckConfig consul.AgentServiceCheck

var options registry.RegisterOptions
var (
regTCPCheck bool
regInterval time.Duration
regHTTPCheck bool
httpCheckConfig consul.AgentServiceCheck
options registry.RegisterOptions
)
for _, o := range opts {
o(&options)
}
Expand All @@ -191,9 +193,9 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
regTCPCheck = true
regInterval = tcpCheckInterval
}
var ok bool
if httpCheckConfig, ok = c.opts.Context.Value("consul_http_check_config").(consul.AgentServiceCheck); ok {
if conf, ok := c.opts.Context.Value("consul_http_check_config").(consul.AgentServiceCheck); ok {
regHTTPCheck = true
httpCheckConfig = conf
}
}

Expand All @@ -212,37 +214,8 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
lastChecked := c.lastChecked[s.Name]
c.Unlock()

// if it's already registered and matches then just pass the check
if ok && v == h {
if options.TTL == time.Duration(0) {
// ensure that our service hasn't been deregistered by Consul
if time.Since(lastChecked) <= getDeregisterTTL(regInterval) {
return nil
}
services, _, err := c.Client().Health().Checks(s.Name, c.queryOptions)
if err == nil {
for _, v := range services {
if v.ServiceID == node.Id {
return nil
}
}
}
} else {
// if the err is nil we're all good, bail out
// if not, we don't know what the state is, so full re-register
if err := c.Client().Agent().PassTTL("service:"+node.Id, ""); err == nil {
return nil
}
}
}

// encode the tags
tags := encodeMetadata(node.Metadata)
tags = append(tags, encodeEndpoints(s.Endpoints)...)
tags = append(tags, encodeVersion(s.Version)...)

var check *consul.AgentServiceCheck

checkTTL := regInterval
if regTCPCheck {
deregTTL := getDeregisterTTL(regInterval)

Expand All @@ -255,6 +228,7 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
} else if regHTTPCheck {
interval, _ := time.ParseDuration(httpCheckConfig.Interval)
deregTTL := getDeregisterTTL(interval)
checkTTL = interval

host, _, _ := net.SplitHostPort(node.Address)
healthCheckURI := strings.Replace(httpCheckConfig.HTTP, "{host}", host, 1)
Expand All @@ -269,12 +243,53 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
// if the TTL is greater than 0 create an associated check
} else if options.TTL > time.Duration(0) {
deregTTL := getDeregisterTTL(options.TTL)
checkTTL = options.TTL

check = &consul.AgentServiceCheck{
TTL: fmt.Sprintf("%v", options.TTL),
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
}
}
if c.opts.Context != nil {
if ttl, ok := c.opts.Context.Value("consul_check_ttl").(time.Duration); ok {
checkTTL = ttl
}
}

// if it's already registered and matches then just pass the check
if ok && v == h {
passing := false
if time.Since(lastChecked) > checkTTL {
services, _, _ := c.Client().Health().Checks(s.Name, c.queryOptions)
for _, service := range services {
if service.ServiceID == node.Id && service.Status == "passing" {
passing = true
c.Lock()
c.lastChecked[s.Name] = time.Now()
c.Unlock()
break
}
}
} else {
passing = true
}
if passing {
if options.TTL == time.Duration(0) {
return nil
}
// if the err is nil we're all good, bail out
// if not, we don't know what the state is, so full re-register
if err := c.Client().Agent().UpdateTTL("service:"+node.Id, "", "pass"); err == nil {
return nil
}
}
c.Deregister(s)
}

// encode the tags
tags := encodeMetadata(node.Metadata)
tags = append(tags, encodeEndpoints(s.Endpoints)...)
tags = append(tags, encodeVersion(s.Version)...)

host, pt, _ := net.SplitHostPort(node.Address)
if host == "" {
Expand Down Expand Up @@ -316,7 +331,7 @@ func (c *consulRegistry) Register(s *registry.Service, opts ...registry.Register
}

// pass the healthcheck
return c.Client().Agent().PassTTL("service:"+node.Id, "")
return c.Client().Agent().UpdateTTL("service:"+node.Id, "", "pass")
}

func (c *consulRegistry) GetService(name string, opts ...registry.GetOption) ([]*registry.Service, error) {
Expand Down
11 changes: 11 additions & 0 deletions v4/registry/consul/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ func Config(c *consul.Config) registry.Option {
}
}

// CheckTTL allows you to periodically check the registration of the service to ensure
// that the registration actually exists in the consul
func CheckTTL(t time.Duration) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, "consul_check_ttl", t)
}
}

// AllowStale sets whether any Consul server (non-leader) can service
// a read. This allows for lower latency and higher throughput
// at the cost of potentially stale data.
Expand Down
Loading