Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Log debug topic id header for misplaced events (#1591)
Browse files Browse the repository at this point in the history
1. Reuse `HeaderTag` for adding the debug header.
2. Use `EnumMap` for slightly more efficiency.
3. Log it when we detect misplaced event.
  • Loading branch information
a1exsh authored Feb 13, 2024
1 parent d23f3ac commit fc86990
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zalando.nakadi.domain.ConsumedEvent;
import org.zalando.nakadi.domain.HeaderTag;
import org.zalando.nakadi.domain.NakadiCursor;
import org.zalando.nakadi.service.timeline.HighLevelConsumer;

Expand Down Expand Up @@ -84,7 +85,8 @@ public void streamEvents(final Runnable checkAuthorization) {
if (consumedEvents.isEmpty()) {
final List<ConsumedEvent> eventsFromKafka = eventConsumer.readEvents();
for (final ConsumedEvent evt : eventsFromKafka) {
if (eventStreamChecks.isConsumptionBlocked(evt) || !evt.getConsumerTags().isEmpty()) {
if (evt.getConsumerTags().containsKey(HeaderTag.CONSUMER_SUBSCRIPTION_ID)
|| eventStreamChecks.isConsumptionBlocked(evt)) {
continue;
}
consumedEvents.add(evt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,7 @@ public boolean isConsumptionBlocked(final ConsumedEvent event) {
return true;
}
}
if (event.getConsumerTags().isEmpty()) {
return eventStreamChecks.isConsumptionBlocked(event);
}
return !checkConsumptionAllowedFromConsumerTags(event)
return !isConsumptionAllowedFromConsumerTags(event)
|| eventStreamChecks.isConsumptionBlocked(event);
}

Expand All @@ -324,8 +321,9 @@ private boolean isMisplacedEvent(final ConsumedEvent event) {
try {
final String actualEventTypeName = kafkaRecordDeserializer.getEventTypeName(event.getEvent());
if (!expectedEventTypeName.equals(actualEventTypeName)) {
LOG.warn("Consumed event for event type '{}', but expected '{}' (at position {})",
actualEventTypeName, expectedEventTypeName, event.getPosition());
LOG.warn("Consumed event for event type '{}', but expected '{}' (at position {}), topic id: {}",
actualEventTypeName, expectedEventTypeName, event.getPosition(),
event.getConsumerTags().get(HeaderTag.DEBUG_PUBLISHER_TOPIC_ID));
return true;
}
} catch (final IOException e) {
Expand All @@ -338,11 +336,10 @@ private boolean isMisplacedEvent(final ConsumedEvent event) {
return false;
}

private boolean checkConsumptionAllowedFromConsumerTags(final ConsumedEvent event) {
return event.getConsumerTags().
getOrDefault(HeaderTag.CONSUMER_SUBSCRIPTION_ID,
subscription.getId()).
equals(subscription.getId());
private boolean isConsumptionAllowedFromConsumerTags(final ConsumedEvent event) {
return event.getConsumerTags()
.getOrDefault(HeaderTag.CONSUMER_SUBSCRIPTION_ID, subscription.getId())
.equals(subscription.getId());
}

public CursorTokenService getCursorTokenService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public void testIncorrectEventTypeDestination() throws IOException {
final var incorrectEventTypeName = "incorrect_event_type";
final var supportedEventTypeName = "correct_event_type";
final var subscription = new Subscription();
subscription.setId(UUID.randomUUID().toString());
subscription.setEventTypes(Set.of(supportedEventTypeName));
final var et = new EventType();
et.setCategory(EventCategory.BUSINESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -338,31 +339,39 @@ public static Map<HeaderTag, String> toHeaderTagMap(final String consumerString)
throw new InvalidConsumerTagException(X_CONSUMER_TAG + ": is empty!");
}

final var arr = consumerString.split(",");
final Map<HeaderTag, String> result = new HashMap<>();
final Map<HeaderTag, String> result = new EnumMap<>(HeaderTag.class);
final String[] arr = consumerString.split(",");
for (final String entry : arr) {
final var tagAndValue = entry.replaceAll("\\s", "").split("=");
if (tagAndValue.length != 2) {
throw new InvalidConsumerTagException("header tag parameter is imbalanced, " +
"expected: 2 but provided " + arr.length);
}

final var optHeaderTag = HeaderTag.fromString(tagAndValue[0]);
final Optional<HeaderTag> optHeaderTag = HeaderTag.fromString(tagAndValue[0]);
if (optHeaderTag.isEmpty()) {
throw new InvalidConsumerTagException("invalid header tag: " + tagAndValue[0]);
}
if (result.containsKey(optHeaderTag.get())) {
throw new InvalidConsumerTagException("duplicate header tag: "
+ optHeaderTag.get());

final HeaderTag headerTag = optHeaderTag.get();
if (result.containsKey(headerTag)) {
throw new InvalidConsumerTagException("duplicate header tag: " + headerTag);
}
if (optHeaderTag.get() == HeaderTag.CONSUMER_SUBSCRIPTION_ID) {

switch (headerTag) {
case CONSUMER_SUBSCRIPTION_ID:
try {
UUID.fromString(tagAndValue[1]);
} catch (IllegalArgumentException e) {
throw new InvalidConsumerTagException("header tag value: " + tagAndValue[1] + " is not an UUID");
}
break;

default:
throw new InvalidConsumerTagException("header tag unsupported: " + tagAndValue[0]);
}
result.put(optHeaderTag.get(), tagAndValue[1]);

result.put(headerTag, tagAndValue[1]);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.zalando.nakadi.domain;


import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
Expand All @@ -9,13 +8,13 @@
import java.util.stream.Stream;

public enum HeaderTag {
CONSUMER_SUBSCRIPTION_ID;
CONSUMER_SUBSCRIPTION_ID,
DEBUG_PUBLISHER_TOPIC_ID;

private static final Map<String, HeaderTag> STRING_TO_ENUM = HeaderTag.
stream().
collect(Collectors.toMap(HeaderTag::name, Function.identity()));
private static final Map<String, HeaderTag> STRING_TO_ENUM = HeaderTag.stream()
.collect(Collectors.toMap(HeaderTag::name, Function.identity()));

public static Optional<HeaderTag> fromString(final String headerTag){
public static Optional<HeaderTag> fromString(final String headerTag) {
return Optional.ofNullable(STRING_TO_ENUM.get(headerTag.toUpperCase()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.EnumMap;
import java.util.Map;

public class KafkaHeaderTagSerde {

public static void serialize(final Map<HeaderTag, String> consumerTags,
final ProducerRecord<byte[], byte[]> record) {
consumerTags.
forEach((tag, value) -> record.headers().add(tag.name(), value.getBytes(Charsets.UTF_8)));
consumerTags.forEach((tag, value) ->
record.headers().add(tag.name(), value.getBytes(Charsets.UTF_8)));
}

public static Map<HeaderTag, String> deserialize(final ConsumerRecord<byte[], byte[]> record) {
final var result = new HashMap<HeaderTag, String>();
final Map<HeaderTag, String> result = new EnumMap<>(HeaderTag.class);
record.headers().forEach(header -> {
HeaderTag.fromString(header.key()).ifPresent(tag ->
result.put(tag, new String(header.value(), Charsets.UTF_8))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.zalando.nakadi.repository.kafka;

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -172,12 +171,10 @@ private CompletableFuture<Exception> sendItem(
item.getOwner().serialize(kafkaRecord);
}

if (consumerTags!= null && !consumerTags.isEmpty()) {
if (null != consumerTags && !consumerTags.isEmpty()) {
KafkaHeaderTagSerde.serialize(consumerTags, kafkaRecord);
}

kafkaRecord.headers().add("X-Kafka-Topic", topicId.getBytes(Charsets.UTF_8));

producer.send(kafkaRecord, ((metadata, exception) -> {
if (null != exception) {
LOG.warn("Failed to publish to kafka topic {}", topicId, exception);
Expand Down Expand Up @@ -420,12 +417,10 @@ public List<NakadiRecordResult> sendEvents(final String topic,
nakadiRecord.getOwner().serialize(producerRecord);
}

if( null != consumerTags) {
if (null != consumerTags && !consumerTags.isEmpty()) {
KafkaHeaderTagSerde.serialize(consumerTags, producerRecord);
}

producerRecord.headers().add("X-Kafka-Topic", topic.getBytes(Charsets.UTF_8));

final Producer producer =
kafkaFactory.takeProducer(getProducerKey(topic, nakadiRecord.getMetadata().getPartition()));
producer.send(producerRecord, ((metadata, exception) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public class TracingService {

private static final long BUCKET_5_KB = 5000L;
private static final long BUCKET_50_KB = 50000L;

public static final String ERROR_DESCRIPTION = "error.description";
public static final String TAG_EVENT_TYPE = "event_type";

public static String getSLOBucketName(final long batchSize) {
if (batchSize > BUCKET_50_KB) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,41 @@
public class KafkaHeaderTagSerializerTest {

private static final String SUB_ID = "16120729-4a57-4607-ad3a-d526a4590e75";
private static final String TOPIC_ID = "010b65ff-7343-425d-821e-d64e014925c9";

@Test
public void testConsumerTagSerializer() {
final var consumerTags = Map.of(HeaderTag.CONSUMER_SUBSCRIPTION_ID, SUB_ID);
final Map<HeaderTag, String> consumerTags = Map.of(
HeaderTag.CONSUMER_SUBSCRIPTION_ID, SUB_ID,
HeaderTag.DEBUG_PUBLISHER_TOPIC_ID, TOPIC_ID);

final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
"topic",
"value".getBytes(StandardCharsets.UTF_8));
"topic", "value".getBytes(StandardCharsets.UTF_8));

KafkaHeaderTagSerde.serialize(consumerTags, record);

Assert.assertEquals(SUB_ID,
new String(
record.headers().lastHeader(HeaderTag.CONSUMER_SUBSCRIPTION_ID.name()).
value(),
record.headers().lastHeader(HeaderTag.CONSUMER_SUBSCRIPTION_ID.name()).value(),
Charsets.UTF_8));

Assert.assertEquals(TOPIC_ID,
new String(
record.headers().lastHeader(HeaderTag.DEBUG_PUBLISHER_TOPIC_ID.name()).value(),
Charsets.UTF_8));
}

@Test
public void testConsumerTagDeserializer() {
final ConsumerRecord<byte[], byte[]> record =
new ConsumerRecord<>("topic", 1, 1L, "key".getBytes(), "value".getBytes());
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
"topic", 1, 1L, "key".getBytes(), "value".getBytes());

record.headers().add(HeaderTag.CONSUMER_SUBSCRIPTION_ID.name(), SUB_ID.getBytes(Charsets.UTF_8));
record.headers().add(HeaderTag.DEBUG_PUBLISHER_TOPIC_ID.name(), TOPIC_ID.getBytes(Charsets.UTF_8));

final Map<HeaderTag, String> consumerTags = KafkaHeaderTagSerde.deserialize(record);

final var consumerTags = KafkaHeaderTagSerde.deserialize(record);
Assert.assertEquals(consumerTags.get(HeaderTag.CONSUMER_SUBSCRIPTION_ID), SUB_ID);
Assert.assertEquals(consumerTags.get(HeaderTag.DEBUG_PUBLISHER_TOPIC_ID), TOPIC_ID);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -104,11 +105,18 @@ private List<NakadiRecordResult> processInternal(final EventType eventType,

final Span publishingSpan = TracingService.buildNewSpan("publishing_to_kafka")
.withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), topic)
.withTag("event_type", eventType.getName())
.withTag(TracingService.TAG_EVENT_TYPE, eventType.getName())
.withTag("type", "binary")
.start();
try (Closeable ignored = TracingService.activateSpan(publishingSpan)) {
return timelineService.getTopicRepository(eventType).sendEvents(topic, records, consumerTags);
// DEBUG
final Map<HeaderTag, String> debugConsumerTags = new EnumMap<>(HeaderTag.class);
if (null != consumerTags) {
debugConsumerTags.putAll(consumerTags);
}
debugConsumerTags.put(HeaderTag.DEBUG_PUBLISHER_TOPIC_ID, topic);
// DEBUG
return timelineService.getTopicRepository(eventType).sendEvents(topic, records, debugConsumerTags);
} catch (final IOException ioe) {
throw new InternalNakadiException("Error closing active span scope", ioe);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -63,7 +64,6 @@
public class EventPublisher {

private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class);
private static final String TAG_EVENT_TYPE = "event_type";

private final NakadiSettings nakadiSettings;

Expand Down Expand Up @@ -286,7 +286,7 @@ private void validate(final List<BatchItem> batch, final EventType eventType, fi
throws EventValidationException, InternalNakadiException, NoSuchEventTypeException {

final Tracer.SpanBuilder validationSpan = TracingService.buildNewSpan("validation")
.withTag(TAG_EVENT_TYPE, eventType.getName());
.withTag(TracingService.TAG_EVENT_TYPE, eventType.getName());

try (Closeable ignored = TracingService.withActiveSpan(validationSpan)) {

Expand Down Expand Up @@ -320,6 +320,7 @@ private void submit(
final List<BatchItem> batch, final EventType eventType,
final Map<HeaderTag, String> consumerTags, final boolean delete)
throws EventPublishingException, InternalNakadiException {

final Timeline activeTimeline = timelineService.getActiveTimeline(eventType);
final String topic = activeTimeline.getTopic();

Expand All @@ -334,10 +335,17 @@ private void submit(

final Span publishingSpan = TracingService.buildNewSpan("publishing_to_kafka")
.withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), topic)
.withTag(TAG_EVENT_TYPE, eventType.getName())
.withTag(TracingService.TAG_EVENT_TYPE, eventType.getName())
.start();
try (Closeable ignored = TracingService.activateSpan(publishingSpan)) {
topicRepository.syncPostBatch(topic, batch, eventType.getName(), consumerTags, delete);
// DEBUG
final Map<HeaderTag, String> debugConsumerTags = new EnumMap<>(HeaderTag.class);
if (null != consumerTags) {
debugConsumerTags.putAll(consumerTags);
}
debugConsumerTags.put(HeaderTag.DEBUG_PUBLISHER_TOPIC_ID, topic);
// DEBUG
topicRepository.syncPostBatch(topic, batch, eventType.getName(), debugConsumerTags, delete);
} catch (final EventPublishingException epe) {
publishingSpan.log(epe.getMessage());
throw epe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ public void whenPartitionIsUnavailable207IsReportedBinary() throws Exception {

final List<NakadiRecord> records = Collections.singletonList(nakadiRecord);
final List<NakadiRecordResult> publishResult = eventPublisher.publish(eventType, records, null);
Mockito.verify(topicRepository).sendEvents(ArgumentMatchers.eq(topic), ArgumentMatchers.eq(records), eq(null));

Mockito.verify(topicRepository).sendEvents(ArgumentMatchers.eq(topic), ArgumentMatchers.eq(records), any());

Assert.assertNotEquals(NakadiRecordResult.Status.SUCCEEDED, publishResult.get(0).getStatus());
Assert.assertEquals("1", publishResult.get(0).getMetadata().getPartition());
Expand Down Expand Up @@ -728,10 +729,10 @@ public void testAvroEventWasSerialized() throws Exception {
.thenReturn("1");

final NakadiRecord nakadiRecord = mockNakadiRecord();

final List<NakadiRecord> records = Collections.singletonList(nakadiRecord);
eventPublisher.publish(eventType, records, null);
Mockito.verify(topicRepository).sendEvents(ArgumentMatchers.eq(topic), ArgumentMatchers.eq(records), eq(null));

Mockito.verify(topicRepository).sendEvents(ArgumentMatchers.eq(topic), ArgumentMatchers.eq(records), any());
}

@Test
Expand Down Expand Up @@ -929,7 +930,7 @@ private void mockFailedBinaryWriteToKafka() {
new TimeoutException()));
}
return resps;
}).when(topicRepository).sendEvents(any(), any(), eq(null));
}).when(topicRepository).sendEvents(any(), any(), any());
}

}

0 comments on commit fc86990

Please sign in to comment.