Skip to content

Commit

Permalink
Fix acknowledged functionality in deferrable mode for PubSubPullSensor
Browse files Browse the repository at this point in the history
  • Loading branch information
MaksYermak committed May 17, 2024
1 parent 8b19b78 commit 25ca41c
Showing 1 changed file with 8 additions and 11 deletions.
19 changes: 8 additions & 11 deletions airflow/providers/google/cloud/triggers/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,19 @@ def serialize(self) -> tuple[str, dict[str, Any]]:

async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
try:
pulled_messages = None
while True:
if pulled_messages:
if pulled_messages := await self.hook.pull(
project_id=self.project_id,
subscription=self.subscription,
max_messages=self.max_messages,
return_immediately=True,
):
if self.ack_messages:
await self.message_acknowledgement(pulled_messages)
yield TriggerEvent({"status": "success", "message": pulled_messages})
return
else:
pulled_messages = await self.hook.pull(
project_id=self.project_id,
subscription=self.subscription,
max_messages=self.max_messages,
return_immediately=True,
)
self.log.info("Sleeping for %s seconds.", self.poke_interval)
await asyncio.sleep(self.poke_interval)
self.log.info("Sleeping for %s seconds.", self.poke_interval)
await asyncio.sleep(self.poke_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
return
Expand Down

0 comments on commit 25ca41c

Please sign in to comment.