Skip to content

Commit

Permalink
Make zeroconf advertisement more robust to ip address changes
Browse files Browse the repository at this point in the history
  • Loading branch information
linknum23 authored and Lohrer committed Dec 28, 2023
1 parent 71142b4 commit e8be7c7
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 89 deletions.
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ good-names=i,
c,
id,
ex,
ok,
Run,

[FORMAT]
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# AmpliPi Software Releases

## Upcoming Release
* Make zeroconf advertisement more robust to ip address changes
* Remove deprecated old zeroconf advertisement

## 0.3.1
* System
* Fix bug between various hardware versions
Expand Down
103 changes: 67 additions & 36 deletions amplipi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
import netifaces as ni
from socket import gethostname, inet_aton
from zeroconf import IPVersion, ServiceInfo, Zeroconf
from multiprocessing import Queue
from multiprocessing import Event, Queue
from multiprocessing.synchronize import Event as SyncEvent

# amplipi
import amplipi.utils as utils
Expand Down Expand Up @@ -897,60 +898,78 @@ def get_ip_info(iface: str = 'eth0') -> Tuple[Optional[str], Optional[str]]:
return None, None


def advertise_service(port, que: Queue):
def advertise_service(port, event: SyncEvent):
""" Advertise the AmpliPi api via zeroconf, can be verified with 'avahi-browse -ar'
Expected to be run as a seperate process, eg:
q = Queue()
ad = Process(target=amplipi.app.advertise_service, args=(5000, q))
event = multiprocessing.Event()
ad = Process(target=amplipi.app.advertise_service, args=(5000, event))
ad.start()
...
q.put('done')
event.set()
ad.join()
NOTE: multiprocessing.Event() is a function that returns a multiprocessing.synchronize.Event type
Here the type is aliased to SyncEvent
"""
def ok():
""" Was a stop requested by the parent process? """
return not event.is_set()

while ok():
try:
_advertise_service(port, ok)
except Exception as exc:
if not 'change registration' in str(exc):
print(f'Failed to advertise AmpliPi service, retrying in 30s: {exc}')
# delay for a bit after a failure
delay_ct = 300
while ok() and delay_ct > 0:
sleep(0.1)
delay_ct -= 1


def _find_best_iface(ifaces: List[str], ok: Callable) -> Union[Tuple[str, str, str], Tuple[None, None, None]]:
# Try to find the best interface to advertise on,
# retrying in case DHCP resolution is delayed
ip_addr, mac_addr = None, None
retry_count = 5
while ok() and not (ip_addr and mac_addr) and retry_count > 0:
for iface in ifaces:
ip_addr, mac_addr = get_ip_info(iface)
if ip_addr and mac_addr:
return ip_addr, mac_addr, iface
sleep(2) # wait a bit in case this was started before DHCP was started
retry_count -= 1
return None, None, None


def _advertise_service(port: int, ok: Callable) -> None:
""" Underlying advertisement, can throw Exceptions when network is not connected """

hostname = f'{gethostname()}.local'
url = f'http://{hostname}'

# search for the best interface to advertise on
# enumerate interface to advertise on, starting with the pi defaults first
# ordering is important, as the pi defaults will be tried first
ifaces = ['eth0', 'wlan0'] # default pi interfaces
try:
for iface in ni.interfaces():
if iface.startswith('w') or iface.startswith('e'):
ifaces.append(iface)
except:
pass
ip_addr, mac_addr = None, None
for iface in ifaces:
ip_addr, mac_addr = get_ip_info(iface)
if ip_addr and mac_addr:
break # take the first good interface found

ip_addr, mac_addr, good_iface = _find_best_iface(ifaces, ok)

if not ip_addr:
print(f'AmpliPi zeroconf - unable to register service on one of {ifaces}, \
they all are either not available or have no IP address.')
print(f'AmpliPi zeroconf - is this running on AmpliPi?')
ip_addr = '0.0.0.0' # Any hosted ip on this device
# Fallback to any hosted ip on this device.
# This will be resolved to an ip address by the advertisement
ip_addr = '0.0.0.0'
if port != 80:
url += f':{port}'
info_deprecated = ServiceInfo(
'_http._tcp.local.',
# this is named AmpliPi-api to distinguish from the common Spotify/Airport name of AmpliPi
'amplipi-api._http._tcp.local.',
addresses=[inet_aton(ip_addr)],
port=port,
properties={
# standard info
'path': '/api/',
# extra info - for interfacing
'name': 'AmpliPi',
'vendor': 'MicroNova',
'version': utils.detect_version(),
# extra info - for user
'web_app': url,
'documentation': f'{url}/doc'
},
server=f'{hostname}.', # Trailing '.' is required by the SRV_record specification
)

info = ServiceInfo(
# use a custom type to easily support multiple amplipi device enumeration
Expand All @@ -973,17 +992,29 @@ def advertise_service(port, que: Queue):
server=f'{hostname}.', # Trailing '.' is required by the SRV_record specification
)

if not ok():
print("AmpliPi zeroconf - cancelled")
return

