diff --git a/RELEASE.md b/RELEASE.md index 2335982..ac21e8c 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,5 +1,21 @@ # RELEASE NOTES + +## v0.12.1 - Scanner Update + +* Large-scale refactor of scan function by @Nexarian in https://github.com/jasonacox/pypowerwall/pull/117 + - Function `scan()` returns a list of the discovered devices for use as a utility function. + - Ability to silence output for use as a utility. + - Improve performance of multi-threaded scan by using a Queue. + - General code flow improvements and encapsulation. + - Add ability to work with standalone inverters. + +```python +from pypowerwall.scan import scan +found_devices = scan(interactive = False) +``` + + ## v0.12.0 - Add Controller Data * TEDAPI: Add `get_device_controller()` to get device data which includes Powerwall THC_AmbientTemp data. Credit to @ygelfand for discovery and reported in https://github.com/jasonacox/Powerwall-Dashboard/discussions/392#discussioncomment-11360474 diff --git a/pypowerwall/__init__.py b/pypowerwall/__init__.py index b577e5c..d025e93 100644 --- a/pypowerwall/__init__.py +++ b/pypowerwall/__init__.py @@ -88,7 +88,7 @@ from typing import Union, Optional import time -version_tuple = (0, 12, 0) +version_tuple = (0, 12, 1) version = __version__ = '%d.%d.%d' % version_tuple __author__ = 'jasonacox' diff --git a/pypowerwall/__main__.py b/pypowerwall/__main__.py index b58e16f..d931744 100644 --- a/pypowerwall/__main__.py +++ b/pypowerwall/__main__.py @@ -124,11 +124,13 @@ from pypowerwall import scan print("pyPowerwall [%s] - Scanner\n" % version) - color = not args.nocolor - ip = args.ip - hosts = args.hosts - timeout = args.timeout - scan.scan(color, timeout, hosts, ip) + scan.scan( + ip=args.ip, + max_threads=args.hosts, + timeout=args.timeout, + color=not args.nocolor, + interactive=True + ) # Set Powerwall Mode elif command == 'set': diff --git a/pypowerwall/scan.py b/pypowerwall/scan.py index 7c531a4..03ca3b4 100644 --- a/pypowerwall/scan.py +++ b/pypowerwall/scan.py @@ -8,200 +8,294 @@ Scan Function This tool will scan your local network looking for a Tesla Energy Gateway - and Powerwall. It uses your local IP address as a default. + and/or Powerwall. It uses your local IP address as a default. """ -# Modules import errno import ipaddress -import json import socket +import sys import threading import time -import sys +import unicodedata +from http import HTTPStatus +from ipaddress import IPv4Address +from queue import Queue +from typing import Any, Dict, Final, List, Optional import requests import pypowerwall -# Globals -discovered = {} -firmware = {} +UNKNOWN: Final[str] = "Unknown" +POWERWALL: Final[str] = "Powerwall" -# Terminal Color Formatting -bold = "\033[0m\033[97m\033[1m" -subbold = "\033[0m\033[32m" -normal = "\033[97m\033[0m" -dim = "\033[0m\033[97m\033[2m" -alert = "\033[0m\033[91m\033[1m" -alertdim = "\033[0m\033[91m\033[2m" +class ScanContext: + """Handles terminal formatting, interactive output, and other broad spectrum settings.""" + _BOLD: Final[str] = "bold" + _SUBBOLD: Final[str] = "subbold" + _NORMAL: Final[str] = "normal" + _DIM: Final[str] = "dim" + _ALERT: Final[str] = "alert" + _ALERTDIM: Final[str] = "alertdim" + def __init__(self, timeout: float, color: bool = True, interactive: bool = False): + self.timeout = timeout + self.interactive = interactive + self.color = False if not interactive else color + self.colors = { + self._BOLD: "\033[0m\033[97m\033[1m" if self.color else "", + self._SUBBOLD: "\033[0m\033[32m" if self.color else "", + self._NORMAL: "\033[97m\033[0m" if self.color else "", + self._DIM: "\033[0m\033[97m\033[2m" if self.color else "", + self._ALERT: "\033[0m\033[91m\033[1m" if self.color else "", + self._ALERTDIM: "\033[0m\033[91m\033[2m" if self.color else "", + } -# Helper Functions -def getmy_ip(): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(("8.8.8.8", 80)) - r = s.getsockname()[0] - s.close() - return r + def bold(self): + return self.colors[self._BOLD] + def subbold(self): + return self.colors[self._SUBBOLD] -def scan_ip(color, timeout, addr): - """ - Thread Worker: Scan IP Address for Powerwall Gateway + def normal(self): + return self.colors[self._NORMAL] + + def dim(self): + return self.colors[self._DIM] + + def alert(self): + return self.colors[self._ALERT] + + def alertdim(self): + return self.colors[self._ALERTDIM] + +def normalize_caseless(text: str) -> str: + return unicodedata.normalize("NFKD", text.casefold()) - Parameter: - color = True or False, print output in color [Default: True] - timeout = Seconds to wait per host [Default: 1.0] - addr = IP address to scan +def caseless_equal(left: str, right: str) -> bool: + return normalize_caseless(left) == normalize_caseless(right) + +def get_my_ip() -> str: + """Get the local IP address of the machine. + + Returns: + str: IP address of the localhost. """ - # pylint: disable=global-statement,global-variable-not-assigned - global discovered, firmware - global bold, subbold, normal, dim, alert, alertdim - - if not color: - # Disable Terminal Color Formatting - bold = subbold = normal = dim = alert = alertdim = "" - - host = dim + '\r Host: ' + subbold + '%s ...' % addr + normal - print(host, end='') - a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - a_socket.settimeout(timeout) - location = (str(addr), 443) - while True: - result_of_check = a_socket.connect_ex(location) - if not result_of_check == errno.EAGAIN: - break - time.sleep(0.1) - if result_of_check == 0: - # Check to see if it is a Powerwall - url = 'https://%s/api/status' % addr - # noinspection PyBroadException + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: try: - g = requests.get(url, verify=False, timeout=5) - # Check if 404 response - if g.status_code == 404: - # Check if it is a Powerwall 3 - url = 'https://%s/tedapi/din' % addr - g = requests.get(url, verify=False, timeout=5) - # Expected response from PW3 - # {"code":403,"error":"Unable to GET to resource","message":"User does not have adequate access rights"} - if "User does not have adequate access rights" in g.text: - # Found PW3 - print(host + ' OPEN' + dim + ' - ' + subbold + 'Found Powerwall 3 [Supported in Cloud Mode only]') - discovered[addr] = 'Powerwall-3' - firmware[addr] = 'Supported in Cloud Mode only - See https://tinyurl.com/pw3support' - else: - # Not a Powerwall - print(host + ' OPEN' + dim + ' - Not a Powerwall') - else: - data = json.loads(g.text) - print(host + ' OPEN' + dim + ' - ' + subbold + 'Found Powerwall %s' % data[ - 'din'] + subbold + '\n [Firmware %s]' % data['version']) - discovered[addr] = data['din'] - firmware[addr] = data['version'] + s.connect(('8.8.8.8', 80)) + return s.getsockname()[0] except Exception: - print(host + ' OPEN' + dim + ' - Not a Powerwall') + raise + +def check_connection(addr: IPv4Address, context: ScanContext, port: int = 443, max_retries: int = 10, retry_delay: float = 0.1) -> bool: + """Checks for simple connection status to a provided address. - a_socket.close() + Args: + addr (IPv4Address): The address to attempt connection to. + context (ScanContext): Context controlling output interactivity, color, and timeout behaviors. + port (int, optional): The port to connect to. Defaults to 443. + max_retries (int, optional): Maximum number of retry attempts. Defaults to 10. + retry_delay (float, optional): Delay between connection retries in seconds. Defaults to 0.1. In the future this should be replaced with exponential backoff and jitter. + Returns: + bool: True if connection is successful, False otherwise. + """ -def scan(color=True, timeout=1.0, hosts=30, ip=None): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as conn: + conn.settimeout(context.timeout) + for _ in range(max_retries): + try: + result = conn.connect_ex((str(addr), port)) + if result == 0: + return True + elif result != errno.EAGAIN: + return False + except Exception as e: + if context.interactive: + print(f"An error occurred during connection attempt: {e}") + return False + time.sleep(retry_delay) + return False # Connection failed after max retries + +# Example Tesla Inverter response to https://{addr}/api/status +# { +# "din": "1538000-45-C--XXXXXXXXXXXXXX", +# "start_time": "2024-11-29 17:01:30 +0800", +# "up_time_seconds": "23h48m59.321804629s", +# "is_new": false, +# "version": "24.36.2 46990655", +# "git_hash": "469906551a97e7b41a60844c37be7ede868d5d56", +# "commission_count": 0, +# "device_type": "teg", +# "teg_type": "pvinverter", +# "sync_type": "v2.1", +# "cellular_disabled": false, +# "can_reboot": false +# } + +def scan_ip(addr: IPv4Address, context: ScanContext, result_queue: Queue) -> None: + """Thread Worker: Scan IP Address for presence of a Tesla Energy Gateway. + + Args: + addr (IPv4Address): IP address to scan + context (ScanContext): Context controlling output interactivity, color, and timeout behaviors. + result_queue (Queue): Thread safe queue to store results of the asynchronous scan. """ - pyPowerwall Network Scanner - Parameter: - color = True or False, print output in color [Default: True] - timeout = Seconds to wait per host [Default: 1.0] - hosts = Number of hosts to scan simultaneously [Default: 30] - ip = IP address within network to scan [Default: None (detect IP)] + host = f"{context.dim()}\r\t Host: {context.subbold()}{addr} ...{context.normal()}" + if context.interactive: + print(host, end='') - Description - This tool will scan your local network looking for a Tesla Energy Gateway - and Powerwall. It tries to use your local IP address as a default. + if not check_connection(addr, context): + return + # noinspection PyBroadException + try: + # Check to see if it is a Powerwall + response = requests.get(f'https://{addr}/api/status', verify=False, timeout=5) + if response.status_code != HTTPStatus.NOT_FOUND: + data: Final[str] = response.json() + din: Final[str] = data.get('din', UNKNOWN) + version: Final[str] = data.get('version', UNKNOWN) + if din == UNKNOWN and version == UNKNOWN: + return + + up_time: Final[str] = data.get('up_time_seconds', UNKNOWN) + type_selector = lambda type: type if not caseless_equal(type, UNKNOWN) else POWERWALL + teg_type: Final[str] = type_selector(data.get('teg_type', POWERWALL)) + + if context.interactive: + print(f"{host} OPEN{context.dim()} - {context.subbold()}Found {teg_type} {din}{context.subbold()}\n\t\t\t\t\t\t\t\t\t [Firmware {version}]{context.normal()}") + result_queue.put({ + 'ip': addr, + 'din': din, + 'firmware': version, + 'up_time': up_time + }) + return + + # Check if it is a Powerwall 3 + response_pw3 = requests.get(f'https://{addr}/tedapi/din', verify=False, timeout=5) + # Expected response from PW3 + # {"code":403,"error":"Unable to GET to resource","message":"User does not have adequate access rights"} + if "User does not have adequate access rights" in response_pw3.text: + # Found PW3 + if context.interactive: + print(f"{host} OPEN{context.dim()} - {context.subbold()}Found Powerwall 3 [Cloud and TEDAPI Mode only]{context.normal}") + result_queue.put({ + 'ip': addr, + 'din': 'Powerwall-3', + 'firmware': 'Cloud and TEDAPI Mode support only - See https://tinyurl.com/pw3support' + }) + elif context.interactive: + # Not a Powerwall + print(f"{host} OPEN{context.dim()} - Not a Tesla Energy Gateway") + except Exception: + if context.interactive: + print(f'{host} OPEN{context.dim()} - Not a Tesla Energy Gateway') + + +def scan( + ip: Optional[str] = None, + max_threads: int = 30, + timeout: float = 1.0, + color: bool = False, + interactive: bool = False +) -> List[Dict[str, str]]: + """Scan the local network for Tesla Powerwall Gateways. + + Args: + ip (Optional[str], optional): IP address to determine the network to scan. If None, autodetects. + max_threads (int, optional): Maximum number of concurrent threads/IP addresses to simultaneously scan. Defaults to 30. + timeout (float, optional): Timeout in seconds for each host scan. Defaults to 1.0. + color (bool, optional): If True, use colored output. Defaults to False. + interactive (bool, optional): Whether messages should be printed and input sought from a user. Defaults to False. + + Returns: + List[Dict[str, str]]: A list of discovered Tesla Energy Gateway (Powerwall/Inverter) devices. """ - # pylint: disable=global-statement,global-variable-not-assigned - global discovered, firmware - global bold, subbold, normal, dim, alert, alertdim - if not color: - # Disable Terminal Color Formatting - bold = subbold = normal = dim = alert = alertdim = "" + context = ScanContext(timeout=timeout, color=color, interactive=interactive) # Fetch my IP address and assume /24 network # noinspection PyBroadException try: - if ip is None: - ip = getmy_ip() - network = ipaddress.IPv4Interface(u'' + ip + '/24').network + ip = get_my_ip() if ip is None else ip + network = ipaddress.IPv4Network(ip + '/24', strict=False) except Exception: - print(alert + 'ERROR: Unable to get your IP address and network automatically.' + normal) - network = '192.168.0.0/24' + if context.interactive: + print(f"{context.alert()}ERROR: Unable to determine your IP address and network automatically.{context.normal()}") + network = ipaddress.IPv4Network('192.168.1.0/24') - print(bold + '\npyPowerwall Network Scanner' + dim + ' [%s]' % pypowerwall.version + normal) - print(dim + 'Scan local network for Tesla Powerwall Gateways') - print('') + if context.interactive: + print(f"{context.bold()}\npyPowerwall Network Scanner{context.dim()} [{pypowerwall.version}]{context.normal()}") + print(f'{context.dim()}Scan local network for Tesla Powerwall Gateways\n') - if hosts > 100: - # Limit simultaneous host scan to no more than 100 - hosts = 100 + max_threads = min(200, max_threads) - if timeout < 0.2: - print(alert + - ' WARNING: Setting a low timeout (%0.2fs) may cause misses.\n' % timeout) + if context.timeout < 0.2 and context.interactive: + print(f'{context.alert()}\tWARNING: Setting a low timeout ({context.timeout}) may cause misses.\n') # Ask user to verify network - print(dim + ' Your network appears to be: ' + bold + '%s' % network + normal) - print('') + if context.interactive: + print(f'{context.dim()}\tYour network appears to be: {context.bold()}{network}{context.normal()}\n') - # noinspection PyBroadException - try: - response = input(subbold + " Enter " + bold + "Network" + subbold + - " or press enter to use %s: " % network + normal) - except Exception: - # Assume user aborted - print(alert + ' Cancel\n\n' + normal) - sys.exit() - - if response != '': - # Verify we have a valid network # noinspection PyBroadException try: - network = ipaddress.IPv4Network(u'' + response) + response = input(f"{context.subbold()}\tEnter {context.bold()}Network{context.subbold()} or press enter to use {network}{context.normal()}") except Exception: - print('') - print(alert + ' ERROR: %s is not a valid network.' % response + normal) - print(dim + ' Proceeding with %s instead.' % network) - - # Scan network - print('') - print(bold + ' Running Scan...' + dim) - # Loop through each host + # Assume user aborted + print(f"{context.alert()} Cancel\n\n{context.normal()}") + sys.exit() + + if response: + # Verify we have a valid network + # noinspection PyBroadException + try: + network = ipaddress.IPv4Network(response, strict=False) + except Exception: + print(f'\n{context.alert()}\tERROR: {response} is not a valid network.{context.normal()}') + print(f'{context.dim()}\t\t Proceeding with {network} instead.') + + print(f"\n{context.bold()}\tRunning Scan...{context.dim()}") + + result_queue = Queue() + threads: List[threading.Thread] = [] try: - threads = [] - for addr in ipaddress.IPv4Network(network): + for addr in network.hosts(): # Scan each host in a separate thread - thread = threading.Thread(target=scan_ip, args=(color, timeout, addr)) + addr_str: Final = str(addr) + thread = threading.Thread(target=scan_ip, args=(addr_str, context, result_queue)) thread.start() threads.append(thread) - if threading.active_count() >= hosts: - # Wait for thread to exit when max simultaneous hosts reached - threads[0].join() - threads.pop(0) + + # Limit the number of concurrent threads + while len(threads) >= max_threads: + # Remove completed threads + threads = [t for t in threads if t.is_alive()] + time.sleep(0.01) + for thread in threads: # Wait for remaining threads to exit thread.join() - print(dim + '\r Done ') - print('') - + if context.interactive: + print(f"{context.dim()}\r\t Done\t\t\t\t\t\t \n{context.normal()}") except KeyboardInterrupt: - print(dim + '\r ** Interrupted by user ** ') - print('') - - print(normal + 'Discovered %d Powerwall Gateway' % len(discovered)) - for pw_ip in discovered: - print(dim + ' %s [%s] Firmware %s' % (pw_ip, discovered[pw_ip], firmware[pw_ip])) - - print(normal + ' ') + if context.interactive: + print(f"{context.dim()}\r\t ** Interrupted by user **\t\t\t\t\t\t\n{context.normal()}") + + # Collect results from the queue + discovered_devices: List[Dict[Any, Any]] = [] + while not result_queue.empty(): + device_info: Final = result_queue.get() + discovered_devices.append(device_info) + + if context.interactive: + print(f"{context.normal()}Discovered {len(discovered_devices)} Powerwall Gateway(s)") + for device in discovered_devices: + print(f"{context.dim()}\t {device['ip']} [{device['din']}] Firmware {device['firmware']}") + print(f"{context.normal()} ") + return discovered_devices