diff --git a/tests/docker/ducktape-deps/kgo-verifier b/tests/docker/ducktape-deps/kgo-verifier index 176b425778ada..75b4be9f8d291 100644 --- a/tests/docker/ducktape-deps/kgo-verifier +++ b/tests/docker/ducktape-deps/kgo-verifier @@ -2,6 +2,6 @@ set -e git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git cd /opt/kgo-verifier -git reset --hard 7bbf8c883d1807cdf297fdb589d92f436604772b +git reset --hard bffac1f1358875ee6e91308229d908f40d5fe18e go mod tidy make diff --git a/tests/rptest/services/kgo_verifier_services.py b/tests/rptest/services/kgo_verifier_services.py index efbd4cab43930..286d904c53833 100644 --- a/tests/rptest/services/kgo_verifier_services.py +++ b/tests/rptest/services/kgo_verifier_services.py @@ -228,6 +228,7 @@ def clean_node(self, node: ClusterNode): self._redpanda.logger.info(f"{self.__class__.__name__}.clean_node") node.account.kill_process("kgo-verifier", clean_shutdown=False) node.account.remove("valid_offsets*json", True) + node.account.remove("latest_value*json", True) node.account.remove(f"/tmp/{self.__class__.__name__}*", True) def _remote(self, node, action, timeout=60): @@ -475,8 +476,8 @@ class ValidatorStatus: def __init__(self, name: str, valid_reads: int, invalid_reads: int, out_of_scope_invalid_reads: int, - max_offsets_consumed: Optional[int], lost_offsets: Dict[str, - int]): + max_offsets_consumed: Optional[int], + lost_offsets: Dict[str, int], tombstones_consumed: int): # Validator name is just a unique name per worker thread in kgo-verifier: useful in logging # but we mostly don't care self.name = name @@ -486,6 +487,7 @@ def __init__(self, name: str, valid_reads: int, invalid_reads: int, self.out_of_scope_invalid_reads = out_of_scope_invalid_reads self.max_offsets_consumed = max_offsets_consumed self.lost_offsets = lost_offsets + self.tombstones_consumed = tombstones_consumed @property def total_reads(self): @@ -510,7 +512,8 @@ def __str__(self): f"valid_reads={self.valid_reads}, " \ f"invalid_reads={self.invalid_reads}, " \ f"out_of_scope_invalid_reads={self.out_of_scope_invalid_reads}, " \ - f"lost_offsets={self.lost_offsets}>" + f"lost_offsets={self.lost_offsets}, " \ + f"tombstones_consumed={self.tombstones_consumed}>" class ConsumerStatus: @@ -531,7 +534,8 @@ def __init__(self, 'out_of_scope_invalid_reads': 0, 'name': "", 'max_offsets_consumed': dict(), - 'lost_offsets': dict() + 'lost_offsets': dict(), + 'tombstones_consumed': 0 } self.validator = ValidatorStatus(**validator) @@ -571,7 +575,9 @@ def __init__(self, msgs_per_producer_id=None, max_buffered_records=None, tolerate_data_loss=False, - tolerate_failed_produce=False): + tolerate_failed_produce=False, + tombstone_probability=0.0, + validate_latest_values=False): super(KgoVerifierProducer, self).__init__(context, redpanda, topic, msg_size, custom_node, debug_logs, trace_logs, username, password, @@ -590,6 +596,8 @@ def __init__(self, self._max_buffered_records = max_buffered_records self._tolerate_data_loss = tolerate_data_loss self._tolerate_failed_produce = tolerate_failed_produce + self._tombstone_probability = tombstone_probability + self._validate_latest_values = validate_latest_values @property def produce_status(self): @@ -697,6 +705,11 @@ def start_node(self, node, clean=False): if self._tolerate_failed_produce: cmd += " --tolerate-failed-produce" + if self._tombstone_probability is not None: + cmd += f" --tombstone-probability {self._tombstone_probability}" + if self._validate_latest_values: + cmd += " --validate-latest-values" + self.spawn(cmd, node) self._status_thread = StatusThread(self, node, ProduceStatus) @@ -745,7 +758,9 @@ def __init__( username: Optional[str] = None, password: Optional[str] = None, enable_tls: Optional[bool] = False, - use_transactions: Optional[bool] = False): + use_transactions: Optional[bool] = False, + compacted: Optional[bool] = False, + validate_latest_values: Optional[bool] = False): super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs, trace_logs, username, password, enable_tls) self._max_msgs = max_msgs @@ -755,6 +770,8 @@ def __init__( self._tolerate_data_loss = tolerate_data_loss self._producer = producer self._use_transactions = use_transactions + self._compacted = compacted + self._validate_latest_values = validate_latest_values def start_node(self, node, clean=False): if clean: @@ -778,6 +795,11 @@ def start_node(self, node, clean=False): cmd += " --tolerate-data-loss" if self._use_transactions: cmd += " --use-transactions" + if self._compacted: + cmd += " --compacted" + if self._validate_latest_values: + cmd += " --validate-latest-values" + self.spawn(cmd, node) self._status_thread = StatusThread(self, node, ConsumerStatus) @@ -877,7 +899,9 @@ def __init__(self, continuous=False, tolerate_data_loss=False, group_name=None, - use_transactions=False): + use_transactions=False, + compacted=False, + validate_latest_values=False): super().__init__(context, redpanda, topic, msg_size, nodes, debug_logs, trace_logs, username, password, enable_tls) @@ -889,6 +913,8 @@ def __init__(self, self._continuous = continuous self._tolerate_data_loss = tolerate_data_loss self._use_transactions = use_transactions + self._compacted = compacted + self._validate_latest_values = validate_latest_values def start_node(self, node, clean=False): if clean: @@ -915,6 +941,11 @@ def start_node(self, node, clean=False): cmd += f" --consumer_group_name {self._group_name}" if self._use_transactions: cmd += " --use-transactions" + if self._compacted: + cmd += " --compacted" + if self._validate_latest_values: + cmd += " --validate-latest-values" + self.spawn(cmd, node) self._status_thread = StatusThread(self, node, ConsumerStatus) @@ -933,7 +964,8 @@ def __init__(self, active=False, failed_transactions=0, aborted_transaction_msgs=0, - fails=0): + fails=0, + tombstones_produced=0): self.topic = topic self.sent = sent self.acked = acked @@ -947,7 +979,8 @@ def __init__(self, self.failed_transactions = failed_transactions self.aborted_transaction_messages = aborted_transaction_msgs self.fails = fails + self.tombstones_produced = tombstones_produced def __str__(self): l = self.latency - return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts} {self.failed_transactions} {self.aborted_transaction_messages} {self.fails} {l['p50']}/{l['p90']}/{l['p99']}>" + return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts} {self.failed_transactions} {self.aborted_transaction_messages} {self.fails} {self.tombstones_produced} {l['p50']}/{l['p90']}/{l['p99']}>"