Skip to content

Commit

Permalink
Support new event.stream_path parameter (#528)
Browse files Browse the repository at this point in the history
  • Loading branch information
gtopper authored Jun 19, 2024
1 parent eed7bfa commit fa2fc26
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ async def _run_loop(self):
if event is None:
event = await loop.run_in_executor(None, self._q.get)
if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"):
qualified_shard = (event.path, event.shard_id)
# ML-6065 – workaround for NUC-178
stream_path = event.stream_path if hasattr(event, "stream_path") else event.path
qualified_shard = (stream_path, event.shard_id)
offsets = self._outstanding_offsets[qualified_shard]
offsets.append(_EventOffset(event))
num_offsets_not_committed += 1
Expand Down Expand Up @@ -620,7 +622,9 @@ async def _run_loop(self):
if not event:
event = await self._q.get()
if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"):
qualified_shard = (event.path, event.shard_id)
# ML-6065 – workaround for NUC-178
stream_path = event.stream_path if hasattr(event, "stream_path") else event.path
qualified_shard = (stream_path, event.shard_id)
offsets = self._outstanding_offsets[qualified_shard]
offsets.append(_EventOffset(event))
num_offsets_not_handled += 1
Expand Down

0 comments on commit fa2fc26

Please sign in to comment.