Skip to content

Commit

Permalink
Update subscription and topic assumptions given upcoming breaking cha…
Browse files Browse the repository at this point in the history
…nges (#1121)
  • Loading branch information
allenporter authored Oct 19, 2024
1 parent c24ab57 commit e116e27
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 27 deletions.
47 changes: 26 additions & 21 deletions google_nest_sdm/google_nest_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
EXPECTED_SUBSCRIBER_REGEXP = re.compile("projects/.*/subscriptions/.*")

# Used to catch a topic misconfiguration
EXPECTED_TOPIC_REGEXP = re.compile("projects/sdm-[a-z]+/topics/.*")
EXPECTED_TOPIC_REGEXP = re.compile("projects/.*/topics/.*")

WATCHDOG_CHECK_INTERVAL_SECONDS = 10

Expand Down Expand Up @@ -336,15 +336,20 @@ def __init__(
self,
auth: AbstractAuth,
project_id: str,
subscriber_id: str,
subscriber_name: str,
subscriber_factory: AbstractSubscriberFactory = DefaultSubscriberFactory(),
topic_name: str | None = None,
loop: asyncio.AbstractEventLoop | None = None,
watchdog_check_interval_seconds: float = WATCHDOG_CHECK_INTERVAL_SECONDS,
watchdog_restart_delay_min_seconds: float = WATCHDOG_RESTART_DELAY_MIN_SECONDS,
):
"""Initialize the subscriber for the specified topic."""
self._auth = auth
self._subscriber_id = subscriber_id
self._subscriber_name = subscriber_name
if topic_name is None:
self._topic_name = TOPIC_FORMAT.format(project_id=project_id)
else:
self._topic_name = topic_name
self._project_id = project_id
self._api = GoogleNestAPI(auth, project_id)
self._loop = loop or asyncio.get_running_loop()
Expand All @@ -363,8 +368,8 @@ def __init__(

@property
def subscriber_id(self) -> str:
"""Return the configured subscriber_id."""
return self._subscriber_id
"""Return the configured subscriber name."""
return self._subscriber_name

@property
def project_id(self) -> str:
Expand All @@ -385,7 +390,7 @@ def set_update_callback(

async def create_subscription(self) -> None:
"""Create the subscription if it does not already exist."""
_validate_subscription_name(self._subscriber_id)
_validate_subscription_name(self._subscriber_name)
DIAGNOSTICS.increment("create_subscription.attempt")
try:
creds = await self._auth.async_get_creds()
Expand All @@ -395,14 +400,14 @@ async def create_subscription(self) -> None:
try:
await self._subscriber_factory.async_create_subscription(
creds,
self._subscriber_id,
TOPIC_FORMAT.format(project_id=self._project_id),
self._subscriber_name,
self._topic_name,
self._loop,
)
except NotFound as err:
DIAGNOSTICS.increment("create_subscription.not_found")
raise ConfigurationException(
f"Failed to create subscription '{self._subscriber_id}' "
f"Failed to create subscription '{self._subscriber_name}' "
+ "(cloud project id incorrect?)"
) from err
except Unauthenticated as err:
Expand All @@ -411,7 +416,7 @@ async def create_subscription(self) -> None:
except GoogleAPIError as err:
DIAGNOSTICS.increment("create_subscription.api_error")
raise SubscriberException(
f"Failed to create subscription '{self._subscriber_id}': {err}"
f"Failed to create subscription '{self._subscriber_name}': {err}"
) from err

async def delete_subscription(self) -> None:
Expand All @@ -425,7 +430,7 @@ async def delete_subscription(self) -> None:

try:
await self._subscriber_factory.async_delete_subscription(
creds, self._subscriber_id, self._loop
creds, self._subscriber_name, self._loop
)
except NotFound:
DIAGNOSTICS.increment("delete_subscription.not_found")
Expand All @@ -437,14 +442,14 @@ async def delete_subscription(self) -> None:
except GoogleAPIError as err:
DIAGNOSTICS.increment("delete_subscription.api_error")
raise SubscriberException(
f"Failed to delete subscription '{self._subscriber_id}': {err}"
f"Failed to delete subscription '{self._subscriber_name}': {err}"
) from err

async def start_async(self) -> None:
"""Start the subscriber."""
_LOGGER.debug("Starting subscriber %s", self._subscriber_id)
_LOGGER.debug("Starting subscriber %s", self._subscriber_name)
DIAGNOSTICS.increment("start")
_validate_subscription_name(self._subscriber_id)
_validate_subscription_name(self._subscriber_name)
try:
creds = await self._auth.async_get_creds()
except ClientError as err:
Expand All @@ -455,30 +460,30 @@ async def start_async(self) -> None:
async with asyncio.timeout(NEW_SUBSCRIBER_THREAD_TIMEOUT_SECONDS):
self._subscriber_future = (
await self._subscriber_factory.async_new_subscriber(
creds, self._subscriber_id, self._loop, self._async_message_callback_with_timeout
creds, self._subscriber_name, self._loop, self._async_message_callback_with_timeout
)
)
except asyncio.TimeoutError as err:
_LOGGER.debug("Failed to create subscriber '%s' with timeout: %s", self._subscriber_id, err)
_LOGGER.debug("Failed to create subscriber '%s' with timeout: %s", self._subscriber_name, err)
DIAGNOSTICS.increment("start.timeout_error")
raise SubscriberException(
f"Failed to create subscriber '{self._subscriber_id}' with timeout: {err}"
f"Failed to create subscriber '{self._subscriber_name}' with timeout: {err}"
) from err
except NotFound as err:
_LOGGER.debug("Failed to create subscriber '%s' id was not found: %s", self._subscriber_id, err)
_LOGGER.debug("Failed to create subscriber '%s' id was not found: %s", self._subscriber_name, err)
DIAGNOSTICS.increment("start.not_found_error")
raise ConfigurationException(
f"Failed to create subscriber '{self._subscriber_id}' id was not found"
f"Failed to create subscriber '{self._subscriber_name}' id was not found"
) from err
except Unauthenticated as err:
_LOGGER.debug("Failed to authenticate subscriber: %s", err)
DIAGNOSTICS.increment("start.unauthenticated")
raise AuthException("Failed to authenticate subscriber: {err}") from err
except GoogleAPIError as err:
_LOGGER.debug("Failed to create subscriber '%s' with api error: %s", self._subscriber_id, err)
_LOGGER.debug("Failed to create subscriber '%s' with api error: %s", self._subscriber_name, err)
DIAGNOSTICS.increment("start.api_error")
raise SubscriberException(
f"Failed to create subscriber '{self._subscriber_id}' with api error: {err}"
f"Failed to create subscriber '{self._subscriber_name}' with api error: {err}"
) from err

if not self._healthy:
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = google_nest_sdm
version = 5.0.1
version = 6.0.0
description = Library for the Google Nest SDM API
long_description = file: README.md
long_description_content_type = text/markdown
Expand Down
34 changes: 29 additions & 5 deletions tests/test_google_nest_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from .conftest import DeviceHandler, EventCallback, StructureHandler, assert_diagnostics

PROJECT_ID = "project-id1"
SUBSCRIBER_ID = "projects/some-project-id/subscriptions/subscriber-id1"
SUBSCRIPTION_NAME = "projects/some-project-id/subscriptions/subscriber-id1"


class FakeSubscriberFactory(AbstractSubscriberFactory):
Expand Down Expand Up @@ -89,7 +89,7 @@ async def make_subscriber(
) -> GoogleNestSubscriber:
auth = await auth_client()
assert factory
return GoogleNestSubscriber(auth, PROJECT_ID, SUBSCRIBER_ID, factory)
return GoogleNestSubscriber(auth, PROJECT_ID, SUBSCRIPTION_NAME, factory)

return make_subscriber

Expand Down Expand Up @@ -291,7 +291,7 @@ async def task2() -> None:
subscriber = GoogleNestSubscriber(
auth,
PROJECT_ID,
SUBSCRIBER_ID,
SUBSCRIPTION_NAME,
subscriber_factory=subscriber_factory,
watchdog_check_interval_seconds=0.1,
watchdog_restart_delay_min_seconds=0.1,
Expand Down Expand Up @@ -427,7 +427,7 @@ async def auth_handler(request: aiohttp.web.Request) -> aiohttp.web.Response:

auth = await refreshing_auth_client()
subscriber = GoogleNestSubscriber(
auth, PROJECT_ID, SUBSCRIBER_ID, subscriber_factory
auth, PROJECT_ID, SUBSCRIPTION_NAME, subscriber_factory
)
await subscriber.start_async()
device_manager = await subscriber.async_get_device_manager()
Expand Down Expand Up @@ -456,7 +456,7 @@ async def auth_handler(request: aiohttp.web.Request) -> aiohttp.web.Response:

auth = await refreshing_auth_client()
subscriber = GoogleNestSubscriber(
auth, PROJECT_ID, SUBSCRIBER_ID, subscriber_factory
auth, PROJECT_ID, SUBSCRIPTION_NAME, subscriber_factory
)

with pytest.raises(AuthException):
Expand Down Expand Up @@ -893,3 +893,27 @@ def test_api_env_preprod() -> None:
def test_api_env_invalid() -> None:
with pytest.raises(ValueError):
get_api_env("invalid")



async def test_topic_id(
app: aiohttp.web.Application,
device_handler: DeviceHandler,
structure_handler: StructureHandler,
auth_client: Callable[[], Awaitable[AbstractAuth]],
subscriber_factory: FakeSubscriberFactory,
) -> None:
"""Test that creates a client with a specific topic id."""

auth = await auth_client()
# Just exercise the constructor
subscriber = GoogleNestSubscriber(
auth,
PROJECT_ID,
subscriber_name=SUBSCRIPTION_NAME,
topic_name="projects/some-project-id/topics/some-topic-id",
subscriber_factory=subscriber_factory,
watchdog_check_interval_seconds=0.1,
watchdog_restart_delay_min_seconds=0.1,
)
await subscriber.create_subscription()

0 comments on commit e116e27

Please sign in to comment.