Skip to content

Commit

Permalink
fix: More appropriate redis stats, run black
Browse files Browse the repository at this point in the history
  • Loading branch information
bmtcril committed Mar 13, 2024
1 parent e276143 commit e57c796
Showing 1 changed file with 61 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def run(self) -> None:
if collect_celery:
current_stats["celery"] = self.get_celery_stats()

Check warning on line 63 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L63

Added line #L63 was not covered by tests
# if collect_kafka_bus:
# current_stats["kafka_bus"] = self.get_kafka_stats()
# current_stats["kafka_bus"] = self.get_kafka_bus_stats()
# if collect_vector:
# current_stats["vector"] = self.get_vector_stats()

Expand Down Expand Up @@ -160,15 +160,74 @@ def get_redis_bus_stats(self):
'pending': []}],
"""

lag = []

Check warning on line 163 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L163

Added line #L163 was not covered by tests
for g in info["groups"]:
lag.append({g["name"]: g["lag"]})

Check warning on line 165 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L165

Added line #L165 was not covered by tests

consumer_stats = {

Check warning on line 167 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L167

Added line #L167 was not covered by tests
"total_events": info["length"],
"queue_length": info["lag"],
"queue_lengths": lag,
}

print(f"Redis bus queue length: {consumer_stats['queue_length']}")

Check warning on line 172 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L172

Added line #L172 was not covered by tests

return consumer_stats

Check warning on line 174 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L174

Added line #L174 was not covered by tests

def get_kafka_bus_stats(self):
import sys
import confluent_kafka

Check warning on line 178 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L176-L178

Added lines #L176 - L178 were not covered by tests

if len(sys.argv) < 4:
sys.stderr.write("Usage: {} <brokers> <group.id> <topic> <topic2..>\n".format(sys.argv[0]))
sys.exit(1)

Check warning on line 182 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L181-L182

Added lines #L181 - L182 were not covered by tests

brokers, group = sys.argv[1:3]

Check warning on line 184 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L184

Added line #L184 was not covered by tests

# Create consumer.
# This consumer will not join the group, but the group.id is required by
# committed() to know which group to get offsets for.
consumer = confluent_kafka.Consumer({'bootstrap.servers': brokers,

Check warning on line 189 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L189

Added line #L189 was not covered by tests
'group.id': group})

print("%-50s %9s %9s" % ("Topic [Partition]", "Committed", "Lag"))
print("=" * 72)

Check warning on line 193 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L192-L193

Added lines #L192 - L193 were not covered by tests

for topic in sys.argv[3:]:
# Get the topic's partitions
metadata = consumer.list_topics(topic, timeout=10)

Check warning on line 197 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L197

Added line #L197 was not covered by tests
if metadata.topics[topic].error is not None:
raise confluent_kafka.KafkaException(metadata.topics[topic].error)

Check warning on line 199 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L199

Added line #L199 was not covered by tests

# Construct TopicPartition list of partitions to query
partitions = [confluent_kafka.TopicPartition(topic, p) for p in metadata.topics[topic].partitions]

# Query committed offsets for this group and the given partitions
committed = consumer.committed(partitions, timeout=10)

Check warning on line 205 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L205

Added line #L205 was not covered by tests

for partition in committed:
# Get the partitions low and high watermark offsets.
(lo, hi) = consumer.get_watermark_offsets(partition, timeout=10, cached=False)

Check warning on line 209 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L209

Added line #L209 was not covered by tests

if partition.offset == confluent_kafka.OFFSET_INVALID:
offset = "-"

Check warning on line 212 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L212

Added line #L212 was not covered by tests
else:
offset = "%d" % (partition.offset)

Check warning on line 214 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L214

Added line #L214 was not covered by tests

if hi < 0:
lag = "no hwmark" # Unlikely

Check warning on line 217 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L217

Added line #L217 was not covered by tests
elif partition.offset < 0:
# No committed offset, show total message count as lag.
# The actual message count may be lower due to compaction
# and record deletions.
lag = "%d" % (hi - lo)

Check warning on line 222 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L222

Added line #L222 was not covered by tests
else:
lag = "%d" % (hi - partition.offset)

Check warning on line 224 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L224

Added line #L224 was not covered by tests

print("%-50s %9s %9s" % (

Check warning on line 226 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L226

Added line #L226 was not covered by tests
"{} [{}]".format(partition.topic, partition.partition), offset, lag))

consumer.close()

Check warning on line 229 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L229

Added line #L229 was not covered by tests


class Command(BaseCommand):

Check warning on line 232 in platform_plugin_aspects/management/commands/monitor_load_test_tracking.py

View check run for this annotation

Codecov / codecov/patch

platform_plugin_aspects/management/commands/monitor_load_test_tracking.py#L232

Added line #L232 was not covered by tests
"""
Expand Down

0 comments on commit e57c796

Please sign in to comment.