-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDisplayServer.py
61 lines (50 loc) · 1.79 KB
/
DisplayServer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import threading
from queue import Queue
from time import sleep
from rich.layout import Layout
from rich.console import Console
from rich.live import Live
from rich.table import Table
from Widgets.SentReceivedWIdget import SentReceivedWidget
from Widgets.StatusHeaderWidget import StatusHeaderWidget, StatusModel
class DisplayServer:
def __init__(
self,
event: threading.Event,
sent_messages_queue: Queue,
received_messages_queue: Queue,
display_server_status: StatusModel,
) -> None:
self.event = event
self.header_widget = StatusHeaderWidget(display_server_status)
self.sent_received_widget = SentReceivedWidget(
sent_messages_queue,
received_messages_queue,
)
self.main_bar = Table()
self.console = Console()
self.layout = Layout()
def start(self):
header = Layout(name="header")
header.size = 6
body = Layout(name="body")
self.layout = Layout(
name="main",
)
self.layout.split_column(header, body)
self.header_widget.setup(self.layout["main"]["header"])
self.sent_received_widget.setup(self.layout["main"]["body"])
def run(self):
with Live(console=self.console, screen=False, refresh_per_second=10) as live:
while not self.event.is_set():
(
messages_sent,
messages_received,
) = self.sent_received_widget.get_messages_stats()
self.header_widget.render(
messages_received=messages_received,
messages_sent=messages_sent,
)
self.sent_received_widget.render()
live.update(self.layout)
sleep(0.1)