diff --git a/CHANGELOG.md b/CHANGELOG.md index 21f1eae..c7fceff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +### Added +- Support for UDMP + +### Fixed +- Support for CSRF + +## [2.20.1] - 2020-03-30 +### Fixed +- Lint failures in controller.py + +## [2.20.0] - 2020-03-30 +### Added +- CHANGELOG +- Added support for UnifiOS: `version = 'unifiOS'` ## [2.19.0] - 2019-10-28 ### Added diff --git a/README.md b/README.md index 770b772..3d32c88 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ Create a Controller object. - `username` -- the username to log in with - `password` -- the password to log in with - `port` -- the port of the controller host - - `version` -- the base version of the controller API [v4|v5] + - `version` -- the base version of the controller API [v4|v5|unifiOS|UDMP-unifiOS] - `site_id` -- the site ID to access - `ssl_verify` -- Verify the controllers SSL certificate, default=True, can also be False or "path/to/custom_cert.pem" @@ -208,6 +208,29 @@ Gets the current state & configuration of the given device based on its MAC Addr - `target_mac` -- MAC address of the device +### `get_radius_users(self)` +Returns a list of all RADIUS users, name, password, 24 digit user id, and 24 digit site id. + +### `add_radius_user(self, name, password)` +Add a new RADIUS user with this username and password. + +- `name` -- the new user's username +- `password` -- the new user's password + +### `update_radius_user(self, name, password, id)` +Update a RADIUS user to this new username and password. +Requires the user's 24 digit user id, which can be gotten from `get_radius_users(self)`. + +- `name` -- the user's new username +- `password` -- the user's new password +- `id` -- the user's 24 digit user id. + +### `delete_radius_user(self, id)` +Delete a RADIUS user. +Requires the user's 24 digit user id, which can be gotten from `get_radius_users(self)`. + +- `id` -- the user's 24 digit user id. + ### `get_switch_port_overrides(self, target_mac)` Gets a list of port overrides, in dictionary format, for the given target MAC address. The dictionary contains the port_idx, portconf_id, poe_mode, & name. @@ -232,8 +255,17 @@ The following small utilities are bundled with the API: ### unifi-ls-clients -Lists the currently active clients on the networks. Takes parameters for -controller, username, password, controller version and site ID (UniFi >= 3.x) +Lists the currently active clients on the networks. Can take the following parameters: +|Parameters |Description |Default | +| ------------- |---------------------------------------| -------| +| -c | controller address |unifi | +| -u | controller username |admin | +| -p | controller password | | +| -b | controller port |8443 | +| -v | controller base version |v5 | +| -s | site ID, UniFi >=3.x only |default | +| -V | ignore SSL certificates | | +| -C | verify with ssl certificate pem file | | ``` jb@unifi:~ % unifi-ls-clients -c localhost -u admin -p p4ssw0rd -v v3 -s default diff --git a/pyunifi/__init__.py b/pyunifi/__init__.py index f026d48..23557e5 100644 --- a/pyunifi/__init__.py +++ b/pyunifi/__init__.py @@ -1,5 +1,9 @@ +""" +Python __init__ to interact with UniFi Controller +""" +import urllib3 + def http_debug_log_stderr(): """Dump requests urllib3 debug messages to stderr""" - import requests - requests.packages.urllib3.add_stderr_logger() + urllib3.add_stderr_logger() diff --git a/pyunifi/controller.py b/pyunifi/controller.py index 2334312..a56bbf2 100644 --- a/pyunifi/controller.py +++ b/pyunifi/controller.py @@ -1,40 +1,46 @@ -import json -import logging -import requests +""" +Python package to interact with UniFi Controller +""" import shutil import time import warnings +import json +import logging + +import requests +from urllib3.exceptions import InsecureRequestWarning """For testing purposes: logging.basicConfig(filename='pyunifi.log', level=logging.WARN, format='%(asctime)s %(message)s') -""" -log = logging.getLogger(__name__) +""" # pylint: disable=W0105 +CONS_LOG = logging.getLogger(__name__) class APIError(Exception): - pass + """API Error exceptions""" -def retry_login(func, *args, **kwargs): +def retry_login(func, *args, **kwargs): # pylint: disable=w0613 """To reattempt login if requests exception(s) occur at time of call""" + def wrapper(*args, **kwargs): try: try: return func(*args, **kwargs) - except (requests.exceptions.RequestException, - APIError) as err: - log.warning("Failed to perform %s due to %s" % (func, err)) + except (requests.exceptions.RequestException, APIError) as err: + CONS_LOG.warning("Failed to perform %s due to %s", func, err) controller = args[0] - controller._login() + controller._login() # pylint: disable=w0212 return func(*args, **kwargs) except Exception as err: raise APIError(err) + return wrapper -class Controller(object): +class Controller: # pylint: disable=R0902,R0904 """Interact with a UniFi controller. @@ -54,8 +60,16 @@ class Controller(object): """ - def __init__(self, host, username, password, port=8443, - version='v5', site_id='default', ssl_verify=True): + def __init__( # pylint: disable=r0913 + self, + host, + username, + password, + port=8443, + version="v5", + site_id="default", + ssl_verify=True, + ): """ :param host: the address of the controller host; IP or name :param username: the username to log in with @@ -66,82 +80,126 @@ def __init__(self, host, username, password, port=8443, :param ssl_verify: Verify the controllers SSL certificate, can also be "path/to/custom_cert.pem" """ + self.log = logging.getLogger(__name__ + ".Controller") - if float(version[1:]) < 4: - raise APIError("%s controllers no longer supported" % version) self.host = host - self.port = port + self.headers = None self.version = version + self.port = port self.username = username self.password = password self.site_id = site_id - self.url = 'https://' + host + ':' + str(port) + '/' self.ssl_verify = ssl_verify - if ssl_verify is False: - warnings.simplefilter("default", category=requests.packages. - urllib3.exceptions.InsecureRequestWarning) + if version == "unifiOS": + self.url = "https://" + host + "/proxy/network/" + self.auth_url = self.url + "api/login" + elif version == "UDMP-unifiOS": + self.auth_url = "https://" + host + "/api/auth/login" + self.url = "https://" + host + "/proxy/network/" + elif version[:1] == "v": + if float(version[1:]) < 4: + raise APIError("%s controllers no longer supported" % version) + self.url = "https://" + host + ":" + str(port) + "/" + self.auth_url = self.url + "api/login" + else: + raise APIError("%s controllers no longer supported" % version) - self.session = requests.Session() - self.session.verify = ssl_verify + if ssl_verify is False: + warnings.simplefilter("default", category=InsecureRequestWarning) - self.log.debug('Controller for %s', self.url) + self.log.debug("Controller for %s", self.url) self._login() @staticmethod def _jsondec(data): obj = json.loads(data) - if 'meta' in obj: - if obj['meta']['rc'] != 'ok': - raise APIError(obj['meta']['msg']) - if 'data' in obj: - return obj['data'] + if "meta" in obj: + if obj["meta"]["rc"] != "ok": + raise APIError(obj["meta"]["msg"]) + if "data" in obj: + result = obj["data"] else: - return obj + result = obj + + return result def _api_url(self): - return self.url + 'api/s/' + self.site_id + '/' + return self.url + "api/s/" + self.site_id + "/" @retry_login def _read(self, url, params=None): # Try block to handle the unifi server being offline. - r = self.session.get(url, params=params) - return self._jsondec(r.text) + response = self.session.get(url, params=params, headers=self.headers) + + if response.headers.get("X-CSRF-Token"): + self.headers = {"X-CSRF-Token": response.headers["X-CSRF-Token"]} + + return self._jsondec(response.text) def _api_read(self, url, params=None): return self._read(self._api_url() + url, params) @retry_login def _write(self, url, params=None): - r = self.session.post(url, json=params) - return self._jsondec(r.text) + response = self.session.post(url, json=params, headers=self.headers) + + if response.headers.get("X-CSRF-Token"): + self.headers = {"X-CSRF-Token": response.headers["X-CSRF-Token"]} + + return self._jsondec(response.text) def _api_write(self, url, params=None): return self._write(self._api_url() + url, params) @retry_login def _update(self, url, params=None): - r = self.session.put(url, json=params) - return self._jsondec(r.text) + response = self.session.put(url, json=params, headers=self.headers) + + if response.headers.get("X-CSRF-Token"): + self.headers = {"X-CSRF-Token": response.headers["X-CSRF-Token"]} + + return self._jsondec(response.text) def _api_update(self, url, params=None): return self._update(self._api_url() + url, params) + @retry_login + def _delete(self, url, params=None): + response = self.session.delete(url, json=params, headers=self.headers) + + if response.headers.get("X-CSRF-Token"): + self.headers = {"X-CSRF-Token": response.headers["X-CSRF-Token"]} + + return self._jsondec(response.text) + + def _api_delete(self, url, params=None): + return self._delete(self._api_url() + url, params) + def _login(self): - log.debug('login() as %s', self.username) + self.log.debug("login() as %s", self.username) + self.session = requests.Session() + self.session.verify = self.ssl_verify + + response = self.session.post( + self.auth_url, + json={"username": self.username, "password": self.password}, + headers=self.headers, + ) - # XXX Why doesn't passing in the dict work? - params = {'username': self.username, 'password': self.password} - login_url = self.url + 'api/login' + if response.headers.get("X-CSRF-Token"): + self.headers = {"X-CSRF-Token": response.headers["X-CSRF-Token"]} - r = self.session.post(login_url, json=params) - if r.status_code != 200: - raise APIError("Login failed - status code: %i" % r.status_code) + if response.status_code != 200: + raise APIError( + "Login failed - status code: %i" % response.status_code + ) def _logout(self): - log.debug('logout()') - self._api_write('logout') + self.log.debug("logout()") + self._api_write("logout") + self.session.close() def switch_site(self, name): """ @@ -150,19 +208,27 @@ def switch_site(self, name): :param name: Site Name :return: True or APIError """ + + # TODO: Not currently supported on UDMP as site support doesn't exist. + if self.version == "UDMP-unifiOS": + raise APIError( + "Controller version not supported: %s" % self.version + ) + for site in self.get_sites(): - if site['desc'] == name: - self.site_id = site['name'] + if site["desc"] == name: + self.site_id = site["name"] return True raise APIError("No site %s found" % name) def get_alerts(self): """Return a list of all Alerts.""" - return self._api_write('stat/alarm') + return self._api_write("stat/alarm") def get_alerts_unarchived(self): """Return a list of Alerts unarchived.""" - return self._api_write('stat/alarm', params={'archived': False}) + params = {"archived": False} + return self._api_write("stat/alarm", params=params) def get_statistics_last_24h(self): """Returns statistical data of the last 24h""" @@ -170,24 +236,24 @@ def get_statistics_last_24h(self): def get_statistics_24h(self, endtime): """Return statistical data last 24h from time""" - params = { - 'attrs': ["bytes", "num_sta", "time"], - 'start': int(endtime - 86400) * 1000, - 'end': int(endtime - 3600) * 1000} - return self._write(self._api_url() + 'stat/report/hourly.site', params) + "attrs": ["bytes", "num_sta", "time"], + "start": int(endtime - 86400) * 1000, + "end": int(endtime - 3600) * 1000, + } + return self._api_write("stat/report/hourly.site", params) def get_events(self): """Return a list of all Events.""" - return self._api_read('stat/event') + return self._api_read("stat/event") def get_aps(self): """Return a list of all APs, with significant information about each. """ # Set test to 0 instead of NULL - params = {'_depth': 2, 'test': 0} - return self._api_read('stat/device', params) + params = {"_depth": 2, "test": 0} + return self._api_read("stat/device", params) def get_client(self, mac): """Get details about a specific client""" @@ -195,51 +261,55 @@ def get_client(self, mac): # stat/user/ works better than stat/sta/ # stat/sta seems to be only active clients # stat/user includes known but offline clients - return self._api_read('stat/user/' + mac)[0] + return self._api_read("stat/user/" + mac)[0] def get_clients(self): """Return a list of all active clients, with significant information about each. """ - return self._api_read('stat/sta') + return self._api_read("stat/sta") def get_users(self): """Return a list of all known clients, with significant information about each. """ - return self._api_read('list/user') + return self._api_read("list/user") def get_user_groups(self): """Return a list of user groups with its rate limiting settings.""" - return self._api_read('list/usergroup') + return self._api_read("list/usergroup") def get_sysinfo(self): """Return basic system informations.""" - return self._api_read('stat/sysinfo') + return self._api_read("stat/sysinfo") def get_healthinfo(self): """Return health information.""" - return self._api_read('stat/health') + return self._api_read("stat/health") def get_sites(self): """Return a list of all sites, with their UID and description""" - return self._read(self.url + 'api/self/sites') + return self._read(self.url + "api/self/sites") def get_wlan_conf(self): """Return a list of configured WLANs with their configuration parameters. """ - return self._api_read('list/wlanconf') + return self._api_read("list/wlanconf") - def _run_command(self, command, params={}, mgr='stamgr'): - log.debug('_run_command(%s)', command) - params.update({'cmd': command}) - return self._write(self._api_url() + 'cmd/' + mgr, params=params) + def _run_command(self, command, params=None, mgr="stamgr"): + if params is None: + params = {} + self.log.debug("_run_command(%s)", command) + params.update({"cmd": command}) + return self._api_write("cmd/" + mgr, params=params) - def _mac_cmd(self, target_mac, command, mgr='stamgr', params={}): - log.debug('_mac_cmd(%s, %s)', target_mac, command) - params['mac'] = target_mac + def _mac_cmd(self, target_mac, command, mgr="stamgr", params=None): + if params is None: + params = {} + self.log.debug("_mac_cmd(%s, %s)", target_mac, command) + params["mac"] = target_mac return self._run_command(command, params, mgr) def get_device_stat(self, target_mac): @@ -251,9 +321,42 @@ def get_device_stat(self, target_mac): capabilities and configuration of the device :rtype: dict() """ - log.debug('get_device_stat(%s)', target_mac) + self.log.debug("get_device_stat(%s)", target_mac) params = {"macs": [target_mac]} - return self._api_read('stat/device/' + target_mac, params)[0] + return self._api_read("stat/device/" + target_mac, params)[0] + + def get_radius_users(self): + """Return a list of all users, with their + name, password, 24 digit user id, and 24 digit site id + """ + return self._api_read('rest/account') + + def add_radius_user(self, name, password): + """Add a new user with this username and password + :param name: new user's username + :param password: new user's password + :returns: user's name, password, 24 digit user id, and 24 digit site id + """ + params = {'name': name, 'x_password': password} + return self._api_write('rest/account/', params) + + def update_radius_user(self, name, password, user_id): + """Update a user to this new username and password + :param name: user's new username + :param password: user's new password + :param id: the user's 24 digit user id, from get_radius_users() or add_radius_user() + :returns: user's name, password, 24 digit user id, and 24 digit site id + :returns: [] if no change was made + """ + params = {'name': name, '_id': user_id, 'x_password': password} + return self._api_update('rest/account/' + user_id, params) + + def delete_radius_user(self, user_id): + """Delete user + :param id: the user's 24 digit user id, from get_radius_users() or add_radius_user() + :returns: [] if successful + """ + return self._api_delete('rest/account/' + user_id) def get_switch_port_overrides(self, target_mac): """Gets a list of port overrides, in dictionary @@ -267,8 +370,8 @@ def get_switch_port_overrides(self, target_mac): 'poe_mode': str, 'name': str } ] :rtype: list( dict() ) """ - log.debug('get_switch_port_overrides(%s)', target_mac) - return self.get_device_stat(target_mac)['port_overrides'] + self.log.debug("get_switch_port_overrides(%s)", target_mac) + return self.get_device_stat(target_mac)["port_overrides"] def _switch_port_power(self, target_mac, port_idx, mode): """Helper method to set the given PoE mode the port/switch. @@ -285,34 +388,38 @@ def _switch_port_power(self, target_mac, port_idx, mode): """ # TODO: Switch operations should most likely happen in a # different Class, Switch. - log.debug('_switch_port_power(%s, %s, %s)', target_mac, port_idx, mode) + self.log.debug( + "_switch_port_power(%s, %s, %s)", target_mac, port_idx, mode + ) device_stat = self.get_device_stat(target_mac) - device_id = device_stat['_id'] - overrides = device_stat['port_overrides'] + device_id = device_stat.get("_id") + overrides = device_stat.get("port_overrides") found = False - for i in range(0, len(overrides)): - if overrides[i]['port_idx'] == port_idx: - # Override already exists, update.. - overrides[i]['poe_mode'] = mode - found = True - break + if overrides: + for i in overrides: + if overrides[i]["port_idx"] == port_idx: + # Override already exists, update.. + overrides[i]["poe_mode"] = mode + found = True + break if not found: # Retrieve portconf portconf_id = None - for port in device_stat['port_table']: - if port['port_idx'] == port_idx: - portconf_id = port['portconf_id'] + for port in device_stat["port_table"]: + if port["port_idx"] == port_idx: + portconf_id = port["portconf_id"] break if portconf_id is None: - log.error("Port ID %s could not be found in the port_table.") raise APIError( - 'Port ID %s not found in port_table' % str(port_idx) - ) - overrides.append({ - "port_idx": port_idx, - "portconf_id": portconf_id, - "poe_mode": mode - }) + "Port ID %s not found in port_table" % str(port_idx) + ) + overrides.append( + { + "port_idx": port_idx, + "portconf_id": portconf_id, + "poe_mode": mode + } + ) # We return the device_id as it's needed by the parent method return {"port_overrides": overrides, "device_id": device_id} @@ -327,11 +434,11 @@ def switch_port_power_off(self, target_mac, port_idx): :returns: API Response which is the resulting complete port overrides :rtype: list( dict() ) """ - log.debug('switch_port_power_off(%s, %s)', target_mac, port_idx) + self.log.debug("switch_port_power_off(%s, %s)", target_mac, port_idx) params = self._switch_port_power(target_mac, port_idx, "off") - device_id = params['device_id'] - del params['device_id'] - return self._api_update('rest/device/' + device_id, params) + device_id = params["device_id"] + del params["device_id"] + return self._api_update("rest/device/" + device_id, params) def switch_port_power_on(self, target_mac, port_idx): """Powers On the given port on the Switch identified @@ -344,33 +451,43 @@ def switch_port_power_on(self, target_mac, port_idx): :returns: API Response which is the resulting complete port overrides :rtype: list( dict() ) """ - log.debug('switch_port_power_on(%s, %s)', target_mac, port_idx) + self.log.debug("switch_port_power_on(%s, %s)", target_mac, port_idx) params = self._switch_port_power(target_mac, port_idx, "auto") - device_id = params['device_id'] - del params['device_id'] - return self._api_update('rest/device/' + device_id, params) + device_id = params["device_id"] + del params["device_id"] + return self._api_update("rest/device/" + device_id, params) - def create_site(self, desc='desc'): + def create_site(self, desc="desc"): """Create a new site. :param desc: Name of the site to be created. """ - return self._run_command('add-site', params={"desc": desc}, - mgr='sitemgr') + + # TODO: Not currently supported on UDMP as site support doesn't exist. + if self.version == "UDMP-unifiOS": + raise APIError( + "Controller version not supported: %s" % self.version + ) + + return self._run_command( + "add-site", + params={"desc": desc}, + mgr="sitemgr" + ) def block_client(self, mac): """Add a client to the block list. :param mac: the MAC address of the client to block. """ - return self._mac_cmd(mac, 'block-sta') + return self._mac_cmd(mac, "block-sta") def unblock_client(self, mac): """Remove a client from the block list. :param mac: the MAC address of the client to unblock. """ - return self._mac_cmd(mac, 'unblock-sta') + return self._mac_cmd(mac, "unblock-sta") def disconnect_client(self, mac): """Disconnect a client. @@ -380,14 +497,14 @@ def disconnect_client(self, mac): :param mac: the MAC address of the client to disconnect. """ - return self._mac_cmd(mac, 'kick-sta') + return self._mac_cmd(mac, "kick-sta") def restart_ap(self, mac): """Restart an access point (by MAC). :param mac: the MAC address of the AP to restart. """ - return self._mac_cmd(mac, 'restart', 'devmgr') + return self._mac_cmd(mac, "restart", "devmgr") def restart_ap_name(self, name): """Restart an access point (by name). @@ -395,16 +512,21 @@ def restart_ap_name(self, name): :param name: the name address of the AP to restart. """ if not name: - raise APIError('%s is not a valid name' % str(name)) - for ap in self.get_aps(): - if ap.get('state', 0) == 1 and ap.get('name', None) == name: - return self.restart_ap(ap['mac']) + raise APIError("%s is not a valid name" % str(name)) + for access_point in self.get_aps(): + if ( + access_point.get("state", 0) == 1 + and access_point.get("name", None) == name + ): + result = self.restart_ap(access_point["mac"]) + return result def archive_all_alerts(self): """Archive all Alerts""" - return self._run_command('archive-all-alarms', mgr='evtmgr') + return self._run_command("archive-all-alarms", mgr="evtmgr") - def create_backup(self, days='0'): + # TODO: Not currently supported on UDMP as it now utilizes async-backups. + def create_backup(self, days="0"): """Ask controller to create a backup archive file ..warning: @@ -415,25 +537,51 @@ def create_backup(self, days='0'): '-1' backup all metrics. '0' backup only the configuration. :return: URL path to backup file """ - res = self._run_command('backup', mgr='system', params={'days': days}) - return res[0]['url'] + if self.version == "UDMP-unifiOS": + raise APIError( + "Controller version not supported: %s" % self.version + ) - def get_backup(self, download_path=None, target_file='unifi-backup.unf'): + res = self._run_command( + "backup", + mgr="system", + params={"days": days} + ) + return res[0]["url"] + + # TODO: Not currently supported on UDMP as it now utilizes async-backups. + def get_backup(self, download_path=None, target_file="unifi-backup.unf"): """ :param download_path: path to backup; if None is given one will be created :param target_file: Filename or full path to download the backup archive to, should have .unf extension for restore. """ + if self.version == "UDMP-unifiOS": + raise APIError( + "Controller version not supported: %s" % self.version + ) + if not download_path: download_path = self.create_backup() - r = self.session.get(self.url + download_path, stream=True) - with open(target_file, 'wb') as _backfh: - return shutil.copyfileobj(r.raw, _backfh) + response = self.session.get(self.url + download_path, stream=True) + + if response != 200: + raise APIError("API backup failed: %i" % response.status_code) - def authorize_guest(self, guest_mac, minutes, up_bandwidth=None, - down_bandwidth=None, byte_quota=None, ap_mac=None): + with open(target_file, "wb") as _backfh: + return shutil.copyfileobj(response.raw, _backfh) + + def authorize_guest( # pylint: disable=R0913 + self, + guest_mac, + minutes, + up_bandwidth=None, + down_bandwidth=None, + byte_quota=None, + ap_mac=None, + ): """ Authorize a guest based on his MAC address. @@ -444,17 +592,17 @@ def authorize_guest(self, guest_mac, minutes, up_bandwidth=None, :param byte_quota: quantity of bytes allowed in MB :param ap_mac: access point MAC address """ - cmd = 'authorize-guest' - params = {'mac': guest_mac, 'minutes': minutes} + cmd = "authorize-guest" + params = {"mac": guest_mac, "minutes": minutes} if up_bandwidth: - params['up'] = up_bandwidth + params["up"] = up_bandwidth if down_bandwidth: - params['down'] = down_bandwidth + params["down"] = down_bandwidth if byte_quota: - params['bytes'] = byte_quota + params["bytes"] = byte_quota if ap_mac: - params['ap_mac'] = ap_mac + params["ap_mac"] = ap_mac return self._run_command(cmd, params=params) def unauthorize_guest(self, guest_mac): @@ -463,12 +611,21 @@ def unauthorize_guest(self, guest_mac): :param guest_mac: the guest MAC address: 'aa:bb:cc:dd:ee:ff' """ - cmd = 'unauthorize-guest' - params = {'mac': guest_mac} - return self._run_command(cmd, params=params) + cmd = "unauthorize-guest" + params = {"mac": guest_mac} + return self._run_command( + cmd, + params=params + ) + + def get_firmware( + self, + cached=True, + available=True, + known=False, + site=False + ): - def get_firmware(self, cached=True, available=True, - known=False, site=False): """ Return a list of available/cached firmware versions @@ -480,14 +637,14 @@ def get_firmware(self, cached=True, available=True, """ res = [] if cached: - res.extend(self._run_command('list-cached', mgr='firmware')) + res.extend(self._run_command("list-cached", mgr="firmware")) if available: - res.extend(self._run_command('list-available', mgr='firmware')) + res.extend(self._run_command("list-available", mgr="firmware")) if known: - res = [fw for fw in res if fw['knownDevice']] + res = [fw for fw in res if fw["knownDevice"]] if site: - res = [fw for fw in res if fw['siteDevice']] + res = [fw for fw in res if fw["siteDevice"]] return res def cache_firmware(self, version, device): @@ -502,8 +659,13 @@ def cache_firmware(self, version, device): :return: True/False """ return self._run_command( - 'download', mgr='firmware', - params={'device': device, 'version': version})[0]['result'] + "download", + mgr="firmware", + params={ + "device": device, + "version": version + } + )[0]["result"] def remove_firmware(self, version, device): """ @@ -517,12 +679,17 @@ def remove_firmware(self, version, device): :return: True/false """ return self._run_command( - 'remove', mgr='firmware', - params={'device': device, 'version': version})[0]['result'] + "remove", + mgr="firmware", + params={ + "device": device, + "version": version + } + )[0]["result"] def get_tag(self): """Get all tags and their member MACs""" - return self._api_read('rest/tag') + return self._api_read("rest/tag") def upgrade_device(self, mac, version): """ @@ -530,38 +697,48 @@ def upgrade_device(self, mac, version): :param mac: MAC of dev :param version: version to upgrade to """ - self._mac_cmd(mac, 'upgrade', mgr='devmgr', - params={'upgrade_to_firmware': version}) + self._mac_cmd( + mac, + "upgrade", + mgr="devmgr", + params={ + "upgrade_to_firmware": version + } + ) def provision(self, mac): """ Force provisioning of a device :param mac: MAC of device """ - self._mac_cmd(mac, 'force-provision', mgr='devmgr') + self._mac_cmd(mac, "force-provision", mgr="devmgr") - def get_setting(self, section=None, super=False): + def get_setting(self, section=None, cs_settings=False): """ Return settings for this site or controller - :param super: Return only controller-wide settings + :param cs_settings: Return only controller-wide settings :param section: Only return this/these section(s) :return: {section:settings} """ res = {} - settings = self._api_read('get/setting') + all_settings = self._api_read("get/setting") if section and not isinstance(section, (list, tuple)): section = [section] - for s in settings: - s_sect = s['key'] - if (super and 'site_id' in s) or \ - (not super and 'site_id' not in s) or \ - (section and s_sect not in section): + for setting in all_settings: + s_sect = setting["key"] + if ( + ( # pylint: disable=R0916 + cs_settings and "site_id" in setting + ) + or (not cs_settings and "site_id" not in setting) + or (section and s_sect not in section) + ): continue - for k in ('_id', 'site_id', 'key'): - s.pop(k, None) - res[s_sect] = s + for k in ("_id", "site_id", "key"): + setting.pop(k, None) + res[s_sect] = setting return res def update_setting(self, settings): @@ -573,7 +750,7 @@ def update_setting(self, settings): """ res = [] for sect, setting in settings.items(): - res.extend(self._api_write('set/setting/' + sect, setting)) + res.extend(self._api_write("set/setting/" + sect, setting)) return res def update_user_group(self, group_id, down_kbps=-1, up_kbps=-1): @@ -591,13 +768,16 @@ def update_user_group(self, group_id, down_kbps=-1, up_kbps=-1): for group in groups: if group["_id"] == group_id: # Apply setting change - res = self._api_update("rest/usergroup/{0}".format(group_id), { - "qos_rate_max_down": down_kbps, - "qos_rate_max_up": up_kbps, - "name": group["name"], - "_id": group_id, - "site_id": self.site_id - }) + res = self._api_update( + "rest/usergroup/{0}".format(group_id), + { + "qos_rate_max_down": down_kbps, + "qos_rate_max_up": up_kbps, + "name": group["name"], + "_id": group_id, + "site_id": self.site_id, + }, + ) return res raise ValueError("Group ID {0} is not valid.".format(group_id)) @@ -608,11 +788,19 @@ def set_client_alias(self, mac, alias): :param mac: The MAC of the client to rename :param alias: The alias to set """ - client = self.get_client(mac)['_id'] - return self._api_update('rest/user/' + client, {'name': alias}) + client = self.get_client(mac)["_id"] + return self._api_update("rest/user/" + client, {"name": alias}) - def create_voucher(self, number, quota, expire, up_bandwidth=None, - down_bandwidth=None, byte_quota=None, note=None): + def create_voucher( # pylint: disable=R0913 + self, + number, + quota, + expire, + up_bandwidth=None, + down_bandwidth=None, + byte_quota=None, + note=None, + ): """ Create voucher for guests. @@ -624,47 +812,52 @@ def create_voucher(self, number, quota, expire, up_bandwidth=None, :param byte_quota: quantity of bytes allowed in MB :param note: description """ - cmd = 'create-voucher' - params = {'n': number, 'quota': quota, 'expire': 'custom', - 'expire_number': expire, 'expire_unit': 1} + cmd = "create-voucher" + params = { + "n": number, + "quota": quota, + "expire": "custom", + "expire_number": expire, + "expire_unit": 1, + } if up_bandwidth: - params['up'] = up_bandwidth + params["up"] = up_bandwidth if down_bandwidth: - params['down'] = down_bandwidth + params["down"] = down_bandwidth if byte_quota: - params['bytes'] = byte_quota + params["bytes"] = byte_quota if note: - params['note'] = note - res = self._run_command(cmd, mgr='hotspot', params=params) - return self.list_vouchers(create_time=res[0]['create_time']) + params["note"] = note + res = self._run_command(cmd, mgr="hotspot", params=params) + return self.list_vouchers(create_time=res[0]["create_time"]) - def list_vouchers(self, **filter): + def list_vouchers(self, **filter_voucher): """ Get list of vouchers - :param filter: Filter vouchers by create_time, code, quota, + :param filter_voucher: Filter vouchers by create_time, code, quota, used, note, status_expires, status, ... """ - if 'code' in filter: - filter['code'] = filter['code'].replace('-', '') + if "code" in filter_voucher: + filter_voucher["code"] = filter_voucher["code"].replace("-", "") vouchers = [] - for voucher in self._api_read('stat/voucher'): + for voucher in self._api_read("stat/voucher"): voucher_match = True - for key, val in filter.items(): + for key, val in filter_voucher.items(): voucher_match &= voucher.get(key) == val if voucher_match: vouchers.append(voucher) return vouchers - def delete_voucher(self, id): + def delete_voucher(self, voucher_id): """ Delete / revoke voucher :param id: id of voucher """ - cmd = 'delete-voucher' - params = {'_id': id} - self._run_command(cmd, mgr='hotspot', params=params) + cmd = "delete-voucher" + params = {"_id": voucher_id} + self._run_command(cmd, mgr="hotspot", params=params) diff --git a/setup.py b/setup.py index d6d259f..3cd994b 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='pyunifi', - version='2.19.0', + version='2.20.1', description='API for Ubiquity Networks UniFi controller', author='Caleb Dunn', author_email='finish.06@gmail.com', diff --git a/unifi-copy-radius b/unifi-copy-radius new file mode 100755 index 0000000..22a7f72 --- /dev/null +++ b/unifi-copy-radius @@ -0,0 +1,125 @@ +#!/usr/bin/env python + +import argparse +import json + +from pyunifi.controller import Controller + +parser = argparse.ArgumentParser() +parser.add_argument('-c', '--controller', default='unifi', help='the controller address (default "unifi")') +parser.add_argument('-u', '--username', default='admin', help='the controller username (default("admin")') +parser.add_argument('-p', '--password', default='', help='the controller password') +parser.add_argument('-b', '--port', default='8443', help='the controller port (default "8443")') +parser.add_argument('-v', '--version', default='v5', help='the controller base version (default "v5")') +parser.add_argument('-s', '--siteid', default='default', help='the source site ID, (default "default")') +parser.add_argument('-S', '--siteid2', default='', help='the destination site ID, to copy to') +parser.add_argument('-V', '--no-ssl-verify', default=False, action='store_true', help='Don\'t verify ssl certificates') +parser.add_argument('-C', '--certificate', default='', help='verify with ssl certificate pem file') +args = parser.parse_args() + +ssl_verify = (not args.no_ssl_verify) + +if ssl_verify and len(args.certificate) > 0: + ssl_verify = args.certificate + +controller_source = Controller(args.controller, args.username, args.password, args.port, args.version, args.siteid, ssl_verify=ssl_verify) +controller_dest = Controller(args.controller, args.username, args.password, args.port, args.version, args.siteid2, ssl_verify=ssl_verify) + +source_users = controller_source.get_radius_users() +dest_users = controller_dest.get_radius_users() + +for user in source_users: + # remove irrelevent fields + user.pop("site_id", None) + user.pop("vlan", None) + user.pop("tunnel_type", None) + user.pop("tunnel_medium_type", None) + # add status field to keep track of which + # users should be added or deleted or modified + user["status"] = "None" +for user in dest_users: + # remove irrelevent fields + user.pop("site_id", None) + user.pop("vlan", None) + user.pop("tunnel_type", None) + user.pop("tunnel_medium_type", None) + # add status field to keep track of which + # users should be added or deleted or modified + user["status"] = "None" + +source_users.sort(key=lambda x: x['name']) +print("source_users\n", json.dumps(source_users, indent=2, sort_keys=False), "\n") +dest_users.sort(key=lambda x: x['name']) +print("dest_users\n", json.dumps(dest_users, indent=2, sort_keys=False), "\n") + +unchanged_users = [] +modified_users = [] +temp_user = {} + +# Compare source and destination usernames and passwords +# to decide which users have been unchanged or modified +# +for source_user in source_users: + for dest_user in dest_users: + if source_user['name'] == dest_user['name']: + # usernames are the same + if source_user['x_password'] == dest_user['x_password']: + # username and password are the same + dest_user["status"] = "unchanged" + source_user["status"] = "unchanged" + unchanged_users.append (source_user) + else: + # username is the same but password has changed + dest_user["status"] = "modified" + source_user["status"] = "modified" + # Strange problem solved by temp_user. + # We need the username/password of source_user + temp_user['name'] = source_user['name'] + temp_user['x_password'] = source_user['x_password'] + # but: we need the id of the destination user to modify it + temp_user['_id'] = dest_user['_id'] + modified_users.append (temp_user) + +unchanged_users.sort(key=lambda x: x['name']) +print("unchanged_users\n", json.dumps(unchanged_users, indent=2, sort_keys=False), "\n") +modified_users.sort(key=lambda x: x['name']) +print("modified_users\n", json.dumps(modified_users, indent=2, sort_keys=False), "\n") + +added_users = [] +deleted_users = [] + +# Any users who are not unchanged or modified +# are unique to either the source or destination +# +# Unique users on the source will be added to the destination +for source_user in source_users: + if source_user['status'] == 'None': + source_user['status'] == 'added' + added_users.append(source_user) +# +# Unique users on the destination will be deleted from the destination +for dest_user in dest_users: + if dest_user['status'] == 'None': + dest_user['status'] == 'deleted' + deleted_users.append(dest_user) + +added_users.sort(key=lambda x: x['name']) +print("added_users\n", json.dumps(added_users, indent=2, sort_keys=False), "\n") +deleted_users.sort(key=lambda x: x['name']) +print("deleted_users\n", json.dumps(deleted_users, indent=2, sort_keys=False), "\n") + +print () +if (len(added_users) == 0) and (len(modified_users) == 0) and (len(deleted_users) == 0): + print ("No users to add, modify, or delete") +else: + for user in added_users: + print ("adding user:", user['name']) + controller_dest.add_radius_user(user['name'], user['x_password']) + + for user in modified_users: + print ("updating user:", user['name']) + controller_dest.update_radius_user(user['name'], user['x_password'], user['_id']) + + for user in deleted_users: + print ("deleting user:", user['name']) + controller_dest.delete_radius_user(user['_id']) diff --git a/unifi-ls-clients b/unifi-ls-clients index 34cb118..66a675a 100755 --- a/unifi-ls-clients +++ b/unifi-ls-clients @@ -19,7 +19,7 @@ ssl_verify = (not args.no_ssl_verify) if ssl_verify and len(args.certificate) > 0: ssl_verify = args.certificate - + c = Controller(args.controller, args.username, args.password, args.port, args.version, args.siteid, ssl_verify=ssl_verify) aps = c.get_aps() diff --git a/unifi-ls-radius b/unifi-ls-radius new file mode 100755 index 0000000..a4bef8a --- /dev/null +++ b/unifi-ls-radius @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +import argparse + +from pyunifi.controller import Controller + +parser = argparse.ArgumentParser() +parser.add_argument('-c', '--controller', default='unifi', help='the controller address (default "unifi")') +parser.add_argument('-u', '--username', default='admin', help='the controller username (default("admin")') +parser.add_argument('-p', '--password', default='', help='the controller password') +parser.add_argument('-b', '--port', default='8443', help='the controller port (default "8443")') +parser.add_argument('-v', '--version', default='v5', help='the controller base version (default "v5")') +parser.add_argument('-s', '--siteid', default='default', help='the site ID, UniFi >=3.x only (default "default")') +parser.add_argument('-V', '--no-ssl-verify', default=False, action='store_true', help='Don\'t verify ssl certificates') +parser.add_argument('-C', '--certificate', default='', help='verify with ssl certificate pem file') +args = parser.parse_args() + +ssl_verify = (not args.no_ssl_verify) + +if ssl_verify and len(args.certificate) > 0: + ssl_verify = args.certificate + +c = Controller(args.controller, args.username, args.password, args.port, args.version, args.siteid, ssl_verify=ssl_verify) + +users = c.get_radius_users() +users.sort(key=lambda x: x['name']) + +FORMAT = '%-26s %-16s %-26s %-26s' +print(FORMAT % ('USERNAME', 'PASSWORD', 'ID', 'SITE ID')) +for user in users: + name = user["name"] + password = user["x_password"] + id = user["_id"] + site_id = user["site_id"] + + print(FORMAT % (name, password, id, site_id)) diff --git a/unifi-save-radius b/unifi-save-radius new file mode 100755 index 0000000..e3b2a66 --- /dev/null +++ b/unifi-save-radius @@ -0,0 +1,46 @@ +#!/usr/bin/env python + +import argparse + +from pyunifi.controller import Controller + +parser = argparse.ArgumentParser() +parser.add_argument('-c', '--controller', default='unifi', help='the controller address (default "unifi")') +parser.add_argument('-u', '--username', default='admin', help='the controller username (default("admin")') +parser.add_argument('-p', '--password', default='', help='the controller password') +parser.add_argument('-b', '--port', default='8443', help='the controller port (default "8443")') +parser.add_argument('-v', '--version', default='v5', help='the controller base version (default "v5")') +parser.add_argument('-s', '--siteid', default='default', help='the site ID, UniFi >=3.x only (default "default")') +parser.add_argument('-V', '--no-ssl-verify', default=False, action='store_true', help='Don\'t verify ssl certificates') +parser.add_argument('-C', '--certificate', default='', help='verify with ssl certificate pem file') +parser.add_argument('-f', '--file', default='radius-unifi.csv', help='the filename of write statistics') +args = parser.parse_args() + +ssl_verify = (not args.no_ssl_verify) + +if ssl_verify and len(args.certificate) > 0: + ssl_verify = args.certificate + +c = Controller(args.controller, args.username, args.password, args.port, args.version, args.siteid, ssl_verify=ssl_verify) + +users = c.get_radius_users() +users.sort(key=lambda x: x['name']) + +#open file +fo = open(args.file, "wb") + +FORMAT_CSV = '%s, %s, %s, %s\n' +fo.write(FORMAT_CSV % ('USERNAME', 'PASSWORD', 'ID', 'SITE ID')) +for user in users: + name = user["name"] + password = user["x_password"] + id = user["_id"] + site_id = user["site_id"] + + fo.write(FORMAT_CSV % (name, password, id, site_id)) + +# Close file +fo.close() + +# Print result of file +print(open(args.file,"rb").read())