Skip to content
This repository was archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1586 from zalando/207-partition-reporting
Browse files Browse the repository at this point in the history
Expose partition in BatchItemResponse (useful for 207s)
  • Loading branch information
fghibellini authored Feb 5, 2024
2 parents 2502380 + 8f72a5c commit ab6690e
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

@Service
Expand All @@ -22,6 +23,7 @@ public EventPublishResult mapPublishingResultToView(final List<NakadiRecordResul
.setStep(EventPublishingStep.PUBLISHING)
.setPublishingStatus(status)
.setEid(recordMetadata.getMetadata().getEid())
.setPartition(Optional.ofNullable(recordMetadata.getMetadata().getPartition()))
.setDetail((recordMetadata.getException() != null) ?
recordMetadata.getException().getMessage() : ""));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public static EmptyInjectionConfiguration build(final int position, final boolea
private final InjectionConfiguration[] injections;
private String[] injectionValues;
private final List<Integer> skipCharacters;
private String partition;
private String eventKey;
private List<String> partitionKeys;
private int eventSize;
Expand Down Expand Up @@ -108,11 +107,11 @@ public int getEventSize() {

@Nullable
public String getPartition() {
return partition;
return this.response.getPartition().orElse(null);
}

public void setPartition(final String partition) {
this.partition = partition;
this.response.setPartition(Optional.ofNullable(partition));
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
package org.zalando.nakadi.domain;

import java.util.Optional;

public class BatchItemResponse {
private volatile EventPublishingStatus publishingStatus = EventPublishingStatus.ABORTED;
private volatile String detail = "";
private EventPublishingStep step = EventPublishingStep.NONE;
private String eid = "";
private Optional<String> partition = Optional.empty();

public String getEid() {
return eid;
}

public Optional<String> getPartition() {
return partition;
}

public BatchItemResponse setPartition(final Optional<String> partition) {
this.partition = partition;
return this;
}

public BatchItemResponse setEid(final String eid) {
this.eid = eid;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.zalando.nakadi.domain.EventTypeBase;
import org.zalando.nakadi.domain.NakadiMetadata;
import org.zalando.nakadi.domain.NakadiRecord;
import org.zalando.nakadi.domain.NakadiRecordResult;
import org.zalando.nakadi.domain.Timeline;
import org.zalando.nakadi.enrichment.Enrichment;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
Expand All @@ -39,6 +40,7 @@
import org.zalando.nakadi.service.AuthorizationValidator;
import org.zalando.nakadi.service.LocalSchemaRegistry;
import org.zalando.nakadi.service.publishing.check.Check;
import org.zalando.nakadi.service.publishing.check.PartitioningCheck;
import org.zalando.nakadi.service.timeline.TimelineService;
import org.zalando.nakadi.service.timeline.TimelineSync;
import org.zalando.nakadi.util.MDCUtils;
Expand All @@ -47,10 +49,12 @@
import org.zalando.nakadi.validation.ValidationError;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -67,6 +71,7 @@
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -94,6 +99,7 @@ public class EventPublisherTest {
protected final Enrichment enrichment = mock(Enrichment.class);
protected final AuthorizationValidator authzValidator = mock(AuthorizationValidator.class);
protected final TimelineService timelineService = Mockito.mock(TimelineService.class);

protected final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, 1, 2,
NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, TIMELINE_WAIT_TIMEOUT_MS, NAKADI_EVENT_MAX_BYTES,
NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "org/zalando/nakadi", "", "",
Expand Down Expand Up @@ -128,6 +134,90 @@ public void whenPublishIsSuccessfulThenResultIsSubmitted() throws Exception {
verify(topicRepository, times(1)).syncPostBatch(any(), any(), any(), any(), eq(false));
}

@Test
public void whenPartitionIsUnavailable207IsReported() throws Exception {
final EventType eventType = buildDefaultEventType();
final JSONArray batch = buildDefaultBatch(1);

mockSuccessfulValidation(eventType);
Mockito.when(partitionResolver.resolvePartition(any(EventType.class), any(BatchItem.class), any()))
.thenReturn("3");
mockFailedWriteToKafka();

final EventPublishResult result = publisher.publish(batch.toString(), eventType.getName(), null);
assertThat(result.getStatus(), equalTo(EventPublishingStatus.FAILED));
result.getResponses().stream()
.forEach(bi -> Assert.assertEquals(
"Reported BatchItemResponse has a partition specified",
Optional.of("3"),
bi.getPartition()));
}

private NakadiRecord mockNakadiRecord() throws IOException {
final org.springframework.core.io.Resource eventTypeRes =
new DefaultResourceLoader().getResource("avro-schema/");
final LocalSchemaRegistry localSchemaRegistry = new LocalSchemaRegistry(eventTypeRes);

final Instant now = Instant.now();
final NakadiMetadata metadata = new NakadiMetadata();
metadata.setOccurredAt(now);
metadata.setEid("9702cf96-9bdb-48b7-9f4c-92643cb6d9fc");
metadata.setFlowId(MDCUtils.getFlowId());
metadata.setEventType("nakadi.access.log");
metadata.setPartition("0");
metadata.setReceivedAt(now);
metadata.setSchemaVersion("1.0.0");
metadata.setPublishedBy("adyachkov");

final SpecificRecord event = NakadiAccessLog.newBuilder()
.setMethod("POST")
.setPath("/event-types")
.setQuery("")
.setUserAgent("test-user-agent")
.setApp("nakadi")
.setAppHashed("hashed-app")
.setContentEncoding("--")
.setAcceptEncoding("-")
.setStatusCode(201)
.setResponseTimeMs(10)
.setRequestLength(123)
.setResponseLength(321)
.build();

final NakadiRecord nakadiRecord = new NakadiRecordMapper(localSchemaRegistry)
.fromAvroRecord(metadata, event);

return nakadiRecord;
}

@Test
public void whenPartitionIsUnavailable207IsReportedBinary() throws Exception {
final EventType eventType = buildDefaultEventType();
final String topic = UUID.randomUUID().toString();
final String eventTypeName = eventType.getName();
Mockito.when(cache.getEventType(eventTypeName)).thenReturn(eventType);
Mockito.when(timelineService.getActiveTimeline(eventType))
.thenReturn(new Timeline(eventTypeName, 0, null, topic, null));
Mockito.when(partitionResolver.resolvePartition(any(EventType.class), any(NakadiMetadata.class), any()))
.thenReturn("1");

mockFailedBinaryWriteToKafka();

final NakadiRecord nakadiRecord = mockNakadiRecord();

final var partitionCheck = new PartitioningCheck(cache, partitionResolver);
final BinaryEventPublisher eventPublisher = new BinaryEventPublisher(
timelineService, timelineSync, nakadiSettings,
List.of(partitionCheck), List.of(partitionCheck), List.of(partitionCheck));

final List<NakadiRecord> records = Collections.singletonList(nakadiRecord);
final List<NakadiRecordResult> publishResult = eventPublisher.publish(eventType, records, null);
Mockito.verify(topicRepository).sendEvents(ArgumentMatchers.eq(topic), ArgumentMatchers.eq(records), eq(null));

Assert.assertNotEquals(NakadiRecordResult.Status.SUCCEEDED, publishResult.get(0).getStatus());
Assert.assertEquals("1", publishResult.get(0).getMetadata().getPartition());
}

@Test
public void whenPublishThenExtractorForOwnerCreated() throws Exception {
final EventType eventType = buildDefaultEventType();
Expand Down Expand Up @@ -624,9 +714,6 @@ public void testWrite() throws Exception {

@Test
public void testAvroEventWasSerialized() throws Exception {
final org.springframework.core.io.Resource eventTypeRes =
new DefaultResourceLoader().getResource("avro-schema/");
final LocalSchemaRegistry localSchemaRegistry = new LocalSchemaRegistry(eventTypeRes);
final var dummyCheck = Mockito.mock(Check.class);
final BinaryEventPublisher eventPublisher = new BinaryEventPublisher(
timelineService, timelineSync, nakadiSettings,
Expand All @@ -640,34 +727,7 @@ public void testAvroEventWasSerialized() throws Exception {
Mockito.when(partitionResolver.resolvePartition(any(EventType.class), any(NakadiMetadata.class), any()))
.thenReturn("1");

final Instant now = Instant.now();
final NakadiMetadata metadata = new NakadiMetadata();
metadata.setOccurredAt(now);
metadata.setEid("9702cf96-9bdb-48b7-9f4c-92643cb6d9fc");
metadata.setFlowId(MDCUtils.getFlowId());
metadata.setEventType("nakadi.access.log");
metadata.setPartition("0");
metadata.setReceivedAt(now);
metadata.setSchemaVersion("1.0.0");
metadata.setPublishedBy("adyachkov");

final SpecificRecord event = NakadiAccessLog.newBuilder()
.setMethod("POST")
.setPath("/event-types")
.setQuery("")
.setUserAgent("test-user-agent")
.setApp("nakadi")
.setAppHashed("hashed-app")
.setContentEncoding("--")
.setAcceptEncoding("-")
.setStatusCode(201)
.setResponseTimeMs(10)
.setRequestLength(123)
.setResponseLength(321)
.build();

final NakadiRecord nakadiRecord = new NakadiRecordMapper(localSchemaRegistry)
.fromAvroRecord(metadata, event);
final NakadiRecord nakadiRecord = mockNakadiRecord();

final List<NakadiRecord> records = Collections.singletonList(nakadiRecord);
eventPublisher.publish(eventType, records, null);
Expand Down Expand Up @@ -835,4 +895,41 @@ private String createStringFromBatchItems(final List<BatchItem> batch) {
sb.setCharAt(sb.length() - 1, ']');
return sb.toString();
}

private void mockFailedWriteToKafka() {
doAnswer((invocation) -> {
final Object[] args = invocation.getArguments();
final String topicId = (String) args[0];
final List<BatchItem> invocationBatch = (List<BatchItem>) args[1];
final String et = (String) args[2];

invocationBatch
.stream()
.forEach(item -> item.updateStatusAndDetail(EventPublishingStatus.FAILED, "timed out"));
throw new EventPublishingException(
"Timeout publishing message to kafka",
new TimeoutException(),
topicId,
et);
}).when(topicRepository).syncPostBatch(any(), any(), any(), any(), eq(false));
}

private void mockFailedBinaryWriteToKafka() {
doAnswer((invocation) -> {
final Object[] args = invocation.getArguments();
final String topicId = (String) args[0];
final List<NakadiRecord> nakadiRecords = (List<NakadiRecord>) args[1];

final List<NakadiRecordResult> resps = new LinkedList<>();
for (final NakadiRecord record : nakadiRecords) {
resps.add(new NakadiRecordResult(
record.getMetadata(),
NakadiRecordResult.Status.ABORTED,
NakadiRecordResult.Step.PUBLISHING,
new TimeoutException()));
}
return resps;
}).when(topicRepository).sendEvents(any(), any(), eq(null));
}

}

0 comments on commit ab6690e

Please sign in to comment.