Skip to content
This repository has been archived by the owner on May 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #315 from zalando-nakadi/add-documentation
Browse files Browse the repository at this point in the history
Add documentation
  • Loading branch information
MALPI authored May 4, 2022
2 parents 78a5ca6 + af9983d commit 8a6c370
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
import java.util.Formatter;
import java.util.Locale;

@SuppressWarnings("serial")
/**
* Thrown in case the client wasn't able to publish the given batch of events to Nakadi.
*
* The response will contain an array of {@code BatchItemResponse}.
*/
public class EventPublishingException extends IOException {
private final BatchItemResponse[] responses;

Expand Down
46 changes: 35 additions & 11 deletions fahrschein/src/main/java/org/zalando/fahrschein/NakadiClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,43 +26,67 @@
import static org.zalando.fahrschein.Preconditions.checkArgument;
import static org.zalando.fahrschein.Preconditions.checkState;

/**
* General implementation of the Nakadi Client used within this Library.
*/
public class NakadiClient {
private static final Logger LOG = LoggerFactory.getLogger(NakadiClient.class);

private static final TypeReference<List<Partition>> LIST_OF_PARTITIONS = new TypeReference<List<Partition>>() {
};

private final URI baseUri;
private final RequestFactory clientHttpRequestFactory;
private final RequestFactory requestFactory;
private final ObjectMapper internalObjectMapper;
private final ObjectMapper objectMapper;
private final CursorManager cursorManager;

public static NakadiClientBuilder builder(URI baseUri, RequestFactory clientHttpRequestFactory) {
return new NakadiClientBuilder(baseUri, clientHttpRequestFactory);
/**
* Returns a new Builder that will make use of the given {@code RequestFactory}.
*
* @param baseUri that we will send requests to
* @param requestFactory that we use for the execution of our HTTP Requests.
* @return A builder to initialize the client. Can be further modified later.
*/
public static NakadiClientBuilder builder(URI baseUri, RequestFactory requestFactory) {
return new NakadiClientBuilder(baseUri, requestFactory);
}

NakadiClient(URI baseUri, RequestFactory clientHttpRequestFactory, ObjectMapper objectMapper, CursorManager cursorManager) {
NakadiClient(URI baseUri, RequestFactory requestFactory, ObjectMapper objectMapper, CursorManager cursorManager) {
this.baseUri = baseUri;
this.clientHttpRequestFactory = clientHttpRequestFactory;
this.requestFactory = requestFactory;
this.objectMapper = objectMapper;
this.internalObjectMapper = DefaultObjectMapper.INSTANCE;
this.cursorManager = cursorManager;
}

/**
* Resolves a list of partitions for the given eventName.
* @param eventName that we want to resolve the partitions for.
* @return {@code List<Partition>} or {@code null} in
* @throws IOException in case of network issues.
*/
public List<Partition> getPartitions(String eventName) throws IOException {
final URI uri = baseUri.resolve(String.format("/event-types/%s/partitions", eventName));
final Request request = clientHttpRequestFactory.createRequest(uri, "GET");
final Request request = requestFactory.createRequest(uri, "GET");
try (final Response response = request.execute()) {
try (final InputStream is = response.getBody()) {
return internalObjectMapper.readValue(is, LIST_OF_PARTITIONS);
}
}
}

/**
* Writes the given events to the endpoint provided by the eventName.
* @param eventName where the event should be written to
* @param events that should be written
* @param <T> Type of the Event
* @throws IOException in case we fail to reach Nakadi
* @throws EventPublishingException In case Nakadi returns an Erroneous response
*/
public <T> void publish(String eventName, List<T> events) throws EventPublishingException, IOException {
final URI uri = baseUri.resolve(String.format("/event-types/%s/events", eventName));
final Request request = clientHttpRequestFactory.createRequest(uri, "POST");
final Request request = requestFactory.createRequest(uri, "POST");

request.getHeaders().setContentType(ContentType.APPLICATION_JSON);

Expand Down Expand Up @@ -106,7 +130,7 @@ public void deleteSubscription(String subscriptionId) throws IOException {
checkArgument(!subscriptionId.isEmpty(), "Subscription ID cannot be empty.");

final URI uri = baseUri.resolve(String.format("/subscriptions/%s", subscriptionId));
final Request request = clientHttpRequestFactory.createRequest(uri, "DELETE");
final Request request = requestFactory.createRequest(uri, "DELETE");

request.getHeaders().setContentType(ContentType.APPLICATION_JSON);

Expand All @@ -127,7 +151,7 @@ Subscription subscribe(String applicationName, Set<String> eventNames, String co
final SubscriptionRequest subscription = new SubscriptionRequest(applicationName, eventNames, consumerGroup, readFrom, initialCursors, authorization);

final URI uri = baseUri.resolve("/subscriptions");
final Request request = clientHttpRequestFactory.createRequest(uri, "POST");
final Request request = requestFactory.createRequest(uri, "POST");

request.getHeaders().setContentType(ContentType.APPLICATION_JSON);

Expand All @@ -148,11 +172,11 @@ Subscription subscribe(String applicationName, Set<String> eventNames, String co
public StreamBuilder.SubscriptionStreamBuilder stream(Subscription subscription) {
checkState(cursorManager instanceof ManagedCursorManager, "Subscription api requires a ManagedCursorManager");

return new StreamBuilders.SubscriptionStreamBuilderImpl(baseUri, clientHttpRequestFactory, cursorManager, objectMapper, subscription);
return new StreamBuilders.SubscriptionStreamBuilderImpl(baseUri, requestFactory, cursorManager, objectMapper, subscription);
}

public StreamBuilder.LowLevelStreamBuilder stream(String eventName) {
return new StreamBuilders.LowLevelStreamBuilderImpl(baseUri, clientHttpRequestFactory, cursorManager, objectMapper, eventName);
return new StreamBuilders.LowLevelStreamBuilderImpl(baseUri, requestFactory, cursorManager, objectMapper, eventName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ static RequestFactory wrapClientHttpRequestFactory(RequestFactory delegate, @Nul
return requestFactory;
}

/**
* Creates a new instance of {@code NakadiClient}. In case no {@code ObjectMapper} is provided, it's going to make
* use of {@code DefaultObjectMapper} that is making use of
* {@code PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES}.
* In case no {@code CursorManager} is provided it's going to make use of {@code ManagedCursorManager}.
*
* @return A fresh instance of {@code NakadiClient}
*/
public NakadiClient build() {
final RequestFactory clientHttpRequestFactory = wrapClientHttpRequestFactory(this.clientHttpRequestFactory, authorizationProvider);
final CursorManager cursorManager = this.cursorManager != null ? this.cursorManager : new ManagedCursorManager(baseUri, clientHttpRequestFactory, true);
Expand Down

0 comments on commit 8a6c370

Please sign in to comment.