-
Notifications
You must be signed in to change notification settings - Fork 28
/
sample.py
84 lines (58 loc) · 2.17 KB
/
sample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import pyRTOS
# User defined message types start at 128
REQUEST_DATA = 128
SENT_DATA = 129
# self is the thread object this runs in
def sample_task(self):
### Setup code here
### End Setup code
# Pass control back to RTOS
yield
# Thread loop
while True:
# Check messages
msgs = self.recv()
for msg in msgs:
### Handle messages by adding elifs to this
if msg.type == pyRTOS.QUIT: # This allows you to
# terminate a thread.
# This condition may be removed if
# the thread should never terminate.
### Tear down code here
print("Terminating task:", self.name)
print("Terminated by:", msg.source)
### End of Tear down code
return
elif msg.type == REQUEST_DATA: # Example message, using user
# message types
self.send(pyRTOS.Message(SENT_DATA,
self,
msg.source,
"This is data"))
### End Message Handler
### Work code here
# If there is significant code here, yield periodically
# between instructions that are not timing dependent.
# Also, it is generally a good idea to yield after
# I/O commands that return instantly but will require
# some time to complete (like I2C data requests).
# Each task must yield at least one per iteration,
# or it will hog all of the CPU, preventing any other
# task from running.
### End Work code
yield [pyRTOS.timeout(0.5)]
if self.name == "task1":
target = "task2"
else:
target = "task1"
print(self.name, "sending quit message to:", target)
self.send(pyRTOS.Message(pyRTOS.QUIT, self, target))
# Testing message passing system
print(self.name, "sending quit message to:", "task3 (does not exist)")
print("This should silently fail")
self.send(pyRTOS.Message(pyRTOS.QUIT, self, "task3"))
yield [pyRTOS.wait_for_message(self)]
pyRTOS.add_task(pyRTOS.Task(sample_task, name="task1", mailbox=True))
pyRTOS.add_task(pyRTOS.Task(sample_task, name="task2", mailbox=True))
pyRTOS.add_service_routine(lambda: print("Service Routine Executing"))
pyRTOS.start()