-
Notifications
You must be signed in to change notification settings - Fork 0
/
actors.ts
105 lines (90 loc) · 3.06 KB
/
actors.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import { Subject, observeOn, asyncScheduler } from 'rxjs'
type Message = 'Ping' | 'Pong'
type Destination = string | 'Any'
interface IActor<Message> {
readonly eventBus: Subject<{ sender: IActor<Message>, destination: Destination, message: Message }>
readonly name: string
eventId: number
handleMessage: (sender: IActor<Message>, message: Message) => Promise<void>
broadcast: (message: Message, destination: Destination) => void
subscribe: (actor: IActor<Message>, self: IActor<Message>) => void
}
class Actor<M> implements IActor<M> {
readonly name: string
readonly eventBus: Subject<{ sender: IActor<M>, destination: Destination, message: M }>
eventId: number = 0
constructor(name: string) {
this.name = name
this.eventBus = new Subject<{ sender: IActor<M>, message: M, destination: Destination }>()
}
async handleMessage(sender: IActor<M>, message: M) {
this.eventId++
process.stdout.write(`\r${this.name} ${this.eventId}: ${sender.name} ${message}`)
}
broadcast(message: M, destination = 'Any'): void {
this.eventBus.next({ sender: this, message, destination })
}
subscribe(actor: IActor<M>, self: IActor<M> = this): void {
actor.eventBus
.pipe(observeOn(asyncScheduler))
.subscribe(async ({ sender, message, destination }) => {
if (destination === 'Any' || this.name === destination) {
await self.handleMessage(sender, message)
}
})
}
}
class PingPong extends Actor<Message> {
async handleMessage(sender: IActor<Message>, message: Message) {
super.handleMessage(sender, message)
switch (message) {
case 'Ping':
this.broadcast('Pong', sender.name)
break
case 'Pong':
this.broadcast('Ping', sender.name)
break
}
}
}
class ThroughputPerSecond<T extends IActor<Message>> implements IActor<Message> {
eventBus: Subject<{ sender: IActor<Message>, destination: Destination, message: Message }>
name: string
eventId: number
private tic: number
private count = 0
private readonly t: T
constructor(innerActor: T) {
this.eventBus = innerActor.eventBus
this.name = innerActor.name
this.eventId = innerActor.eventId
this.t = innerActor
this.tic = Date.now()
}
broadcast(message: Message, destination: string): void {
this.t.broadcast(message, destination)
}
subscribe(actor: IActor<Message>, self: IActor<Message> = this): void {
this.t.subscribe(actor, self)
}
async handleMessage(sender: IActor<Message>, message: Message) {
this.t.handleMessage(sender, message)
this.count++
const toc = Date.now()
const delta = toc - this.tic
if (delta >= 1000) {
const avgThroughput = this.count / delta
console.log(`\n${this.name} Events ${avg_throughput} per ms`)
this.count = 0
this.tic = Date.now()
}
}
}
const avgPing = new ThroughputPerSecond(new PingPong('C1'))
// Add actors and watch throughput increase linearly.
for (let i = 1; i <= 8; i++) {
const actor = new PingPong(`P${i}`)
actor.subscribe(avgPing)
avgPing.subscribe(actor)
actor.broadcast('Ping')
}