-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcluster.ts
195 lines (164 loc) · 6 KB
/
cluster.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import {
Optional, Address, Identifier, Key, Value,
randomlyOccurs, randomInteger, secureRandomUint16
} from './common.ts'
import Peers from './peers.ts'
import { Node, SelfNode, PeerNode, Digest, Diff } from './node.ts'
import { Transport, Message, MessageType } from './transport.ts'
type NodeDiff = [Digest, Diff[], Address?]
const HEARTBEAT_INTERVAL = 1000 // milliseconds
const HEARTBEAT_KEY = '\x01h' as Key
const SYNC_WITH_ROOT_FREQUENCY = 0.2
const SYNC_WITH_INACTIVE_FREQUENCY = 0.1
const SYNC_WITH_COUNT = 4
export default class Cluster {
public name: string
public node: SelfNode // @todo: make private
public peers = new Peers // @todo: make private
private interval: Optional<number>
constructor(
private transport: Transport,
clusterName: string,
nodeName: string
) {
const timestamp = Date.now().toString(36)
const nonce = secureRandomUint16().toString(16).padStart(4, '0')
this.name = `${clusterName}/${nodeName}`
const identifier = `${this.name}:${timestamp}-${nonce}`
this.node = new SelfNode(identifier as Identifier, this.transport.local())
}
get identifier() { return this.node.identifier }
get sequence() { return this.node.sequence }
get peerCount() { return this.peers.count }
public async start() {
this.interval = setInterval(() => {
this.peers.prune()
this.define(HEARTBEAT_KEY, Date.now())
}, HEARTBEAT_INTERVAL)
this.define(HEARTBEAT_KEY, Date.now())
for await (const [from, data] of this.transport.recv()) {
this.process(from, data)
}
}
public stop() {
this.transport.stop()
if (this.interval) clearInterval(this.interval)
this.interval = undefined
}
public define(key: Key, value: Value) {
this.node.set(key, value)
if (this.interval) this.sync()
}
private sync() {
const message: Message = [MessageType.SYN, this.digest(), []]
for (const target of this.targets()) this.transport.send(target, message)
}
private process(from: Address, [type, digests, diffs]: Message): void {
let requests: Digest[] = []
let responses: NodeDiff[] = []
if (type === MessageType.SYN) {
[requests, responses] = this.processDigest(digests)
} else if (type === MessageType.ACK) {
this.processDiffs(diffs)
responses = this.processRequests(digests)
} else {
console.error(`unknown message type: ${type}`)
}
if (requests.length > 0 || responses.length > 0) {
this.transport.send(from, [MessageType.ACK, requests, responses])
}
}
private digest() {
return [this.node.digest, ...this.peers.digest()]
}
private nodeFor(identifier: Identifier): Optional<Node> {
if (identifier === this.node.identifier) return this.node
return this.peers.get(identifier)
}
private processDigest(digest: Digest[]): [Digest[], NodeDiff[]] {
const requests: Digest[] = []
const diffs: NodeDiff[] = []
const actives = this.peers.actives()
actives.add(this.node)
for (const [identifier, sequence] of digest) {
const node = this.nodeFor(identifier)
if (!node) {
// unknown node, so request all info on it.
requests.push([identifier, 0])
continue
}
actives.delete(node)
if (node.sequence < sequence) { // peer's info is newer?
if (this.node === node) {
// we received a digest claiming to have a newer version of ourself. this should not happen.
console.error(`received digest for ourself with a higher sequence (${identifier} @ ${sequence} > ${node.sequence})`)
// @note: should we shutdown? something else? jump sequence and send update with current state?
continue
}
requests.push([identifier, node.sequence])
} else if (node.sequence > sequence) { // peer's info is older
diffs.push([node.digest, node.diff(sequence)])
}
}
// add diffs for any active nodes we have that are not in the digest
for (const node of actives) {
diffs.push([node.digest, node.diff(0), node.address])
}
return [requests, diffs]
}
private processDiffs(diffs: NodeDiff[]) {
for (const [[identifier, sequence], updates, address] of diffs) {
let node = this.nodeFor(identifier)
if (this.node === node) {
// we received an update for ourself. this should not happen.
console.error(`received diffs to update ourself (${identifier} @ ${sequence})`)
// @note: should we shutdown? something else?
continue
}
if (!node && address) {
this.peers.add(node = new PeerNode(identifier, address))
}
if (node) {
(node as PeerNode).apply(sequence, updates)
}
}
}
private processRequests(requests: Digest[]): NodeDiff[] {
const diffs: NodeDiff[] = []
for (const [identifier, sequence] of requests) {
const node = this.nodeFor(identifier)
if (node && node.sequence > sequence) {
const diff: NodeDiff = [node.digest, node.diff(sequence)]
if (sequence === 0) diff.push(node.address)
diffs.push(diff)
}
}
return diffs
}
private randomRoot(): Optional<Address> {
const roots = this.transport.roots()
return roots[randomInteger(roots.length)]
}
private targets(): Set<Address> {
// if we have no peers yet, target the roots
if (this.peers.count === 0) return new Set(this.transport.roots())
const sample = new Set<Address>
// add the next node on peer ring
const node = this.peers.next()
if (node) sample.add(node.address)
// sometimes, add a root
if (randomlyOccurs(SYNC_WITH_ROOT_FREQUENCY)) {
const address = this.randomRoot()
if (address) sample.add(address)
}
// sometimes, add an inactive
if (randomlyOccurs(SYNC_WITH_INACTIVE_FREQUENCY)) {
const node = this.peers.randomInactive()
if (node) sample.add(node.address)
}
// add random actives to fill
const actives = this.peers.randomActives(SYNC_WITH_COUNT - sample.size)
for (const node of actives) sample.add(node.address)
return sample
}
}