Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove addresses from node class hash #24942

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changelog/24942.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
scheduler: Fixed a bug where node class hashes included unique attributes, making scheduling more costly
```

```release-note:breaking-change
node: The node attribute `consul.addr.dns` has been changed to `unique.consul.addr.dns`. The node attribute `nomad.advertise.address` has been changed to `unique.advertise.address`.
```
4 changes: 2 additions & 2 deletions client/allocrunner/networking_cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,10 @@ func (c *cniNetworkConfigurator) setupTransparentProxyArgs(alloc *structs.Alloca
func (c *cniNetworkConfigurator) dnsFromAttrs(cluster string) (string, int) {
var dnsAddrAttr, dnsPortAttr string
if cluster == structs.ConsulDefaultCluster || cluster == "" {
dnsAddrAttr = "consul.dns.addr"
dnsAddrAttr = "unique.consul.dns.addr"
dnsPortAttr = "consul.dns.port"
} else {
dnsAddrAttr = "consul." + cluster + ".dns.addr"
dnsAddrAttr = "unique.consul." + cluster + ".dns.addr"
dnsPortAttr = "consul." + cluster + ".dns.port"
}

Expand Down
20 changes: 10 additions & 10 deletions client/allocrunner/networking_cni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ func TestSetup(t *testing.T) {
}

nodeAddrs := map[string]string{
"consul.dns.addr": "192.168.1.117",
"consul.dns.port": "8600",
"unique.consul.dns.addr": "192.168.1.117",
"consul.dns.port": "8600",
}
nodeMeta := map[string]string{
"connect.transparent_proxy.default_outbound_port": "15001",
Expand Down Expand Up @@ -554,8 +554,8 @@ func TestCNI_setupTproxyArgs(t *testing.T) {
}

nodeAttrs := map[string]string{
"consul.dns.addr": "192.168.1.117",
"consul.dns.port": "8600",
"unique.consul.dns.addr": "192.168.1.117",
"consul.dns.port": "8600",
}

alloc := mock.ConnectAlloc()
Expand Down Expand Up @@ -716,8 +716,8 @@ func TestCNI_setupTproxyArgs(t *testing.T) {
{
name: "tproxy with consul dns disabled",
nodeAttrs: map[string]string{
"consul.dns.port": "-1",
"consul.dns.addr": "192.168.1.117",
"consul.dns.port": "-1",
"unique.consul.dns.addr": "192.168.1.117",
},
tproxySpec: &structs.ConsulTransparentProxy{},
expectIPConfig: &iptables.Config{
Expand All @@ -732,10 +732,10 @@ func TestCNI_setupTproxyArgs(t *testing.T) {
name: "tproxy for other cluster with default consul dns disabled",
cluster: "infra",
nodeAttrs: map[string]string{
"consul.dns.port": "-1",
"consul.dns.addr": "192.168.1.110",
"consul.infra.dns.port": "8600",
"consul.infra.dns.addr": "192.168.1.117",
"consul.dns.port": "-1",
"unique.consul.dns.addr": "192.168.1.110",
"consul.infra.dns.port": "8600",
"unique.consul.infra.dns.addr": "192.168.1.117",
},
tproxySpec: &structs.ConsulTransparentProxy{},
expectIPConfig: &iptables.Config{
Expand Down
50 changes: 25 additions & 25 deletions client/fingerprint/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,34 +186,34 @@ func (cfs *consulState) initialize(cfg *config.ConsulConfig, logger hclog.Logger

if cfg.Name == structs.ConsulDefaultCluster {
cfs.readers = map[string]valueReader{
"consul.server": cfs.server,
"consul.version": cfs.version,
"consul.sku": cfs.sku,
"consul.revision": cfs.revision,
"unique.consul.name": cfs.name, // note: won't have this for non-default clusters
"consul.datacenter": cfs.dc,
"consul.segment": cfs.segment,
"consul.connect": cfs.connect,
"consul.grpc": cfs.grpc(consulConfig.Scheme, logger),
"consul.ft.namespaces": cfs.namespaces,
"consul.partition": cfs.partition,
"consul.dns.port": cfs.dnsPort,
"consul.dns.addr": cfs.dnsAddr(logger),
"consul.server": cfs.server,
"consul.version": cfs.version,
"consul.sku": cfs.sku,
"consul.revision": cfs.revision,
"unique.consul.name": cfs.name, // note: won't have this for non-default clusters
"consul.datacenter": cfs.dc,
"consul.segment": cfs.segment,
"consul.connect": cfs.connect,
"consul.grpc": cfs.grpc(consulConfig.Scheme, logger),
"consul.ft.namespaces": cfs.namespaces,
"consul.partition": cfs.partition,
"consul.dns.port": cfs.dnsPort,
"unique.consul.dns.addr": cfs.dnsAddr(logger),
}
} else {
cfs.readers = map[string]valueReader{
fmt.Sprintf("consul.%s.server", cfg.Name): cfs.server,
fmt.Sprintf("consul.%s.version", cfg.Name): cfs.version,
fmt.Sprintf("consul.%s.sku", cfg.Name): cfs.sku,
fmt.Sprintf("consul.%s.revision", cfg.Name): cfs.revision,
fmt.Sprintf("consul.%s.datacenter", cfg.Name): cfs.dc,
fmt.Sprintf("consul.%s.segment", cfg.Name): cfs.segment,
fmt.Sprintf("consul.%s.connect", cfg.Name): cfs.connect,
fmt.Sprintf("consul.%s.grpc", cfg.Name): cfs.grpc(consulConfig.Scheme, logger),
fmt.Sprintf("consul.%s.ft.namespaces", cfg.Name): cfs.namespaces,
fmt.Sprintf("consul.%s.partition", cfg.Name): cfs.partition,
fmt.Sprintf("consul.%s.dns.port", cfg.Name): cfs.dnsPort,
fmt.Sprintf("consul.%s.dns.addr", cfg.Name): cfs.dnsAddr(logger),
fmt.Sprintf("consul.%s.server", cfg.Name): cfs.server,
fmt.Sprintf("consul.%s.version", cfg.Name): cfs.version,
fmt.Sprintf("consul.%s.sku", cfg.Name): cfs.sku,
fmt.Sprintf("consul.%s.revision", cfg.Name): cfs.revision,
fmt.Sprintf("consul.%s.datacenter", cfg.Name): cfs.dc,
fmt.Sprintf("consul.%s.segment", cfg.Name): cfs.segment,
fmt.Sprintf("consul.%s.connect", cfg.Name): cfs.connect,
fmt.Sprintf("consul.%s.grpc", cfg.Name): cfs.grpc(consulConfig.Scheme, logger),
fmt.Sprintf("consul.%s.ft.namespaces", cfg.Name): cfs.namespaces,
fmt.Sprintf("consul.%s.partition", cfg.Name): cfs.partition,
fmt.Sprintf("consul.%s.dns.port", cfg.Name): cfs.dnsPort,
fmt.Sprintf("unique.consul.%s.dns.addr", cfg.Name): cfs.dnsAddr(logger),
}
}

Expand Down
52 changes: 26 additions & 26 deletions client/fingerprint/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,19 +696,19 @@ func TestConsulFingerprint_Fingerprint_ent(t *testing.T) {
err := cf.Fingerprint(&FingerprintRequest{Config: cfg, Node: node}, &resp)
must.NoError(t, err)
must.Eq(t, map[string]string{
"consul.datacenter": "dc1",
"consul.revision": "22ce6c6ad",
"consul.segment": "seg1",
"consul.server": "true",
"consul.sku": "ent",
"consul.version": "1.9.5+ent",
"consul.ft.namespaces": "true",
"consul.connect": "true",
"consul.grpc": "8502",
"consul.dns.addr": "192.168.1.117",
"consul.dns.port": "8600",
"consul.partition": "default",
"unique.consul.name": "HAL9000",
"consul.datacenter": "dc1",
"consul.revision": "22ce6c6ad",
"consul.segment": "seg1",
"consul.server": "true",
"consul.sku": "ent",
"consul.version": "1.9.5+ent",
"consul.ft.namespaces": "true",
"consul.connect": "true",
"consul.grpc": "8502",
"unique.consul.dns.addr": "192.168.1.117",
"consul.dns.port": "8600",
"consul.partition": "default",
"unique.consul.name": "HAL9000",
}, resp.Attributes)
must.True(t, resp.Detected)

Expand Down Expand Up @@ -752,19 +752,19 @@ func TestConsulFingerprint_Fingerprint_ent(t *testing.T) {
err4 := cf.Fingerprint(&FingerprintRequest{Config: cfg, Node: node}, &resp4)
must.NoError(t, err4)
must.Eq(t, map[string]string{
"consul.datacenter": "dc1",
"consul.revision": "22ce6c6ad",
"consul.segment": "seg1",
"consul.server": "true",
"consul.sku": "ent",
"consul.version": "1.9.5+ent",
"consul.ft.namespaces": "true",
"consul.connect": "true",
"consul.grpc": "8502",
"consul.dns.addr": "192.168.1.117",
"consul.dns.port": "8600",
"consul.partition": "default",
"unique.consul.name": "HAL9000",
"consul.datacenter": "dc1",
"consul.revision": "22ce6c6ad",
"consul.segment": "seg1",
"consul.server": "true",
"consul.sku": "ent",
"consul.version": "1.9.5+ent",
"consul.ft.namespaces": "true",
"consul.connect": "true",
"consul.grpc": "8502",
"unique.consul.dns.addr": "192.168.1.117",
"consul.dns.port": "8600",
"consul.partition": "default",
"unique.consul.name": "HAL9000",
}, resp4.Attributes)

// consul now available again
Expand Down
2 changes: 1 addition & 1 deletion client/fingerprint/nomad.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewNomadFingerprint(logger log.Logger) Fingerprint {
}

func (f *NomadFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintResponse) error {
resp.AddAttribute("nomad.advertise.address", req.Node.HTTPAddr)
resp.AddAttribute("unique.advertise.address", req.Node.HTTPAddr)
resp.AddAttribute("nomad.version", req.Config.Version.VersionNumber())
resp.AddAttribute("nomad.revision", req.Config.Version.Revision)
resp.AddAttribute("nomad.service_discovery", strconv.FormatBool(req.Config.NomadServiceDiscovery))
Expand Down
2 changes: 1 addition & 1 deletion client/fingerprint/nomad_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestNomadFingerprint(t *testing.T) {
t.Fatalf("incorrect revision")
}

if response.Attributes["nomad.advertise.address"] != h {
if response.Attributes["unique.advertise.address"] != h {
t.Fatalf("incorrect advertise address")
}

Expand Down
2 changes: 2 additions & 0 deletions nomad/structs/node_class.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func EscapedConstraints(constraints []*Constraint) []*Constraint {
// computed node class optimization.
func constraintTargetEscapes(target string) bool {
switch {
case strings.HasPrefix(target, "${unique."):
return true
Comment on lines +126 to +127
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not actually sure there actually are values that fall into this case (the unique attributes from fingerprinting would be attr.unique), but it was one of the test cases.

case strings.HasPrefix(target, "${node.unique."):
return true
case strings.HasPrefix(target, "${attr.unique."):
Expand Down
8 changes: 3 additions & 5 deletions nomad/structs/node_class_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package structs

import (
"reflect"
"testing"

"github.com/hashicorp/nomad/ci"
Expand Down Expand Up @@ -276,8 +275,7 @@ func TestNode_EscapedConstraints(t *testing.T) {
Operand: "!=",
}
constraints := []*Constraint{ne1, ne2, ne3, e1, e2, e3}
expected := []*Constraint{ne1, ne2, ne3}
if act := EscapedConstraints(constraints); reflect.DeepEqual(act, expected) {
t.Fatalf("EscapedConstraints(%v) returned %v; want %v", constraints, act, expected)
}
expected := []*Constraint{e1, e2, e3}
must.Eq(t, expected, EscapedConstraints(constraints),
must.Sprintf("expected unique fields to escape constraints"))
}
57 changes: 38 additions & 19 deletions scheduler/benchmarks/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ package benchmarks

import (
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/require"
"github.com/shoenig/test/must"

"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
Expand Down Expand Up @@ -38,7 +39,7 @@ func BenchmarkSchedulerExample(b *testing.B) {
upsertNodes(h, 5000, 100)

iter, err := h.State.Nodes(nil)
require.NoError(b, err)
must.NoError(b, err)
nodes := 0
for {
raw := iter.Next()
Expand All @@ -47,8 +48,8 @@ func BenchmarkSchedulerExample(b *testing.B) {
}
nodes++
}
require.Equal(b, 5000, nodes)
job := generateJob(true, 600)
must.Eq(b, 5000, nodes)
job := generateJob(true, 600, 100)
eval = upsertJob(h, job)
}

Expand All @@ -58,24 +59,24 @@ func BenchmarkSchedulerExample(b *testing.B) {
// benchmarking a successful run and not a failed plan.
{
err := h.Process(scheduler.NewServiceScheduler, eval)
require.NoError(b, err)
require.Len(b, h.Plans, 1)
require.False(b, h.Plans[0].IsNoOp())
must.NoError(b, err)
must.Len(b, 1, h.Plans)
must.False(b, h.Plans[0].IsNoOp())
}

for i := 0; i < b.N; i++ {
err := h.Process(scheduler.NewServiceScheduler, eval)
require.NoError(b, err)
must.NoError(b, err)
}
}

// BenchmarkServiceScheduler exercises the service scheduler at a
// variety of cluster sizes, with both spread and non-spread jobs
func BenchmarkServiceScheduler(b *testing.B) {

clusterSizes := []int{1000, 5000, 10000}
rackSets := []int{10, 25, 50, 75}
jobSizes := []int{300, 600, 900, 1200}
clusterSizes := []int{500, 1000, 5000, 10000}
rackSets := []int{25, 50, 75}
jobSizes := []int{50, 300, 600, 900, 1200}

type benchmark struct {
name string
Expand Down Expand Up @@ -112,18 +113,21 @@ func BenchmarkServiceScheduler(b *testing.B) {
}

for _, bm := range benchmarks {
job := generateJob(bm.withSpread, bm.jobSize, bm.racks)
h := scheduler.NewHarness(b)
h.SetNoSubmit()
upsertNodes(h, bm.clusterSize, bm.racks)
eval := upsertJob(h, job)
b.ResetTimer()

b.Run(bm.name, func(b *testing.B) {
h := scheduler.NewHarness(b)
upsertNodes(h, bm.clusterSize, bm.racks)
job := generateJob(bm.withSpread, bm.jobSize)
eval := upsertJob(h, job)
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := h.Process(scheduler.NewServiceScheduler, eval)
require.NoError(b, err)
must.NoError(b, err)
}
})
}

}

func upsertJob(h *scheduler.Harness, job *structs.Job) *structs.Evaluation {
Expand All @@ -147,13 +151,26 @@ func upsertJob(h *scheduler.Harness, job *structs.Job) *structs.Evaluation {
return eval
}

func generateJob(withSpread bool, jobSize int) *structs.Job {
func generateJob(withSpread bool, jobSize int, racks int) *structs.Job {
job := mock.Job()
job.Datacenters = []string{"dc-1", "dc-2"}
if withSpread {
job.Spreads = []*structs.Spread{{Attribute: "${meta.rack}"}}
}
job.Constraints = []*structs.Constraint{}

// only half the racks will be considered eligibble
rackTargets := []string{}
for i := range racks / 2 {
rackTargets = append(rackTargets, fmt.Sprintf("r%d", i))
}
rackTarget := strings.Join(rackTargets, ",")
job.Constraints = []*structs.Constraint{
{
LTarget: "${meta.rack}",
RTarget: rackTarget,
Operand: "set_contains_any",
},
}
job.TaskGroups[0].Count = jobSize
job.TaskGroups[0].Networks = nil
job.TaskGroups[0].Services = []*structs.Service{}
Expand All @@ -173,6 +190,7 @@ func upsertNodes(h *scheduler.Harness, count, racks int) {
node.Datacenter = datacenters[i%2]
node.Meta = map[string]string{}
node.Meta["rack"] = fmt.Sprintf("r%d", i%racks)
node.Attributes["unique.advertise.address"] = fmt.Sprintf("192.168.%d.%d", i%10, i%120)
memoryMB := 32000
diskMB := 100 * 1024

Expand All @@ -196,6 +214,7 @@ func upsertNodes(h *scheduler.Harness, count, racks int) {
},
}
node.NodeResources.Compatibility()
node.ComputeClass()

err := h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)
if err != nil {
Expand Down
Loading