Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate timeout to _retrieve_offsets #33

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,18 @@ def send_fetches(self):
self._clean_done_fetch_futures()
return futures

def reset_offsets_if_needed(self, partitions):
def reset_offsets_if_needed(self, partitions, timeout_ms=float("inf")):
"""Lookup and set offsets for any partitions which are awaiting an
explicit reset.

Arguments:
partitions (set of TopicPartitions): the partitions to reset
"""
end_time = time.time() + timeout_ms / 1000
for tp in partitions:
# TODO: If there are several offsets to reset, we could submit offset requests in parallel
if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)
self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time())))

def _clean_done_fetch_futures(self):
while True:
Expand All @@ -156,7 +157,7 @@ def in_flight_fetches(self):
self._clean_done_fetch_futures()
return bool(self._fetch_futures)

def update_fetch_positions(self, partitions):
def update_fetch_positions(self, partitions, timeout_ms=float("inf")):
"""Update the fetch positions for the provided partitions.

Arguments:
Expand All @@ -167,6 +168,7 @@ def update_fetch_positions(self, partitions):
partition and no reset policy is available
"""
# reset the fetch position to the committed position
end_time = time.time() + timeout_ms / 1000
for tp in partitions:
if not self._subscriptions.is_assigned(tp):
log.warning("partition %s is not assigned - skipping offset"
Expand All @@ -178,12 +180,12 @@ def update_fetch_positions(self, partitions):
continue

if self._subscriptions.is_offset_reset_needed(tp):
self._reset_offset(tp)
self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time())))
elif self._subscriptions.assignment[tp].committed is None:
# there's no committed position, so we need to reset with the
# default strategy
self._subscriptions.need_offset_reset(tp)
self._reset_offset(tp)
self._reset_offset(tp, timeout_ms=max(0.0, 1000 * (end_time - time.time())))
else:
committed = self._subscriptions.assignment[tp].committed.offset
log.debug("Resetting offset for partition %s to the committed"
Expand Down Expand Up @@ -215,7 +217,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
offsets[tp] = offsets[tp][0]
return offsets

def _reset_offset(self, partition):
def _reset_offset(self, partition, timeout_ms):
"""Reset offsets for the given partition using the offset reset strategy.

Arguments:
Expand All @@ -234,7 +236,7 @@ def _reset_offset(self, partition):

log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
offsets = self._retrieve_offsets({partition: timestamp})
offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms)

if partition in offsets:
offset = offsets[partition][0]
Expand Down
26 changes: 15 additions & 11 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def partitions_for_topic(self, topic):
partitions = cluster.partitions_for_topic(topic)
return partitions

def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
def poll(self, timeout_ms=0, max_records=None, update_offsets=True, *, positions_timeout_ms=float("inf")):
"""Fetch data from assigned topics / partitions.

Records are fetched and returned in batches by topic-partition.
Expand Down Expand Up @@ -656,7 +656,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
start = time.time()
remaining = timeout_ms
while not self._closed:
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
records = self._poll_once(remaining, positions_timeout_ms, max_records, update_offsets=update_offsets)
if records:
return records

Expand All @@ -668,7 +668,7 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):

return {}

def _poll_once(self, timeout_ms, max_records, update_offsets=True):
def _poll_once(self, timeout_ms, positions_timeout_ms, max_records, update_offsets=True):
"""Do one round of polling. In addition to checking for new data, this does
any needed heart-beating, auto-commits, and offset updates.

Expand All @@ -683,7 +683,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
if not self._subscription.has_all_fetch_positions():
self._update_fetch_positions(self._subscription.missing_fetch_positions())
self._update_fetch_positions(self._subscription.missing_fetch_positions(), positions_timeout_ms)

# If data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
Expand Down Expand Up @@ -714,7 +714,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
return records

def position(self, partition):
def position(self, partition, timeout_ms=float("inf")):
"""Get the offset of the next record that will be fetched

Arguments:
Expand All @@ -728,7 +728,7 @@ def position(self, partition):
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
offset = self._subscription.assignment[partition].position
if offset is None:
self._update_fetch_positions([partition])
self._update_fetch_positions([partition], timeout_ms)
offset = self._subscription.assignment[partition].position
return offset

Expand Down Expand Up @@ -1087,7 +1087,7 @@ def _use_consumer_group(self):
return False
return True

def _update_fetch_positions(self, partitions):
def _update_fetch_positions(self, partitions, timeout_ms):
"""Set the fetch position to the committed position (if there is one)
or reset it using the offset reset policy the user has configured.

Expand All @@ -1099,23 +1099,26 @@ def _update_fetch_positions(self, partitions):
NoOffsetForPartitionError: If no offset is stored for a given
partition and no offset reset policy is defined.
"""
end_time = time.time() + timeout_ms / 1000
# Lookup any positions for partitions which are awaiting reset (which may be the
# case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do
# this check first to avoid an unnecessary lookup of committed offsets (which
# typically occurs when the user is manually assigning partitions and managing
# their own offsets).
self._fetcher.reset_offsets_if_needed(partitions)
self._fetcher.reset_offsets_if_needed(partitions, timeout_ms)