print(f'AmpliPi zeroconf - registering service: {info}')
# right now the AmpliPi webserver is ipv4 only
zeroconf = Zeroconf(ip_version=IPVersion.V4Only, interfaces=[ip_addr])
zeroconf.register_service(info_deprecated, cooperating_responders=True)
zeroconf.register_service(info)
print('AmpliPi zeroconf - finished registering service')
try:
while que.empty():
sleep(0.1)
except:
pass
# poll for changes to the IP address
# this attempts to handle events like router/switch resets
while ok():
delay_ct = 100
while ok() and delay_ct > 0:
sleep(0.1)
delay_ct -= 1
if ok():
new_ip_addr, _, new_iface = _find_best_iface(ifaces, ok)
if new_ip_addr != ip_addr:
print(f'AmpliPi zeroconf - IP address changed from {ip_addr} ({good_iface}) to {new_ip_addr} ({new_iface})')
print('AmpliPi zeroconf - Registration change needed')
raise Exception("change registration")
finally:
print('AmpliPi zeroconf - unregistering service')
zeroconf.unregister_service(info)
Expand Down
12 changes: 7 additions & 5 deletions amplipi/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""

import os
from multiprocessing import Process, Queue
from multiprocessing import Process, Event
import uvicorn
import amplipi.app

Expand All @@ -35,15 +35,17 @@
application = amplipi.app.create_app(delay_saves=True, mock_ctrl=MOCK_CTRL, mock_streams=MOCK_STREAMS)

# advertise the service here, to avoid adding bloat to underlying app, especially for test startup
# this needs to be done as a separate process to avoid interfering with webserver (ZeroConf makes its own event loop)
zc_que: "Queue[str]" = Queue()
zc_reg = Process(target=amplipi.app.advertise_service, args=(PORT, zc_que))
# NOTE: Zeroconf advertisements need to be done as a separate process to avoid blocking the
# webserver startup since behind the scenes zeroconf makes its own event loop.
zc_event = Event()
zc_reg = Process(target=amplipi.app.advertise_service, args=(PORT, zc_event))
zc_reg.start()


@application.on_event('shutdown')
def on_shutdown():
zc_que.put('done')
""" Notify the mdns advertisement to shutdown"""
zc_event.set()
zc_reg.join()


Expand Down
54 changes: 6 additions & 48 deletions tests/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import os
from copy import deepcopy # copy test config

import time

import pytest
from fastapi.testclient import TestClient

Expand Down Expand Up @@ -985,56 +983,16 @@ def test_generate(client):
# if os.path.exists('{}/{}'.format(fullpath, fn)):
# os.remove('{}/{}'.format(fullpath, fn))

def test_zeroconf_deprecated():
""" Unit test for older, deprecated zeroconf advertisement """
# TODO: migrate this test into its own module
from zeroconf import Zeroconf, ServiceStateChange, ServiceBrowser, IPVersion
from time import sleep
from multiprocessing import Process, Queue

AMPLIPI_ZC_NAME = 'amplipi-api._http._tcp.local.'

services_advertised = {}
def on_service_state_change(zeroconf: Zeroconf, service_type: str, name: str, state_change: ServiceStateChange):
if state_change is ServiceStateChange.Added:
info = zeroconf.get_service_info(service_type, name)
if info and info.port != 80: # ignore the actual amplipi service on your network
services_advertised[name] = info

# advertise amplipi-api service (start this before the listener to verify it can be found after advertisement)
q = Queue()
zc_reg = Process(target=amplipi.app.advertise_service, args=(9898,q))
zc_reg.start()
sleep(4) # wait for a bit to make sure the service is started

# start listener that adds available services
zeroconf = Zeroconf(ip_version=IPVersion.V4Only)
services = ["_http._tcp.local."]
_ = ServiceBrowser(zeroconf, services, handlers=[on_service_state_change])

# wait enough time for a response from the serice
sleep(2)

# stop the advertiser
q.put('done')
zc_reg.join()

# stop the listener
zeroconf.close()

# check advertisememts
assert AMPLIPI_ZC_NAME in services_advertised
assert services_advertised[AMPLIPI_ZC_NAME].port == 9898

def test_zeroconf():
""" Unit test for zeroconf advertisement """
# TODO: migrate this test into its own module
from zeroconf import Zeroconf, ServiceStateChange, ServiceBrowser, IPVersion
from time import sleep
from multiprocessing import Process, Queue
from multiprocessing import Process, Event

# get default network interface
iface = ni.gateways()['default'][ni.AF_INET][1]
FAKE_PORT = 9898

# first time ni.ifaddresses is called in the CI system it fails
try:
Expand All @@ -1051,8 +1009,8 @@ def on_service_state_change(zeroconf: Zeroconf, service_type: str, name: str, st
services_advertised[name] = info

# advertise amplipi-api service (start this before the listener to verify it can be found after advertisement)
q = Queue()
zc_reg = Process(target=amplipi.app.advertise_service, args=(9898,q))
event = Event()
zc_reg = Process(target=amplipi.app.advertise_service, args=(FAKE_PORT,event))
zc_reg.start()
sleep(4) # wait for a bit to make sure the service is started

Expand All @@ -1074,15 +1032,15 @@ def on_service_state_change(zeroconf: Zeroconf, service_type: str, name: str, st
AMPLIPI_ZC_NAME = f'amplipi-{mac_addr}._amplipi._tcp.local.'

# stop the advertiser
q.put('done')
event.set()
zc_reg.join()

# stop the listener
zeroconf.close()

# check advertisememts
assert AMPLIPI_ZC_NAME in services_advertised
assert services_advertised[AMPLIPI_ZC_NAME].port == 9898
assert services_advertised[AMPLIPI_ZC_NAME].port == FAKE_PORT

@pytest.mark.parametrize('zid', base_zone_ids())
def test_set_zone_vol(client, zid):
Expand Down

0 comments on commit e8be7c7

Please sign in to comment.