-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvoice_controller.py
100 lines (80 loc) · 3.17 KB
/
voice_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import threading
from time import sleep, time
from typing import Callable, Dict, List
from basic_speech_to_text import speech_to_text, is_keyword_said, is_wake_up_word_said
from plant_intent_recognizer.detect_intent import RasaIntent, Intent
CALLBACK_INTENTS: Dict[Intent, List[Callable[[], None]]] = {}
def register_function_for_intent(intent: Intent):
"""Register a function to be called every time an intent is detected by VoiceController"""
def inner_decorator(f):
def wrapped(*args, **kwargs):
response = f(*args, **kwargs)
return response
print(f"Registering {f} for intent: {intent.value}")
functions = CALLBACK_INTENTS.get(intent, [])
functions.append(f)
CALLBACK_INTENTS[intent] = functions
return wrapped
return inner_decorator
def _trigger_function_on_intent(intent: Intent):
"""Trigger all function registered for this intent"""
if intent not in CALLBACK_INTENTS:
return
functions = CALLBACK_INTENTS[intent]
for f in functions:
f()
class VoiceController:
def __init__(self, active_time_delay=10):
"""
:param active_time_delay time in seconds after the keyword was said before being not "active"
"""
self._rasa_intent = RasaIntent()
self.active = False
self._stop = False
self.active_time_delay = active_time_delay
self.last_active_time = None
self._thread = threading.Thread(target=self.run, args=())
self._thread.daemon = True # Daemonize thread
def stop(self):
"""Stopping gracefully, might take a few seconds"""
self._stop = True
self._thread.join()
def start(self):
self._thread.start() # Call run()
def run(self):
self._stop = False
while not self._stop:
if self.active: # We actively listen to user command
text = speech_to_text()
print(f"text: {text}", flush=True)
if text:
intent = self._rasa_intent.detect_intent(text)
print(f"intent: {intent}\n", flush=True)
_trigger_function_on_intent(intent)
self.last_active_time = time()
elif time() - self.last_active_time > self.active_time_delay:
print("SLEEP MODE", flush=True)
self.active = False
elif is_wake_up_word_said():
print("ACTIVE MODE", flush=True)
self.active = True
self.last_active_time = time()
else:
print("😴", end='', flush=True) # Still in sleep mode
if __name__ == '__main__':
"""Example on how to use the register_function_for_intent wrapper"""
@register_function_for_intent(intent=Intent.GREET)
def greeting():
print("Hello !")
@register_function_for_intent(intent=Intent.GREET)
def greeting2():
print("Hello 2 !")
@register_function_for_intent(intent=Intent.GOODBYE)
def goodbye():
print("goodbye !")
vc = VoiceController()
vc.start()
print("I can continue to do stuff")
sleep(60)
print("Time to stop")
vc.stop()