if not self._subscription.has_all_fetch_positions():
# if we still don't have offsets for all partitions, then we should either seek
# to the last committed position or reset using the auto reset policy
if (self.config['api_version'] >= (0, 8, 1) and
self.config['group_id'] is not None):
# first refresh commits for all assigned partitions
self._coordinator.refresh_committed_offsets_if_needed()
refresh_timeout_ms = max(0.0, 1000 * (end_time - time.time()))
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=refresh_timeout_ms)

# Then, do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
update_timeout_ms = max(0.0, 1000 * (end_time - time.time()))
self._fetcher.update_fetch_positions(partitions, update_timeout_ms)

def _message_generator_v2(self):
timeout_ms = 1000 * (self._consumer_timeout - time.time())
Expand Down Expand Up @@ -1145,7 +1148,8 @@ def _message_generator(self):
# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
update_timeout_ms = max(0.0, 1000 * (self._consumer_timeout - time.time()))
self._update_fetch_positions(partitions, update_timeout_ms)

poll_ms = min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms'])
self._client.poll(timeout_ms=poll_ms)
Expand Down
8 changes: 6 additions & 2 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,17 @@ def coordinator(self):
else:
return self.coordinator_id

def ensure_coordinator_ready(self):
def ensure_coordinator_ready(self, timeout_ms = float("inf")):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
end_time = time.time() + timeout_ms / 1000
with self._client._lock, self._lock:
while self.coordinator_unknown():

if time.time() >= end_time:
raise Errors.KafkaTimeoutError(
"Failed to ensure coordinator is ready in %s ms" % (timeout_ms,)
)
# Prior to 0.8.2 there was no group coordinator
# so we will just pick a node at random and treat
# it as the "coordinator"
Expand Down
9 changes: 5 additions & 4 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,17 +383,17 @@ def need_rejoin(self):

return super(ConsumerCoordinator, self).need_rejoin()

def refresh_committed_offsets_if_needed(self):
def refresh_committed_offsets_if_needed(self, timeout_ms=float("inf")):
"""Fetch committed offsets for assigned partitions."""
if self._subscription.needs_fetch_committed_offsets:
offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions())
offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions(), timeout_ms=timeout_ms)
for partition, offset in six.iteritems(offsets):
# verify assignment is still active
if self._subscription.is_assigned(partition):
self._subscription.assignment[partition].committed = offset
self._subscription.needs_fetch_committed_offsets = False

def fetch_committed_offsets(self, partitions):
def fetch_committed_offsets(self, partitions, timeout_ms=float("inf")):
"""Fetch the current committed offsets for specified partitions

Arguments:
Expand All @@ -405,8 +405,9 @@ def fetch_committed_offsets(self, partitions):
if not partitions:
return {}

end_time = time.time() + timeout_ms / 1000
while True:
self.ensure_coordinator_ready()
self.ensure_coordinator_ready(timeout_ms=max(0.0, 1000 * (end_time - time.time())))

# contact coordinator to fetch committed offsets
future = self._send_offset_fetch_request(partitions)
Expand Down
9 changes: 5 additions & 4 deletions test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from collections import OrderedDict
from unittest.mock import ANY
import itertools
import time

Expand Down Expand Up @@ -114,11 +115,11 @@ def test_update_fetch_positions(fetcher, topic, mocker):
# partition needs reset, no committed offset
fetcher._subscriptions.need_offset_reset(partition)
fetcher._subscriptions.assignment[partition].awaiting_reset = False
fetcher.update_fetch_positions([partition])
fetcher._reset_offset.assert_called_with(partition)
fetcher.update_fetch_positions([partition], timeout_ms=1234)
fetcher._reset_offset.assert_called_with(partition, timeout_ms=ANY)
assert fetcher._subscriptions.assignment[partition].awaiting_reset is True
fetcher.update_fetch_positions([partition])
fetcher._reset_offset.assert_called_with(partition)
fetcher._reset_offset.assert_called_with(partition, timeout_ms=ANY)

# partition needs reset, has committed offset
fetcher._reset_offset.reset_mock()
Expand All @@ -139,7 +140,7 @@ def test__reset_offset(fetcher, mocker):
mocked = mocker.patch.object(fetcher, '_retrieve_offsets')

mocked.return_value = {tp: (1001, None)}
fetcher._reset_offset(tp)
fetcher._reset_offset(tp, timeout_ms=1234)
assert not fetcher._subscriptions.assignment[tp].awaiting_reset
assert fetcher._subscriptions.assignment[tp].position == 1001

Expand Down