diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/EventStream.java b/api-consumption/src/main/java/org/zalando/nakadi/service/EventStream.java index 0e60b4085..686d55110 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/EventStream.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/EventStream.java @@ -6,6 +6,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zalando.nakadi.domain.ConsumedEvent; +import org.zalando.nakadi.domain.HeaderTag; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.service.timeline.HighLevelConsumer; @@ -84,7 +85,8 @@ public void streamEvents(final Runnable checkAuthorization) { if (consumedEvents.isEmpty()) { final List eventsFromKafka = eventConsumer.readEvents(); for (final ConsumedEvent evt : eventsFromKafka) { - if (eventStreamChecks.isConsumptionBlocked(evt) || !evt.getConsumerTags().isEmpty()) { + if (evt.getConsumerTags().containsKey(HeaderTag.CONSUMER_SUBSCRIPTION_ID) + || eventStreamChecks.isConsumptionBlocked(evt)) { continue; } consumedEvents.add(evt); diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java index 6fdce8322..d2bd542d1 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/StreamingContext.java @@ -311,10 +311,7 @@ public boolean isConsumptionBlocked(final ConsumedEvent event) { return true; } } - if (event.getConsumerTags().isEmpty()) { - return eventStreamChecks.isConsumptionBlocked(event); - } - return !checkConsumptionAllowedFromConsumerTags(event) + return !isConsumptionAllowedFromConsumerTags(event) || eventStreamChecks.isConsumptionBlocked(event); } @@ -324,8 +321,9 @@ private boolean isMisplacedEvent(final ConsumedEvent event) { try { final String actualEventTypeName = kafkaRecordDeserializer.getEventTypeName(event.getEvent()); if (!expectedEventTypeName.equals(actualEventTypeName)) { - LOG.warn("Consumed event for event type '{}', but expected '{}' (at position {})", - actualEventTypeName, expectedEventTypeName, event.getPosition()); + LOG.warn("Consumed event for event type '{}', but expected '{}' (at position {}), topic id: {}", + actualEventTypeName, expectedEventTypeName, event.getPosition(), + event.getConsumerTags().get(HeaderTag.DEBUG_PUBLISHER_TOPIC_ID)); return true; } } catch (final IOException e) { @@ -338,11 +336,10 @@ private boolean isMisplacedEvent(final ConsumedEvent event) { return false; } - private boolean checkConsumptionAllowedFromConsumerTags(final ConsumedEvent event) { - return event.getConsumerTags(). - getOrDefault(HeaderTag.CONSUMER_SUBSCRIPTION_ID, - subscription.getId()). - equals(subscription.getId()); + private boolean isConsumptionAllowedFromConsumerTags(final ConsumedEvent event) { + return event.getConsumerTags() + .getOrDefault(HeaderTag.CONSUMER_SUBSCRIPTION_ID, subscription.getId()) + .equals(subscription.getId()); } public CursorTokenService getCursorTokenService() { diff --git a/api-consumption/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java b/api-consumption/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java index c37ddd3e1..1c0ec63c5 100644 --- a/api-consumption/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java +++ b/api-consumption/src/test/java/org/zalando/nakadi/service/subscription/StreamingContextTest.java @@ -230,6 +230,7 @@ public void testIncorrectEventTypeDestination() throws IOException { final var incorrectEventTypeName = "incorrect_event_type"; final var supportedEventTypeName = "correct_event_type"; final var subscription = new Subscription(); + subscription.setId(UUID.randomUUID().toString()); subscription.setEventTypes(Set.of(supportedEventTypeName)); final var et = new EventType(); et.setCategory(EventCategory.BUSINESS); diff --git a/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java b/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java index 371a97b62..5d4d28102 100644 --- a/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java +++ b/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java @@ -42,9 +42,10 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collections; -import java.util.HashMap; +import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -338,8 +339,8 @@ public static Map toHeaderTagMap(final String consumerString) throw new InvalidConsumerTagException(X_CONSUMER_TAG + ": is empty!"); } - final var arr = consumerString.split(","); - final Map result = new HashMap<>(); + final Map result = new EnumMap<>(HeaderTag.class); + final String[] arr = consumerString.split(","); for (final String entry : arr) { final var tagAndValue = entry.replaceAll("\\s", "").split("="); if (tagAndValue.length != 2) { @@ -347,22 +348,30 @@ public static Map toHeaderTagMap(final String consumerString) "expected: 2 but provided " + arr.length); } - final var optHeaderTag = HeaderTag.fromString(tagAndValue[0]); + final Optional optHeaderTag = HeaderTag.fromString(tagAndValue[0]); if (optHeaderTag.isEmpty()) { throw new InvalidConsumerTagException("invalid header tag: " + tagAndValue[0]); } - if (result.containsKey(optHeaderTag.get())) { - throw new InvalidConsumerTagException("duplicate header tag: " - + optHeaderTag.get()); + + final HeaderTag headerTag = optHeaderTag.get(); + if (result.containsKey(headerTag)) { + throw new InvalidConsumerTagException("duplicate header tag: " + headerTag); } - if (optHeaderTag.get() == HeaderTag.CONSUMER_SUBSCRIPTION_ID) { + + switch (headerTag) { + case CONSUMER_SUBSCRIPTION_ID: try { UUID.fromString(tagAndValue[1]); } catch (IllegalArgumentException e) { throw new InvalidConsumerTagException("header tag value: " + tagAndValue[1] + " is not an UUID"); } + break; + + default: + throw new InvalidConsumerTagException("header tag unsupported: " + tagAndValue[0]); } - result.put(optHeaderTag.get(), tagAndValue[1]); + + result.put(headerTag, tagAndValue[1]); } return result; } diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/HeaderTag.java b/core-common/src/main/java/org/zalando/nakadi/domain/HeaderTag.java index bf135bdec..8e1c5889f 100644 --- a/core-common/src/main/java/org/zalando/nakadi/domain/HeaderTag.java +++ b/core-common/src/main/java/org/zalando/nakadi/domain/HeaderTag.java @@ -1,6 +1,5 @@ package org.zalando.nakadi.domain; - import java.util.Arrays; import java.util.Map; import java.util.Optional; @@ -9,13 +8,13 @@ import java.util.stream.Stream; public enum HeaderTag { - CONSUMER_SUBSCRIPTION_ID; + CONSUMER_SUBSCRIPTION_ID, + DEBUG_PUBLISHER_TOPIC_ID; - private static final Map STRING_TO_ENUM = HeaderTag. - stream(). - collect(Collectors.toMap(HeaderTag::name, Function.identity())); + private static final Map STRING_TO_ENUM = HeaderTag.stream() + .collect(Collectors.toMap(HeaderTag::name, Function.identity())); - public static Optional fromString(final String headerTag){ + public static Optional fromString(final String headerTag) { return Optional.ofNullable(STRING_TO_ENUM.get(headerTag.toUpperCase())); } diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/KafkaHeaderTagSerde.java b/core-common/src/main/java/org/zalando/nakadi/domain/KafkaHeaderTagSerde.java index a26ed5c55..4c776c7b7 100644 --- a/core-common/src/main/java/org/zalando/nakadi/domain/KafkaHeaderTagSerde.java +++ b/core-common/src/main/java/org/zalando/nakadi/domain/KafkaHeaderTagSerde.java @@ -4,19 +4,19 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; -import java.util.HashMap; +import java.util.EnumMap; import java.util.Map; public class KafkaHeaderTagSerde { public static void serialize(final Map consumerTags, final ProducerRecord record) { - consumerTags. - forEach((tag, value) -> record.headers().add(tag.name(), value.getBytes(Charsets.UTF_8))); + consumerTags.forEach((tag, value) -> + record.headers().add(tag.name(), value.getBytes(Charsets.UTF_8))); } public static Map deserialize(final ConsumerRecord record) { - final var result = new HashMap(); + final Map result = new EnumMap<>(HeaderTag.class); record.headers().forEach(header -> { HeaderTag.fromString(header.key()).ifPresent(tag -> result.put(tag, new String(header.value(), Charsets.UTF_8)) diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index 921bc4a80..a69ca2696 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -1,6 +1,5 @@ package org.zalando.nakadi.repository.kafka; -import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -172,12 +171,10 @@ private CompletableFuture sendItem( item.getOwner().serialize(kafkaRecord); } - if (consumerTags!= null && !consumerTags.isEmpty()) { + if (null != consumerTags && !consumerTags.isEmpty()) { KafkaHeaderTagSerde.serialize(consumerTags, kafkaRecord); } - kafkaRecord.headers().add("X-Kafka-Topic", topicId.getBytes(Charsets.UTF_8)); - producer.send(kafkaRecord, ((metadata, exception) -> { if (null != exception) { LOG.warn("Failed to publish to kafka topic {}", topicId, exception); @@ -420,12 +417,10 @@ public List sendEvents(final String topic, nakadiRecord.getOwner().serialize(producerRecord); } - if( null != consumerTags) { + if (null != consumerTags && !consumerTags.isEmpty()) { KafkaHeaderTagSerde.serialize(consumerTags, producerRecord); } - producerRecord.headers().add("X-Kafka-Topic", topic.getBytes(Charsets.UTF_8)); - final Producer producer = kafkaFactory.takeProducer(getProducerKey(topic, nakadiRecord.getMetadata().getPartition())); producer.send(producerRecord, ((metadata, exception) -> { diff --git a/core-common/src/main/java/org/zalando/nakadi/service/TracingService.java b/core-common/src/main/java/org/zalando/nakadi/service/TracingService.java index b7e9291f3..11913ef02 100644 --- a/core-common/src/main/java/org/zalando/nakadi/service/TracingService.java +++ b/core-common/src/main/java/org/zalando/nakadi/service/TracingService.java @@ -23,7 +23,9 @@ public class TracingService { private static final long BUCKET_5_KB = 5000L; private static final long BUCKET_50_KB = 50000L; + public static final String ERROR_DESCRIPTION = "error.description"; + public static final String TAG_EVENT_TYPE = "event_type"; public static String getSLOBucketName(final long batchSize) { if (batchSize > BUCKET_50_KB) { diff --git a/core-common/src/test/java/org/zalando/nakadi/domain/KafkaHeaderTagSerializerTest.java b/core-common/src/test/java/org/zalando/nakadi/domain/KafkaHeaderTagSerializerTest.java index 9dc05ceb3..4da07c7fc 100644 --- a/core-common/src/test/java/org/zalando/nakadi/domain/KafkaHeaderTagSerializerTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/domain/KafkaHeaderTagSerializerTest.java @@ -12,30 +12,41 @@ public class KafkaHeaderTagSerializerTest { private static final String SUB_ID = "16120729-4a57-4607-ad3a-d526a4590e75"; + private static final String TOPIC_ID = "010b65ff-7343-425d-821e-d64e014925c9"; @Test public void testConsumerTagSerializer() { - final var consumerTags = Map.of(HeaderTag.CONSUMER_SUBSCRIPTION_ID, SUB_ID); + final Map consumerTags = Map.of( + HeaderTag.CONSUMER_SUBSCRIPTION_ID, SUB_ID, + HeaderTag.DEBUG_PUBLISHER_TOPIC_ID, TOPIC_ID); + final ProducerRecord record = new ProducerRecord<>( - "topic", - "value".getBytes(StandardCharsets.UTF_8)); + "topic", "value".getBytes(StandardCharsets.UTF_8)); + KafkaHeaderTagSerde.serialize(consumerTags, record); Assert.assertEquals(SUB_ID, new String( - record.headers().lastHeader(HeaderTag.CONSUMER_SUBSCRIPTION_ID.name()). - value(), + record.headers().lastHeader(HeaderTag.CONSUMER_SUBSCRIPTION_ID.name()).value(), + Charsets.UTF_8)); + + Assert.assertEquals(TOPIC_ID, + new String( + record.headers().lastHeader(HeaderTag.DEBUG_PUBLISHER_TOPIC_ID.name()).value(), Charsets.UTF_8)); } @Test public void testConsumerTagDeserializer() { - final ConsumerRecord record = - new ConsumerRecord<>("topic", 1, 1L, "key".getBytes(), "value".getBytes()); + final ConsumerRecord record = new ConsumerRecord<>( + "topic", 1, 1L, "key".getBytes(), "value".getBytes()); + record.headers().add(HeaderTag.CONSUMER_SUBSCRIPTION_ID.name(), SUB_ID.getBytes(Charsets.UTF_8)); + record.headers().add(HeaderTag.DEBUG_PUBLISHER_TOPIC_ID.name(), TOPIC_ID.getBytes(Charsets.UTF_8)); + + final Map consumerTags = KafkaHeaderTagSerde.deserialize(record); - final var consumerTags = KafkaHeaderTagSerde.deserialize(record); Assert.assertEquals(consumerTags.get(HeaderTag.CONSUMER_SUBSCRIPTION_ID), SUB_ID); + Assert.assertEquals(consumerTags.get(HeaderTag.DEBUG_PUBLISHER_TOPIC_ID), TOPIC_ID); } - } diff --git a/core-services/src/main/java/org/zalando/nakadi/service/publishing/BinaryEventPublisher.java b/core-services/src/main/java/org/zalando/nakadi/service/publishing/BinaryEventPublisher.java index d6367912f..31a84de46 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/publishing/BinaryEventPublisher.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/publishing/BinaryEventPublisher.java @@ -28,6 +28,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; @@ -104,11 +105,18 @@ private List processInternal(final EventType eventType, final Span publishingSpan = TracingService.buildNewSpan("publishing_to_kafka") .withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), topic) - .withTag("event_type", eventType.getName()) + .withTag(TracingService.TAG_EVENT_TYPE, eventType.getName()) .withTag("type", "binary") .start(); try (Closeable ignored = TracingService.activateSpan(publishingSpan)) { - return timelineService.getTopicRepository(eventType).sendEvents(topic, records, consumerTags); + // DEBUG + final Map debugConsumerTags = new EnumMap<>(HeaderTag.class); + if (null != consumerTags) { + debugConsumerTags.putAll(consumerTags); + } + debugConsumerTags.put(HeaderTag.DEBUG_PUBLISHER_TOPIC_ID, topic); + // DEBUG + return timelineService.getTopicRepository(eventType).sendEvents(topic, records, debugConsumerTags); } catch (final IOException ioe) { throw new InternalNakadiException("Error closing active span scope", ioe); } finally { diff --git a/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java b/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java index 43aff2d6a..8566702c3 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java @@ -50,6 +50,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -63,7 +64,6 @@ public class EventPublisher { private static final Logger LOG = LoggerFactory.getLogger(EventPublisher.class); - private static final String TAG_EVENT_TYPE = "event_type"; private final NakadiSettings nakadiSettings; @@ -286,7 +286,7 @@ private void validate(final List batch, final EventType eventType, fi throws EventValidationException, InternalNakadiException, NoSuchEventTypeException { final Tracer.SpanBuilder validationSpan = TracingService.buildNewSpan("validation") - .withTag(TAG_EVENT_TYPE, eventType.getName()); + .withTag(TracingService.TAG_EVENT_TYPE, eventType.getName()); try (Closeable ignored = TracingService.withActiveSpan(validationSpan)) { @@ -320,6 +320,7 @@ private void submit( final List batch, final EventType eventType, final Map consumerTags, final boolean delete) throws EventPublishingException, InternalNakadiException { + final Timeline activeTimeline = timelineService.getActiveTimeline(eventType); final String topic = activeTimeline.getTopic(); @@ -334,10 +335,17 @@ private void submit( final Span publishingSpan = TracingService.buildNewSpan("publishing_to_kafka") .withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), topic) - .withTag(TAG_EVENT_TYPE, eventType.getName()) + .withTag(TracingService.TAG_EVENT_TYPE, eventType.getName()) .start(); try (Closeable ignored = TracingService.activateSpan(publishingSpan)) { - topicRepository.syncPostBatch(topic, batch, eventType.getName(), consumerTags, delete); + // DEBUG + final Map debugConsumerTags = new EnumMap<>(HeaderTag.class); + if (null != consumerTags) { + debugConsumerTags.putAll(consumerTags); + } + debugConsumerTags.put(HeaderTag.DEBUG_PUBLISHER_TOPIC_ID, topic); + // DEBUG + topicRepository.syncPostBatch(topic, batch, eventType.getName(), debugConsumerTags, delete); } catch (final EventPublishingException epe) { publishingSpan.log(epe.getMessage()); throw epe; diff --git a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java index cc2621725..5f3f5376e 100644 --- a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java @@ -212,7 +212,8 @@ public void whenPartitionIsUnavailable207IsReportedBinary() throws Exception { final List records = Collections.singletonList(nakadiRecord); final List publishResult = eventPublisher.publish(eventType, records, null); - Mockito.verify(topicRepository).sendEvents(ArgumentMatchers.eq(topic), ArgumentMatchers.eq(records), eq(null)); + + Mockito.verify(topicRepository).sendEvents(ArgumentMatchers.eq(topic), ArgumentMatchers.eq(records), any()); Assert.assertNotEquals(NakadiRecordResult.Status.SUCCEEDED, publishResult.get(0).getStatus()); Assert.assertEquals("1", publishResult.get(0).getMetadata().getPartition()); @@ -728,10 +729,10 @@ public void testAvroEventWasSerialized() throws Exception { .thenReturn("1"); final NakadiRecord nakadiRecord = mockNakadiRecord(); - final List records = Collections.singletonList(nakadiRecord); eventPublisher.publish(eventType, records, null); - Mockito.verify(topicRepository).sendEvents(ArgumentMatchers.eq(topic), ArgumentMatchers.eq(records), eq(null)); + + Mockito.verify(topicRepository).sendEvents(ArgumentMatchers.eq(topic), ArgumentMatchers.eq(records), any()); } @Test @@ -929,7 +930,7 @@ private void mockFailedBinaryWriteToKafka() { new TimeoutException())); } return resps; - }).when(topicRepository).sendEvents(any(), any(), eq(null)); + }).when(topicRepository).sendEvents(any(), any(), any()); } }