forked from antonylesuisse/sonos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsonos.py
executable file
·313 lines (252 loc) · 8.02 KB
/
sonos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#!/usr/bin/env python3
"""
Send audio from Pulse Audio to a Sonos device
by Antony Lesuisse 2020, public domain
"""
import argparse
import socket
import subprocess
import sys
from io import BufferedReader
from queue import Empty, Queue
from threading import Thread
from typing import Any, List, Optional, Union
import soco # type: ignore[import]
HTTP_PORT = 8888
STREAM_NAME = "linux_to_sonos.flac"
def get_ip() -> Any:
"""
Return the main IP address of the system
Returns:
Any: IP address
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(0)
try:
# doesn't even have to be reachable
sock.connect(('10.255.255.255', 1))
ip_addr = sock.getsockname()[0]
except socket.gaierror:
ip_addr = '127.0.0.1'
finally:
sock.close()
return ip_addr
def run(cmd: str, shell: bool = False) -> str:
"""
Function to run a command and return output
Args:
cmd (str): command to run
shell (bool, optional): Whether the supplied command should utilise the shell.
Defaults to False.
Returns:
str: _description_
"""
command: Union[str, List[str]]
if '|' in cmd:
shell = True
command = cmd
else:
command = cmd.split(' ')
try:
proc = subprocess.run(
command,
shell=shell,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=True,
)
except subprocess.CalledProcessError as error:
print(f'Command "{" ".join(error.cmd)}" failed: {error.stdout.decode("utf-8")}')
return ''
return proc.stdout.decode('utf-8').strip()
def sonos_discover(preferred: str = '') -> Optional[soco.core.SoCo]:
"""
Discover a Sonos speaker
Args:
preferred (str, optional): IP of preferred Sonos device. Defaults to ''.
Returns:
Optional[soco.core.SoCo]: Sonos device found
"""
for device in soco.discover():
if not preferred:
print(
'No preferred Sonos device. Returning the first one found: '
f'{device.player_name}'
)
return device
if preferred == device.ip_address:
print(f'Preferred Sonos device found: {device.player_name}')
return device
return None
def sonos_play(sonos: soco.core.SoCo, volume: int) -> None:
"""
Play content on the Sonos device
Args:
sonos (soco.core.SoCo): Sonos device
volume (int): Volume level to set
"""
sonos.clear_queue()
sonos.add_uri_to_queue(f'http://{get_ip()}:{HTTP_PORT}/{STREAM_NAME}')
sonos.play_from_queue(0)
sonos.volume = volume
sonos.play()
def get_all_audio_sinks() -> str:
"""
Return the list of PA sinks
Returns:
str: list of PA sinks
"""
return run("pactl list sinks short | awk '{print $2}'")
def sonos_sink_exists() -> bool:
"""
Check if the Sonos sink exists
Returns:
bool: True if found, False otherwise
"""
return 'Sonos' in get_all_audio_sinks()
def pa_sink_load(volume: int) -> int:
"""
Load the Pulse Audio sink for the Sonos
Args:
volume (int): Initial volume to set
Returns:
int: ID of PA module just loaded. Defaults to 0 if no module was loaded
"""
module_id = 0
sonos_sink = sonos_sink_exists()
if not sonos_sink:
print('Loading PA module')
try:
module_id = int(
run(
'pactl load-module module-combine-sink sink_name=Sonos '
'sink_properties=device.description=Sonos slaves=@DEFAULT_SINK@ channels=2'
)
)
except ValueError:
print('There was an error loading the PA module. Check the daemon is running.')
return 0
sonos_sink = sonos_sink_exists()
if sonos_sink:
run('pactl set-sink-mute @DEFAULT_SINK@ true')
run(f'pactl set-sink-volume Sonos {volume}%')
run('pactl set-default-sink Sonos')
return module_id
def pa_sink_unload(module_id: int = 0) -> None:
"""
Unload the Pule Audio sink for the Sonos
Args:
module_id (int): ID of PA module to unload. Defaults to 0
"""
if module_id:
print('Unloading PA module')
run(f'pactl unload-module {module_id}')
run('pactl set-sink-mute @DEFAULT_SINK@ false')
def vlc() -> subprocess.Popen: # type: ignore[type-arg]
"""
Use VLC to transcode and send the audio as a FLAC stream
Returns:
subprocess.Popen: process handle for VLC
"""
command = (
"/usr/bin/cvlc "
"pulse://Sonos.monitor "
"--sout #transcode{vcodec=none,acodec=flac,"
"ab=1441,channels=2,samplerate=44100,scodec=none}:standard"
f"{{access=http,dst=/{STREAM_NAME}}} "
f'--http-host={get_ip()} '
f'--http-port={HTTP_PORT}'
)
proc = subprocess.Popen( # pylint: disable=consider-using-with
command.split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
return proc
def silence() -> subprocess.Popen: # type: ignore[type-arg]
"""
Play a very quiet Pink noise to avoid sonos to cut when there is silence
Returns:
subprocess.Popen: process handle for ffplay
"""
command = "ffplay -loglevel 24 -nodisp -autoexit -f lavfi -i anoisesrc=c=pink:r=44100:a=0.001"
proc = subprocess.Popen( # pylint: disable=consider-using-with
command.split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return proc
def stream_queue(stream: BufferedReader, queue: Queue) -> None: # type: ignore[type-arg]
"""
Read from a stream and push it to a queue
Args:
stream (BufferedReader): stream to follow
queue (Queue): queue to write to
"""
while not stream.closed:
for line in iter(stream.readline, b''):
queue.put(line)
stream.close()
if __name__ == '__main__':
def parse_args() -> argparse.Namespace:
"""
Parse the command line arguments
Returns:
argparse.Namespace: parsed arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'-d',
'--device',
help='preferred Sonos device to play stream on',
)
parser.add_argument(
'-v',
'--volume',
default=60,
type=int,
)
return parser.parse_args()
def main() -> int:
"""
Main program
"""
args = parse_args()
module_id = pa_sink_load(args.volume)
if not sonos_sink_exists():
print('No Sonos audio sink exists.')
return 5
silence_proc = silence()
sonos = sonos_discover(preferred=args.device)
if not sonos:
print('No Sonos devices found')
return 3
vlc_proc = vlc()
if not vlc_proc.stdout: # or not vlc_proc.stderr:
print('There was a problem with VLC. Check errors.')
return 4
# Set up a thread and queue to read from the VLC CLI output
queue = Queue() # type: ignore[var-annotated]
thread = Thread(target=stream_queue, args=(vlc_proc.stdout, queue))
thread.daemon = True
thread.start()
sonos_play(sonos, args.volume)
try:
while True:
try:
buffer = queue.get().decode().strip()
except Empty:
if vlc_proc.stdout.closed:
print('Waiting for background thread to exit')
thread.join()
raise EOFError from Empty
continue
print(buffer)
except (EOFError, KeyboardInterrupt):
print('Killing ffplay')
silence_proc.terminate()
print('Killing VLC')
vlc_proc.terminate()
pa_sink_unload(module_id)
return 0
sys.exit(main())