Skip to content

Commit

Permalink
Added MVG Messages sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
g4bri3lDev committed Oct 19, 2024
1 parent 8ac2fcd commit fa78516
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 21 deletions.
23 changes: 23 additions & 0 deletions custom_components/munich_public_transport/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,29 @@ def get_icon(transport_type: str) -> str:
}
return icons.get(transport_type, "mdi:train-car")

@staticmethod
async def fetch_messages() -> List[Dict[str, Any]]:
"""Fetch messages from the API."""
try:
data = await MunichTransportAPI._make_request(f"{MunichTransportAPI.BASE_URL}/messages")
messages = [
{
"title": msg["title"],
"description": msg["description"],
"type": msg["type"],
"valid_from": datetime.fromtimestamp(msg.get("validFrom", 0) / 1000).isoformat() if msg.get("validFrom") else None,
"valid_to": datetime.fromtimestamp(msg.get("validTo", 0) / 1000).isoformat() if msg.get("validTo") else None,
"lines": [line["label"] for line in msg.get("lines", [])],
}
for msg in data
]
if not messages:
_LOGGER.warning("No messages found")
return messages
except MunichTransportAPIError as e:
_LOGGER.error(f"Error fetching messages: {e}")
raise

@staticmethod
def calculate_minutes_until(timestamp: int) -> int:
"""Calculate minutes until the given timestamp."""
Expand Down
149 changes: 128 additions & 21 deletions custom_components/munich_public_transport/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def async_setup_entry(

_LOGGER.debug(f"Station: {station_name}, Lines: {selected_lines}, Directions: {selected_directions}, Count: {departure_count}, Scan Interval: {scan_interval}")

async def async_update_data():
async def async_update_departures():
"""Fetch data from API."""
try:
_LOGGER.debug(f"Fetching departures for station {station_id} ({station_name})")
Expand Down Expand Up @@ -103,27 +103,48 @@ async def async_update_data():
"next": None
}

coordinator = DataUpdateCoordinator(
async def async_update_messages():
"""Fetch message data from API."""
try:
_LOGGER.debug("Fetching transport messages")
messages = await MunichTransportAPI.fetch_messages()
_LOGGER.debug(f"Fetched {len(messages)} messages")
return {"messages": messages}
except Exception as err:
_LOGGER.error(f"Error fetching messages: {err}", exc_info=True)
return {"messages": []}

departure_coordinator = DataUpdateCoordinator(
hass,
_LOGGER,
name="munich_public_transport",
update_method=async_update_data,
update_method=async_update_departures,
update_interval=scan_interval,
)

await coordinator.async_config_entry_first_refresh()
message_coordinator = DataUpdateCoordinator(
hass,
_LOGGER,
name="munich_public_transport_messages",
update_method=async_update_messages,
update_interval=timedelta(minutes=30),
)

if coordinator.data is None:
await departure_coordinator.async_config_entry_first_refresh()
await message_coordinator.async_config_entry_first_refresh()

if departure_coordinator.data is None or message_coordinator.data is None:
raise ConfigEntryNotReady("Failed to fetch initial data")

entities = [
NextDepartureSensor(coordinator, station_name, config_entry),
AllDeparturesSensor(coordinator, station_name, config_entry)
NextDepartureSensor(departure_coordinator, station_name, config_entry),
AllDeparturesSensor(departure_coordinator, station_name, config_entry),
MessagesSensor(message_coordinator, station_name, config_entry, selected_lines)
]

for (line, destination) in coordinator.data["grouped"].keys():
for (line, destination) in departure_coordinator.data["grouped"].keys():
if line in selected_lines and destination in selected_directions:
entities.append(LineSensor(coordinator, station_name, line, destination, config_entry))
entities.append(LineSensor(departure_coordinator, station_name, line, destination, config_entry))

async_add_entities(entities, True)

Expand Down Expand Up @@ -156,12 +177,12 @@ def async_update_sensors(entry: ConfigEntry) -> None:
unique_id = f"{DOMAIN}_{station_name}_{line}_{direction}"
if unique_id not in current_entities:
_LOGGER.debug(f"Adding new entity: {unique_id}")
new_entity = LineSensor(coordinator, station_name, line, direction, entry)
new_entity = LineSensor(departure_coordinator, station_name, line, direction, entry)
entities.append(new_entity)
async_add_entities([new_entity], True)

# Update coordinator
coordinator.update_interval = timedelta(minutes=int(entry.options.get("scan_interval", entry.data.get("scan_interval", DEFAULT_SCAN_INTERVAL))))
departure_coordinator.update_interval = timedelta(minutes=int(entry.options.get("scan_interval", entry.data.get("scan_interval", DEFAULT_SCAN_INTERVAL))))

config_entry.async_on_unload(config_entry.add_update_listener(async_update_sensors))

Expand Down Expand Up @@ -208,12 +229,24 @@ def __init__(self, coordinator, station_name: str, config_entry: ConfigEntry) ->
"model": "Public Transport Station",
}

async def async_added_to_hass(self):
"""When entity is added to hass."""
await super().async_added_to_hass()
self.async_on_remove(
self.coordinator.async_add_listener(self.async_write_ha_state)
)

@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self.async_write_ha_state()

class NextDepartureSensor(MunichTransportBaseSensor):
"""Sensor for the next departure."""

def __init__(self, coordinator, station_name: str, config_entry: ConfigEntry) -> None:
def __init__(self, departure_coordinator, station_name: str, config_entry: ConfigEntry) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, station_name, config_entry)
super().__init__(departure_coordinator, station_name, config_entry)
self._attr_unique_id = f"{config_entry.entry_id}_{station_name}_next_departure"
self._attr_name = "Next Departure"

