Skip to content
This repository was archived by the owner on May 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #84 from zalando-incubator/subscription-api-changes
Browse files Browse the repository at this point in the history
Ignore unknown properties and subscription api changes
  • Loading branch information
jhorstmann authored Sep 23, 2016
2 parents 1a7b911 + 1a566de commit 4b2ca3a
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 77 deletions.
4 changes: 4 additions & 0 deletions src/main/java/org/zalando/fahrschein/CursorManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ default void addSubscription(Subscription subscription) {

}

default void addStreamId(Subscription subscription, String streamId) {

}

/**
* Initializes offsets to start streaming from the newest available offset.
*/
Expand Down
96 changes: 79 additions & 17 deletions src/main/java/org/zalando/fahrschein/ManagedCursorManager.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.zalando.fahrschein;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpRequestFactory;
Expand All @@ -24,49 +24,110 @@
import java.util.List;
import java.util.Map;

import static java.util.Collections.singletonList;

public class ManagedCursorManager implements CursorManager {

private static final Logger LOG = LoggerFactory.getLogger(ManagedCursorManager.class);
private static final TypeReference<List<Cursor>> LIST_OF_CURSORS = new TypeReference<List<Cursor>>() {
};

static final class SubscriptionStream {
private final String eventName;
private final String subscriptionId;
private String streamId;

SubscriptionStream(String eventName, String subscriptionId) {
this.eventName = eventName;
this.subscriptionId = subscriptionId;
}

String getEventName() {
return eventName;
}

String getSubscriptionId() {
return subscriptionId;
}

String getStreamId() {
return streamId;
}

void setStreamId(String streamId) {
this.streamId = streamId;
}
}

static final class CursorResponse {
private final List<Cursor> items;

@JsonCreator
CursorResponse(@JsonProperty("items") List<Cursor> items) {
this.items = items;
}

List<Cursor> getItems() {
return items;
}
}

private final URI baseUri;
private final ClientHttpRequestFactory clientHttpRequestFactory;
private final ObjectMapper objectMapper;
private final Map<String, Subscription> subscriptions;
private final Map<String, SubscriptionStream> streams;

public ManagedCursorManager(URI baseUri, ClientHttpRequestFactory clientHttpRequestFactory, ObjectMapper objectMapper) {
this.baseUri = baseUri;
this.clientHttpRequestFactory = clientHttpRequestFactory;
this.objectMapper = objectMapper;
this.subscriptions = new HashMap<>();
this.streams = new HashMap<>();
}

@Override
public void addSubscription(Subscription subscription) {
final String eventName = Iterables.getOnlyElement(subscription.getEventTypes());
subscriptions.put(eventName, subscription);

LOG.debug("Adding subscription [{}] to event [{}]", subscription.getId(), eventName);

streams.put(eventName, new SubscriptionStream(eventName, subscription.getId()));
}

@Override
public void addStreamId(Subscription subscription, String streamId) {
final String eventName = Iterables.getOnlyElement(subscription.getEventTypes());

LOG.debug("Adding stream id [{}] for subscription [{}] to event [{}]", streamId, subscription.getId(), eventName);

streams.get(eventName).setStreamId(streamId);
}

@Override
public void onSuccess(String eventName, Cursor cursor) throws IOException {
final Subscription subscription = subscriptions.get(eventName);
final URI subscriptionUrl = baseUri.resolve(String.format("/subscriptions/%s/cursors", subscription.getId()));

final ClientHttpRequest request = clientHttpRequestFactory.createRequest(subscriptionUrl, HttpMethod.PUT);
final SubscriptionStream stream = streams.get(eventName);
final String subscriptionId = stream.getSubscriptionId();
final URI subscriptionUrl = baseUri.resolve(String.format("/subscriptions/%s/cursors", subscriptionId));

LOG.debug("Committing cursors for subscription [{}] to event [{}] in partition [{}] with offset [{}]", subscriptionId, stream.getEventName(), cursor.getPartition(), cursor.getOffset());

final ClientHttpRequest request = clientHttpRequestFactory.createRequest(subscriptionUrl, HttpMethod.POST);

request.getHeaders().setContentType(MediaType.APPLICATION_JSON);
request.getHeaders().put("X-Nakadi-StreamId", singletonList(stream.getStreamId()));

try (OutputStream os = request.getBody()) {
objectMapper.writeValue(os, Collections.singleton(cursor));
}

try (final ClientHttpResponse response = request.execute()) {

if (response.getStatusCode().value() == HttpStatus.NO_CONTENT.value()) {
LOG.warn("Cursor for event [{}] in partition [{}] with offset [{}] was already committed", eventName, cursor.getPartition(), cursor.getOffset());
} else if (response.getStatusCode().is2xxSuccessful()) {
LOG.debug("Successfully committed cursor for event [{}] in partition [{}] with offset [{}]", eventName, cursor.getPartition(), cursor.getOffset());
final int status = response.getStatusCode().value();
if (status == 204) {
LOG.debug("Successfully committed cursor for subscription [{}] to event [{}] in partition [{}] with offset [{}]", subscriptionId, eventName, cursor.getPartition(), cursor.getOffset());
} else if (status == 200) {
LOG.warn("Cursor for subscription [{}] to event [{}] in partition [{}] with offset [{}] was already committed", subscriptionId, eventName, cursor.getPartition(), cursor.getOffset());
} else {
// Error responses should already have been handled by ProblemHandlingClientHttpRequest, so we still treat this as success
LOG.warn("Unexpected status code [{}] for subscription [{}] to event [{}] in partition [{}] with offset [{}]", status, subscriptionId, eventName, cursor.getPartition(), cursor.getOffset());
}
}
}
Expand All @@ -78,14 +139,15 @@ public void onError(String eventName, Cursor cursor, Throwable throwable) {

@Override
public Collection<Cursor> getCursors(String eventName) throws IOException {
final Subscription subscription = subscriptions.get(eventName);
final URI subscriptionUrl = baseUri.resolve(String.format("/subscriptions/%s/cursors", subscription.getId()));
final SubscriptionStream stream = streams.get(eventName);
final URI subscriptionUrl = baseUri.resolve(String.format("/subscriptions/%s/cursors", stream.getSubscriptionId()));

final ClientHttpRequest request = clientHttpRequestFactory.createRequest(subscriptionUrl, HttpMethod.GET);

try (final ClientHttpResponse response = request.execute()) {
try (InputStream is = response.getBody()) {
return objectMapper.readValue(is, LIST_OF_CURSORS);
final CursorResponse cursorResponse = objectMapper.readValue(is, CursorResponse.class);
return cursorResponse.getItems();
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions src/main/java/org/zalando/fahrschein/NakadiClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,15 @@ public <T> void listen(Subscription subscription, Class<T> eventType, Listener<T
final String queryString = streamParameters.toQueryString();
final URI uri = baseUri.resolve(String.format("/subscriptions/%s/events?%s", subscription.getId(), queryString));

listen(uri, eventName, eventType, listener);
final NakadiReader<T> nakadiReader = nakadiReaderFactory.createReader(uri, eventName, Optional.of(subscription), eventType, listener);

nakadiReader.run();
}

public <T> void listen(String eventName, Class<T> eventType, Listener<T> listener, StreamParameters streamParameters) throws IOException {
final String queryString = streamParameters.toQueryString();
final URI uri = baseUri.resolve(String.format("/event-types/%s/events?%s", eventName, queryString));

listen(uri, eventName, eventType, listener);
}

private <T> void listen(URI uri, String eventName, Class<T> eventType, Listener<T> listener) throws IOException {
final NakadiReader<T> nakadiReader = nakadiReaderFactory.createReader(uri, eventName, Optional.<Subscription>empty(), eventType, listener);

nakadiReader.run();
Expand Down
86 changes: 62 additions & 24 deletions src/main/java/org/zalando/fahrschein/NakadiReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

class NakadiReader<T> implements IORunnable {
Expand Down Expand Up @@ -115,6 +116,11 @@ public void close() {
}
}

private static Optional<String> getStreamId(ClientHttpResponse response) {
return Optional.ofNullable(response.getHeaders())
.flatMap(h -> h.getOrDefault("X-Nakadi-StreamId", emptyList()).stream().findFirst());
}

private JsonInput openJsonInput() throws IOException {
final ClientHttpRequest request = clientHttpRequestFactory.createRequest(uri, HttpMethod.GET);
if (!subscription.isPresent()) {
Expand All @@ -126,7 +132,13 @@ private JsonInput openJsonInput() throws IOException {
}
final ClientHttpResponse response = request.execute();
try {
final Optional<String> streamId = getStreamId(response);
final JsonParser jsonParser = jsonFactory.createParser(response.getBody()).disable(JsonParser.Feature.AUTO_CLOSE_SOURCE);

if (subscription.isPresent() && streamId.isPresent()) {
cursorManager.addStreamId(subscription.get(), streamId.get());
}

return new JsonInput(response, jsonParser);
} catch (Throwable throwable) {
try {
Expand Down Expand Up @@ -154,6 +166,9 @@ private void processBatch(Batch<T> batch) throws IOException {
private Cursor readCursor(JsonParser jsonParser) throws IOException {
String partition = null;
String offset = null;
String eventType = null;
String cursorToken = null;


expectToken(jsonParser, JsonToken.START_OBJECT);

Expand All @@ -166,6 +181,12 @@ private Cursor readCursor(JsonParser jsonParser) throws IOException {
case "offset":
offset = jsonParser.nextTextValue();
break;
case "event_type":
eventType = jsonParser.nextTextValue();
break;
case "cursor_token":
cursorToken = jsonParser.nextTextValue();
break;
default:
LOG.warn("Unexpected field [{}] in cursor", field);
jsonParser.nextToken();
Expand All @@ -181,7 +202,7 @@ private Cursor readCursor(JsonParser jsonParser) throws IOException {
throw new IllegalStateException("Could not read offset from cursor for partition [" + partition + "]");
}

return new Cursor(partition, offset);
return new Cursor(partition, offset, eventType, cursorToken);
}

private List<T> readEvents(final JsonParser jsonParser) throws IOException {
Expand Down Expand Up @@ -233,36 +254,56 @@ void runInternal() throws IOException, BackoffException {

while (true) {
try {

LOG.debug("Waiting for next batch of events for [{}]", eventName);

expectToken(jsonParser, JsonToken.START_OBJECT);
expectToken(jsonParser, JsonToken.FIELD_NAME);
expectField(jsonParser, "cursor");

final Cursor cursor = readCursor(jsonParser);
metricsCollector.markMessageReceived();

LOG.debug("Cursor for [{}] partition [{}] at offset [{}]", eventName, cursor.getPartition(), cursor.getOffset());
Cursor cursor = null;
List<T> events = null;

while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
final String field = jsonParser.getCurrentName();
switch (field) {
case "cursor": {
cursor = readCursor(jsonParser);
break;
}
case "events": {
events = readEvents(jsonParser);
break;
}
case "info": {
LOG.debug("Skipping stream info in event batch");
jsonParser.nextToken();
jsonParser.skipChildren();
break;
}
default: {
LOG.warn("Unexpected field [{}] in event batch", field);
jsonParser.nextToken();
jsonParser.skipChildren();
break;
}
}
}

metricsCollector.markMessageReceived();
if (cursor == null) {
throw new IOException("Could not read cursor");
}

final JsonToken token = jsonParser.nextToken();
if (token != JsonToken.END_OBJECT) {
expectField(jsonParser, "events");
LOG.debug("Cursor for [{}] partition [{}] at offset [{}]", eventName, cursor.getPartition(), cursor.getOffset());

final List<T> events = readEvents(jsonParser);
if (events == null) {
metricsCollector.markEventsReceived(0);
} else {
metricsCollector.markEventsReceived(events.size());

expectToken(jsonParser, JsonToken.END_OBJECT);

final Batch<T> batch = new Batch<>(cursor, Collections.unmodifiableList(events));

processBatch(batch);

metricsCollector.markMessageSuccessfullyProcessed();
} else {
// it's a keep alive batch
metricsCollector.markEventsReceived(0);
}

errorCount = 0;
Expand All @@ -273,7 +314,7 @@ void runInternal() throws IOException, BackoffException {
if (errorCount > 0) {
LOG.warn("Got [{}] [{}] while reading events for [{}] after [{}] retries", e.getClass().getSimpleName(), e.getMessage(), eventName, errorCount, e);
} else {
LOG.info("Got [{}] [{}] while reading events for [{}]", e.getClass().getSimpleName(), e.getMessage(), eventName);
LOG.info("Got [{}] [{}] while reading events for [{}]", e.getClass().getSimpleName(), e.getMessage(), eventName, e);
}

jsonInput.close();
Expand Down Expand Up @@ -301,11 +342,6 @@ void runInternal() throws IOException, BackoffException {
}
}

private void expectField(JsonParser jsonParser, String expectedFieldName) throws IOException {
final String fieldName = jsonParser.getCurrentName();
checkState(expectedFieldName.equals(fieldName), "Expected [%s] field but got [%s]", expectedFieldName, fieldName);
}

private void expectToken(JsonParser jsonParser, JsonToken expectedToken) throws IOException {
final JsonToken token = jsonParser.nextToken();
if (token == null) {
Expand All @@ -315,7 +351,9 @@ private void expectToken(JsonParser jsonParser, JsonToken expectedToken) throws
throw new EOFException("Stream was closed");
}
}
checkState(token == expectedToken, "Expected [%s] but got [%s]", expectedToken, token);
if (token != expectedToken) {
throw new IOException(String.format("Expected [%s] but got [%s]", expectedToken, token));
}
}

}
Loading

0 comments on commit 4b2ca3a

Please sign in to comment.