Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

*: introduce an option UseLeaseTTL for lease manager's TTL #1692

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
UnitsDirectory string
SystemdUser bool
AuthorizedKeysFile string
UseLeaseTTL bool
}

func (c *Config) Capabilities() machine.Capabilities {
Expand Down
7 changes: 0 additions & 7 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,6 @@ func New(reg CompleteRegistry, lManager lease.Manager, rStream pkg.EventStream,

func (e *Engine) Run(ival time.Duration, stop <-chan struct{}) {
leaseTTL := ival * 5
if e.machine.State().Capabilities.Has(machine.CapGRPC) {
// With grpc it doesn't make sense to set to 5secs the TTL of the etcd key.
// This has a special impact whenever we have high worload in the cluster, cause
// it'd provoke constant leader re-elections.
// TODO: IMHO, this should be configurable via a flag to disable the TTL.
leaseTTL = ival * 500000
}
machID := e.machine.State().ID

reconcile := func() {
Expand Down
2 changes: 1 addition & 1 deletion fleet.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@
# agent_ttl="30s"

# Interval at which the engine should reconcile the cluster schedule in etcd.
# engine_reconcile_interval=2
# engine_reconcile_interval=5
4 changes: 3 additions & 1 deletion fleetd/fleetd.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func Main() {
cfgset.String("etcd_cafile", "", "SSL Certificate Authority file used to secure etcd communication")
cfgset.String("etcd_key_prefix", registry.DefaultKeyPrefix, "Keyspace for fleet data in etcd")
cfgset.Float64("etcd_request_timeout", 1.0, "Amount of time in seconds to allow a single etcd request before considering it failed.")
cfgset.Float64("engine_reconcile_interval", 2.0, "Interval at which the engine should reconcile the cluster schedule in etcd.")
cfgset.Float64("engine_reconcile_interval", 5.0, "Interval at which the engine should reconcile the cluster schedule in etcd.")
cfgset.String("public_ip", "", "IP address that fleet machine should publish")
cfgset.String("metadata", "", "List of key-value metadata to assign to the fleet machine")
cfgset.String("agent_ttl", agent.DefaultTTL, "TTL in seconds of fleet machine state in etcd")
Expand All @@ -96,6 +96,7 @@ func Main() {
cfgset.Bool("disable_watches", false, "Disable the use of etcd watches. Increases scheduling latency")
cfgset.Bool("verify_units", false, "DEPRECATED - This option is ignored")
cfgset.String("authorized_keys_file", "", "DEPRECATED - This option is ignored")
cfgset.Bool("use_lease_ttl", false, "Enable the usage of TTL option when creating a lease key in etcd")

globalconf.Register("", cfgset)
cfg, err := getConfig(cfgset, *cfgPath)
Expand Down Expand Up @@ -237,6 +238,7 @@ func getConfig(flagset *flag.FlagSet, userCfgFile string) (*config.Config, error
SystemdUser: (*flagset.Lookup("systemd_user")).Value.(flag.Getter).Get().(bool),
TokenLimit: (*flagset.Lookup("token_limit")).Value.(flag.Getter).Get().(int),
AuthorizedKeysFile: (*flagset.Lookup("authorized_keys_file")).Value.(flag.Getter).Get().(string),
UseLeaseTTL: (*flagset.Lookup("use_lease_ttl")).Value.(flag.Getter).Get().(bool),
}

if cfg.VerifyUnits {
Expand Down
32 changes: 25 additions & 7 deletions pkg/lease/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"time"

etcd "github.com/coreos/etcd/client"
"github.com/coreos/fleet/log"

"golang.org/x/net/context"
)

Expand Down Expand Up @@ -53,8 +55,14 @@ func (l *etcdLease) Renew(period time.Duration) error {
val, err := serializeLeaseMetadata(l.meta.MachineID, l.meta.Version)
opts := &etcd.SetOptions{
PrevIndex: l.idx,
TTL: period,
}

log.Debugf("Renew %v", l.mgr.UseLeaseTTL)

if l.mgr.UseLeaseTTL {
opts.TTL = period
}

resp, err := l.mgr.kAPI.Set(l.mgr.ctx(), l.key, val, opts)
if err != nil {
return err
Expand Down Expand Up @@ -97,12 +105,13 @@ func serializeLeaseMetadata(machID string, ver int) (string, error) {
}

type etcdLeaseManager struct {
kAPI etcd.KeysAPI
keyPrefix string
kAPI etcd.KeysAPI
keyPrefix string
UseLeaseTTL bool
}

func NewEtcdLeaseManager(kAPI etcd.KeysAPI, keyPrefix string) *etcdLeaseManager {
return &etcdLeaseManager{kAPI: kAPI, keyPrefix: keyPrefix}
func NewEtcdLeaseManager(kAPI etcd.KeysAPI, keyPrefix string, useLeaseTTL bool) *etcdLeaseManager {
return &etcdLeaseManager{kAPI: kAPI, keyPrefix: keyPrefix, UseLeaseTTL: useLeaseTTL}
}

func (r *etcdLeaseManager) ctx() context.Context {
Expand Down Expand Up @@ -136,7 +145,12 @@ func (r *etcdLeaseManager) StealLease(name, machID string, ver int, period time.
key := r.leasePath(name)
opts := &etcd.SetOptions{
PrevIndex: idx,
TTL: period,
}

log.Debugf("StealLease %v", r.UseLeaseTTL)

if r.UseLeaseTTL {
opts.TTL = period
}
resp, err := r.kAPI.Set(r.ctx(), key, val, opts)
if err != nil {
Expand All @@ -158,9 +172,13 @@ func (r *etcdLeaseManager) AcquireLease(name string, machID string, ver int, per

key := r.leasePath(name)
opts := &etcd.SetOptions{
TTL: period,
PrevExist: etcd.PrevNoExist,
}
log.Debugf("AcquireLease %v", r.UseLeaseTTL)

if r.UseLeaseTTL {
opts.TTL = period
}

resp, err := r.kAPI.Set(r.ctx(), key, val, opts)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion registry/rpc/registrymux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestRegistryMuxUnitManagement(t *testing.T) {
e := &testEtcdKeysAPI{}
etcdReg := registry.NewEtcdRegistry(e, "/fleet/")

lManager := lease.NewEtcdLeaseManager(e, "/fleet/")
lManager := lease.NewEtcdLeaseManager(e, "/fleet/", false)
reg := NewRegistryMux(etcdReg, mach, lManager)

contents := `
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func New(cfg config.Config, listeners []net.Listener) (*Server, error) {
reg engine.CompleteRegistry
genericReg interface{}
)
lManager := lease.NewEtcdLeaseManager(kAPI, cfg.EtcdKeyPrefix)
lManager := lease.NewEtcdLeaseManager(kAPI, cfg.EtcdKeyPrefix, cfg.UseLeaseTTL)

if !cfg.EnableGRPC {
genericReg = registry.NewEtcdRegistry(kAPI, cfg.EtcdKeyPrefix)
Expand Down