diff --git a/airflow/providers/google/cloud/triggers/pubsub.py b/airflow/providers/google/cloud/triggers/pubsub.py index 6b95e283adddb..535bfe2ba1c68 100644 --- a/airflow/providers/google/cloud/triggers/pubsub.py +++ b/airflow/providers/google/cloud/triggers/pubsub.py @@ -97,22 +97,19 @@ def serialize(self) -> tuple[str, dict[str, Any]]: async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] try: - pulled_messages = None while True: - if pulled_messages: + if pulled_messages := await self.hook.pull( + project_id=self.project_id, + subscription=self.subscription, + max_messages=self.max_messages, + return_immediately=True, + ): if self.ack_messages: await self.message_acknowledgement(pulled_messages) yield TriggerEvent({"status": "success", "message": pulled_messages}) return - else: - pulled_messages = await self.hook.pull( - project_id=self.project_id, - subscription=self.subscription, - max_messages=self.max_messages, - return_immediately=True, - ) - self.log.info("Sleeping for %s seconds.", self.poke_interval) - await asyncio.sleep(self.poke_interval) + self.log.info("Sleeping for %s seconds.", self.poke_interval) + await asyncio.sleep(self.poke_interval) except Exception as e: yield TriggerEvent({"status": "error", "message": str(e)}) return