Skip to content
This repository has been archived by the owner on May 3, 2024. It is now read-only.

Commit

Permalink
Some more cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
jhorstmann committed Dec 5, 2016
1 parent 8655707 commit 518b167
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 15 deletions.
5 changes: 1 addition & 4 deletions src/main/java/org/zalando/fahrschein/CursorManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -16,8 +15,6 @@
*/
public interface CursorManager {

Comparator<String> OFFSET_COMPARATOR = Comparator.nullsFirst(Comparator.comparing((String offset) -> "BEGIN".equals(offset) ? null : Long.parseLong(offset)));

void onSuccess(String eventName, Cursor cursor) throws IOException;

void onError(String eventName, Cursor cursor, Throwable throwable) throws IOException;
Expand Down Expand Up @@ -59,7 +56,7 @@ default void updatePartitions(String eventName, List<Partition> partitions) thro

for (Partition partition : partitions) {
final Cursor cursor = cursorsByPartition.get(partition.getPartition());
if (cursor == null || (!"BEGIN".equals(cursor.getOffset()) && OFFSET_COMPARATOR.compare(cursor.getOffset(), partition.getOldestAvailableOffset()) < 0)) {
if (cursor == null || (!"BEGIN".equals(cursor.getOffset()) && OffsetComparator.INSTANCE.compare(cursor.getOffset(), partition.getOldestAvailableOffset()) < 0)) {
onSuccess(eventName, new Cursor(partition.getPartition(), "BEGIN"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;

import static java.util.Collections.singletonList;
import static org.zalando.fahrschein.Preconditions.checkState;

public class ManagedCursorManager implements CursorManager {

Expand Down Expand Up @@ -82,6 +83,7 @@ public ManagedCursorManager(URI baseUri, ClientHttpRequestFactory clientHttpRequ

@Override
public void addSubscription(Subscription subscription) {
checkState(subscription.getEventTypes().size() == 1);
final String eventName = subscription.getEventTypes().iterator().next();

LOG.debug("Adding subscription [{}] to event [{}]", subscription.getId(), eventName);
Expand All @@ -91,6 +93,7 @@ public void addSubscription(Subscription subscription) {

@Override
public void addStreamId(Subscription subscription, String streamId) {
checkState(subscription.getEventTypes().size() == 1);
final String eventName = subscription.getEventTypes().iterator().next();

LOG.debug("Adding stream id [{}] for subscription [{}] to event [{}]", streamId, subscription.getId(), eventName);
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/org/zalando/fahrschein/OffsetComparator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.zalando.fahrschein;

import java.util.Comparator;

class OffsetComparator implements Comparator<String> {
static final Comparator<String> INSTANCE = new OffsetComparator();

private OffsetComparator() {

}

@Override
public int compare(String o1, String o2) {
if ("BEGIN".equals(o1)) {
return "BEGIN".equals(o2) ? 0 : -1;
} else {
int l1 = o1.length();
int l2 = o2.length();
if (l1 < l2) {
return -1;
} else if (l1 > l2) {
return 1;
} else {
return o1.compareTo(o2);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static java.util.Arrays.asList;

public class ProblemHandlingClientHttpRequest implements ClientHttpRequest {
class ProblemHandlingClientHttpRequest implements ClientHttpRequest {

private static final MediaType APPLICATION_PROBLEM_JSON = new MediaType("application", "problem+json");
private static final Set<MediaType> PROBLEM_CONTENT_TYPES = new HashSet<>(asList(APPLICATION_PROBLEM_JSON, MediaType.APPLICATION_JSON));
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/zalando/fahrschein/redis/Codec.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public String[] deserialize(final byte[] bytes) {
return null;
}

List<String> result = new ArrayList<>();
StringTokenizer st = new StringTokenizer(new String(bytes, UTF8), DELIMITER_STRING);
final List<String> result = new ArrayList<>();
final StringTokenizer st = new StringTokenizer(new String(bytes, UTF8), DELIMITER_STRING);
while (st.hasMoreTokens()) {
result.add(st.nextToken());
}
Expand Down
10 changes: 10 additions & 0 deletions src/test/java/org/zalando/fahrschein/CursorManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public void shouldNotUpdatePartitionWhenOffsetStillAvailable() throws IOExceptio
run("20", "10", "30", null);
}

@Test
public void shouldNotUpdatePartitionWhenOffsetStillAvailableAndMore() throws IOException {
run("234", "12", "2345", null);
}

@Test
public void shouldUpdatePartitionWhenNoCursorAndLastConsumedOffsetNoLongerAvailable() throws IOException {
run(null, "10", "20", "BEGIN");
Expand Down Expand Up @@ -73,4 +78,9 @@ public void shouldUpdatePartitionToNewestAvailableWhenPartitionIsExpired() throw
run("10", "22", "21", "BEGIN");
}

@Test
public void shouldUpdatePartitionToNewestAvailableWhenPartitionIsExpiredLongAgo() throws IOException {
run("10", "1234", "2345", "BEGIN");
}

}
26 changes: 18 additions & 8 deletions src/test/java/org/zalando/fahrschein/salesorder/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.zalando.fahrschein.ZignAccessTokenProvider;
import org.zalando.fahrschein.domain.Lock;
import org.zalando.fahrschein.domain.Partition;
import org.zalando.fahrschein.domain.Subscription;
import org.zalando.fahrschein.jdbc.JdbcCursorManager;
import org.zalando.fahrschein.jdbc.JdbcPartitionManager;
import org.zalando.fahrschein.salesorder.domain.SalesOrderPlaced;
Expand Down Expand Up @@ -63,30 +64,39 @@ public static void main(String[] args) throws IOException, InterruptedException
objectMapper.registerModule(new MoneyModule());
objectMapper.registerModule(new ParameterNamesModule());

AtomicInteger counter = new AtomicInteger();

final Listener<SalesOrderPlaced> listener = events -> {
if (Math.random() < 0.0001) {
// For testing reconnection logic
throw new EventProcessingException("Random failure");
} else {
for (SalesOrderPlaced salesOrderPlaced : events) {
LOG.debug("Received sales order [{}]", salesOrderPlaced.getSalesOrder().getOrderNumber());
final int count = counter.incrementAndGet();
if (count % 1000 == 0) {
LOG.info("Received [{}] sales orders", count);
}
LOG.info("Received sales order [{}]", salesOrderPlaced.getSalesOrder().getOrderNumber());
}
}
};

simpleListen(objectMapper, listener);
subscriptionListen(objectMapper, listener);

//simpleListen(objectMapper, listener);

//persistentListen(objectMapper, listener);

//multiInstanceListen(objectMapper, listener);
}

private static void subscriptionListen(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {

final NakadiClient nakadiClient = NakadiClient.builder(NAKADI_URI)
.withAccessTokenProvider(new ZignAccessTokenProvider())
.build();

final Subscription subscription = nakadiClient.subscribe("fahrschein-demo", SALES_ORDER_SERVICE_ORDER_PLACED, "fahrschein-demo");

nakadiClient.stream(subscription)
.withObjectMapper(objectMapper)
.listen(SalesOrderPlaced.class, listener);
}

private static void simpleListen(ObjectMapper objectMapper, Listener<SalesOrderPlaced> listener) throws IOException {
final InMemoryCursorManager cursorManager = new InMemoryCursorManager();

Expand Down

0 comments on commit 518b167

Please sign in to comment.