diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index 2ecae37a5e..5dba7b1dd6 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -242,7 +242,8 @@ public void whenBulkSendSuccessfullyThenUpdateBatchItemStatus() { items.add(item); } - kafkaTopicRepository.syncPostBatch(topicId, items, null, null, false); + kafkaTopicRepository.syncPostBatch(topicId, items, null, null, + Optional.empty(), false); for (int i = 0; i < 10; i++) { assertThat(items.get(i).getResponse().getPublishingStatus(), equalTo(EventPublishingStatus.SUBMITTED)); @@ -261,7 +262,8 @@ public void whenSendBatchWithItemHeadersThenCheckBatchStatus() { item.setOwner(new EventOwnerHeader("unit", "Nakadi")); items.add(item); } - kafkaTopicRepository.syncPostBatch(topicId, items, null, null, false); + kafkaTopicRepository.syncPostBatch(topicId, items, null, null, + Optional.empty(), false); for (int i = 0; i < 10; i++) { assertThat(items.get(i).getResponse().getPublishingStatus(), equalTo(EventPublishingStatus.SUBMITTED)); diff --git a/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java b/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java index 371a97b622..a96250dc80 100644 --- a/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java +++ b/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java @@ -8,6 +8,7 @@ import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.zalando.nakadi.cache.EventTypeCache; @@ -93,11 +94,13 @@ public EventPublishingController(final EventPublisher publisher, public ResponseEntity postJsonEvents(@PathVariable final String eventTypeName, @RequestBody final String eventsAsString, final HttpServletRequest request, - final Client client) + final Client client, + final @RequestHeader(value = "X-TIMEOUT", required = false, defaultValue = "0") + int publishTimeout) throws AccessDeniedException, BlockedException, ServiceTemporarilyUnavailableException, InternalNakadiException, EventTypeTimeoutException, NoSuchEventTypeException { - return postEventsWithMetrics(eventTypeName, eventsAsString, - toHeaderTagMap(request.getHeader(X_CONSUMER_TAG)), request, client, false); + return postEventsWithMetrics(PublishRequest.asPublish(eventTypeName, eventsAsString, + client, toHeaderTagMap(request.getHeader(X_CONSUMER_TAG)), publishTimeout)); } @RequestMapping( @@ -108,15 +111,18 @@ public ResponseEntity postJsonEvents(@PathVariable final String eventTypeName, ) public ResponseEntity postBinaryEvents(@PathVariable final String eventTypeName, final HttpServletRequest request, - final Client client) + final Client client, + final @RequestHeader(value = "X-TIMEOUT", + required = false, defaultValue = "0") + int publishTimeout) throws AccessDeniedException, BlockedException, ServiceTemporarilyUnavailableException, InternalNakadiException, EventTypeTimeoutException, NoSuchEventTypeException { // TODO: check that event type schema type is AVRO! try { - return postBinaryEvents(eventTypeName, request.getInputStream(), - toHeaderTagMap(request.getHeader(X_CONSUMER_TAG)), client, false); + return postBinaryEvents(PublishRequest.asPublish(eventTypeName, request.getInputStream(), + client, toHeaderTagMap(request.getHeader(X_CONSUMER_TAG)), publishTimeout)); } catch (IOException e) { throw new InternalNakadiException("failed to parse batch", e); } @@ -134,46 +140,46 @@ public ResponseEntity deleteBinaryEvents(@PathVariable final String eventTypeNam throws AccessDeniedException, BlockedException, ServiceTemporarilyUnavailableException, InternalNakadiException, EventTypeTimeoutException, NoSuchEventTypeException { try { - return postBinaryEvents(eventTypeName, request.getInputStream(), null, client, true); + return postBinaryEvents(PublishRequest.asDelete(eventTypeName, request.getInputStream(), + client)); } catch (IOException e) { throw new InternalNakadiException("failed to parse batch", e); } } - private ResponseEntity postBinaryEvents(final String eventTypeName, - final InputStream batch, - final Map consumerTags, - final Client client, - final boolean delete) { + private ResponseEntity postBinaryEvents(final PublishRequest request) { TracingService.setOperationName("publish_events") - .setTag("event_type", eventTypeName) + .setTag("event_type", request.getEventTypeName()) .setTag("ัontent-type", "application/avro-binary") - .setTag(Tags.SPAN_KIND_PRODUCER, client.getClientId()); + .setTag(Tags.SPAN_KIND_PRODUCER, request.getClient().getClientId()); - if (blacklistService.isProductionBlocked(eventTypeName, client.getClientId())) { + if (blacklistService.isProductionBlocked(request.getEventTypeName(), + request.getClient().getClientId())) { throw new BlockedException("Application or event type is blocked"); } - final EventType eventType = eventTypeCache.getEventType(eventTypeName); + final EventType eventType = eventTypeCache.getEventType(request.getEventTypeName()); - if (delete && eventType.getCleanupPolicy() == CleanupPolicy.DELETE) { + if (request.isDeleteRequest() && + eventType.getCleanupPolicy() == CleanupPolicy.DELETE) { throw new InvalidEventTypeException("It is not allowed to delete events from non compacted event type"); } authValidator.authorizeEventTypeWrite(eventType); - final EventTypeMetrics eventTypeMetrics = eventTypeMetricRegistry.metricsFor(eventTypeName); + final EventTypeMetrics eventTypeMetrics = eventTypeMetricRegistry.metricsFor(request.getEventTypeName()); try { final long startingNanos = System.nanoTime(); try { - final CountingInputStream countingInputStream = new CountingInputStream(batch); + final CountingInputStream countingInputStream = new CountingInputStream(request.getEventsRaw()); final List nakadiRecords = nakadiRecordMapper.fromBytesBatch(countingInputStream); final List recordResults; - if (delete) { + if (request.isDeleteRequest()) { recordResults = binaryPublisher.delete(nakadiRecords, eventType); } else { - recordResults = binaryPublisher.publish(eventType, nakadiRecords, consumerTags); + recordResults = binaryPublisher.publish(eventType, nakadiRecords, + request.getConsumerTags(), request.getDesiredPublishingTimeout()); } if (recordResults.isEmpty()) { throw new InternalNakadiException("unexpected empty record result list, " + @@ -188,7 +194,8 @@ private ResponseEntity postBinaryEvents(final String eventTypeName, TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes)); reportMetrics(eventTypeMetrics, result, totalSizeBytes, eventCount); - reportSLOs(startingNanos, totalSizeBytes, eventCount, result, eventTypeName, client); + reportSLOs(startingNanos, totalSizeBytes, eventCount, result, + request.getEventTypeName(), request.getClient()); if (result.getStatus() == EventPublishingStatus.FAILED) { TracingService.setErrorFlag(); @@ -214,26 +221,22 @@ public ResponseEntity deleteEvents(@PathVariable final String eventTypeName, @RequestBody final String eventsAsString, final HttpServletRequest request, final Client client) { - return postEventsWithMetrics(eventTypeName, eventsAsString, null, request, client, true); + return postEventsWithMetrics(PublishRequest.asDelete(eventTypeName, eventsAsString, + client)); } - private ResponseEntity postEventsWithMetrics(final String eventTypeName, - final String eventsAsString, - final Map consumerTags, - final HttpServletRequest request, - final Client client, - final boolean delete) { + private ResponseEntity postEventsWithMetrics(final PublishRequest request) { TracingService.setOperationName("publish_events") - .setTag("event_type", eventTypeName) - .setTag(Tags.SPAN_KIND_PRODUCER, client.getClientId()); + .setTag("event_type", request.getEventTypeName()) + .setTag(Tags.SPAN_KIND_PRODUCER, request.getClient().getClientId()); - if (blacklistService.isProductionBlocked(eventTypeName, client.getClientId())) { + if (blacklistService. + isProductionBlocked(request.getEventTypeName(), request.getClient().getClientId())) { throw new BlockedException("Application or event type is blocked"); } - final EventTypeMetrics eventTypeMetrics = eventTypeMetricRegistry.metricsFor(eventTypeName); + final EventTypeMetrics eventTypeMetrics = eventTypeMetricRegistry.metricsFor(request.getEventTypeName()); try { - final ResponseEntity response = postEventInternal( - eventTypeName, eventsAsString, consumerTags, eventTypeMetrics, client, request, delete); + final ResponseEntity response = postEventInternal(request, eventTypeMetrics); eventTypeMetrics.incrementResponseCount(response.getStatusCode().value()); return response; } catch (final NoSuchEventTypeException exception) { @@ -245,32 +248,29 @@ private ResponseEntity postEventsWithMetrics(final String eventTypeName, } } - private ResponseEntity postEventInternal(final String eventTypeName, - final String eventsAsString, - final Map consumerTags, - final EventTypeMetrics eventTypeMetrics, - final Client client, - final HttpServletRequest request, - final boolean delete) + private ResponseEntity postEventInternal(final PublishRequest request, + final EventTypeMetrics eventTypeMetrics) throws AccessDeniedException, ServiceTemporarilyUnavailableException, InternalNakadiException, EventTypeTimeoutException, NoSuchEventTypeException { final long startingNanos = System.nanoTime(); try { final EventPublishResult result; - final int totalSizeBytes = eventsAsString.getBytes(Charsets.UTF_8).length; + final int totalSizeBytes = request.getEventsRaw().getBytes(Charsets.UTF_8).length; TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes)); - if (delete) { - result = publisher.delete(eventsAsString, eventTypeName); + if (request.isDeleteRequest()) { + result = publisher.delete(request.getEventsRaw(), request.getEventTypeName()); } else { - result = publisher.publish(eventsAsString, eventTypeName, consumerTags); + result = publisher.publish(request.getEventsRaw(), request.getEventTypeName(), + request.getConsumerTags(), request.getDesiredPublishingTimeout()); } // FIXME: there should be a more direct way to get the input batch size final int eventCount = result.getResponses().size(); reportMetrics(eventTypeMetrics, result, totalSizeBytes, eventCount); - reportSLOs(startingNanos, totalSizeBytes, eventCount, result, eventTypeName, client); + reportSLOs(startingNanos, totalSizeBytes, eventCount, result, + request.getEventTypeName(), request.getClient()); TracingService.setTag("number_of_events", eventCount); diff --git a/api-publishing/src/main/java/org/zalando/nakadi/PublishRequest.java b/api-publishing/src/main/java/org/zalando/nakadi/PublishRequest.java new file mode 100644 index 0000000000..5259dd46d1 --- /dev/null +++ b/api-publishing/src/main/java/org/zalando/nakadi/PublishRequest.java @@ -0,0 +1,91 @@ +package org.zalando.nakadi; + +import org.zalando.nakadi.domain.HeaderTag; +import org.zalando.nakadi.exceptions.runtime.InvalidPublishingParamException; +import org.zalando.nakadi.security.Client; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +public class PublishRequest { + private final String eventTypeName; + private final T eventsRaw; + private final Client client; + private final Map consumerTags; + private final Optional desiredPublishingTimeout; + private final boolean isDeleteRequest; + + public PublishRequest(final String eventTypeName, + final T eventsRaw, + final Client client, + final Map consumerTags, + final int desiredPublishingTimeout, + final boolean isDeleteRequest) { + this.eventTypeName = eventTypeName; + this.eventsRaw = eventsRaw; + this.client = client; + this.consumerTags = consumerTags; + //TODO: better way to get max timeout instead of hardcoding + if (desiredPublishingTimeout < 0 || desiredPublishingTimeout > 30_000) { + throw new InvalidPublishingParamException("X-TIMEOUT cannot be less than 0 or greater than 30000 ms"); + } + //0 means either nothing was supplied or 0 was supplied, in both cases it means we will leave + //the timeout to be current default + this.desiredPublishingTimeout = Optional.of(desiredPublishingTimeout).filter(v -> v != 0); + this.isDeleteRequest = isDeleteRequest; + } + + public String getEventTypeName() { + return eventTypeName; + } + + public T getEventsRaw() { + return eventsRaw; + } + + public Client getClient() { + return client; + } + + public Map getConsumerTags() { + return consumerTags; + } + + public Optional getDesiredPublishingTimeout() { + return desiredPublishingTimeout; + } + + public boolean isDeleteRequest() { + return isDeleteRequest; + } + + @Override + public String toString() { + return "PublishRequest{" + + "eventTypeName='" + eventTypeName + '\'' + + ", eventsAsString='" + eventsRaw + '\'' + + ", client=" + client + + ", consumerTags=" + consumerTags + + ", desiredPublishingTimeout=" + desiredPublishingTimeout + + ", isDeleteRequest=" + isDeleteRequest + + '}'; + } + + public static PublishRequest asPublish(final String eventTypeName, + final T eventsRaw, + final Client client, + final Map consumerTags, + final int desiredPublishingTimeout) { + return new PublishRequest<>(eventTypeName, eventsRaw, client, + consumerTags, desiredPublishingTimeout, false); + } + + public static PublishRequest asDelete(final String eventTypeName, + final T eventsRaw, + final Client client) { + return new PublishRequest<>(eventTypeName, eventsRaw, client, + Collections.emptyMap(), 0, true); + } + +} diff --git a/api-publishing/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java b/api-publishing/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java index 48c9e79e49..c2c0c9505e 100644 --- a/api-publishing/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java +++ b/api-publishing/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.function.Supplier; import static org.hamcrest.MatcherAssert.assertThat; @@ -69,6 +70,7 @@ public class EventPublishingControllerTest { public static final String TOPIC = "my-topic"; private static final String EVENT_BATCH = "[{\"payload\": \"My Event Payload\"}]"; + private static final String X_TIMEOUT = "X-TIMEOUT"; private static final String X_CONSUMER_TAG = "X-CONSUMER-TAG"; private MetricRegistry metricRegistry; @@ -117,7 +119,7 @@ public void whenResultIsSubmittedThen200() throws Exception { Mockito .doReturn(result) .when(publisher) - .publish(any(String.class), eq(TOPIC), any()); + .publish(any(String.class), eq(TOPIC), any(), any()); postBatch(TOPIC, EVENT_BATCH, null) .andExpect(status().isOk()) @@ -125,18 +127,45 @@ public void whenResultIsSubmittedThen200() throws Exception { } @Test + public void whenInvalidTimeoutHeaderThen400() + throws Exception { + final var response = Problem.builder(). + withStatus(Status.BAD_REQUEST). + withTitle(Status.BAD_REQUEST.getReasonPhrase()). + withDetail("X-TIMEOUT cannot be less than 0 or greater than 30000 ms").build(); + + postBatch(TOPIC, EVENT_BATCH, Map.of(X_TIMEOUT, "-1")) + .andExpect(content().contentType("application/problem+json")) + .andExpect(status().isBadRequest()) + .andExpect(content().string(TestUtils.JSON_TEST_HELPER.matchesObject(response))); + + postBatch(TOPIC, EVENT_BATCH, Map.of(X_TIMEOUT, "30001")) + .andExpect(content().contentType("application/problem+json")) + .andExpect(status().isBadRequest()) + .andExpect(content().string(TestUtils.JSON_TEST_HELPER.matchesObject(response))); + } + + @Test + public void whenTimeoutHeaderNotIntThen400() throws Exception { + postBatch(TOPIC, EVENT_BATCH, Map.of(X_TIMEOUT, "1.5")) + .andExpect(content().contentType("application/problem+json")) + .andExpect(status().isBadRequest()); + } + + @Test public void whenInvalidPostBodyThen400() throws Exception { Mockito.doThrow(new JSONException("Error")) .when(publisher) - .publish(any(String.class), eq(TOPIC), any()); + .publish(any(String.class), eq(TOPIC), any(), any()); postBatch(TOPIC, "invalid json array", null).andExpect(status().isBadRequest()); } @Test public void whenEventPublishTimeoutThen503() throws Exception { - Mockito.when(publisher.publish(any(), any(), any())).thenThrow(new EventTypeTimeoutException("")); + Mockito.when(publisher.publish(any(), any(), any(), any())). + thenThrow(new EventTypeTimeoutException("")); postBatch(TOPIC, EVENT_BATCH, null) .andExpect(content().contentType("application/problem+json")) @@ -150,7 +179,7 @@ public void whenResultIsAbortedThen422() throws Exception { Mockito .doReturn(result) .when(publisher) - .publish(any(String.class), eq(TOPIC), any()); + .publish(any(String.class), eq(TOPIC), any(), any()); postBatch(TOPIC, EVENT_BATCH, null) .andExpect(status().isUnprocessableEntity()) @@ -164,7 +193,7 @@ public void whenResultIsAbortedThen207() throws Exception { Mockito .doReturn(result) .when(publisher) - .publish(any(String.class), eq(TOPIC), any()); + .publish(any(String.class), eq(TOPIC), any(), any()); postBatch(TOPIC, EVENT_BATCH, null) .andExpect(status().isMultiStatus()) @@ -176,7 +205,7 @@ public void whenEventTypeNotFoundThen404() throws Exception { Mockito .doThrow(new NoSuchEventTypeException("topic not found")) .when(publisher) - .publish(any(String.class), eq(TOPIC), any()); + .publish(any(String.class), eq(TOPIC), any(), any()); postBatch(TOPIC, EVENT_BATCH, null) .andExpect(content().contentType("application/problem+json")) @@ -190,8 +219,9 @@ public void whenConsumerTagDuplicateThen422() throws Exception { withTitle(Status.UNPROCESSABLE_ENTITY.getReasonPhrase()). withDetail("duplicate header tag: CONSUMER_SUBSCRIPTION_ID").build(); - postBatch(TOPIC, EVENT_BATCH, "consumer_subscription_id=16120729-4a57-4607-ad3a-d526a4590e75, " + - "consumer_subscription_id = 16120729-4a57-4607-ad3a-d526a4590e76") + postBatch(TOPIC, EVENT_BATCH, Map.of(X_CONSUMER_TAG, + "consumer_subscription_id=16120729-4a57-4607-ad3a-d526a4590e75, " + + "consumer_subscription_id = 16120729-4a57-4607-ad3a-d526a4590e76")) .andExpect(content().contentType("application/problem+json")) .andExpect(status().isUnprocessableEntity()) .andExpect(content().string(TestUtils.JSON_TEST_HELPER.matchesObject(response))); @@ -204,7 +234,7 @@ public void whenConsumerTagEmptyThen422() throws Exception { withTitle(Status.UNPROCESSABLE_ENTITY.getReasonPhrase()). withDetail("X-CONSUMER-TAG: is empty!").build(); - postBatch(TOPIC, EVENT_BATCH, "") + postBatch(TOPIC, EVENT_BATCH, Map.of(X_CONSUMER_TAG, "")) .andExpect(content().contentType("application/problem+json")) .andExpect(status().isUnprocessableEntity()) .andExpect(content().string(TestUtils.JSON_TEST_HELPER.matchesObject(response))); @@ -217,7 +247,8 @@ public void whenConsumerTagSubscriptionIdIsNotUUIDThen422() throws Exception { withTitle(Status.UNPROCESSABLE_ENTITY.getReasonPhrase()). withDetail("header tag value: 123f is not an UUID").build(); - postBatch(TOPIC, EVENT_BATCH, "consumer_subscription_id=123f") + postBatch(TOPIC, EVENT_BATCH, + Map.of(X_CONSUMER_TAG, "consumer_subscription_id=123f")) .andExpect(content().contentType("application/problem+json")) .andExpect(status().isUnprocessableEntity()) .andExpect(content().string(TestUtils.JSON_TEST_HELPER.matchesObject(response))); @@ -230,7 +261,8 @@ public void whenConsumerTagHasInvalidLengthThen422() throws Exception { withTitle(Status.UNPROCESSABLE_ENTITY.getReasonPhrase()). withDetail("header tag parameter is imbalanced, expected: 2 but provided 1").build(); - postBatch(TOPIC, EVENT_BATCH, "consumer_subscription_id=,") + postBatch(TOPIC, EVENT_BATCH, + Map.of(X_CONSUMER_TAG, "consumer_subscription_id=,")) .andExpect(content().contentType("application/problem+json")) .andExpect(status().isUnprocessableEntity()) .andExpect(content().string(TestUtils.JSON_TEST_HELPER.matchesObject(response))); @@ -244,7 +276,7 @@ public void publishedEventsAreReportedPerEventType() throws Exception { .doReturn(success) .doThrow(InternalNakadiException.class) .when(publisher) - .publish(any(), any(), any()); + .publish(any(), any(), any(), any()); postBatch(TOPIC, EVENT_BATCH, null); postBatch(TOPIC, EVENT_BATCH, null); @@ -264,7 +296,7 @@ public void publishedEventsKPIReported() throws Exception { .doReturn(success) .doThrow(InternalNakadiException.class) .when(publisher) - .publish(any(), any(), any()); + .publish(any(), any(), any(), any()); Mockito.when(kpiPublisher.hash(any())).thenReturn("hashed-application-name"); @@ -308,14 +340,14 @@ private List responses(final int number, final EventPublishin } private ResultActions postBatch(final String eventType, final String batch, - final String consumerTagHeader) throws Exception { + final Map headers) throws Exception { final String url = "/event-types/" + eventType + "/events"; final MockHttpServletRequestBuilder requestBuilder = post(url) .contentType(APPLICATION_JSON) .content(batch); - if(consumerTagHeader!= null){ - requestBuilder.header(X_CONSUMER_TAG, consumerTagHeader); + if(headers!= null){ + headers.forEach(requestBuilder::header); } return mockMvc.perform(requestBuilder); } diff --git a/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/InvalidPublishingParamException.java b/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/InvalidPublishingParamException.java new file mode 100644 index 0000000000..8d3b833b0e --- /dev/null +++ b/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/InvalidPublishingParamException.java @@ -0,0 +1,7 @@ +package org.zalando.nakadi.exceptions.runtime; + +public class InvalidPublishingParamException extends NakadiBaseException { + public InvalidPublishingParamException(final String message) { + super(message); + } +} diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/TopicRepository.java b/core-common/src/main/java/org/zalando/nakadi/repository/TopicRepository.java index 63c31ad7ac..ac4d57117b 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/TopicRepository.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/TopicRepository.java @@ -51,11 +51,12 @@ public String getPartition() { boolean topicExists(String topic) throws TopicRepositoryException; void syncPostBatch(String topicId, List batch, String eventTypeName, - Map consumerTags, boolean delete) + Map consumerTags, Optional publishTimeout, boolean delete) throws EventPublishingException; List sendEvents(String topic, List nakadiRecords, - Map consumerTags); + Map consumerTags, + Optional publishTimeout); void repartition(String topic, int partitionsNumber) throws CannotAddPartitionToTopicException, TopicConfigException; diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index 5b63ac2c6d..c56f44889d 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -291,7 +291,9 @@ public boolean topicExists(final String topic) throws TopicRepositoryException { @Override public void syncPostBatch( final String topicId, final List batch, final String eventType, - final Map consumerTags, final boolean delete) + final Map consumerTags, + final Optional publishTimeout, + final boolean delete) throws EventPublishingException { try { final Map> sendFutures = new HashMap<>(); @@ -315,7 +317,7 @@ public void syncPostBatch( final Tracer.SpanBuilder waitForBatchSentSpan = TracingService.buildNewSpan("wait_for_batch_sent") .withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), topicId); try (Closeable ignore = TracingService.withActiveSpan(waitForBatchSentSpan)) { - multiFuture.get(createSendTimeout(), TimeUnit.MILLISECONDS); + multiFuture.get(createSendWaitTimeout(publishTimeout), TimeUnit.MILLISECONDS); } catch (final IOException io) { throw new InternalNakadiException("Error closing active span scope", io); } @@ -343,8 +345,10 @@ private static String getProducerKey(final String topicId, final String partitio return String.format("%s:%s", topicId, partition); } - private long createSendTimeout() { - return nakadiSettings.getKafkaSendTimeoutMs() + kafkaSettings.getRequestTimeoutMs(); + private long createSendWaitTimeout(final Optional customTimeout) { + final long timeOut = nakadiSettings.getKafkaSendTimeoutMs() + kafkaSettings.getRequestTimeoutMs(); + return customTimeout.filter(t -> t <= timeOut). + map(Integer::longValue).orElse(timeOut); } private void failUnpublished(final String topicId, final String eventType, final List batch, @@ -405,7 +409,8 @@ private void logFailedEvents(final String topicId, final String eventType, final */ public List sendEvents(final String topic, final List nakadiRecords, - final Map consumerTags) { + final Map consumerTags, + final Optional publishTimeout) { final CountDownLatch latch = new CountDownLatch(nakadiRecords.size()); final Map responses = new ConcurrentHashMap<>(); try { @@ -417,7 +422,7 @@ public List sendEvents(final String topic, nakadiRecord.getOwner().serialize(producerRecord); } - if( null != consumerTags) { + if (null != consumerTags) { KafkaHeaderTagSerde.serialize(consumerTags, producerRecord); } @@ -444,7 +449,9 @@ public List sendEvents(final String topic, } })); } - final boolean recordsPublished = latch.await(createSendTimeout(), TimeUnit.MILLISECONDS); + + final boolean recordsPublished = latch. + await(createSendWaitTimeout(publishTimeout), TimeUnit.MILLISECONDS); return prepareResponse(nakadiRecords, responses, recordsPublished ? null : new TimeoutException("timeout waiting for events to be sent to kafka")); } catch (final InterruptException | InterruptedException e) { @@ -459,6 +466,7 @@ public List sendEvents(final String topic, } } + private List prepareResponse( final List nakadiRecords, final Map recordStatuses, diff --git a/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java b/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java index 04dff26c4f..5d22b113d6 100644 --- a/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java @@ -2,6 +2,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.clients.producer.Callback; @@ -17,6 +18,7 @@ import org.mockito.Captor; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.springframework.core.io.DefaultResourceLoader; import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.BatchItem; import org.zalando.nakadi.domain.HeaderTag; @@ -33,7 +35,10 @@ import org.zalando.nakadi.domain.TopicPartition; import org.zalando.nakadi.exceptions.runtime.EventPublishingException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; +import org.zalando.nakadi.kpi.event.NakadiAccessLog; import org.zalando.nakadi.mapper.NakadiRecordMapper; +import org.zalando.nakadi.service.LocalSchemaRegistry; +import org.zalando.nakadi.util.MDCUtils; import org.zalando.nakadi.utils.TestUtils; import org.zalando.nakadi.view.Cursor; @@ -46,6 +51,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.Future; @@ -57,6 +63,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -153,7 +160,8 @@ public void testRecordHeaderSetWhilePublishing() { new PartitionInfo(myTopic, 1, NODE, null, null))); try { - kafkaTopicRepository.syncPostBatch(myTopic, batch, "random", null, false); + kafkaTopicRepository.syncPostBatch(myTopic, batch, "random", null, + Optional.empty(), false); fail(); } catch (final EventPublishingException e) { final ProducerRecord recordSent = captureProducerRecordSent(); @@ -181,7 +189,8 @@ public void testConsumerTagSetWhilePublishing() { new PartitionInfo(myTopic, 1, NODE, null, null))); try { - kafkaTopicRepository.syncPostBatch(myTopic, batch, "random", consumerTags, false); + kafkaTopicRepository.syncPostBatch(myTopic, batch, "random", consumerTags, + Optional.empty(), false); fail(); } catch (final EventPublishingException e) { final ProducerRecord recordSent = captureProducerRecordSent(); @@ -308,6 +317,89 @@ public void canLoadPartitionEndStatistics() { assertThat(newHashSet(stats), equalTo(expected)); } + @Test + public void whenPostEventWithCustomTimeOutThenUpdateItemStatusAvro() throws Exception { + final org.springframework.core.io.Resource eventTypeRes = + new DefaultResourceLoader().getResource("avro-schema/"); + final LocalSchemaRegistry localSchemaRegistry = new LocalSchemaRegistry(eventTypeRes); + final Instant now = Instant.now(); + final NakadiMetadata metadata = new NakadiMetadata(); + metadata.setOccurredAt(now); + metadata.setEid("9702cf96-9bdb-48b7-9f4c-92643cb6d9fc"); + metadata.setFlowId(MDCUtils.getFlowId()); + metadata.setEventType("nakadi.access.log"); + metadata.setPartition("0"); + metadata.setReceivedAt(now); + metadata.setSchemaVersion("1.0.0"); + metadata.setPublishedBy("adyachkov"); + + final SpecificRecord event = NakadiAccessLog.newBuilder() + .setMethod("POST") + .setPath("/event-types") + .setQuery("") + .setUserAgent("test-user-agent") + .setApp("nakadi") + .setAppHashed("hashed-app") + .setContentEncoding("--") + .setAcceptEncoding("-") + .setStatusCode(201) + .setResponseTimeMs(10) + .setRequestLength(123) + .setResponseLength(321) + .build(); + + final NakadiRecord nakadiRecord = new NakadiRecordMapper(localSchemaRegistry) + .fromAvroRecord(metadata, event); + + when(nakadiSettings.getKafkaSendTimeoutMs()).thenReturn((long) 30000); + Mockito + .doReturn(mock(Future.class)) + .when(kafkaProducer) + .send(any(), any()); + + final var currentTime = System.currentTimeMillis(); + final long customTimeoutMs = 200; + final var result = kafkaTopicRepository.sendEvents(EXPECTED_PRODUCER_RECORD.topic(), + List.of(nakadiRecord), null, + Optional.of((int) customTimeoutMs)); + final var elapsedTime = System.currentTimeMillis() - currentTime; + assertThat(elapsedTime, greaterThanOrEqualTo(customTimeoutMs)); + assertThat(result.get(0).getStatus().toString(), equalTo("ABORTED")); + assertThat(result.get(0).getException().getMessage(), + equalTo("timeout waiting for events to be sent to kafka")); + } + + @Test + public void whenPostEventWithCustomTimeOutThenUpdateItemStatus() { + final long customTimeOutMs = 200; + final BatchItem item = new BatchItem( + "{}", + BatchItem.EmptyInjectionConfiguration.build(1, true), + new BatchItem.InjectionConfiguration[BatchItem.Injection.values().length], + Collections.emptyList()); + item.setPartition("1"); + final List batch = new ArrayList<>(); + batch.add(item); + + when(nakadiSettings.getKafkaSendTimeoutMs()).thenReturn((long) 30000); + Mockito + .doReturn(mock(Future.class)) + .when(kafkaProducer) + .send(any(), any()); + + + final var currentTime = System.currentTimeMillis(); + try { + kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(), batch, "random", null, + Optional.of((int) customTimeOutMs), false); + } catch (final EventPublishingException e) { + final var elapsedTime = System.currentTimeMillis() - currentTime; + assertThat(elapsedTime, greaterThanOrEqualTo(customTimeOutMs)); + assertThat(item.getResponse().getPublishingStatus(), equalTo(EventPublishingStatus.FAILED)); + assertThat(item.getResponse().getDetail(), equalTo("timed out")); + } + } + @Test public void whenPostEventTimesOutThenUpdateItemStatus() { final BatchItem item = new BatchItem( @@ -328,7 +420,8 @@ public void whenPostEventTimesOutThenUpdateItemStatus() { .send(any(), any()); try { - kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(), batch, "random", null, false); + kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(), batch, "random", null, + Optional.empty(), false); fail(); } catch (final EventPublishingException e) { assertThat(item.getResponse().getPublishingStatus(), equalTo(EventPublishingStatus.FAILED)); @@ -355,7 +448,8 @@ public void whenPostEventOverflowsBufferThenUpdateItemStatus() { .send(any(), any()); try { - kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(), batch, "random", null, false); + kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(), batch, "random", null, + Optional.empty(), false); fail(); } catch (final EventPublishingException e) { assertThat(item.getResponse().getPublishingStatus(), equalTo(EventPublishingStatus.FAILED)); @@ -392,7 +486,8 @@ public void whenKafkaPublishCallbackWithExceptionThenEventPublishingException() }); try { - kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(), batch, "random", null, false); + kafkaTopicRepository.syncPostBatch(EXPECTED_PRODUCER_RECORD.topic(), batch, "random", null, + Optional.empty(), false); fail(); } catch (final EventPublishingException e) { assertThat(firstItem.getResponse().getPublishingStatus(), equalTo(EventPublishingStatus.SUBMITTED)); @@ -447,7 +542,8 @@ public void testSendNakadiRecordsOk() { return null; }); - final List result = kafkaTopicRepository.sendEvents(topic, nakadiRecords, null); + final List result = kafkaTopicRepository.sendEvents(topic, nakadiRecords, + null, Optional.empty()); Assert.assertEquals(3, result.size()); result.forEach(r -> Assert.assertEquals(NakadiRecordResult.Status.SUCCEEDED, r.getStatus())); } @@ -475,7 +571,7 @@ public void testSendNakadiRecordsHalfPublished() throws IOException { }); final List result = - kafkaTopicRepository.sendEvents(topic, nakadiRecords, null); + kafkaTopicRepository.sendEvents(topic, nakadiRecords, null, Optional.empty()); Assert.assertEquals(4, result.size()); Assert.assertEquals(exception, result.get(0).getException()); Assert.assertEquals(NakadiRecordResult.Status.FAILED, result.get(0).getStatus()); @@ -509,7 +605,7 @@ public void testSendNakadiRecordsHalfSubmitted() throws IOException { }); final List result = - kafkaTopicRepository.sendEvents(topic, nakadiRecords, null); + kafkaTopicRepository.sendEvents(topic, nakadiRecords, null, Optional.empty()); Assert.assertEquals(4, result.size()); Assert.assertEquals(null, result.get(0).getException()); Assert.assertEquals(NakadiRecordResult.Status.SUCCEEDED, result.get(0).getStatus()); diff --git a/core-services/src/main/java/org/zalando/nakadi/controller/advice/NakadiProblemExceptionHandler.java b/core-services/src/main/java/org/zalando/nakadi/controller/advice/NakadiProblemExceptionHandler.java index 476cb3f2b7..333e590677 100644 --- a/core-services/src/main/java/org/zalando/nakadi/controller/advice/NakadiProblemExceptionHandler.java +++ b/core-services/src/main/java/org/zalando/nakadi/controller/advice/NakadiProblemExceptionHandler.java @@ -19,6 +19,7 @@ import org.zalando.nakadi.exceptions.runtime.IllegalClientIdException; import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.InvalidLimitException; +import org.zalando.nakadi.exceptions.runtime.InvalidPublishingParamException; import org.zalando.nakadi.exceptions.runtime.InvalidVersionNumberException; import org.zalando.nakadi.exceptions.runtime.NakadiBaseException; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; @@ -120,7 +121,8 @@ public ResponseEntity handleInternalNakadiException(final InternalNakad return create(Problem.valueOf(INTERNAL_SERVER_ERROR, exception.getMessage()), request); } - @ExceptionHandler({InvalidLimitException.class, InvalidVersionNumberException.class}) + @ExceptionHandler({InvalidLimitException.class, InvalidVersionNumberException.class, + InvalidPublishingParamException.class}) public ResponseEntity handleBadRequestResponses(final NakadiBaseException exception, final NativeWebRequest request) { LOG.debug(exception.getMessage()); diff --git a/core-services/src/main/java/org/zalando/nakadi/service/publishing/BinaryEventPublisher.java b/core-services/src/main/java/org/zalando/nakadi/service/publishing/BinaryEventPublisher.java index d6367912ff..7b7be9bd66 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/publishing/BinaryEventPublisher.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/publishing/BinaryEventPublisher.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeoutException; @Service @@ -74,14 +75,17 @@ public BinaryEventPublisher( } public List publish(final EventType eventType, final List records, - final Map consumerTags) { - return processInternal(eventType, records, prePublishingChecks, consumerTags); + final Map consumerTags, + final Optional publishTimeout) { + return processInternal(eventType, records, prePublishingChecks, + consumerTags, publishTimeout); } private List processInternal(final EventType eventType, final List records, final List checks, - final Map consumerTags) { + final Map consumerTags, + final Optional publishTimeout) { for (final Check check : checks) { final List res = check.execute(eventType, records); if (res != null && !res.isEmpty()) { @@ -108,7 +112,8 @@ private List processInternal(final EventType eventType, .withTag("type", "binary") .start(); try (Closeable ignored = TracingService.activateSpan(publishingSpan)) { - return timelineService.getTopicRepository(eventType).sendEvents(topic, records, consumerTags); + return timelineService.getTopicRepository(eventType). + sendEvents(topic, records, consumerTags, publishTimeout); } catch (final IOException ioe) { throw new InternalNakadiException("Error closing active span scope", ioe); } finally { @@ -141,10 +146,12 @@ public List delete(final List events, final Ev PartitioningException { LOG.debug("Deleting {} binary events from {}, with {} checks", events.size(), eventType.getName(), preDeletingChecks.size()); - return processInternal(eventType, events, preDeletingChecks, null); + return processInternal(eventType, events, preDeletingChecks, + null, Optional.empty()); } public List publishInternal(final EventType eventType, final List events) { - return processInternal(eventType, events, internalPublishingChecks, null); + return processInternal(eventType, events, internalPublishingChecks, + null, Optional.empty()); } } diff --git a/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java b/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java index a6ee6a7fd9..96b1fe06f2 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/publishing/EventPublisher.java @@ -102,7 +102,8 @@ public EventPublisher(final TimelineService timelineService, public EventPublishResult publish(final String events, final String eventTypeName, - final Map consumerTags) + final Map consumerTags, + final Optional desiredTimeout) throws NoSuchEventTypeException, InternalNakadiException, EnrichmentException, @@ -111,7 +112,7 @@ public EventPublishResult publish(final String events, PublishEventOwnershipException, ServiceTemporarilyUnavailableException, PartitioningException { - return processInternal(events, eventTypeName, consumerTags, true, false); + return processInternal(events, eventTypeName, consumerTags, desiredTimeout, true, false); } // no auth checks @@ -126,7 +127,8 @@ public EventPublishResult publishInternal(final String events, PublishEventOwnershipException, ServiceTemporarilyUnavailableException, PartitioningException { - return processInternal(events, eventTypeName, consumerTags, false, false); + return processInternal(events, eventTypeName, consumerTags, Optional.empty(), + false, false); } public EventPublishResult delete(final String events, final String eventTypeName) @@ -138,12 +140,15 @@ public EventPublishResult delete(final String events, final String eventTypeName PublishEventOwnershipException, ServiceTemporarilyUnavailableException, PartitioningException { - return processInternal(events, eventTypeName, null, true, true); + + return processInternal(events, eventTypeName, null, Optional.empty(), + true, true); } EventPublishResult processInternal(final String events, final String eventTypeName, final Map consumerTags, + final Optional publishTimeout, final boolean useAuthz, final boolean delete) throws NoSuchEventTypeException, InternalNakadiException, EventTypeTimeoutException, @@ -166,7 +171,7 @@ EventPublishResult processInternal(final String events, if (!delete) { enrich(batch, eventType); } - submit(batch, eventType, consumerTags, delete); + submit(batch, eventType, consumerTags, publishTimeout, delete); return ok(batch); } catch (final EventValidationException e) { @@ -317,7 +322,8 @@ private void validate(final List batch, final EventType eventType, fi private void submit( final List batch, final EventType eventType, - final Map consumerTags, final boolean delete) + final Map consumerTags, + final Optional publishTimeout, final boolean delete) throws EventPublishingException, InternalNakadiException { final Timeline activeTimeline = timelineService.getActiveTimeline(eventType); final String topic = activeTimeline.getTopic(); @@ -335,7 +341,7 @@ private void submit( .withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), topic) .start(); try (Closeable ignored = TracingService.activateSpan(publishingSpan)) { - topicRepository.syncPostBatch(topic, batch, eventType.getName(), consumerTags, delete); + topicRepository.syncPostBatch(topic, batch, eventType.getName(), consumerTags, publishTimeout, delete); } catch (final EventPublishingException epe) { publishingSpan.log(epe.getMessage()); throw epe; diff --git a/core-services/src/main/java/org/zalando/nakadi/service/publishing/JsonEventProcessor.java b/core-services/src/main/java/org/zalando/nakadi/service/publishing/JsonEventProcessor.java index 9748e48913..8b3b5430de 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/publishing/JsonEventProcessor.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/publishing/JsonEventProcessor.java @@ -9,6 +9,7 @@ import org.springframework.stereotype.Component; import java.util.List; +import java.util.Optional; @Component @Qualifier("json-publisher") @@ -33,7 +34,8 @@ public JsonEventProcessor( public void sendEvents(final String etName, final List events) { try { // sending events batch with disabled authz check - eventPublisher.processInternal(new JSONArray(events).toString(), etName, null, false, false); + eventPublisher.processInternal(new JSONArray(events).toString(), etName, null, Optional.empty(), + false, false); } catch (final RuntimeException ex) { LOG.error("Failed to send single batch for unknown reason", ex); } diff --git a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java index 6762bc7a44..c9847a32a1 100644 --- a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java @@ -122,10 +122,12 @@ public void whenPublishIsSuccessfulThenResultIsSubmitted() throws Exception { mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.SUBMITTED)); - verify(topicRepository, times(1)).syncPostBatch(any(), any(), any(), any(), eq(false)); + verify(topicRepository, times(1)).syncPostBatch(any(), any(), any(), any(), + any(), eq(false)); } @Test @@ -135,7 +137,8 @@ public void whenPublishThenExtractorForOwnerCreated() throws Exception { mockSuccessfulValidation(eventType); Mockito.when(eventOwnerExtractorFactory.createExtractor(eq(eventType))).thenReturn(null); - publisher.publish(batch.toString(), eventType.getName(), null); + publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); // invoked once for a batch Mockito.verify(eventOwnerExtractorFactory, Mockito.times(1)).createExtractor(eq(eventType)); @@ -151,7 +154,8 @@ public void whenPublishWithExtractorThenAuthorizationCheckedPerEvent() throws Ex Mockito.when(eventOwnerExtractorFactory.createExtractor(eq(eventType))).thenReturn( EventOwnerExtractorFactory.createStaticExtractor("retailer", "nakadi")); - publisher.publish(batch.toString(), eventType.getName(), null); + publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); Mockito.verify(authzValidator, Mockito.times(3)).authorizeEventWrite(any(BatchItem.class)); } @@ -165,7 +169,8 @@ public void whenPublishAuthorizationIsTakenIntoAccount() throws Exception { .when(authzValidator) .authorizeEventTypeWrite(Mockito.eq(et)); - publisher.publish(buildDefaultBatch(1).toString(), et.getName(), null); + publisher.publish(buildDefaultBatch(1).toString(), et.getName(), null, + Optional.empty()); } @Test @@ -176,10 +181,12 @@ public void whenEventHasEidThenSetItInTheResponse() throws Exception { mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getResponses().get(0).getEid(), equalTo(event.getJSONObject("metadata").optString("eid"))); - verify(topicRepository, times(1)).syncPostBatch(any(), any(), any(), any(), eq(false)); + verify(topicRepository, times(1)).syncPostBatch(any(), any(), any(), any(), + any(), eq(false)); } @Test @@ -191,7 +198,8 @@ public void whenPublishEventTypeIsLockedAndReleased() throws Exception { final Closeable etCloser = mock(Closeable.class); Mockito.when(timelineSync.workWithEventType(any(String.class), anyLong())).thenReturn(etCloser); - publisher.publish(batch.toString(), eventType.getName(), null); + publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); verify(timelineSync, times(1)).workWithEventType(eq(eventType.getName()), eq(TIMELINE_WAIT_TIMEOUT_MS)); verify(etCloser, times(1)).close(); @@ -200,7 +208,8 @@ public void whenPublishEventTypeIsLockedAndReleased() throws Exception { @Test(expected = EventTypeTimeoutException.class) public void whenPublishAndTimelineLockTimedOutThenException() throws Exception { Mockito.when(timelineSync.workWithEventType(any(String.class), anyLong())).thenThrow(new TimeoutException()); - publisher.publish(buildDefaultBatch(0).toString(), "blahET", null); + publisher.publish(buildDefaultBatch(0).toString(), "blahET", null, + Optional.empty()); } @Test @@ -211,12 +220,14 @@ public void whenValidationFailsThenResultIsAborted() throws Exception { mockFaultValidation(eventType, "error"); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); verify(enrichment, times(0)).enrich(any(), any()); verify(partitionResolver, times(0)).resolvePartition(any(EventType.class), any(BatchItem.class), any()); - verify(topicRepository, times(0)).syncPostBatch(any(), any(), any(), any(), anyBoolean()); + verify(topicRepository, times(0)).syncPostBatch(any(), any(), any(), any(), + any(), anyBoolean()); } @Test @@ -227,7 +238,8 @@ public void whenValidationFailsThenSubsequentItemsAreAborted() throws Exception mockFaultValidation(eventType, "error"); - EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -245,7 +257,8 @@ public void whenValidationFailsThenSubsequentItemsAreAborted() throws Exception // test with event header being set mockSuccessfulOwnerExtraction(eventType); - result = publisher.publish(batch.toString(), eventType.getName(), null); + result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); second = result.getResponses().get(1); @@ -260,12 +273,14 @@ public void whenEventIsTooLargeThenResultIsAborted() throws Exception { mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); verify(enrichment, times(0)).enrich(any(), any()); verify(partitionResolver, times(0)).resolvePartition(any(EventType.class), any(BatchItem.class), any()); - verify(topicRepository, times(0)).syncPostBatch(any(), any(), any(), any(), anyBoolean()); + verify(topicRepository, times(0)).syncPostBatch(any(), any(), any(), any(), + any(), anyBoolean()); } @Test @@ -281,7 +296,8 @@ public void whenEventIsTooLargeThenAllItemsAreAborted() throws Exception { mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -314,7 +330,8 @@ public void whenEnrichmentMakesEventTooLargeThenAllItemsAreAborted() throws Exce mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -341,12 +358,14 @@ public void whenEventIsExactlyMaxSizeThenResultIsSuccess() throws Exception { mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.SUBMITTED)); verify(enrichment, times(1)).enrich(any(), any()); verify(partitionResolver, times(1)).resolvePartition(any(EventType.class), any(BatchItem.class), any()); - verify(topicRepository, times(1)).syncPostBatch(any(), any(), any(), any(), eq(false)); + verify(topicRepository, times(1)).syncPostBatch(any(), any(), any(), any(), + any(), eq(false)); } @Test @@ -356,12 +375,14 @@ public void whenEventIsOneByteOverMaxSizeThenResultIsAborted() throws Exception mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); verify(enrichment, times(0)).enrich(any(), any()); verify(partitionResolver, times(0)).resolvePartition(any(EventType.class), any(BatchItem.class), any()); - verify(topicRepository, times(0)).syncPostBatch(any(), any(), any(), any(), anyBoolean()); + verify(topicRepository, times(0)).syncPostBatch(any(), any(), any(), any(), + any(), anyBoolean()); } @Test @@ -371,12 +392,14 @@ public void whenEventIsOneByteOverMaxSizeWithMultiByteCharsThenResultIsAborted() mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); verify(enrichment, times(0)).enrich(any(), any()); verify(partitionResolver, times(0)).resolvePartition(any(EventType.class), any(BatchItem.class), any()); - verify(topicRepository, times(0)).syncPostBatch(any(), any(), any(), any(), anyBoolean()); + verify(topicRepository, times(0)).syncPostBatch(any(), any(), any(), any(), + any(), anyBoolean()); } @Test @@ -386,12 +409,14 @@ public void whenEventIsExactlyMaxSizeWithMultiByteCharsThenResultIsSuccess() thr mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.SUBMITTED)); verify(enrichment, times(1)).enrich(any(), any()); verify(partitionResolver, times(1)).resolvePartition(any(EventType.class), any(BatchItem.class), any()); - verify(topicRepository, times(1)).syncPostBatch(any(), any(), any(), any(), eq(false)); + verify(topicRepository, times(1)).syncPostBatch(any(), any(), any(), any(), + any(), eq(false)); } @Test @@ -405,7 +430,7 @@ public void whenPartitionFailsThenResultIsAborted() throws Exception { mockFaultPartition(); final EventPublishResult result = publisher.publish(createStringFromBatchItems(batch), - eventType.getName(), null); + eventType.getName(), null, Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); } @@ -422,7 +447,7 @@ public void whenPartitionFailsThenSubsequentItemsAreAborted() throws Exception { mockFaultPartition(); final EventPublishResult result = publisher.publish(createStringFromBatchItems(batch), - eventType.getName(), null); + eventType.getName(), null, Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -448,10 +473,12 @@ public void whenPublishingFailsThenResultIsFailed() throws Exception { mockSuccessfulValidation(eventType); mockFailedPublishing(); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.FAILED)); - verify(topicRepository, times(1)).syncPostBatch(any(), any(), any(), any(), eq(false)); + verify(topicRepository, times(1)).syncPostBatch(any(), any(), any(), any(), + any(), eq(false)); } @Test @@ -463,13 +490,15 @@ public void whenEnrichmentFailsThenResultIsAborted() throws Exception { mockSuccessfulValidation(eventType); mockFaultEnrichment(); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); verify(cache, atLeastOnce()).getValidator(eventType.getName()); verify(partitionResolver, times(1)).resolvePartition(any(EventType.class), any(BatchItem.class), any()); verify(enrichment, times(1)).enrich(any(), any()); - verify(topicRepository, times(0)).syncPostBatch(any(), any(), any(), any(), anyBoolean()); + verify(topicRepository, times(0)).syncPostBatch(any(), any(), any(), any(), + any(), anyBoolean()); } @Test @@ -483,7 +512,8 @@ public void whenSinglePartitioningKeyThenEventKeyIsSet() throws Exception { final JSONObject event = new JSONObject("{\"my_field\": \"my_key\"}"); final JSONArray batch = new JSONArray(List.of(event)); - publisher.publish(batch.toString(), eventType.getName(), null); + publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); final List publishedBatch = capturePublishedBatch(); final BatchItem publishedItem = publishedBatch.get(0); @@ -502,7 +532,8 @@ public void whenMultiplePartitioningKeyThenEventKeyIsComposite() throws Exceptio final JSONObject event = new JSONObject("{\"my_field\": \"my_key\", \"other_field\": \"other_value\"}"); final JSONArray batch = new JSONArray(List.of(event)); - publisher.publish(batch.toString(), eventType.getName(), null); + publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); final List publishedBatch = capturePublishedBatch(); final BatchItem publishedItem = publishedBatch.get(0); @@ -524,7 +555,8 @@ public void whenCompactedThenUsesPartitionCompactionKey() throws Exception { " \"my_field\": \"my_key\"}"); final JSONArray batch = new JSONArray(List.of(event)); - publisher.publish(batch.toString(), eventType.getName(), null); + publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); final List publishedBatch = capturePublishedBatch(); final BatchItem publishedItem = publishedBatch.get(0); @@ -543,7 +575,8 @@ public void whenNotAHashPartitioningStrategyThenEventKeyIsNotSet() throws Except final JSONArray batch = buildDefaultBatch(1); - publisher.publish(batch.toString(), eventType.getName(), null); + publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); final List publishedBatch = capturePublishedBatch(); final BatchItem publishedItem = publishedBatch.get(0); @@ -554,7 +587,8 @@ public void whenNotAHashPartitioningStrategyThenEventKeyIsNotSet() throws Except @SuppressWarnings("unchecked") private List capturePublishedBatch() { final ArgumentCaptor batchCaptor = ArgumentCaptor.forClass(List.class); - verify(topicRepository, atLeastOnce()).syncPostBatch(any(), batchCaptor.capture(), any(), any(), eq(false)); + verify(topicRepository, atLeastOnce()).syncPostBatch(any(), batchCaptor.capture(), any(), any(), + any(), eq(false)); return (List) batchCaptor.getValue(); } @@ -566,7 +600,8 @@ public void whenEnrichmentFailsThenSubsequentItemsAreAborted() throws Exception mockSuccessfulValidation(eventType); mockFaultEnrichment(); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -595,7 +630,8 @@ public void whenEventAuthorizationFailsThenSubsequentItemsAreAborted() throws Ex .when(authzValidator) .authorizeEventWrite(any(BatchItem.class)); - final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); assertThat(result.getStatus(), equalTo(EventPublishingStatus.ABORTED)); @@ -617,7 +653,8 @@ public void testWrite() throws Exception { final EventType eventType = EventTypeTestBuilder.builder().build(); Mockito.when(cache.getEventType(eventType.getName())).thenReturn(eventType); mockSuccessfulValidation(eventType); - final EventPublishResult result = publisher.publish(buildDefaultBatch(0).toString(), eventType.getName(), null); + final EventPublishResult result = publisher.publish(buildDefaultBatch(0).toString(), eventType.getName(), null, + Optional.empty()); Assert.assertEquals(result.getStatus(), EventPublishingStatus.SUBMITTED); } @@ -670,8 +707,10 @@ public void testAvroEventWasSerialized() throws Exception { .fromAvroRecord(metadata, event); final List records = Collections.singletonList(nakadiRecord); - eventPublisher.publish(eventType, records, null); - Mockito.verify(topicRepository).sendEvents(ArgumentMatchers.eq(topic), ArgumentMatchers.eq(records), eq(null)); + eventPublisher.publish(eventType, records, + null, Optional.empty()); + Mockito.verify(topicRepository).sendEvents(ArgumentMatchers.eq(topic), ArgumentMatchers.eq(records), + eq(null), any()); } @Test @@ -685,7 +724,8 @@ public void testUniqueEventTypePartitions() throws Exception { final Set uniqueEventTypePartitions = publisher.getUniqueEventTypePartitions(); uniqueEventTypePartitions.clear(); - publisher.publish(batch.toString(), eventType.getName(), null); + publisher.publish(batch.toString(), eventType.getName(), null, + Optional.empty()); Assert.assertEquals(1, uniqueEventTypePartitions.size()); final String expectedEntry = String.format("%s:%s", eventType.getName(), 0); @@ -696,7 +736,8 @@ private void mockFailedPublishing() { Mockito .doThrow(EventPublishingException.class) .when(topicRepository) - .syncPostBatch(any(), any(), any(), any(), anyBoolean()); + .syncPostBatch(any(), any(), any(), any(), + any(), anyBoolean()); } private void mockFaultPartition() throws PartitioningException { diff --git a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventsProcessorTest.java b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventsProcessorTest.java index d02c80d90e..ea896d53c0 100644 --- a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventsProcessorTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventsProcessorTest.java @@ -22,7 +22,7 @@ public void shouldSendEventWhenSubmitted() throws InterruptedException { eventsProcessor.queueEvent("test_et_name", event); TestUtils.waitFor(() -> { try { - Mockito.verify(eventPublisher).processInternal(any(), any(), any(), eq(false), eq(false)); + Mockito.verify(eventPublisher).processInternal(any(), any(), any(), any(), eq(false), eq(false)); } catch (final Exception e) { throw new AssertionError(e); } diff --git a/docker-compose.yml b/docker-compose.yml index adff770255..91a2813581 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -71,3 +71,4 @@ services: KAFKA_CLIENT_PASSWORDS: nakadi_password, producer_password KAFKA_CFG_SASL_ENABLED_MECHANISMS: PLAIN KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_CFG_MESSAGE_MAX_BYTES: 2098152