-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathconsistent_hashing.h
1307 lines (1114 loc) · 65 KB
/
consistent_hashing.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// In Consistent Hashing, a Ring is represented as an array of sorted in ascending order tokens, and each of those
// tokens identifies a segment in the ring.
//
// The key property is that every segment owns the ring-space defined by the range:
// (prev_segment.token, segment.token]
// that is, starting but excluding the token of the previous segment in the ring, upto and including the token of the segment.
//
// The two exceptions are for tokens that <= the first tokens in the ring or > last tokens in the ring(ring semantics)
// -- For the last segment in the array, its next segment is the first segment in the array
// -- For the first segment in the array, its previous segment is the last segment in the array
//
//
// You should operate on ring segments, and for a typical distributed service, each segment will be owned by a primary replica, and based on
// replication strategies and factors, more(usually, the successor) segments will also get to hold to hold the same segment's data.
#pragma once
#ifdef HAVE_SWITCH
#include "switch.h"
#include "switch_vector.h"
#include <experimental/optional>
#else
#include <algorithm>
#include <experimental/optional>
#include <limits>
#include <stdint.h>
#include <string.h>
#include <vector>
#endif
#include <unordered_map>
namespace ConsistentHashing
{
// Returns lowest index where token <= tokens[index]
// if it returns cnt, use 0 ( because tokens[0] owns ( tokens[cnt - 1], tokens[0] ]
template <typename T>
static uint32_t search(const T *const tokens, const uint32_t cnt, const T token)
{
int32_t h = cnt - 1, l{0};
while (l <= h)
{
// This protects from overflows: see http://locklessinc.com/articles/binary_search/
// The addition can be split up bitwise. The carry between bits can be obtained by
// the logical-and of the two summands. The resultant bit can be obtained by a XOR
//
// https://en.wikipedia.org/wiki/Binary_search_algorithm#Implementation_issues
// The problem with overflow is that if (l + h) add up to value greater than INT32_MAX,
// (exceeds the range of integers of the data type used to store the midpoint, even if
// l and h are withing rhe range). If l and h, this can non-negatives, this can be avoided
// by calculating the modpoint as: (l + (r - l) / 2)
//
// We are not using unsigned integers though -- though we should look for a way
// to do that so that we could safely use (l + (r - l ) / 2)
// so we can't use >> 1 here becuse (l + r) may result in a negative number
// and shifting by >> 1 won't divide that number by two.
const auto m = (l & h) + ((l ^ h) >> 1);
const auto v = tokens[m];
const auto r = TrivialCmp(token, v);
if (!r)
return m;
else if (r < 0)
h = m - 1;
else
l = m + 1;
}
return l;
}
// An 128bit token representation
// You should probably use 128 or more bits for the tokens space
struct token128
{
uint64_t ms;
uint64_t ls;
constexpr token128()
: ms{0}, ls{0}
{
}
constexpr token128(const uint64_t m, const uint64_t l)
: ms{m}, ls{l}
{
}
constexpr bool is_minimum() const noexcept
{
return ms == 0 && ls == 0;
}
constexpr operator bool() const noexcept
{
return is_valid();
}
constexpr bool is_valid() const noexcept
{
return ms || ls;
}
constexpr bool operator==(const token128 &o) const noexcept
{
return ms == o.ms && ls == o.ls;
}
constexpr bool operator!=(const token128 &o) const noexcept
{
return ms != o.ms || ls != o.ls;
}
constexpr bool operator>(const token128 &o) const noexcept
{
return ms > o.ms || (ms == o.ms && ls > o.ls);
}
constexpr bool operator<(const token128 &o) const noexcept
{
return ms < o.ms || (ms == o.ms && ls < o.ls);
}
constexpr bool operator>=(const token128 &o) const noexcept
{
return ms > o.ms || (ms == o.ms && ls >= o.ls);
}
constexpr bool operator<=(const token128 &o) const noexcept
{
return ms < o.ms || (ms == o.ms && ls <= o.ls);
}
constexpr auto &operator=(const token128 &o) noexcept
{
ms = o.ms;
ls = o.ls;
return *this;
}
constexpr void reset() noexcept
{
ms = 0;
ls = 0;
}
};
// A segment in a ring. The segment is responsible(owns) the tokens range
// (left, right] i.e left exlusive, right inclusive
// whereas left is the token of the predecessor segment and right is the token of this segment
// See also: https://en.wikipedia.org/wiki/Circular_segment
template <typename token_t>
struct ring_segment
{
token_t left;
token_t right;
constexpr uint64_t span() const noexcept
{
if (wraps())
{
require(left >= right);
return uint64_t(std::numeric_limits<token_t>::max()) - left + right;
}
else
{
require(right >= left);
return right - left;
}
}
constexpr ring_segment()
{
}
constexpr ring_segment(const token_t l, const token_t r)
: left{l}, right{r}
{
}
constexpr void set(const token_t l, const token_t r)
{
left = l;
right = r;
}
// this segment's token
constexpr auto token() const noexcept
{
return right;
}
constexpr bool operator==(const ring_segment &o) const noexcept
{
return left == o.left && right == o.right;
}
constexpr bool operator!=(const ring_segment &o) const noexcept
{
return left != o.left || right != o.right;
}
constexpr bool operator<(const ring_segment &o) const noexcept
{
return left < o.left || (left == o.left && right < o.right);
}
constexpr bool operator>(const ring_segment &o) const noexcept
{
return left > o.left || (left == o.left && right > o.right);
}
constexpr int8_t cmp(const ring_segment &rhs) const noexcept
{
if (tokens_wrap_around(left, right))
{
// there is only one segment that wraps around in the ring
return -1;
}
else if (tokens_wrap_around(rhs.left, rhs.right))
{
// there is only one segment that wraps around in the ring
return 1;
}
else
{
if (right == rhs.right)
return 0;
else if (right > rhs.right)
return 1;
else
return -1;
}
}
static constexpr bool tokens_wrap_around(const token_t &l, const token_t &r) noexcept
{
// true iff extends from last to the first ring segment
return l >= r;
}
bool contains(const ring_segment &that) const noexcept
{
if (left == right)
{
// Full ring always contains all other ranges
return true;
}
const bool thisWraps = tokens_wrap_around(left, right);
const bool thatWraps = tokens_wrap_around(that.left, that.right);
if (thisWraps == thatWraps)
return left <= that.left && that.right <= right;
else if (thisWraps)
{
// wrapping might contain non-wrapping that is contained if both its tokens are in one of our wrap segments
return left <= that.left || that.right <= right;
}
else
{
// non-wrapping cannot contain wrapping
return false;
}
}
// masks a segment `mask` from a segment `s`, if they intersect, and return 0+ segments
//
// It is very important that we get this right, otherwise other methods that depend on it will produce crap
// returns a pair, where the first is true if the segment was intersected by the mask, false otherwise, and the second
// is the number of segments it was partitioned to (can be 0)
std::pair<bool, uint8_t> mask(const ring_segment mask, ring_segment *const out) const noexcept
{
if (false == intersects(mask))
return {false, 0};
else if (mask.contains(*this))
{
// completely masked
return {true, 0};
}
else
{
// partially masked
uint8_t n{0};
if (mask.wraps() || wraps())
n = mask.difference(*this, out);
else if (mask.right > left)
{
if (mask.left < right && mask.left > left)
out[n++] = {left, mask.left};
if (mask.right < right)
out[n++] = {mask.right, right};
}
return {true, n};
}
}
static void mask_segments_impl(const ring_segment *it, const ring_segment *const end, const std::vector<ring_segment> &toExclude, std::vector<ring_segment> *const out)
{
ring_segment list[2];
for (auto i{it}; i != end; ++i)
{
const auto in = *i;
for (const auto mask : toExclude)
{
if (const auto res = in.mask(mask, list); res.first)
{
// OK, either completely or partially masked
if (res.second)
mask_segments_impl(list, list + res.second, toExclude, out);
goto next;
}
}
out->push_back(in);
next:;
}
}
static void mask_segments(const ring_segment *it, const ring_segment *const end, const std::vector<ring_segment> &toExclude, std::vector<ring_segment> *const out)
{
if (toExclude.size())
{
mask_segments_impl(it, end, toExclude, out);
// Just in case (this is cheap)
sort_and_deoverlap(out);
}
else
out->insert(out->end(), it, end);
}
static void mask_segments(const std::vector<ring_segment> &in, const std::vector<ring_segment> &toExclude, std::vector<ring_segment> *const out)
{
mask_segments(in.data(), in.data() + in.size(), toExclude, out);
}
static auto mask_segments(const std::vector<ring_segment> &in, const std::vector<ring_segment> &toExclude)
{
std::vector<ring_segment> out;
mask_segments(in.begin(), in.end(), &out);
return out;
}
// For list of wrapped segments sorted by left token ascending, process the list to produce
// an equivalent set of ranges, sans the overlapping ranges
// it will also merge together ranges
// i.e [(8, 10],(8, 15],(14, 18],(17, 18]] => [ (8, 18] ]
//
// this will only work if the segments are properly sorted. see sort_and_deoverlap()
// This utility method deals with invalid segments as well (e.g you can't really have more than one segments that wrap)
static void deoverlap(std::vector<ring_segment> *const segments)
{
auto out = segments->data();
for (auto *it = segments->data(), *const end = it + segments->size(); it != end;)
{
auto s = *it;
if (it->right <= it->left)
{
// This segment wraps
// deal with e.g [30, 4], [35, 8], [40, 2]
// that'd be an invalid list of segments(there can only be one wrapping segment), but we 'll deal with it anyway
const auto wrappedSegmentIt = it;
for (++it; it != end; ++it)
{
if (it->right > s.right)
s.right = it->right;
}
// we need to potentially drop some of them segments if the wrapping segment overlaps them
if (wrappedSegmentIt != (it = segments->data()) && s.right >= it->right)
{
s.right = it->right;
memmove(it, it + 1, (out - it) * sizeof(ring_segment));
--out;
}
*out++ = s;
break;
}
else
{
for (++it; it != end && ((*it == s) || (it->left >= s.left && s.right > it->left)); ++it)
s.right = it->right;
if (out == segments->data() || false == out[-1].contains(s))
{
// deal with (8, 30],(9, 18]
*out++ = s;
}
}
}
segments->resize(out - segments->data());
if (segments->size() == 1 && segments->back().left == segments->back().right)
{
// spans the whole ring
const auto MinTokenValue = std::numeric_limits<token_t>::min();
segments->pop_back();
segments->push_back({MinTokenValue, MinTokenValue});
}
}
// utility method; sorts segments so that deoverlap() can process them
static void sort_and_deoverlap(std::vector<ring_segment> *const segments)
{
std::sort(segments->begin(), segments->end(), [](const auto &a, const auto &b) { return a.left < b.left || (a.left == b.left && a.right < b.right); });
deoverlap(segments);
}
// Copy of input list, with all segments unwrapped, sorted by left bound, and with overlapping bounds merged
static void normalize(const ring_segment *const segments, const uint32_t segmentsCnt, std::vector<ring_segment> *const out)
{
ring_segment res[2];
for (uint32_t i{0}; i != segmentsCnt; ++i)
{
if (const uint8_t n = segments[i].unwrap(res))
out->insert(out->end(), res, res + n);
}
sort_and_deoverlap(out);
}
static auto normalize(const ring_segment *const segments, const uint32_t segmentsCnt)
{
std::vector<ring_segment> res;
normalize(segments, segmentsCnt, &res);
return res;
}
// true iff segment contains the token
bool contains(const token_t &token) const noexcept
{
if (wraps())
{
// We are wrapping around. Thee interval is (a, b] where a>= b
// then we have 3 cases which hold for any given token k, and we should return true
// 1. a < k
// 2. k <= b
// 3. b < k <= a
return token > left || right >= token;
}
else
{
// Range [a,b], a < b
return token > left && right >= token;
}
}
constexpr bool wraps() const noexcept
{
return tokens_wrap_around(left, right);
}
inline bool intersects(const ring_segment that) const noexcept
{
ring_segment out[2];
return intersection(that, out);
}
static uint8_t _intersection_of_two_wrapping_segments(const ring_segment &first, const ring_segment &that, ring_segment *intersection) noexcept
{
if (that.right > first.left)
{
intersection[0] = ring_segment(first.left, that.right);
intersection[1] = ring_segment(that.left, first.right);
return 2;
}
else
{
intersection[0] = ring_segment(that.left, first.right);
return 1;
}
}
static uint8_t _intersection_of_single_wrapping_segment(const ring_segment &wrapping, const ring_segment &other, ring_segment *intersection) noexcept
{
uint8_t size{0};
if (other.contains(wrapping.right))
intersection[size++] = ring_segment(other.left, wrapping.right);
if (other.contains(wrapping.left) && wrapping.left < other.right)
intersection[size++] = ring_segment(wrapping.left, other.right);
return size;
}
// Returns the intersection of two segments. That can be two disjoint ranges if one is wrapping and the other is not.
// e.g for two nodes G and M, and a query range (D, T]; the intersection is (M-T] and (D-G]
// If there is no interesection, an empty list is returned
//
// (12,7)^(5,20) => [(5,7), (12, 20)]
// ring_segment(10, 100).intersection(50, 120) => [ ring_segment(50, 100) ]
// see also mask()
//
// this is the result of the logical operation: ((*this) & that)
uint8_t intersection(const ring_segment &that, ring_segment *out) const noexcept
{
if (that.contains(*this))
{
*out = *this;
return 1;
}
else if (contains(that))
{
*out = that;
return 1;
}
else
{
const bool thisWraps = tokens_wrap_around(left, right);
const bool thatWraps = tokens_wrap_around(that.left, that.right);
if (!thisWraps && !thatWraps)
{
// Neither wraps; fast path
if (!(left < that.right && that.left < right))
return 0;
*out = ring_segment(std::max<token_t>(left, that.left), std::min<token_t>(right, that.right));
return 1;
}
else if (thisWraps && thatWraps)
{
// Two wrapping ranges always intersect.
// We have determined that neither this or that contains the other, we are left
// with two possibilities and mirror images of each such case:
// 1. both of s (1,2] endpoints lie in this's (A, B] right segment
// 2. only that's start endpoint lies in this's right segment:
if (left < that.left)
return _intersection_of_two_wrapping_segments(*this, that, out);
else
return _intersection_of_two_wrapping_segments(that, *this, out);
}
else if (thisWraps && !thatWraps)
return _intersection_of_single_wrapping_segment(*this, that, out);
else
return _intersection_of_single_wrapping_segment(that, *this, out);
}
}
// Subtracts a portion of this range
// @contained : The range to subtract from `this`: must be totally contained by this range
// @out: List of ranges left after subtracting contained from `this` (@return value is size of @out)
//
// i.e ring_segment(10, 100).subdvide(ring_segment(50, 55)) => [ ring_segment(10, 50), ring_segment(55, 110) ]
//
// You may want to use mask() instead, which is more powerful and covers wrapping cases, etc
uint8_t subdivide(const ring_segment &contained, ring_segment *const out) const noexcept
{
if (contained.contains(*this))
{
// contained actually contains this segment
return 0;
}
uint8_t size{0};
if (left != contained.left)
out[size++] = ring_segment(left, contained.left);
if (right != contained.right)
out[size++] = ring_segment(contained.right, right);
return size;
}
// if this segment wraps, it will return two segments
// 1. (left, std::numeric_limits<token_t>::min())
// 2. (std::numeric_limits<token_t>::min(), right)
// otherwise, it will return itself
uint8_t unwrap(ring_segment *const out) const noexcept
{
const auto MinTokenValue = std::numeric_limits<token_t>::min();
if (false == wraps() || right == MinTokenValue)
{
*out = *this;
return 1;
}
else
{
out[0] = ring_segment(left, MinTokenValue);
out[1] = ring_segment(MinTokenValue, right);
return 2;
}
}
// Compute difference betweet two ring segments
// This is very handy for computing, e.g the segments a node will need to fetch, when moving to a given token
// e.g segment(5, 20).difference(segment(2, 25)) => [ (2, 5), (20, 25) ]
// e.g segment(18, 25).difference(segment(5,20)) => [ (5, 18) ]
//
// In other words, compute the missing segments(ranges) that (*this) is missing from rhs
// There is an opposite operation, mask()
//
// This is the result of the logical operation: (rhs & (~(rhs & (*this))) )
uint8_t difference(const ring_segment &rhs, ring_segment *const result) const
{
ring_segment intersectionSet[2];
switch (intersection(rhs, intersectionSet))
{
case 0:
// not intersected
*result = rhs;
return 1;
case 1:
// compute missing sub-segments
return rhs.subdivide(intersectionSet[0], result);
default:
{
const auto first = intersectionSet[0], second = intersectionSet[1];
ring_segment tmp[2];
rhs.subdivide(first, tmp);
// two intersections; subtracting only one of them will yield a single segment
return tmp[0].subdivide(second, result);
}
}
}
// split the segment in two, halved at segmentToken value (if segmentToken is contained in segment)
//
// i.e ring_segment(10, 20).split(18) => ( ring_segment(10, 18), ring_segment(18, 20) )
std::experimental::optional<std::pair<ring_segment, ring_segment>> split(const token_t segmentToken) const noexcept
{
if (left == segmentToken || right == segmentToken || !contains(segmentToken))
return {};
return {{ring_segment(left, segmentToken), ring_segment(segmentToken, right)}};
}
#ifdef HAVE_SWITCH
void serialize(IOBuffer *const b) const
{
b->Serialize(left);
b->Serialize(right);
}
void deserialize(ISerializer *const b) const
{
b->Unserialize<token_t>(&left);
b->Unserialize<token_t>(&right);
}
#endif
// Make sure segments is properly ordered and deoverlapped
// see sort_and_deoverlap()
static bool segments_contain(const token_t token, const ring_segment *const segments, const uint32_t cnt)
{
if (!cnt)
return false;
int32_t h = cnt - 1;
if (segments[h].wraps())
{
if (segments[h--].contains(token))
{
// there can only be one segment that wraps, and that should be the last one (see sort_and_deoverlap() impl.)
return true;
}
}
for (int32_t l{0}; l <= h;)
{
const auto m = (l & h) + ((l ^ h) >> 1);
const auto segment = segments[m];
if (segment.contains(token))
return true;
else if (token <= segment.left)
h = m - 1;
else
l = m + 1;
}
return false;
}
};
// A Ring of tokens
template <typename T>
struct Ring
{
using token_t = T;
using segment_t = ring_segment<T>;
const T *const tokens;
const uint32_t cnt;
constexpr Ring(const T *const v, const uint32_t n)
: tokens{v}, cnt{n}
{
}
constexpr Ring(const std::vector<T> &v)
: Ring{v.data(), v.size()}
{
}
constexpr auto size() const noexcept
{
return cnt;
}
uint32_t index_of(const T token) const noexcept
{
for (int32_t h = cnt - 1, l{0}; l <= h;)
{
const auto m = (l & h) + ((l ^ h) >> 1);
const auto v = tokens[m];
const auto r = TrivialCmp(token, v);
if (!r)
return m;
else if (r < 0)
h = m - 1;
else
l = m + 1;
}
return UINT32_MAX;
}
inline bool is_set(const T token) const noexcept
{
return index_of(token) != UINT32_MAX;
}
inline uint32_t search(const T token) const noexcept
{
return ConsistentHashing::search(tokens, cnt, token);
}
// In a distributed systems, you should map the token to a node (or the segment index returned by this method)
inline uint32_t index_owner_of(const T token) const noexcept
{
// modulo is not cheap, and comparisons are much cheaper, but branchless is nice
return search(token) % cnt;
}
inline auto token_owner_of(const T token) const noexcept
{
return tokens[index_owner_of(token)];
}
constexpr const T &token_predecessor_by_index(const uint32_t idx) const noexcept
{
return tokens[(idx + (cnt - 1)) % cnt];
}
constexpr const T &token_predecessor(const T token) const noexcept
{
return token_predecessor_by_index(index_of(token));
}
constexpr const T &token_successor_by_index(const uint32_t idx) const noexcept
{
return tokens[(idx + 1) % cnt];
}
constexpr const T &token_successor(const T token) const noexcept
{
return token_successor_by_index(index_of(token));
}
constexpr auto index_segment(const uint32_t idx) const noexcept
{
return ring_segment<T>(tokens[(idx + (cnt - 1)) % cnt], tokens[idx]);
}
// segment in the ring that owns this token
// based on the (prev segment.token, this segment.token] ownership rule
constexpr auto token_segment(const T token) const noexcept
{
return index_segment(index_of(token));
}
// see also sort_and_deoverlap()
void segments(std::vector<ring_segment<T>> *const res) const
{
if (cnt)
{
res->reserve(cnt + 2);
for (uint32_t i{1}; i != cnt; ++i)
res->push_back({tokens[i - 1], tokens[i]});
res->push_back({tokens[cnt - 1], tokens[0]});
}
}
auto segments() const
{
std::vector<ring_segment<T>> res;
segments(&res);
return res;
}
auto tokens_segments(const std::vector<token_t> &t) const
{
std::vector<segment_t> res;
res.reserve(t.size());
for (const auto token : t)
{
const auto idx = index_owner_of(token);
res.push_back({token_predecessor_by_index(idx), token});
}
std::sort(res.begin(), res.end(), [](const auto &a, const auto &b) { return a.left < b.left; });
return res;
}
// Assuming a node is a replica for tokens in segments `current`, and then it assumes ownership of a different
// set of segments, `updated`
//
// This handy utility method will generate a pair of segments list:
// 1. The first is segments the node will need to *fetch* from other nodes in the ring, because it will now be also responsible
// for those segments, but it does not have the data, based on the current owned segments.
// 2. The second is segments the node will need to *stream* to other nodes in the ring, because it will no longer hold data for them.
//
// Obviously, if a node is just introduced to a ring (i.e have only updated and no current segments ), it should
// just fetch data for all the current segments. Conversely, if the node is exiting the ring, it should
// consider streaming all the data it has to other nodes if needed, and not fetch any data to itself.
//
// make sure that current and updated are in order e.g std::sort(start, end, [](const auto &a, const auto &b) { return a.left < b.left; });
//
// Because the output will be an array of segments (_not_ tokens), you will need to determine the segments of the ring that intersect it
// in order to figure out which nodes have which parts of the segments.
//
// This is a fairly expensive method (although it should be easy to optimize it if necessary), but given how rare it should be used, that's not a real concern
//
// Example: current segment [10, 20), updated segment [10, 25)
// Example: current segment [10, 20), updated segment [8, 30)
static auto compute_segments_ownership_updates(const std::vector<segment_t> ¤tSegmentsInput, const std::vector<segment_t> &updatedSegmentsInput)
{
std::vector<segment_t> toFetch, toStream, current, updated, toFetchFinal, toStreamFinal;
segment_t segmentsList[2];
// We need to work on normalized lists of segments
current = currentSegmentsInput;
ring_segment<T>::sort_and_deoverlap(¤t);
updated = updatedSegmentsInput;
ring_segment<T>::sort_and_deoverlap(&updated);
for (const auto curSegment : current)
{
const auto n = toStream.size();
for (const auto updatedSegment : updated)
{
if (curSegment.intersects(updatedSegment))
toStream.insert(toStream.end(), segmentsList, segmentsList + updatedSegment.difference(curSegment, segmentsList));
}
if (toStream.size() == n)
{
// no intersection; accept whole segment
toStream.push_back(curSegment);
}
}
for (const auto updatedSegment : updated)
{
const auto n = toFetch.size();
for (const auto curSegment : current)
{
if (updatedSegment.intersects(curSegment))
toFetch.insert(toFetch.end(), segmentsList, segmentsList + curSegment.difference(updatedSegment, segmentsList));
}
if (toFetch.size() == n)
{
// no intersection; accept whole segment
toFetch.push_back(updatedSegment);
}
}
// normalize output
ring_segment<T>::sort_and_deoverlap(&toFetch);
ring_segment<T>::sort_and_deoverlap(&toStream);
// mask segments:
// from segments to fetch, mask currently owned segments
// from segments to stream, mask segments we will own (updated segments)
ring_segment<T>::mask_segments(toFetch, current, &toFetchFinal);
ring_segment<T>::mask_segments(toStream, updated, &toStreamFinal);
return std::make_pair(toFetchFinal, toStreamFinal);
}
// When a node acquires ring tokens(joins a cluster), it only disupts segments its token(s) fall into
// Assuming a ring of tokens: (10, 100, 150, 180, 200)
// and a node joins a cluster, and acquires token 120
// then it will only affect requests for (100, 120]
// so it will need to fetch content for (100, 120] from somewhere. Where? well, from whichever owned (100, 150]
// which is just the successor node, which we can find using index_owner_of()
// This is a simple replication strategy implementation; we 'll just walk the ring clockwise and collect nodes that own
// the tokens, skipping already collected nodes
//
// EXAMPLE: This is an illustrative example; you shouldn't really use this in production as is
template <typename L>
auto token_replicas_basic(const token_t token, const uint8_t replicationFactor, L &&endpoint_token) const
{
using node_t = typename std::result_of<L(uint32_t)>::type;
std::vector<node_t> nodes;
const auto base = index_owner_of(token);
auto idx = base;
do
{
const auto node = endpoint_token(idx);
if (std::find(nodes.begin(), nodes.end(), node) == nodes.end())
{
nodes.push_back(node);
if (nodes.size() == replicationFactor)
break;
}
idx = (idx + 1) % size();
} while (idx != base);
return nodes;
}
// This generates the lists of tokens and matching nodes that own them based on a new ownership state that results
// from applying the changes in ringTokensNodes
// Specifically, in the resulted topology, current nodes tokens are replaced with their updated set in ringTokensNodes
template <typename node_t>
std::pair<std::vector<token_t>, std::vector<node_t>> new_topology(const node_t *const ringTokensNodes,
const std::unordered_map<node_t, std::vector<token_t>> &futureNodesTokens) const
{
std::vector<token_t> transientRingTokens;
std::vector<node_t> transientRingTokensNodes;
std::unordered_map<token_t, node_t> map;
for (uint32_t i{0}; i != cnt; ++i)
{
const auto token = tokens[i];
if (futureNodesTokens.find(ringTokensNodes[i]) == futureNodesTokens.end())
{
transientRingTokens.push_back(tokens[i]);
map.insert({tokens[i], ringTokensNodes[i]});
}
}
for (const auto &it : futureNodesTokens)
{
const auto node = it.first;
transientRingTokens.insert(transientRingTokens.end(), it.second.data(), it.second.data() + it.second.size());
for (const auto token : it.second)
map.insert({token, node});
}
std::sort(transientRingTokens.begin(), transientRingTokens.end());
// The associated nodes for each token in the transient ring
transientRingTokensNodes.reserve(transientRingTokens.size());
for (const auto token : transientRingTokens)
transientRingTokensNodes.push_back(map[token]);
return {std::move(transientRingTokens), std::move(transientRingTokensNodes)};
}
template <typename node_t, typename L>
static node_t *filter_by_distance(node_t *const nodes, const node_t *const end, L &&l)
{
using dist_t = typename std::result_of<L(node_t)>::type;
dist_t min;
uint32_t out{0};
for (auto it = nodes; it != end; ++it)
{
if (!out)
{
min = l(*it);
nodes[out++] = *it;