diff --git a/README.md b/README.md index d35b6f2..929191c 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,24 @@ This repository contains a Python module for the Charge Amps' electric vehicle c The module is developed by [Kirei AB](https://www.kirei.se) and is not supported by [Charge Amps AB](https://chargeamps.com). +## How to use + +The simplest way is to install via `pip`, as in: + +``` +pip install chargeamps +``` + +If you need access to the organisation API calls, you need to specify this feature upon installation: + +``` +pip install chargeamps["organisations"] +``` ## External API Key You need an API key to use the Charge Amps external API. Contact [Charge Amps Support](mailto:support@chargeamps.com) for extensive API documentation and API key. - ## References - [Charge Amps External REST API](https://eapi.charge.space/swagger/) diff --git a/chargeamps/external.py b/chargeamps/external.py index f40726d..37df430 100644 --- a/chargeamps/external.py +++ b/chargeamps/external.py @@ -19,6 +19,8 @@ ChargePointStatus, ChargingSession, StartAuth, + User, + Partner, ) API_BASE_URL = "https://eapi.charge.space" @@ -44,6 +46,7 @@ def __init__( self._token = None self._token_expire = 0 self._refresh_token = None + self._user: User = None async def shutdown(self) -> None: await self._session.close() @@ -72,6 +75,7 @@ async def _ensure_token(self) -> None: except HTTPException: self._logger.warning("Token refresh failed") self._token = None + self._user = None self._refresh_token = None else: self._token = None @@ -89,6 +93,7 @@ async def _ensure_token(self) -> None: except HTTPException as exc: self._logger.error("Login failed") self._token = None + self._user = None self._refresh_token = None self._token_expire = 0 raise exc @@ -100,6 +105,7 @@ async def _ensure_token(self) -> None: response_payload = await response.json() self._token = response_payload["token"] + self._user = response_payload["user"] self._refresh_token = response_payload["refreshToken"] token_payload = jwt.decode(self._token, options={"verify_signature": False}) @@ -209,6 +215,13 @@ async def set_chargepoint_connector_settings( request_uri = f"/api/{API_VERSION}/chargepoints/{charge_point_id}/connectors/{connector_id}/settings" await self._put(request_uri, json=payload) + async def get_partner(self, charge_point_id: str) -> Partner: + """Get partner details""" + request_uri = f"/api/{API_VERSION}/chargepoints/{charge_point_id}/partner" + response = await self._get(request_uri) + payload = await response.json() + return Partner.model_validate(payload) + async def remote_start( self, charge_point_id: str, connector_id: int, start_auth: StartAuth ) -> None: @@ -226,3 +239,16 @@ async def reboot(self, charge_point_id) -> None: """Reboot chargepoint""" request_uri = f"/api/{API_VERSION}/chargepoints/{charge_point_id}/reboot" await self._put(request_uri, json="{}") + + async def get_logged_in_user(self) -> User: + """Get authenticated user info""" + if not self._user or not isinstance(self._user, dict): + raise ValueError("No user is currently logged in") + + user_id = self._user["id"] + + request_uri = f"/api/{API_VERSION}/users/{user_id}" + response = await self._get(request_uri) + payload = await response.json() + + return User.model_validate(payload) diff --git a/chargeamps/models.py b/chargeamps/models.py index 1382cc7..3a33315 100644 --- a/chargeamps/models.py +++ b/chargeamps/models.py @@ -15,6 +15,16 @@ ] +def feature_required(feature_flag): + def decorator(cls): + if feature_flag == "organisations": + return cls + else: + raise ImportError(f"Feature '{feature_flag}' is not enabled") + + return decorator + + class FrozenBaseSchema(BaseModel): model_config = ConfigDict( alias_generator=to_camel, @@ -94,3 +104,61 @@ class StartAuth(FrozenBaseSchema): rfid_format: str rfid: str external_transaction_id: str + + +class RfidTag(FrozenBaseSchema): + active: bool + rfid: str | None + rfidDec: str | None + rfidDecReverse: str | None + + +class User(FrozenBaseSchema): + id: str + first_name: str | None + last_name: str | None + email: str | None + mobile: str | None + rfid_tags: Optional[list[RfidTag]] + user_status: str + + +class Partner(FrozenBaseSchema): + id: int + name: str + description: str + email: str + phone: str + + +# Only way to register new RFID seems to be through an Organization +@feature_required("organisations") +class Rfid(FrozenBaseSchema): + rfid: str | None + rfidDec: str | None + rfidDecReverse: str | None + + +@feature_required("organisations") +class Organisation(FrozenBaseSchema): + id: str + name: str + description: str + + +@feature_required("organisations") +class OrganisationChargingSession(FrozenBaseSchema): + id: int + charge_point_id: Optional[str] + connector_id: int + user_id: str + rfid: Optional[str] + rfidDec: Optional[str] + rfidDecReverse: Optional[str] + organisation_id: Optional[str] + session_type: str + start_time: Optional[CustomDateTime] = None + end_time: Optional[CustomDateTime] = None + external_transaction_id: Optional[str] + total_consumption_kwh: float + external_id: Optional[str] diff --git a/chargeamps/organisation.py b/chargeamps/organisation.py new file mode 100644 index 0000000..4903ca9 --- /dev/null +++ b/chargeamps/organisation.py @@ -0,0 +1,261 @@ +from datetime import datetime +import textwrap + +from chargeamps.external import ChargeAmpsExternalClient + +from .models import ( + ChargePoint, + ChargePointStatus, + Rfid, + RfidTag, + Partner, + Organisation, + User, + OrganisationChargingSession, + feature_required, +) + +API_BASE_URL = "https://eapi.charge.space" +API_VERSION = "v5" + + +@feature_required("organisations") +class OrganisationClient(ChargeAmpsExternalClient): + def __init__( + self, + email: str, + password: str, + api_key: str, + api_base_url: str = None, + ): + super().__init__(email, password, api_key, api_base_url) + + async def get_organisations(self) -> list[Organisation]: + """Get all associated organisation's details""" + request_uri = f"/api/{API_VERSION}/organisations" + response = await self._get(request_uri) + payload = await response.json() + + return [Organisation.model_validate(org) for org in payload] + + async def get_organisation(self, org_id: str) -> Organisation: + """Get organisation details""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}" + response = await self._get(request_uri) + payload = await response.json() + return Organisation.model_validate(payload) + + async def get_organisation_chargepoints(self, org_id: str) -> list[ChargePoint]: + """Get all charge points for organisation""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/chargepoints" + response = await self._get(request_uri) + payload = await response.json() + + return [ChargePoint.model_validate(cp) for cp in payload] + + async def get_organisation_chargepoint_statuses( + self, org_id: str + ) -> list[ChargePointStatus]: + """Get all charge points' status""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/chargepoints/statuses" + response = await self._get(request_uri) + payload = await response.json() + + return [ChargePointStatus.model_validate(cp) for cp in payload] + + def is_valid_hex(self, rfid: str) -> bool: + return len(rfid) % 2 == 0 and all(char in "0123456789ABCDEF" for char in rfid) + + def verify_rfid_length(self, rfid: str, length: int | None = None) -> int: + length_in_bytes = len(rfid) // 2 + + if length and length != length_in_bytes: + raise ValueError(textwrap.dedent(f""" + The provided RFID does not match the provided length: + RFID {rfid}, expected length: {length}, calculated length: {length_in_bytes} + """)) + + if length_in_bytes in {4, 7, 10}: + return length_in_bytes + else: + raise ValueError("RFID length invalid, should be either 4, 7 or 10 bytes") + + def verify_rfid( + self, rfid: str, rfid_format: str | None, rfid_length: str | None, rfid_dec_format_length: str | None + ) -> dict[str, str]: + result = {} + if self.is_valid_hex(rfid): + result["rfid"] = rfid + else: + raise ValueError(f"The provided RFID value is not a valid hex value: rfid: {rfid}") + + rfid_actual_length = self.verify_rfid_length(rfid, rfid_length) + if rfid_format != "Hex": + result["rfidFormat"] = rfid_format + result["rfidLength"] = rfid_actual_length + + if rfid_format == "Hex": + if rfid_actual_length != 7: + raise ValueError(f"RFID length must be 7 bytes if the (default) format type 'Hex' is set.") + elif rfid_format == "Dec" or rfid_format == "ReverseDec": + if rfid_dec_format_length: + result["rfidDecimalFormatLength"] = rfid_dec_format_length + else: + raise ValueError("Invalid RFID format") + + return result + + async def get_organisation_charging_sessions( + self, + org_id: str, + start_time: datetime | None = None, + end_time: datetime | None = None, + rfid: str | None = None, + rfid_format: str = "Hex", # Possible values: "Hex", "Dec" and "ReverseDec" + rfid_length: int = None, + rfid_dec_format_length: int = None, + ) -> list[OrganisationChargingSession]: + """Get organisation's charging sessions""" + query_params = {} + if start_time: + query_params["startTime"] = start_time.isoformat() + if end_time: + query_params["endTime"] = end_time.isoformat() + + if rfid: + query_params.update( + self.verify_rfid(rfid, rfid_format, rfid_length, rfid_dec_format_length) + ) + + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/chargingsessions" + response = await self._get(request_uri, params=query_params) + payload = await response.json() + + return [OrganisationChargingSession.model_validate(cp) for cp in payload] + + async def get_partner(self, org_id: str) -> Partner: + """Get partner details""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/partner" + response = await self._get(request_uri) + payload = await response.json() + return Partner.model_validate(payload) + + async def get_organisation_rfids(self, org_id: str) -> list[RfidTag]: + """Get organisation's registered rfid tags""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids" + response = await self._get(request_uri) + payload = await response.json() + + return [RfidTag.model_validate(cp) for cp in payload] + + async def add_organisation_rfid( + self, org_id: str, rfid: Rfid, rfid_dec_format_length: int | None = None + ) -> RfidTag: + """Add a new RFID tag to the organisation""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids" + payload = rfid.model_dump(by_alias=True) + if rfid_dec_format_length: + payload["rfidDecimalFormatLength"] = rfid_dec_format_length + + response = await self._put(request_uri, json=payload) + payload = await response.json() + return RfidTag.model_validate(payload) + + async def get_organisation_rfid( + self, + org_id: str, + rfid: str, + rfid_format: str = "Hex", # Possible values: "Hex", "Dec" and "ReverseDec" + rfid_length: int | None = None, + rfid_dec_format_length: int | None = None + ) -> RfidTag: + """Get information about a specific RFID tag""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids/{rfid}" + query_params = {"organisationId": org_id} + query_params.update(self.verify_rfid(rfid, rfid_format, rfid_length, rfid_dec_format_length)) + + response = await self._get(request_uri, params=query_params) + payload = await response.json() + return RfidTag.model_validate(payload) + + async def revoke_organisation_rfid(self, org_id: str, rfid: Rfid) -> None: + """Revoke an RFID tag""" + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/rfids/revoke" + rfid_id = rfid.rfid + payload = {"rfid": rfid_id} + await self._put(request_uri, json=payload) + + async def get_organisation_users( + self, org_id: str, rfid: bool = False, rfid_dec_format_length: int | None = None + ) -> list[User]: + """Get organisation's registered users""" + query_params = {} + if rfid_dec_format_length: + query_params["rfidDecimalFormatLength"] = rfid_dec_format_length + if rfid: + query_params["expand"] = "rfid" + + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/users" + response = await self._get(request_uri, params=query_params) + payload = await response.json() + + res = [] + for rfid in payload: + res.append(User.model_validate(rfid)) + return res + + async def add_organisation_user( + self, + org_id: str, + first_name: str | None = None, + last_name: str | None = None, + email: str | None = None, + mobile: str | None = None, + rfid: list[Rfid] | None = None, + password: str | None = None, + ) -> User: + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/users" + payload = {} + + if first_name: + payload["firstName"] = first_name + if last_name: + payload["lastName"] = last_name + if email: + payload["email"] = email + if mobile: + payload["mobile"] = mobile + if rfid: + payload["rfidTags"] = [tag.model_dump(by_alias=True) for tag in rfid] + if password: + if len(password) >= 8: + payload["password"] = password + else: + raise ValueError( + "The provided password is too short, must be at least 8 characters" + ) + + response = await self._post(request_uri, json=payload) + new_user = await response.json() + + return User.model_validate(new_user) + + async def get_organisation_user( + self, + org_id: str, + user_id: str, + rfid: bool = False, + rfid_dec_format_length: int | None = None, + ) -> User: + """Get organisation's registered users""" + query_params = {} + if rfid_dec_format_length: + query_params["rfidDecimalFormatLength"] = rfid_dec_format_length + if rfid: + query_params["expand"] = "rfid" + + request_uri = f"/api/{API_VERSION}/organisations/{org_id}/users/{user_id}" + response = await self._get(request_uri) + payload = await response.json() + + return User.model_validate(payload) diff --git a/examples/org_test.py b/examples/org_test.py new file mode 100644 index 0000000..553c79c --- /dev/null +++ b/examples/org_test.py @@ -0,0 +1,33 @@ +import asyncio +import json + +from chargeamps.organisation import OrganisationClient + + +async def test(): + with open("config.json") as input_file: + c = json.load(input_file) + client = OrganisationClient( + email=c["username"], password=c["password"], api_key=c["api_key"] + ) + + chargepoints = await client.get_organisation_chargepoints() + print(chargepoints) + + status = await client.get_chargepoint_status(chargepoints[0].id) + print("Status:", status) + + for c in status.connector_statuses: + settings = await client.get_chargepoint_connector_settings( + c.charge_point_id, c.connector_id + ) + print("Before:", settings) + settings.max_current = 11 + await client.set_chargepoint_connector_settings(settings) + print("After:", settings) + + await client.shutdown() + + +if __name__ == "__main__": + asyncio.run(test()) diff --git a/examples/test.py b/examples/test.py index e3a9f7a..0a5247a 100644 --- a/examples/test.py +++ b/examples/test.py @@ -5,7 +5,7 @@ async def test(): - with open("credentials.json") as input_file: + with open("config.json") as input_file: c = json.load(input_file) client = ChargeAmpsExternalClient( email=c["username"], password=c["password"], api_key=c["api_key"]