Skip to content

Commit

Permalink
allocator: scale member ItemLimit to the relative share of ItemSlots
Browse files Browse the repository at this point in the history
Item replication may not be evenly distributed.

Previously, we were not accounting for this within each allocation
sub-problem, and scaled member ItemLimits by the relative ratio of
items in each sub-problem rather than the relative ratio of item *slots*.

This caused excessive re-assignement of items in solved maximum
assignment flows, as push/relabel must spend more time back-tracking as
it overflows the initial constraints on item limits.

Introduce a new BenchmarkChangingReplication which reproduces this
effect.

Then, solve it by tweaking sparse flow network construction to scale
a member ItemLimit by the relative portion of assignment slots that
must be allocated within each sub-problem (rather than just the
relative number of items).

BenchmarkChangingReplication confirms this heuristic update dramatically
reduces the degree of assignment churn (note `run.ratio`, which is a
measure of average churn per item replication change):

Before:
    final metrics    adds=142806 packs=90617 removes=83044 run.ratio=16.58809060161935 run.total=9757
After:
    final metrics    adds=69854 packs=4457 removes=10092 run.ratio=1.6343138259710976 run.total=9757
  • Loading branch information
jgraettinger committed Jan 18, 2025
1 parent da22df6 commit f510dce
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 16 deletions.
64 changes: 51 additions & 13 deletions allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,27 @@ func Allocate(args AllocateArgs) error {
// Do we need to re-solve for a maximum assignment?
if state.NetworkHash != lastNetworkHash {
var startTime = time.Now()
desired = solveDesiredAssignments(state, desired[:0])
desired = SolveDesiredAssignments(state, desired[:0])
var dur = time.Since(startTime)
allocatorMaxFlowRuntimeSeconds.Observe(dur.Seconds())

var added, removed, unchanged = ChangeSummary(state.Assignments, desired)

log.WithFields(log.Fields{
"dur": dur,
"hash": state.NetworkHash,
"itemSlots": state.ItemSlots,
"items": len(state.Items),
"lastHash": lastNetworkHash,
"memberSlots": state.MemberSlots,
"members": len(state.Members),
"nextAssignments": len(desired),
"prevAssignments": len(state.Assignments),
"rev": state.KS.Header.Revision,
"root": state.KS.Root,
"asn.last": len(state.Assignments),
"asn.next": len(desired),
"asn.next.add": added,
"asn.next.rem": removed,
"asn.next.same": unchanged,
"dur": dur,
"hash.last": lastNetworkHash,
"hash.next": state.NetworkHash,
"item.slots": state.ItemSlots,
"item.total": len(state.Items),
"mem.slots": state.MemberSlots,
"mem.total": len(state.Members),
"rev": state.KS.Header.Revision,
"root": state.KS.Root,
}).Info("solved for maximum assignment")

if len(desired) < state.ItemSlots {
Expand Down Expand Up @@ -205,7 +210,7 @@ func removeDeadAssignments(txn checkpointTxn, ks *keyspace.KeySpace, asn keyspac
return nil
}

func solveDesiredAssignments(s *State, desired []Assignment) []Assignment {
func SolveDesiredAssignments(s *State, desired []Assignment) []Assignment {
// Number of items to lump into each invocation of push/relabel.
// This is an arbitrary number which is empirically fast to solve,
// but is large enough that we're unlikely to see further improvements
Expand All @@ -230,6 +235,39 @@ func solveDesiredAssignments(s *State, desired []Assignment) []Assignment {
return desired
}

// Compute the total number of additions, removals, and unchanged assignments
// if `current` assignments are shifted to `desired`.
func ChangeSummary(current keyspace.KeyValues, desired []Assignment) (added, removed, unchanged int) {
for lhs, rhs := current, desired; len(lhs) != 0 || len(rhs) != 0; {
var cmp int

if len(lhs) == 0 {
cmp = 1
} else if len(rhs) == 0 {
cmp = -1
} else if lh, rh := lhs[0].Decoded.(Assignment), rhs[0]; lh.ItemID != rh.ItemID {
cmp = strings.Compare(lh.ItemID, rh.ItemID)
} else if lh.MemberZone != rh.MemberZone {
cmp = strings.Compare(lh.MemberZone, rh.MemberZone)
} else {
cmp = strings.Compare(lh.MemberSuffix, rh.MemberSuffix)
}

switch cmp {
case -1:
removed += 1
lhs = lhs[1:]
case 1:
added += 1
rhs = rhs[1:]
case 0:
unchanged += 1
lhs, rhs = lhs[1:], rhs[1:]
}
}
return
}

// modRevisionUnchanged returns a Cmp which verifies the key has not changed
// from the current KeyValue.
func modRevisionUnchanged(kv keyspace.KeyValue) clientv3.Cmp {
Expand Down
118 changes: 118 additions & 0 deletions allocator/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"math/rand/v2"
"testing"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -28,6 +29,7 @@ func (s *BenchmarkHealthSuite) TestBenchmarkHealth(c *gc.C) {
var fakeB = testing.B{N: 1}

benchmarkSimulatedDeploy(&fakeB)
BenchmarkChangingReplication(&fakeB)
}

var _ = gc.Suite(&BenchmarkHealthSuite{})
Expand Down Expand Up @@ -149,6 +151,122 @@ func benchmarkSimulatedDeploy(b *testing.B) {
}).Info("final metrics")
}

func BenchmarkChangingReplication(b *testing.B) {
var client = etcdtest.TestClient()
defer etcdtest.Cleanup()

var ctx = context.Background()
var ks = NewAllocatorKeySpace("/root", testAllocDecoder{})
var state = NewObservedState(ks, MemberKey(ks, "zone", "leader"), isConsistent)

var NMembers = 10
var NItems = 323 // Or: 32303
var NMaxRun = 200
var rng = rand.NewPCG(8675, 309)

b.Logf("Benchmarking with %d items, %d members", NItems, NMembers)

// fill inserts (if `asInsert`) or modifies keys/values defined by `kvcb` and the range [begin, end).
var fill = func(begin, end int, asInsert bool, kvcb func(i int) (string, string)) {
var kv = make([]string, 0, 2*(end-begin))

for i := begin; i != end; i++ {
var k, v = kvcb(i)
kv = append(kv, k)
kv = append(kv, v)
}
if asInsert {
require.NoError(b, insert(ctx, client, kv...))
} else {
require.NoError(b, update(ctx, client, kv...))
}
}

// Insert a Member key which will act as the leader.
require.NoError(b, insert(ctx, client, state.LocalKey, `{"R": 1}`))

// Announce all Members.
fill(0, NMembers, true, func(i int) (string, string) {
return MemberKey(ks, "zone-a", fmt.Sprintf("m%05d", i)), `{"R": 10000}`
})
// Announce all Items with full replication.
fill(0, NItems, true, func(i int) (string, string) {
return ItemKey(ks, fmt.Sprintf("i%05d", i)), `{"R":2}`
})

var testState = struct {
step int
total int
}{step: 0, total: 0}

var testHook = func(round int, idle bool) {
if !idle {
return
} else if err := markAllConsistent(ctx, client, ks, ""); err == nil {
return
} else if err == io.ErrNoProgress {
// Continue the next test step below.
} else {
log.WithField("err", err).Warn("failed to mark all consistent (will retry)")
return
}

// Pick a run of contiguous items, and update each to a random replication.
// This strategy is designed to excercise imbalances of the number of
// replication slots across allocation sub-problems.
var r = rng.Uint64() % 3
var run = 1 + int(rng.Uint64()%(uint64(NMaxRun)-1))
var start = int(rng.Uint64() % uint64((NItems - run)))

if r == 2 {
r = 3 // All items begin with R=2.
}

log.WithFields(log.Fields{
"r": r,
"run": run,
"start": start,
"step": testState.step,
}).Info("next test step")

if testState.step == b.N {
// Begin a graceful exit.
update(ctx, client, state.LocalKey, `{"R": 0}`)
return
}
testState.step += 1
testState.total += run

var value = fmt.Sprintf(`{"R":%d}`, r)
fill(start, start+run, false, func(i int) (string, string) {
return ItemKey(ks, fmt.Sprintf("i%05d", i)), value
})
}

require.NoError(b, ks.Load(ctx, client, 0))
go ks.Watch(ctx, client)

require.NoError(b, Allocate(AllocateArgs{
Context: ctx,
Etcd: client,
State: state,
TestHook: testHook,
}))

var adds = counterVal(allocatorAssignmentAddedTotal)
var packs = counterVal(allocatorAssignmentPackedTotal)
var removes = counterVal(allocatorAssignmentRemovedTotal)
var ratio = (float64(adds-2*float64(NItems)) + float64(removes)) / float64(testState.total)

log.WithFields(log.Fields{
"adds": adds,
"packs": packs,
"removes": removes,
"run.ratio": ratio,
"run.total": testState.total,
}).Info("final metrics")
}

func benchMemberKey(ks *keyspace.KeySpace, i int) string {
var zone string

Expand Down
12 changes: 9 additions & 3 deletions allocator/sparse_flow_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ import (
// relaxed until a maximal assignment is achieved.
type sparseFlowNetwork struct {
*State
myItems keyspace.KeyValues // Slice of State.Items included in this network.
myItems keyspace.KeyValues // Slice of State.Items included in this network.
myItemSlots int // Summed of replication slots attributable just to myItems.

firstItemNodeID pr.NodeID // First Item NodeID in the graph.
firstZoneItemNodeID pr.NodeID // First Zone-Item NodeID in the graph.
Expand Down Expand Up @@ -141,6 +142,7 @@ func newSparseFlowNetwork(s *State, myItems keyspace.KeyValues) *sparseFlowNetwo
// Accelerate our left-join by skipping to the first assignment of `myItems` via binary search.
var pivot, _ = s.Assignments.Search(ItemAssignmentsPrefix(s.KS, itemAt(myItems, 0).ID))
var myAssignments = s.Assignments[pivot:]
var myItemSlots int

var it = LeftJoin{
LenL: len(myItems),
Expand All @@ -152,6 +154,7 @@ func newSparseFlowNetwork(s *State, myItems keyspace.KeyValues) *sparseFlowNetwo
for cur, ok := it.Next(); ok; cur, ok = it.Next() {
var item = cur.Left
var assignments = myAssignments[cur.RightBegin:cur.RightEnd]
myItemSlots += myItems[item].Decoded.(Item).DesiredReplication()

// Left-join zones with |assignments| of this |item|.
var it2 = LeftJoin{
Expand Down Expand Up @@ -196,6 +199,7 @@ func newSparseFlowNetwork(s *State, myItems keyspace.KeyValues) *sparseFlowNetwo
var fs = &sparseFlowNetwork{
State: s,
myItems: myItems,
myItemSlots: myItemSlots,
firstItemNodeID: firstItemNodeID,
firstZoneItemNodeID: firstZoneItemNodeID,
firstMemberNodeID: firstMemberNodeID,
Expand Down Expand Up @@ -348,8 +352,10 @@ func (fs *sparseFlowNetwork) buildCurrentItemArcs(item int, bound int) []pr.Arc
// buildMemberArc from member `member` to the sink.
func (fs *sparseFlowNetwork) buildMemberArc(mf *pr.MaxFlow, id pr.NodeID, member int) []pr.Arc {
var c = memberAt(fs.Members, member).ItemLimit()
// Constrain to the scaled ItemLimit for our portion of the global assignment problem.
c = scaleAndRound(c, len(fs.myItems), len(fs.Items))

// Scale ItemLimit by the relative share of ItemSlots within
// our subset of the global assignment problem.
c = scaleAndRound(c, fs.myItemSlots, fs.ItemSlots)

if mf.RelativeHeight(id) < memberOverflowThreshold {
// Further scale to our relative "fair share" items.
Expand Down

0 comments on commit f510dce

Please sign in to comment.