Skip to content

Commit

Permalink
Fixes Panasonic auth breaking changes (#45)
Browse files Browse the repository at this point in the history
* Initial auth changes

* Adding more requests

* Fix cookie

* Response 5

* We got the token!

* Cleanup and token refresh
  • Loading branch information
cjaliaga authored Mar 29, 2024
1 parent e702330 commit c692167
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 33 deletions.
1 change: 1 addition & 0 deletions aioaquarea/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
AQUAREA_SERVICE_CONSUMPTION = "remote/v1/api/consumption"
AQUAREA_SERVICE_CONTRACT = "remote/contract"
AQUAREA_SERVICE_A2W_STATUS_DISPLAY = "remote/a2wStatusDisplay"
AQUAREA_SERVICE_AUTH_CLIENT_ID = "vf2i6hW5hA2BB2BQGfTHXM4YFyW4I06K"

PANASONIC = "Panasonic"

Expand Down
187 changes: 154 additions & 33 deletions aioaquarea/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import datetime as dt
import functools
import logging
import re
from typing import Optional
import urllib.parse
import html

import aiohttp

Expand All @@ -18,6 +20,7 @@
AQUAREA_SERVICE_DEMO_BASE,
AQUAREA_SERVICE_DEVICES,
AQUAREA_SERVICE_LOGIN,
AQUAREA_SERVICE_AUTH_CLIENT_ID,
AquareaEnvironment,
)
from .data import (
Expand Down Expand Up @@ -146,6 +149,7 @@ def __init__(
self._device_direct = (
device_direct if environment == AquareaEnvironment.PRODUCTION else False
)
self._access_token: Optional[str] = None

@property
def username(self) -> str:
Expand All @@ -170,10 +174,10 @@ def token_expiration(self) -> Optional[dt.datetime]:
@property
def is_logged(self) -> bool:
"""Return True if the user is logged in."""
if not self._token_expiration:
if not self._access_token:
return False

now = dt.datetime.astimezone(dt.datetime.utcnow(), tz=dt.timezone.utc)
now = dt.datetime.now(tz=dt.timezone.utc)
return now < self._token_expiration

@property
Expand All @@ -184,7 +188,8 @@ def logger(self) -> logging.Logger:
async def request(
self,
method: str,
url: str,
url: str = None,
external_url: str = None,
referer: str = AQUAREA_SERVICE_BASE,
throw_on_error=True,
content_type: str = "application/x-www-form-urlencoded",
Expand All @@ -199,31 +204,41 @@ async def request(
headers["content-type"] = content_type
kwargs["headers"] = headers

resp = await self._sess.request(method, self._base_url + url, **kwargs)
if external_url is not None:
url = external_url
else:
url = self._base_url + url

# Aquarea returns a 200 even if the request failed, we need to check the message property to see if it's an error
# Some errors just require to login again, so we raise a AuthenticationError in those known cases
errors = [FaultError]
if throw_on_error:
errors = await self.look_for_errors(resp)
# If we have errors, let's look for authentication errors
for error in errors:
if error.error_code in list(AuthenticationErrorCodes):
raise AuthenticationError(error.error_code, error.error_message)
resp = await self._sess.request(method, url, **kwargs)

raise ApiError(error.error_code, error.error_message)
if resp.content_type == "application/json":
data = await resp.json()

# let's check for access token and expiration time
if self._access_token and "accessToken" in data:
self._access_token = data["accessToken"]["token"]
self._token_expiration = dt.datetime.strptime(
data["accessToken"]["expires"], "%Y-%m-%dT%H:%M:%S%z"
)

# Aquarea returns a 200 even if the request failed, we need to check the message property to see if it's an error
# Some errors just require to login again, so we raise a AuthenticationError in those known cases
errors = [FaultError]
if throw_on_error:
errors = await self.look_for_errors(data)
# If we have errors, let's look for authentication errors
for error in errors:
if error.error_code in list(AuthenticationErrorCodes):
raise AuthenticationError(error.error_code, error.error_message)

raise ApiError(error.error_code, error.error_message)

return resp

async def look_for_errors(
self, response: aiohttp.ClientResponse
self, data: dict
) -> list[FaultError]:
"""Look for errors in the response and return them as a list of FaultError objects."""
if response.content_type != "application/json":
return []

data = await response.json()

if not isinstance(data, dict):
return []

Expand Down Expand Up @@ -257,27 +272,133 @@ async def _login_demo(self) -> None:
) + dt.timedelta(days=1)

async def _login_production(self) -> None:
params = {
"var.inputOmit": "false",
"var.loginId": self.username,
"var.password": self.password,
}

response: aiohttp.ClientResponse = await self.request(
"POST",
AQUAREA_SERVICE_LOGIN,
referer=self._base_url,
data=urllib.parse.urlencode(params),
headers={
"popup-screen-id": "1001",
"Registration-Id": "",
}
)

data = await response.json()
auth_state = response.cookies.get("com.auth0.state").value
query_params = {
"client_id": AQUAREA_SERVICE_AUTH_CLIENT_ID,
"audience": f"https://digital.panasonic.com/{AQUAREA_SERVICE_AUTH_CLIENT_ID}/api/v1/",
"response_type": "code",
"redirect_uri": "https://aquarea-smart.panasonic.com/authorizationCallback",
"state": auth_state,
"scope": "openid offline_access",
}

if not isinstance(data, dict):
raise InvalidData(data)
response: aiohttp.ClientResponse = await self.request(
"GET",
external_url="https://authglb.digital.panasonic.com/authorize",
referer=self._base_url,
params=query_params,
allow_redirects=False)

self._token_expiration = dt.datetime.strptime(
data["accessToken"]["expires"], "%Y-%m-%dT%H:%M:%S%z"
)
location = response.headers.get("Location")
parsed_url = urllib.parse.urlparse(location)

# Extract the value of the 'state' query parameter
query_params2 = urllib.parse.parse_qs(parsed_url.query)
state_value = query_params2.get('state', [None])[0]

response: aiohttp.ClientResponse = await self.request(
"GET",
external_url=f"https://authglb.digital.panasonic.com{location}",
referer=self._base_url,
allow_redirects=False)

csrf = response.cookies.get("_csrf").value

query_params = {
"audience": f"https://digital.panasonic.com/{AQUAREA_SERVICE_AUTH_CLIENT_ID}/api/v1/",
"client": AQUAREA_SERVICE_AUTH_CLIENT_ID,
'protocol': 'oauth2',
'redirect_uri': 'https://aquarea-smart.panasonic.com/authorizationCallback',
"response_type": "code",
"state": state_value,
"scope": "openid offline_access",
}

data = {
'client_id' : AQUAREA_SERVICE_AUTH_CLIENT_ID,
'redirect_uri':'https://aquarea-smart.panasonic.com/authorizationCallback?lang=en',
'tenant':'pdpauthglb-a1',
'response_type':'code',
'scope':'openid offline_access',
'audience':f'https://digital.panasonic.com/{AQUAREA_SERVICE_AUTH_CLIENT_ID}/api/v1/',
'_csrf':csrf,
'state':state_value,
'_intstate':'deprecated',
'username': self._username,
'password': self._password,
'lang':'en',
'connection':'PanasonicID-Authentication',
}

response: aiohttp.ClientResponse = await self.request(
"POST",
external_url="https://authglb.digital.panasonic.com/usernamepassword/login",
referer=f"https://authglb.digital.panasonic.com/login?{urllib.parse.urlencode(query_params)}",
content_type="application/json; charset=UTF-8",
headers={
"Auth0-Client": "eyJuYW1lIjoiYXV0aDAuanMtdWxwIiwidmVyc2lvbiI6IjkuMjMuMiJ9"
},
allow_redirects=False,
json=data,
throw_on_error=False)

if not response.ok and response.content_type == "application/json":
data = await response.json()
if data.get("code") == "invalid_user_password":
raise AuthenticationError(AuthenticationErrorCodes.INVALID_USERNAME_OR_PASSWORD, "Invalid username or password")

content = await response.text()
action_url = re.search(r'action="(.+?)"', content).group(1)
inputs = re.findall(r'<input([^\0]+?)>', content, flags=re.IGNORECASE)

form_data = {}
for input in inputs:
name = None
value = None
name_match = re.search(r'name="(.+?)"', input)
if name_match is not None:
name = name_match.group(1)
value = re.search(r'value="(.+?)"', input)
if(name and value):
form_data[name] = html.unescape(value.group(1))


response: aiohttp.ClientResponse = await self.request(
"POST",
external_url=action_url,
referer=f"https://authglb.digital.panasonic.com/login?{urllib.parse.urlencode(query_params)}",
content_type="application/x-www-form-urlencoded; charset=UTF-8",
allow_redirects=False,
data=urllib.parse.urlencode(form_data))

location = response.headers.get("Location")

response: aiohttp.ClientResponse = await self.request(
"GET",
external_url=f"https://authglb.digital.panasonic.com{location}",
referer=self._base_url,
allow_redirects=False)

location = response.headers.get("Location")

response: aiohttp.ClientResponse = await self.request(
"GET",
external_url=location,
referer=self._base_url,
allow_redirects=False)

self._access_token = response.cookies.get("accessToken").value
self._token_expiration = None

self._logger.info(
f"Login successful for {self.username}. Access Token Expiration: {self._token_expiration}"
Expand Down

0 comments on commit c692167

Please sign in to comment.