generated from ludeeus/integration_blueprint
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
7 changed files
with
456 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
"""Fuel Prices integration.""" | ||
|
||
import logging | ||
|
||
from pyfuelprices import FuelPrices, SOURCE_MAP | ||
|
||
from homeassistant.config_entries import ConfigEntry | ||
from homeassistant.const import Platform | ||
from homeassistant.core import ( | ||
HomeAssistant, | ||
ServiceCall, | ||
ServiceResponse, | ||
SupportsResponse, | ||
) | ||
from homeassistant.exceptions import HomeAssistantError | ||
|
||
from .const import DOMAIN | ||
from .coordinator import FuelPricesCoordinator | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
PLATFORMS = [] | ||
|
||
|
||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: | ||
"""Create ConfigEntry.""" | ||
hass.data.setdefault(DOMAIN, {}) | ||
_LOGGER.debug("Got request to setup entry.") | ||
try: | ||
fuel_prices: FuelPrices = FuelPrices.create( | ||
enabled_sources=entry.data.get("sources", []) | ||
) | ||
await fuel_prices.update() | ||
hass.data[DOMAIN][entry.entry_id] = FuelPricesCoordinator( | ||
hass, fuel_prices, entry.entry_id | ||
) | ||
except Exception as err: | ||
_LOGGER.error(err) | ||
raise CannotConnect from err | ||
|
||
async def update_listener(hass: HomeAssistant, entry: ConfigEntry): | ||
"""Update listener.""" | ||
await hass.config_entries.async_reload(entry.entry_id) | ||
|
||
entry.async_on_unload(entry.add_update_listener(update_listener)) | ||
|
||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) | ||
|
||
def handle_fuel_location_lookup(call: ServiceCall) -> ServiceResponse: | ||
"""Handle a fuel location lookup call.""" | ||
radius = call.data.get("radius", 5.0) | ||
lat = call.data.get("latitude") | ||
long = call.data.get("longitude") | ||
location_ids = fuel_prices.find_fuel_locations_from_point((lat, long), radius) | ||
locations = [] | ||
for loc_id in location_ids: | ||
loc = fuel_prices.get_fuel_location(loc_id) | ||
built = { | ||
"name": loc.name, | ||
"last_update": loc.last_updated, | ||
"address": loc.address, | ||
"latitude": loc.lat, | ||
"longitude": loc.long, | ||
"brand": loc.brand, | ||
} | ||
for fuel in loc.available_fuels: | ||
built[fuel.fuel_type] = fuel.cost | ||
locations.append(built) | ||
|
||
return {"items": locations, "sources": []} | ||
|
||
hass.services.async_register( | ||
DOMAIN, | ||
"find_fuel_station", | ||
handle_fuel_location_lookup, | ||
supports_response=SupportsResponse.ONLY, | ||
) | ||
|
||
return True | ||
|
||
|
||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: | ||
"""Unload a config entry.""" | ||
_LOGGER.debug("Unloading config entry %s", entry.entry_id) | ||
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS): | ||
hass.data[DOMAIN].pop(entry.entry_id) | ||
|
||
return unload_ok | ||
|
||
|
||
class CannotConnect(HomeAssistantError): | ||
"""Error to indicate we cannot connect.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,249 @@ | ||
"""Config flow for MSFT Family Safety.""" | ||
|
||
import logging | ||
from typing import Any | ||
|
||
from pyfuelprices import SOURCE_MAP | ||
import voluptuous as vol | ||
|
||
from homeassistant import config_entries | ||
from homeassistant.data_entry_flow import FlowResult | ||
from homeassistant.exceptions import HomeAssistantError | ||
from homeassistant.helpers import selector | ||
from homeassistant.helpers import config_validation as cv | ||
from homeassistant.const import CONF_LATITUDE, CONF_LONGITUDE, CONF_RADIUS, CONF_NAME | ||
|
||
from .const import DOMAIN, NAME | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
AREA_SCHEMA = vol.Schema( | ||
{ | ||
vol.Required(CONF_NAME): selector.TextSelector(), | ||
vol.Required(CONF_RADIUS, default=5.0): selector.NumberSelector( | ||
selector.NumberSelectorConfig( | ||
mode=selector.NumberSelectorMode.BOX, | ||
unit_of_measurement="miles", | ||
min=1, | ||
max=50, | ||
step=0.1, | ||
) | ||
), | ||
vol.Inclusive( | ||
CONF_LATITUDE, "coordinates", "Latitude and longitude must exist together" | ||
): cv.latitude, | ||
vol.Inclusive( | ||
CONF_LONGITUDE, "coordinates", "Latitude and longitude must exist together" | ||
): cv.longitude, | ||
} | ||
) | ||
|
||
|
||
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): | ||
"""Handle a config flow.""" | ||
|
||
VERSION = 1 | ||
configured_areas: list[dict] = [] | ||
configured_sources = [] | ||
configuring_area = {} | ||
configuring_index = -1 | ||
|
||
@property | ||
def configured_area_names(self) -> list[str]: | ||
"""Return a list of area names.""" | ||
items = [] | ||
for area in self.configured_areas: | ||
items.append(area["name"]) | ||
return items | ||
|
||
async def async_step_user( | ||
self, user_input: dict[str, Any] | None = None | ||
) -> FlowResult: | ||
"""Handle the intial step.""" | ||
# only one config entry allowed | ||
# users should use the options flow to adjust areas and sources. | ||
await self.async_set_unique_id(NAME) | ||
self._abort_if_unique_id_configured() | ||
return await self.async_step_main_menu() | ||
|
||
async def async_step_main_menu(self, _: None = None): | ||
"""Main menu.""" | ||
return self.async_show_menu( | ||
step_id="main_menu", | ||
menu_options={ | ||
"area_menu": "Configure areas to create devices/sensors", | ||
"sources": "Configure data collector sources", | ||
"finished": "Complete setup", | ||
}, | ||
) | ||
|
||
async def async_step_sources(self, user_input: dict[str, Any] | None = None): | ||
"""Sources configuration step.""" | ||
if user_input is not None: | ||
self.configured_sources = user_input["sources"] | ||
return await self.async_step_main_menu(None) | ||
return self.async_show_form( | ||
step_id="sources", | ||
data_schema=vol.Schema( | ||
{ | ||
vol.Optional( | ||
"sources", default=self.configured_sources | ||
): selector.SelectSelector( | ||
selector.SelectSelectorConfig( | ||
mode=selector.SelectSelectorMode.DROPDOWN, | ||
options=[k for k in SOURCE_MAP], | ||
multiple=True, | ||
) | ||
) | ||
} | ||
), | ||
) | ||
|
||
async def async_step_area_menu(self, _: None = None) -> FlowResult: | ||
"""Show the area menu.""" | ||
return self.async_show_menu( | ||
step_id="area_menu", | ||
menu_options=[ | ||
"area_create", | ||
"area_update_select", | ||
"area_delete", | ||
"main_menu", | ||
], | ||
) | ||
|
||
async def async_step_area_create(self, user_input: dict[str, Any] | None = None): | ||
"""Handle an area configuration.""" | ||
errors: dict[str, str] = {} | ||
if user_input is not None: | ||
self.configured_areas.append( | ||
{ | ||
CONF_NAME: user_input[CONF_NAME], | ||
CONF_LATITUDE: user_input[CONF_LATITUDE], | ||
CONF_LONGITUDE: user_input[CONF_LONGITUDE], | ||
CONF_RADIUS: user_input[CONF_RADIUS], | ||
} | ||
) | ||
return await self.async_step_area_menu() | ||
return self.async_show_form( | ||
step_id="area_create", data_schema=AREA_SCHEMA, errors=errors | ||
) | ||
|
||
async def async_step_area_update_select( | ||
self, user_input: dict[str, Any] | None = None | ||
): | ||
"""Show a menu to allow the user to select what option to update.""" | ||
if user_input is not None: | ||
for i, data in enumerate(self.configured_areas): | ||
if self.configured_areas[i]["name"] == user_input[CONF_NAME]: | ||
self.configuring_area = data | ||
self.configuring_index = i | ||
break | ||
return await self.async_step_area_update() | ||
if len(self.configured_areas) > 0: | ||
return self.async_show_form( | ||
step_id="area_update_select", | ||
data_schema=vol.Schema( | ||
{ | ||
vol.Required(CONF_NAME): selector.SelectSelector( | ||
selector.SelectSelectorConfig( | ||
mode=selector.SelectSelectorMode.LIST, | ||
options=self.configured_area_names, | ||
) | ||
) | ||
} | ||
), | ||
) | ||
return await self.async_step_area_menu() | ||
|
||
async def async_step_area_update(self, user_input: dict[str, Any] | None = None): | ||
"""Handle an area update.""" | ||
errors: dict[str, str] = {} | ||
if user_input is not None: | ||
self.configured_areas.pop(self.configuring_index) | ||
self.configured_areas.append( | ||
{ | ||
CONF_NAME: user_input[CONF_NAME], | ||
CONF_LATITUDE: user_input[CONF_LATITUDE], | ||
CONF_LONGITUDE: user_input[CONF_LONGITUDE], | ||
CONF_RADIUS: user_input[CONF_RADIUS], | ||
} | ||
) | ||
return await self.async_step_area_menu() | ||
return self.async_show_form( | ||
step_id="area_update", | ||
data_schema=vol.Schema( | ||
{ | ||
vol.Required( | ||
CONF_NAME, default=self.configuring_area[CONF_NAME] | ||
): selector.TextSelector(), | ||
vol.Required( | ||
CONF_RADIUS, default=self.configuring_area[CONF_RADIUS] | ||
): selector.NumberSelector( | ||
selector.NumberSelectorConfig( | ||
mode=selector.NumberSelectorMode.BOX, | ||
unit_of_measurement="miles", | ||
min=1, | ||
max=50, | ||
step=0.1, | ||
) | ||
), | ||
vol.Inclusive( | ||
CONF_LATITUDE, | ||
"coordinates", | ||
"Latitude and longitude must exist together", | ||
default=self.configuring_area[CONF_LATITUDE], | ||
): cv.latitude, | ||
vol.Inclusive( | ||
CONF_LONGITUDE, | ||
"coordinates", | ||
"Latitude and longitude must exist together", | ||
default=self.configuring_area[CONF_LONGITUDE], | ||
): cv.longitude, | ||
} | ||
), | ||
errors=errors, | ||
) | ||
|
||
async def async_step_area_delete(self, user_input: dict[str, Any] | None = None): | ||
"""Delete a configured area.""" | ||
if user_input is not None: | ||
for i, data in enumerate(self.configured_areas): | ||
if data["name"] == user_input[CONF_NAME]: | ||
self.configured_areas.pop(i) | ||
break | ||
return await self.async_step_area_menu() | ||
if len(self.configured_areas) > 0: | ||
return self.async_show_form( | ||
step_id="area_delete", | ||
data_schema=vol.Schema( | ||
{ | ||
vol.Required(CONF_NAME): selector.SelectSelector( | ||
selector.SelectSelectorConfig( | ||
mode=selector.SelectSelectorMode.LIST, | ||
options=self.configured_area_names, | ||
) | ||
) | ||
} | ||
), | ||
) | ||
return await self.async_step_area_menu() | ||
|
||
async def async_step_finished(self, user_input: dict[str, Any] | None = None): | ||
"""Final confirmation step.""" | ||
errors: dict[str, str] = {} | ||
if user_input is not None: | ||
user_input["sources"] = ( | ||
self.configured_sources | ||
if len(self.configured_sources) > 0 | ||
else [k for k in SOURCE_MAP] | ||
) | ||
user_input["areas"] = self.configured_areas | ||
return self.async_create_entry(title=NAME, data=user_input) | ||
return self.async_show_form( | ||
step_id="finished", | ||
errors=errors, | ||
) | ||
|
||
|
||
class CannotConnect(HomeAssistantError): | ||
"""Error to indicate we cannot connect.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
"""Fuel Prices integration const.""" | ||
|
||
DOMAIN = "fuel_prices" | ||
NAME = "Fuel Prices" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
"""Fuel Prices data hub.""" | ||
|
||
import logging | ||
from datetime import timedelta | ||
|
||
import async_timeout | ||
|
||
from homeassistant.core import HomeAssistant | ||
from pyfuelprices import FuelPrices | ||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class FuelPricesCoordinator(DataUpdateCoordinator): | ||
"""Fuel Prices data coordinator.""" | ||
|
||
def __init__(self, hass: HomeAssistant, api: FuelPrices, name: str) -> None: | ||
"""Init the coordinator.""" | ||
super().__init__( | ||
hass=hass, | ||
logger=_LOGGER, | ||
name=name, | ||
update_interval=timedelta(seconds=7200), | ||
) | ||
self.api: FuelPrices = api | ||
|
||
async def _async_update_data(self): | ||
"""Fetch and update data from the API.""" | ||
try: | ||
async with async_timeout.timeout(240): | ||
return await self.api.update() | ||
except Exception as err: | ||
raise UpdateFailed(f"Error communicating with API {err}") from err |
Oops, something went wrong.