From e306273ca38b0956e4fa44ad29e1f61e088cc9a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1ty=C3=A1s=20Kuti?= Date: Wed, 29 Nov 2023 16:30:39 +0100 Subject: [PATCH] Confluent-kafka consumer --- karapace/backup/api.py | 25 ++--- karapace/backup/backends/v3/backend.py | 18 ++-- karapace/backup/backends/writer.py | 8 +- karapace/backup/poll_timeout.py | 5 + karapace/kafka/common.py | 16 ++- karapace/kafka/consumer.py | 33 ++++++ karapace/kafka_utils.py | 4 +- karapace/master_coordinator.py | 3 +- karapace/schema_reader.py | 100 ++++++++---------- pytest.ini | 2 +- stubs/confluent_kafka/__init__.pyi | 4 +- stubs/confluent_kafka/cimpl.pyi | 21 +++- .../integration/backup/test_legacy_backup.py | 27 +++-- tests/integration/backup/test_v3_backup.py | 42 ++++---- tests/unit/backup/test_api.py | 4 +- tests/unit/backup/test_poll_timeout.py | 3 + 16 files changed, 185 insertions(+), 130 deletions(-) create mode 100644 karapace/kafka/consumer.py diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 04cc8f989..52a0dee36 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -24,24 +24,22 @@ from concurrent.futures import Future from enum import Enum from functools import partial -from kafka import KafkaConsumer -from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import KafkaError, TopicAlreadyExistsError -from kafka.structs import TopicPartition from karapace import constants from karapace.backup.backends.v1 import SchemaBackupV1Reader from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer, V2_MARKER from karapace.backup.backends.v3.backend import SchemaBackupV3Reader, SchemaBackupV3Writer, VerifyFailure, VerifySuccess from karapace.config import Config -from karapace.kafka.admin import KafkaAdminClient -from karapace.kafka.producer import KafkaProducer +from karapace.kafka.admin import KafkaAdminClient, TopicPartition +from karapace.kafka.consumer import KafkaConsumer +from karapace.kafka.producer import KafkaProducer, Message from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config from karapace.key_format import KeyFormatter from karapace.utils import assert_never from pathlib import Path from rich.console import Console from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed -from typing import Callable, Collection, Iterator, Literal, Mapping, NewType, Sized, TypeVar +from typing import Callable, Iterator, Literal, Mapping, NewType, Sized, TypeVar import contextlib import datetime @@ -282,9 +280,8 @@ def _consume_records( consumer: KafkaConsumer, topic_partition: TopicPartition, poll_timeout: PollTimeout, -) -> Iterator[ConsumerRecord]: - start_offset: int = consumer.beginning_offsets([topic_partition])[topic_partition] - end_offset: int = consumer.end_offsets([topic_partition])[topic_partition] +) -> Iterator[Message]: + start_offset, end_offset = consumer.get_watermark_offsets(topic_partition) last_offset = start_offset LOG.info( @@ -301,12 +298,11 @@ def _consume_records( end_offset -= 1 # high watermark to actual end offset while True: - records: Collection[ConsumerRecord] = consumer.poll(poll_timeout.milliseconds).get(topic_partition, []) - if len(records) == 0: + record: Message | None = consumer.poll(timeout=poll_timeout.seconds) + if record is None: raise StaleConsumerError(topic_partition, start_offset, end_offset, last_offset, poll_timeout) - for record in records: - yield record - last_offset = record.offset # pylint: disable=undefined-loop-variable + yield record + last_offset = record.offset() if last_offset >= end_offset: break @@ -528,6 +524,7 @@ def create_backup( with _consumer(config, topic_name) as consumer: (partition,) = consumer.partitions_for_topic(topic_name) topic_partition = TopicPartition(topic_name, partition) + consumer.assign([topic_partition]) try: data_file = _write_partition( diff --git a/karapace/backup/backends/v3/backend.py b/karapace/backup/backends/v3/backend.py index bd9b35dbd..d95b3c80b 100644 --- a/karapace/backup/backends/v3/backend.py +++ b/karapace/backup/backends/v3/backend.py @@ -10,11 +10,11 @@ from .schema import ChecksumAlgorithm, DataFile, Header, Metadata, Record from .writers import write_metadata, write_record from dataclasses import dataclass -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.backends.reader import BaseBackupReader, Instruction, ProducerSend, RestoreTopic from karapace.backup.backends.writer import BytesBackupWriter, StdOut from karapace.backup.safe_writer import bytes_writer, staging_directory from karapace.dataclasses import default_dataclass +from karapace.kafka.producer import Message from karapace.utils import assert_never from karapace.version import __version__ from pathlib import Path @@ -334,9 +334,9 @@ def store_metadata( def store_record( self, buffer: IO[bytes], - record: ConsumerRecord, + record: Message, ) -> None: - stats: Final = self._partition_stats[record.partition] + stats: Final = self._partition_stats[record.partition()] checksum_checkpoint: Final = stats.get_checkpoint( records_threshold=self._max_records_per_checkpoint, bytes_threshold=self._max_bytes_per_checkpoint, @@ -345,16 +345,16 @@ def store_record( write_record( buffer, record=Record( - key=record.key, - value=record.value, - headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers), - offset=record.offset, - timestamp=record.timestamp, + key=record.key(), + value=record.value(), + headers=tuple(Header(key=key.encode(), value=value) for key, value in record.headers() or []), + offset=record.offset(), + timestamp=record.timestamp()[1], checksum_checkpoint=checksum_checkpoint, ), running_checksum=stats.running_checksum, ) stats.update( bytes_offset=buffer.tell() - offset_start, - record_offset=record.offset, + record_offset=record.offset(), ) diff --git a/karapace/backup/backends/writer.py b/karapace/backup/backends/writer.py index c2079eb04..08b4eed19 100644 --- a/karapace/backup/backends/writer.py +++ b/karapace/backup/backends/writer.py @@ -4,8 +4,8 @@ """ from __future__ import annotations -from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.safe_writer import bytes_writer, str_writer +from karapace.kafka.producer import Message from pathlib import Path from typing import ContextManager, Generic, IO, Iterator, Literal, Mapping, Sequence, TypeVar from typing_extensions import TypeAlias @@ -98,7 +98,7 @@ def store_metadata( def store_record( self, buffer: IO[B], - record: ConsumerRecord, + record: Message, ) -> None: """ Called in order for each record read from a topic to be backed up. It's safe to @@ -154,9 +154,9 @@ class BaseKVBackupWriter(StrBackupWriter, abc.ABC): def store_record( self, buffer: IO[str], - record: ConsumerRecord, + record: Message, ) -> None: - buffer.write(self.serialize_record(record.key, record.value)) + buffer.write(self.serialize_record(record.key(), record.value())) @staticmethod @abc.abstractmethod diff --git a/karapace/backup/poll_timeout.py b/karapace/backup/poll_timeout.py index 0a1b9e157..91d5871f1 100644 --- a/karapace/backup/poll_timeout.py +++ b/karapace/backup/poll_timeout.py @@ -49,3 +49,8 @@ def __repr__(self) -> str: def milliseconds(self) -> int: """Returns this poll timeout in milliseconds, anything smaller than a milliseconds is ignored (no rounding).""" return self.__value // timedelta(milliseconds=1) + + @cached_property + def seconds(self) -> float: + """Returns this poll timeout in seconds.""" + return self.__value / timedelta(seconds=1) diff --git a/karapace/kafka/common.py b/karapace/kafka/common.py index cb38165c8..0270e9846 100644 --- a/karapace/kafka/common.py +++ b/karapace/kafka/common.py @@ -10,7 +10,7 @@ from confluent_kafka.error import KafkaError, KafkaException from kafka.errors import AuthenticationFailedError, for_code, NoBrokersAvailable, UnknownTopicOrPartitionError from typing import Any, Callable, NoReturn, Protocol, TypedDict, TypeVar -from typing_extensions import Unpack +from typing_extensions import Literal, Unpack import logging @@ -85,6 +85,13 @@ class KafkaClientParams(TypedDict, total=False): ssl_certfile: str | None ssl_keyfile: str | None sasl_oauth_token_provider: TokenWithExpiryProvider + # Consumer-only + auto_offset_reset: Literal["smallest", "earliest", "beginning", "largest", "latest", "end", "error"] + fetch_max_wait_ms: int + enable_auto_commit: bool + group_id: str + request_timeout_ms: int + session_timeout_ms: int class _KafkaConfigMixin: @@ -128,6 +135,13 @@ def _get_config_from_params(self, bootstrap_servers: Iterable[str] | str, **para "ssl.certificate.location": params.get("ssl_certfile"), "ssl.key.location": params.get("ssl_keyfile"), "error_cb": self._error_callback, + # Consumer-only + "auto.offset.reset": params.get("auto_offset_reset"), + "fetch.wait.max.ms": params.get("fetch_max_wait_ms"), + "enable.auto.commit": params.get("enable_auto_commit"), + "request.timeout.ms": params.get("request_timeout_ms"), + "session.timeout.ms": params.get("session_timeout_ms"), + "group.id": params.get("group_id"), } config = {key: value for key, value in config.items() if value is not None} diff --git a/karapace/kafka/consumer.py b/karapace/kafka/consumer.py new file mode 100644 index 000000000..95be2bab6 --- /dev/null +++ b/karapace/kafka/consumer.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from confluent_kafka import Consumer +from confluent_kafka.admin import PartitionMetadata +from confluent_kafka.error import KafkaException +from karapace.kafka.common import _KafkaConfigMixin, KafkaClientParams, raise_from_kafkaexception +from typing import Iterable +from typing_extensions import Unpack + +import secrets + + +class KafkaConsumer(_KafkaConfigMixin, Consumer): + def __init__( + self, + topic: str, + bootstrap_servers: Iterable[str] | str, + verify_connection: bool = True, + **params: Unpack[KafkaClientParams], + ) -> None: + if "group_id" not in params: + params["group_id"] = f"karapace-{secrets.token_hex(4)}" + + super().__init__(bootstrap_servers, verify_connection, **params) + + self.subscribe([topic]) + + def partitions_for_topic(self, topic: str) -> dict[int, PartitionMetadata]: + """Returns all partition metadata for the given topic.""" + try: + return self.list_topics(topic).topics[topic].partitions + except KafkaException as exc: + raise_from_kafkaexception(exc) diff --git a/karapace/kafka_utils.py b/karapace/kafka_utils.py index c70cd530c..0e88026d7 100644 --- a/karapace/kafka_utils.py +++ b/karapace/kafka_utils.py @@ -3,9 +3,8 @@ See LICENSE for details """ from .config import Config -from .utils import KarapaceKafkaClient -from kafka import KafkaConsumer from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import KafkaConsumer from karapace.kafka.producer import KafkaProducer from typing import Iterator @@ -42,7 +41,6 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons sasl_plain_password=config["sasl_plain_password"], auto_offset_reset="earliest", metadata_max_age_ms=config["metadata_max_age_ms"], - kafka_client=KarapaceKafkaClient, ) try: yield consumer diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py index 45497daa6..27e746bfb 100644 --- a/karapace/master_coordinator.py +++ b/karapace/master_coordinator.py @@ -5,7 +5,6 @@ See LICENSE for details """ from dataclasses import dataclass -from kafka import KafkaConsumer from kafka.coordinator.base import BaseCoordinator from kafka.errors import NoBrokersAvailable, NodeNotReadyError from kafka.metrics import MetricConfig, Metrics @@ -238,7 +237,7 @@ def init_schema_coordinator(self) -> None: election_strategy=self.config.get("master_election_strategy", "lowest"), group_id=self.config["group_id"], session_timeout_ms=session_timeout_ms, - request_timeout_ms=max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]), + request_timeout_ms=max(session_timeout_ms, 30000), ) self.schema_coordinator_ready.set() diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index b2fde9bbf..599af7d16 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -10,7 +10,6 @@ from contextlib import closing, ExitStack from enum import Enum from jsonschema.validators import Draft7Validator -from kafka import KafkaConsumer, TopicPartition from kafka.errors import ( InvalidReplicationFactorError, KafkaConfigurationError, @@ -24,7 +23,8 @@ from karapace.dependency import Dependency from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase -from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.admin import KafkaAdminClient, TopicPartition +from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator from karapace.offset_watcher import OffsetWatcher @@ -33,7 +33,7 @@ from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject -from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient +from karapace.utils import json_decode, JSONDecodeError from threading import Event, Thread from typing import Final, Mapping, Sequence @@ -69,11 +69,10 @@ class MessageType(Enum): def _create_consumer_from_config(config: Config) -> KafkaConsumer: # Group not set on purpose, all consumers read the same data session_timeout_ms = config["session_timeout_ms"] - request_timeout_ms = max(session_timeout_ms, KafkaConsumer.DEFAULT_CONFIG["request_timeout_ms"]) + request_timeout_ms = max(session_timeout_ms, 30000) return KafkaConsumer( config["topic_name"], enable_auto_commit=False, - api_version=(1, 0, 0), bootstrap_servers=config["bootstrap_uri"], client_id=config["client_id"], fetch_max_wait_ms=50, @@ -87,7 +86,6 @@ def _create_consumer_from_config(config: Config) -> KafkaConsumer: auto_offset_reset="earliest", session_timeout_ms=session_timeout_ms, request_timeout_ms=request_timeout_ms, - kafka_client=KarapaceKafkaClient, metadata_max_age_ms=config["metadata_max_age_ms"], ) @@ -117,7 +115,7 @@ def __init__( ) -> None: Thread.__init__(self, name="schema-reader") self.master_coordinator = master_coordinator - self.timeout_ms = 200 + self.timeout_s = 0.2 self.config = config self.database = database @@ -234,10 +232,7 @@ def _get_beginning_offset(self) -> int: assert self.consumer is not None, "Thread must be started" try: - offsets = self.consumer.beginning_offsets([TopicPartition(self.config["topic_name"], 0)]) - # Offset in the response is the offset for last offset. - # Reduce by one for matching on startup. - beginning_offset = list(offsets.values())[0] - 1 + beginning_offset, _ = self.consumer.get_watermark_offsets(TopicPartition(self.config["topic_name"], 0)) return beginning_offset except KafkaTimeoutError: LOG.exception("Reading begin offsets timed out.") @@ -253,7 +248,7 @@ def _is_ready(self) -> bool: assert self.consumer is not None, "Thread must be started" try: - offsets = self.consumer.end_offsets([TopicPartition(self.config["topic_name"], 0)]) + _, end_offset = self.consumer.get_watermark_offsets(TopicPartition(self.config["topic_name"], 0)) except KafkaTimeoutError: LOG.exception("Reading end offsets timed out.") return False @@ -263,7 +258,7 @@ def _is_ready(self) -> bool: return False # Offset in the response is the offset for the next upcoming message. # Reduce by one for actual highest offset. - self._highest_offset = list(offsets.values())[0] - 1 + self._highest_offset = end_offset - 1 cur_time = time.monotonic() time_from_last_check = cur_time - self.last_check progress_pct = 0 if not self._highest_offset else round((self.offset / self._highest_offset) * 100, 2) @@ -292,7 +287,7 @@ def _parse_message_value(raw_value: str) -> JsonObject | None: def handle_messages(self) -> None: assert self.consumer is not None, "Thread must be started" - raw_msgs = self.consumer.poll(timeout_ms=self.timeout_ms) + msgs = self.consumer.consume(timeout=self.timeout_s) if self.ready is False: self.ready = self._is_ready() @@ -306,49 +301,48 @@ def handle_messages(self) -> None: if are_we_master is True: watch_offsets = True - for _, msgs in raw_msgs.items(): - schema_records_processed_keymode_canonical = 0 - schema_records_processed_keymode_deprecated_karapace = 0 - for msg in msgs: + schema_records_processed_keymode_canonical = 0 + schema_records_processed_keymode_deprecated_karapace = 0 + for msg in msgs: + try: + key = json_decode(msg.key()) + except JSONDecodeError: + LOG.exception("Invalid JSON in msg.key()") + continue + + assert isinstance(key, dict) + msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE + # Key mode detection happens on startup. + # Default keymode is CANONICAL and preferred unless any data consumed + # has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE + # the subsequent keys are omitted from detection. + if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: + if msg_keymode == KeyMode.DEPRECATED_KARAPACE: + self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) + + value = None + if msg.value(): try: - key = json_decode(msg.key) + value = self._parse_message_value(msg.value()) except JSONDecodeError: - LOG.exception("Invalid JSON in msg.key") + LOG.exception("Invalid JSON in msg.value()") continue - assert isinstance(key, dict) - msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE - # Key mode detection happens on startup. - # Default keymode is CANONICAL and preferred unless any data consumed - # has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE - # the subsequent keys are omitted from detection. - if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: - if msg_keymode == KeyMode.DEPRECATED_KARAPACE: - self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) - - value = None - if msg.value: - try: - value = self._parse_message_value(msg.value) - except JSONDecodeError: - LOG.exception("Invalid JSON in msg.value") - continue - - self.handle_msg(key, value) - self.offset = msg.offset - - if msg_keymode == KeyMode.CANONICAL: - schema_records_processed_keymode_canonical += 1 - else: - schema_records_processed_keymode_deprecated_karapace += 1 - - if self.ready and watch_offsets: - self._offset_watcher.offset_seen(self.offset) - - self._report_schema_metrics( - schema_records_processed_keymode_canonical, - schema_records_processed_keymode_deprecated_karapace, - ) + self.handle_msg(key, value) + self.offset = msg.offset() + + if msg_keymode == KeyMode.CANONICAL: + schema_records_processed_keymode_canonical += 1 + else: + schema_records_processed_keymode_deprecated_karapace += 1 + + if self.ready and watch_offsets: + self._offset_watcher.offset_seen(self.offset) + + self._report_schema_metrics( + schema_records_processed_keymode_canonical, + schema_records_processed_keymode_deprecated_karapace, + ) def _report_schema_metrics( self, diff --git a/pytest.ini b/pytest.ini index 0f19813c2..8ed1116d2 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,4 @@ [pytest] addopts = -ra --numprocesses auto --import-mode=importlib -timeout = 60 +timeout = 90 timeout_func_only = true diff --git a/stubs/confluent_kafka/__init__.pyi b/stubs/confluent_kafka/__init__.pyi index 5762cb52a..3520fa4fb 100644 --- a/stubs/confluent_kafka/__init__.pyi +++ b/stubs/confluent_kafka/__init__.pyi @@ -1,4 +1,4 @@ from ._model import IsolationLevel -from .cimpl import Message, Producer, TopicPartition +from .cimpl import Consumer, Message, Producer, TopicPartition -__all__ = ("IsolationLevel", "Message", "Producer", "TopicPartition") +__all__ = ("Consumer", "IsolationLevel", "Message", "Producer", "TopicPartition") diff --git a/stubs/confluent_kafka/cimpl.pyi b/stubs/confluent_kafka/cimpl.pyi index 9b573c5b9..c6641cfa6 100644 --- a/stubs/confluent_kafka/cimpl.pyi +++ b/stubs/confluent_kafka/cimpl.pyi @@ -31,8 +31,13 @@ class TopicPartition: partition: int = -1, offset: int = -1001, metadata: str | None = None, - leader_epoc: int | None = None, - ) -> None: ... + leader_epoch: int | None = None, + ) -> None: + self.topic: str + self.partition: int + self.offset: int + self.metadata: str | None + self.leader_epoch: int | None class Message: def offset(self) -> int: ... @@ -41,6 +46,7 @@ class Message: def value(self) -> str | bytes | None: ... def topic(self) -> str: ... def partition(self) -> int: ... + def headers(self) -> list[tuple[str, str | bytes | None]] | None: ... class Producer: def produce( @@ -56,3 +62,14 @@ class Producer: def flush(self, timeout: float = -1) -> None: ... def list_topics(self, topic: str | None = None, timeout: float = -1) -> ClusterMetadata: ... def poll(self, timeout: float = -1) -> int: ... + +class Consumer: + def subscribe(self, topics: list[str]) -> None: ... + def get_watermark_offsets( + self, partition: TopicPartition, timeout: float | None = None, cached: bool = False + ) -> tuple[int, int]: ... + def close(self) -> None: ... + def list_topics(self, topic: str | None = None, timeout: float = -1) -> ClusterMetadata: ... + def consume(self, num_messages: int = 1, timeout: float = -1) -> list[Message]: ... + def poll(self, timeout: float = -1) -> Message | None: ... + def assign(self, partitions: list[TopicPartition]) -> None: ... diff --git a/tests/integration/backup/test_legacy_backup.py b/tests/integration/backup/test_legacy_backup.py index 3d732bd58..6b7cf8ae1 100644 --- a/tests/integration/backup/test_legacy_backup.py +++ b/tests/integration/backup/test_legacy_backup.py @@ -4,8 +4,6 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from datetime import timedelta -from kafka import KafkaConsumer from karapace.backup import api from karapace.backup.api import BackupVersion from karapace.backup.errors import StaleConsumerError @@ -13,6 +11,7 @@ from karapace.client import Client from karapace.config import set_config_defaults from karapace.kafka.admin import KafkaAdminClient +from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import is_key_in_canonical_format from karapace.utils import Expiration from pathlib import Path @@ -131,18 +130,16 @@ def _assert_canonical_key_format( schemas_topic, group_id="assert-canonical-key-format-consumer", enable_auto_commit=False, - api_version=(1, 0, 0), bootstrap_servers=bootstrap_servers, auto_offset_reset="earliest", ) - raw_msgs = consumer.poll(timeout_ms=2000) - while raw_msgs: - for _, messages in raw_msgs.items(): - for message in messages: - key = json.loads(message.key) - assert is_key_in_canonical_format(key), f"Not in canonical format: {key}" - raw_msgs = consumer.poll() + messages = consumer.poll(timeout=2) + while messages: + for message in messages: + key = json.loads(message.key()) + assert is_key_in_canonical_format(key), f"Not in canonical format: {key}" + messages = consumer.poll(timeout=2) consumer.close() @@ -174,7 +171,7 @@ async def test_backup_restore( # The restored karapace should have the previously created subject all_subjects = [] - expiration = Expiration.from_timeout(timeout=10) + expiration = Expiration.from_timeout(timeout=30) while subject not in all_subjects: expiration.raise_timeout_if_expired( msg_format="{subject} not in {all_subjects}", @@ -184,7 +181,7 @@ async def test_backup_restore( res = await registry_async_client.get("subjects") assert res.status_code == 200 all_subjects = res.json() - time.sleep(0.1) + time.sleep(1) # Test a few exotic scenarios @@ -260,13 +257,13 @@ async def test_stale_consumer( # The proper way to test this would be with quotas by throttling our client to death while using a very short # poll timeout. However, we have no way to set up quotas because all Kafka clients available to us do not # implement the necessary APIs. - with mock.patch(f"{KafkaConsumer.__module__}.{KafkaConsumer.__qualname__}._poll_once") as poll_once_mock: - poll_once_mock.return_value = {} + with mock.patch(f"{KafkaConsumer.__module__}.{KafkaConsumer.__qualname__}.poll") as poll_mock: + poll_mock.return_value = None api.create_backup( config=config, backup_location=tmp_path / "backup", topic_name=api.normalize_topic_name(None, config), version=BackupVersion.V2, - poll_timeout=PollTimeout(timedelta(seconds=1)), + poll_timeout=PollTimeout.of(seconds=1), ) assert str(e.value) == f"{registry_cluster.schemas_topic}:0#0 (0,0) after PT1S" diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index be336afcf..a01c80f59 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -5,8 +5,6 @@ from __future__ import annotations from dataclasses import fields -from kafka import TopicPartition -from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import UnknownTopicOrPartitionError from karapace.backup import api from karapace.backup.api import _consume_records, TopicName @@ -16,8 +14,8 @@ from karapace.backup.poll_timeout import PollTimeout from karapace.backup.topic_configurations import ConfigSource, get_topic_configurations from karapace.config import Config, set_config_defaults -from karapace.kafka.admin import KafkaAdminClient, NewTopic -from karapace.kafka.producer import KafkaProducer +from karapace.kafka.admin import KafkaAdminClient, NewTopic, TopicPartition +from karapace.kafka.producer import KafkaProducer, Message from karapace.kafka_utils import kafka_consumer_from_config, kafka_producer_from_config from karapace.version import __version__ from pathlib import Path @@ -182,28 +180,28 @@ def test_roundtrip_from_kafka_state( ) # First record. - assert isinstance(first_record, ConsumerRecord) - assert first_record.topic == new_topic.topic - assert first_record.partition == partition + assert isinstance(first_record, Message) + assert first_record.topic() == new_topic.topic + assert first_record.partition() == partition # Note: This might be unreliable due to not using idempotent producer, i.e. we have # no guarantee against duplicates currently. - assert first_record.offset == 0 - assert first_record.timestamp == 1683474641 - assert first_record.timestamp_type == 0 - assert first_record.key == b"bar" - assert first_record.value == b"foo" - assert first_record.headers == [] + assert first_record.offset() == 0 + assert first_record.timestamp()[1] == 1683474641 + assert first_record.timestamp()[0] == 1 + assert first_record.key() == b"bar" + assert first_record.value() == b"foo" + assert first_record.headers() is None # Second record. - assert isinstance(second_record, ConsumerRecord) - assert second_record.topic == new_topic.topic - assert second_record.partition == partition - assert second_record.offset == 1 - assert second_record.timestamp == 1683474657 - assert second_record.timestamp_type == 0 - assert second_record.key == b"foo" - assert second_record.value == b"bar" - assert second_record.headers == [ + assert isinstance(second_record, Message) + assert second_record.topic() == new_topic.topic + assert second_record.partition() == partition + assert second_record.offset() == 1 + assert second_record.timestamp()[1] == 1683474657 + assert second_record.timestamp()[0] == 1 + assert second_record.key() == b"foo" + assert second_record.value() == b"bar" + assert second_record.headers() == [ ("some-header", b"some header value"), ("other-header", b"some other header value"), ] diff --git a/tests/unit/backup/test_api.py b/tests/unit/backup/test_api.py index c112d5ffc..c451c8974 100644 --- a/tests/unit/backup/test_api.py +++ b/tests/unit/backup/test_api.py @@ -4,9 +4,7 @@ """ from __future__ import annotations -from kafka import KafkaConsumer from kafka.errors import KafkaError, TopicAlreadyExistsError -from kafka.structs import PartitionMetadata from karapace import config from karapace.backup.api import ( _admin, @@ -24,6 +22,8 @@ from karapace.backup.errors import BackupError, PartitionCountError from karapace.config import Config from karapace.constants import DEFAULT_SCHEMA_TOPIC +from karapace.kafka.admin import PartitionMetadata +from karapace.kafka.consumer import KafkaConsumer from karapace.kafka.producer import KafkaProducer from pathlib import Path from types import FunctionType diff --git a/tests/unit/backup/test_poll_timeout.py b/tests/unit/backup/test_poll_timeout.py index 9a0614038..ecd9bfce4 100644 --- a/tests/unit/backup/test_poll_timeout.py +++ b/tests/unit/backup/test_poll_timeout.py @@ -37,3 +37,6 @@ def test__repr__(self) -> None: def test_milliseconds(self) -> None: assert PollTimeout(timedelta(milliseconds=1000.5)).milliseconds == 1000 + + def test_seconds(self) -> None: + assert PollTimeout(timedelta(milliseconds=1500)).seconds == 1.5