Expand Down Expand Up @@ -245,17 +278,16 @@ def extra_state_attributes(self) -> dict[str, Any]:
"type": next_dep['type'],
"occupancy": next_dep['occupancy'],
"cancelled": next_dep['cancelled'],
"messages": next_dep['messages'],
"network": next_dep['network'],
})
return attrs

class AllDeparturesSensor(MunichTransportBaseSensor):
"""Sensor for all departures."""

def __init__(self, coordinator, station_name: str, config_entry: ConfigEntry) -> None:
def __init__(self, departure_coordinator, station_name: str, config_entry: ConfigEntry) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, station_name, config_entry)
super().__init__(departure_coordinator, station_name, config_entry)
self._attr_unique_id = f"{config_entry.entry_id}_{station_name}_all_departures"
self._attr_name = "All Departures"
self._attr_icon = "mdi:train-car-multiple"
Expand Down Expand Up @@ -288,7 +320,6 @@ def extra_state_attributes(self) -> dict[str, Any]:
"type": dep['type'],
"occupancy": dep['occupancy'],
"cancelled": dep['cancelled'],
"messages": dep['messages'],
"network": dep['network'],
} for dep in self.coordinator.data["all"]
]
Expand All @@ -298,9 +329,9 @@ def extra_state_attributes(self) -> dict[str, Any]:
class LineSensor(MunichTransportBaseSensor):
"""Sensor for specific line and destination."""

def __init__(self, coordinator, station_name: str, line: str, destination: str, config_entry: ConfigEntry) -> None:
def __init__(self, departure_coordinator, station_name: str, line: str, destination: str, config_entry: ConfigEntry) -> None:
"""Initialize the sensor."""
super().__init__(coordinator, station_name, config_entry)
super().__init__(departure_coordinator, station_name, config_entry)
self._line = line
self._destination = destination
self._attr_unique_id = f"{config_entry.entry_id}_{station_name}_{line}_{destination}"
Expand Down Expand Up @@ -335,9 +366,85 @@ def extra_state_attributes(self) -> dict[str, Any]:
"minutes_until_departure": MunichTransportAPI.calculate_minutes_until(dep['realtime_departure']),
"occupancy": dep['occupancy'],
"cancelled": dep['cancelled'],
"messages": dep['messages'],
"network": dep['network'],
} for dep in departures
]
attrs["type"] = departures[0]['type']
return attrs
return attrs

class MessagesSensor(CoordinatorEntity, SensorEntity):
"""Sensor for transport messages."""

_attr_has_entity_name = True
_attr_state_class = SensorStateClass.MEASUREMENT

def __init__(self, coordinator, station_name: str, config_entry: ConfigEntry, selected_lines: list[str]) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self._attr_unique_id = f"{config_entry.entry_id}_{station_name}_messages"
self._attr_name = "Messages"
self._attr_icon = "mdi:message-alert"
self._selected_lines = selected_lines
self._attr_native_unit_of_measurement = "messages"
self._attr_device_info = {
"identifiers": {(DOMAIN, f"{config_entry.entry_id}_{station_name}")},
"name": station_name,
"manufacturer": "MVG",
"model": "Public Transport Station",
}

@property
def native_value(self) -> StateType:
"""Return the state of the sensor."""
return len(self._filter_messages(self.coordinator.data["messages"]))

@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes."""
attrs = {ATTR_ATTRIBUTION: ATTRIBUTION}
filtered_messages = self._filter_messages(self.coordinator.data["messages"])
attrs["messages"] = [self._format_message(msg) for msg in filtered_messages]
return attrs

def _filter_messages(self, messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Filter messages relevant to this station."""
now = datetime.now()
return [
msg for msg in messages
if (not msg['lines'] or any(line in self._selected_lines for line in msg['lines'])) and
(not msg['valid_from'] or datetime.fromisoformat(msg['valid_from']) <= now) and
(not msg['valid_to'] or datetime.fromisoformat(msg['valid_to']) >= now)
]

def _format_message(self, msg: dict[str, Any]) -> dict[str, Any]:
"""Format a single message."""
return {
"title": self._truncate_title(msg['title']),
"lines": self._format_lines(msg['lines']),
# "valid_from": msg['valid_from'],
# "valid_to": msg['valid_to'],
"validity": self._format_validity(msg['valid_from'], msg['valid_to'])
}

def _format_lines(self, lines: list[str]) -> list[str]:
"""Format the affected lines, removing duplicates."""
return sorted(set(lines)) if lines else ["All lines"]

def _format_validity(self, valid_from: str, valid_to: str) -> str:
"""Format the validity period."""
from_date = datetime.fromisoformat(valid_from) if valid_from else None
to_date = datetime.fromisoformat(valid_to) if valid_to else None

if from_date and to_date:
if from_date.date() == to_date.date():
return f"{from_date.strftime('%d.%m.%Y')} {from_date.strftime('%H:%M')} - {to_date.strftime('%H:%M')}"
return f"{from_date.strftime('%d.%m.%Y %H:%M')} - {to_date.strftime('%d.%m.%Y %H:%M')}"
elif from_date:
return f"From {from_date.strftime('%d.%m.%Y %H:%M')}"
elif to_date:
return f"Until {to_date.strftime('%Y.%m.%d %H:%M')}"
return "No specific time"

def _truncate_title(self, title: str, max_length: int = 100) -> str:
"""Truncate the title if it's too long."""
return title if len(title) <= max_length else title[:max_length-3] + "..."

0 comments on commit fa78516

Please sign in to comment.