diff --git a/aioaquarea/const.py b/aioaquarea/const.py index 11a5fee..e32d163 100644 --- a/aioaquarea/const.py +++ b/aioaquarea/const.py @@ -10,6 +10,7 @@ AQUAREA_SERVICE_CONSUMPTION = "remote/v1/api/consumption" AQUAREA_SERVICE_CONTRACT = "remote/contract" AQUAREA_SERVICE_A2W_STATUS_DISPLAY = "remote/a2wStatusDisplay" +AQUAREA_SERVICE_AUTH_CLIENT_ID = "vf2i6hW5hA2BB2BQGfTHXM4YFyW4I06K" PANASONIC = "Panasonic" diff --git a/aioaquarea/core.py b/aioaquarea/core.py index ef12f79..2d34d79 100644 --- a/aioaquarea/core.py +++ b/aioaquarea/core.py @@ -5,8 +5,10 @@ import datetime as dt import functools import logging +import re from typing import Optional import urllib.parse +import html import aiohttp @@ -18,6 +20,7 @@ AQUAREA_SERVICE_DEMO_BASE, AQUAREA_SERVICE_DEVICES, AQUAREA_SERVICE_LOGIN, + AQUAREA_SERVICE_AUTH_CLIENT_ID, AquareaEnvironment, ) from .data import ( @@ -146,6 +149,7 @@ def __init__( self._device_direct = ( device_direct if environment == AquareaEnvironment.PRODUCTION else False ) + self._access_token: Optional[str] = None @property def username(self) -> str: @@ -170,10 +174,10 @@ def token_expiration(self) -> Optional[dt.datetime]: @property def is_logged(self) -> bool: """Return True if the user is logged in.""" - if not self._token_expiration: + if not self._access_token: return False - now = dt.datetime.astimezone(dt.datetime.utcnow(), tz=dt.timezone.utc) + now = dt.datetime.now(tz=dt.timezone.utc) return now < self._token_expiration @property @@ -184,7 +188,8 @@ def logger(self) -> logging.Logger: async def request( self, method: str, - url: str, + url: str = None, + external_url: str = None, referer: str = AQUAREA_SERVICE_BASE, throw_on_error=True, content_type: str = "application/x-www-form-urlencoded", @@ -199,31 +204,41 @@ async def request( headers["content-type"] = content_type kwargs["headers"] = headers - resp = await self._sess.request(method, self._base_url + url, **kwargs) + if external_url is not None: + url = external_url + else: + url = self._base_url + url - # Aquarea returns a 200 even if the request failed, we need to check the message property to see if it's an error - # Some errors just require to login again, so we raise a AuthenticationError in those known cases - errors = [FaultError] - if throw_on_error: - errors = await self.look_for_errors(resp) - # If we have errors, let's look for authentication errors - for error in errors: - if error.error_code in list(AuthenticationErrorCodes): - raise AuthenticationError(error.error_code, error.error_message) + resp = await self._sess.request(method, url, **kwargs) - raise ApiError(error.error_code, error.error_message) + if resp.content_type == "application/json": + data = await resp.json() + + # let's check for access token and expiration time + if self._access_token and "accessToken" in data: + self._access_token = data["accessToken"]["token"] + self._token_expiration = dt.datetime.strptime( + data["accessToken"]["expires"], "%Y-%m-%dT%H:%M:%S%z" + ) + + # Aquarea returns a 200 even if the request failed, we need to check the message property to see if it's an error + # Some errors just require to login again, so we raise a AuthenticationError in those known cases + errors = [FaultError] + if throw_on_error: + errors = await self.look_for_errors(data) + # If we have errors, let's look for authentication errors + for error in errors: + if error.error_code in list(AuthenticationErrorCodes): + raise AuthenticationError(error.error_code, error.error_message) + + raise ApiError(error.error_code, error.error_message) return resp async def look_for_errors( - self, response: aiohttp.ClientResponse + self, data: dict ) -> list[FaultError]: """Look for errors in the response and return them as a list of FaultError objects.""" - if response.content_type != "application/json": - return [] - - data = await response.json() - if not isinstance(data, dict): return [] @@ -257,27 +272,133 @@ async def _login_demo(self) -> None: ) + dt.timedelta(days=1) async def _login_production(self) -> None: - params = { - "var.inputOmit": "false", - "var.loginId": self.username, - "var.password": self.password, - } - response: aiohttp.ClientResponse = await self.request( "POST", AQUAREA_SERVICE_LOGIN, referer=self._base_url, - data=urllib.parse.urlencode(params), + headers={ + "popup-screen-id": "1001", + "Registration-Id": "", + } ) - data = await response.json() + auth_state = response.cookies.get("com.auth0.state").value + query_params = { + "client_id": AQUAREA_SERVICE_AUTH_CLIENT_ID, + "audience": f"https://digital.panasonic.com/{AQUAREA_SERVICE_AUTH_CLIENT_ID}/api/v1/", + "response_type": "code", + "redirect_uri": "https://aquarea-smart.panasonic.com/authorizationCallback", + "state": auth_state, + "scope": "openid offline_access", + } - if not isinstance(data, dict): - raise InvalidData(data) + response: aiohttp.ClientResponse = await self.request( + "GET", + external_url="https://authglb.digital.panasonic.com/authorize", + referer=self._base_url, + params=query_params, + allow_redirects=False) - self._token_expiration = dt.datetime.strptime( - data["accessToken"]["expires"], "%Y-%m-%dT%H:%M:%S%z" - ) + location = response.headers.get("Location") + parsed_url = urllib.parse.urlparse(location) + + # Extract the value of the 'state' query parameter + query_params2 = urllib.parse.parse_qs(parsed_url.query) + state_value = query_params2.get('state', [None])[0] + + response: aiohttp.ClientResponse = await self.request( + "GET", + external_url=f"https://authglb.digital.panasonic.com{location}", + referer=self._base_url, + allow_redirects=False) + + csrf = response.cookies.get("_csrf").value + + query_params = { + "audience": f"https://digital.panasonic.com/{AQUAREA_SERVICE_AUTH_CLIENT_ID}/api/v1/", + "client": AQUAREA_SERVICE_AUTH_CLIENT_ID, + 'protocol': 'oauth2', + 'redirect_uri': 'https://aquarea-smart.panasonic.com/authorizationCallback', + "response_type": "code", + "state": state_value, + "scope": "openid offline_access", + } + + data = { + 'client_id' : AQUAREA_SERVICE_AUTH_CLIENT_ID, + 'redirect_uri':'https://aquarea-smart.panasonic.com/authorizationCallback?lang=en', + 'tenant':'pdpauthglb-a1', + 'response_type':'code', + 'scope':'openid offline_access', + 'audience':f'https://digital.panasonic.com/{AQUAREA_SERVICE_AUTH_CLIENT_ID}/api/v1/', + '_csrf':csrf, + 'state':state_value, + '_intstate':'deprecated', + 'username': self._username, + 'password': self._password, + 'lang':'en', + 'connection':'PanasonicID-Authentication', + } + + response: aiohttp.ClientResponse = await self.request( + "POST", + external_url="https://authglb.digital.panasonic.com/usernamepassword/login", + referer=f"https://authglb.digital.panasonic.com/login?{urllib.parse.urlencode(query_params)}", + content_type="application/json; charset=UTF-8", + headers={ + "Auth0-Client": "eyJuYW1lIjoiYXV0aDAuanMtdWxwIiwidmVyc2lvbiI6IjkuMjMuMiJ9" + }, + allow_redirects=False, + json=data, + throw_on_error=False) + + if not response.ok and response.content_type == "application/json": + data = await response.json() + if data.get("code") == "invalid_user_password": + raise AuthenticationError(AuthenticationErrorCodes.INVALID_USERNAME_OR_PASSWORD, "Invalid username or password") + + content = await response.text() + action_url = re.search(r'action="(.+?)"', content).group(1) + inputs = re.findall(r'', content, flags=re.IGNORECASE) + + form_data = {} + for input in inputs: + name = None + value = None + name_match = re.search(r'name="(.+?)"', input) + if name_match is not None: + name = name_match.group(1) + value = re.search(r'value="(.+?)"', input) + if(name and value): + form_data[name] = html.unescape(value.group(1)) + + + response: aiohttp.ClientResponse = await self.request( + "POST", + external_url=action_url, + referer=f"https://authglb.digital.panasonic.com/login?{urllib.parse.urlencode(query_params)}", + content_type="application/x-www-form-urlencoded; charset=UTF-8", + allow_redirects=False, + data=urllib.parse.urlencode(form_data)) + + location = response.headers.get("Location") + + response: aiohttp.ClientResponse = await self.request( + "GET", + external_url=f"https://authglb.digital.panasonic.com{location}", + referer=self._base_url, + allow_redirects=False) + + location = response.headers.get("Location") + + response: aiohttp.ClientResponse = await self.request( + "GET", + external_url=location, + referer=self._base_url, + allow_redirects=False) + + self._access_token = response.cookies.get("accessToken").value + self._token_expiration = None self._logger.info( f"Login successful for {self.username}. Access Token Expiration: {self._token_expiration}"