Skip to content
This repository was archived by the owner on May 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #53 from zalando-incubator/fix-is-available-again
Browse files Browse the repository at this point in the history
Fix cursor manager updating of offsets
  • Loading branch information
jhorstmann authored Jul 19, 2016
2 parents f64f77f + 0b802ca commit 8221f8b
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 43 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.10.19</version>
<version>2.0.86-beta</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
18 changes: 14 additions & 4 deletions src/main/java/org/zalando/fahrschein/CursorManager.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.zalando.fahrschein;

import com.google.common.collect.Ordering;
import org.zalando.fahrschein.domain.Cursor;
import org.zalando.fahrschein.domain.Partition;
import org.zalando.fahrschein.domain.Subscription;
Expand All @@ -14,8 +15,13 @@
* Manages cursor offsets for one consumer. One consumer can handle several distinct events.
*/
public interface CursorManager {

Ordering<String> OFFSET_ORDERING = Ordering.natural().nullsFirst().onResultOf((String offset) -> "BEGIN".equals(offset) ? null : Long.parseLong(offset));

void onSuccess(String eventName, Cursor cursor) throws IOException;

void onError(String eventName, Cursor cursor, Throwable throwable) throws IOException;

Collection<Cursor> getCursors(String eventName) throws IOException;

default void addSubscription(Subscription subscription) {
Expand All @@ -32,21 +38,25 @@ default void fromNewestAvailableOffsets(String eventName, List<Partition> partit
}

/**
* Initializes offsets to start streaming at the oldes available offset (BEGIN).
* Initializes offsets to start streaming at the oldest available offset (BEGIN).
*/
default void fromOldestAvailableOffset(String eventName, List<Partition> partitions) throws IOException {
for (Partition partition : partitions) {
onSuccess(eventName, new Cursor(partition.getPartition(), partition.getOldestAvailableOffset()));
onSuccess(eventName, new Cursor(partition.getPartition(), "BEGIN"));
}
}

/**
* Updates offsets in case the currently stored offset is no longer available. Streaming will start at the oldest available offset (BEGIN) to minimize the amount of events skipped.
*/
default void updatePartitions(String eventName, List<Partition> partitions) throws IOException {

final Map<String, Cursor> cursorsByPartition = getCursors(eventName).stream().collect(Collectors.toMap(Cursor::getPartition, c -> c));

for (Partition partition : partitions) {
final Cursor cursor = cursorsByPartition.get(partition.getPartition());
if (cursor == null || !partition.isAvailable(cursor.getOffset())) {
onSuccess(eventName, new Cursor(partition.getPartition(), partition.getOldestAvailableOffset()));
if (cursor == null || (!"BEGIN".equals(cursor.getOffset()) && OFFSET_ORDERING.compare(cursor.getOffset(), partition.getOldestAvailableOffset()) < 0)) {
onSuccess(eventName, new Cursor(partition.getPartition(), "BEGIN"));
}
}
}
Expand Down
17 changes: 0 additions & 17 deletions src/main/java/org/zalando/fahrschein/domain/Partition.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.zalando.fahrschein.domain;

import com.google.gag.annotation.remark.Hack;

public class Partition {
private final String partition;
private final String oldestAvailableOffset;
Expand All @@ -25,19 +23,4 @@ public String getNewestAvailableOffset() {
return newestAvailableOffset;
}

@Hack("This method relies on offsets being numeric and montonically increasing")
public boolean isAvailable(final String offset) {
try {
final long requestedOffset = Long.parseLong(offset);
final long oldestAvailableOffset = Long.parseLong(this.oldestAvailableOffset);
final long newestAvailableOffset = Long.parseLong(this.newestAvailableOffset);

return requestedOffset >= oldestAvailableOffset;
} catch (NumberFormatException e) {
// Assume it is available and wait for the problem response from nakadi
return true;
}
}


}
76 changes: 76 additions & 0 deletions src/test/java/org/zalando/fahrschein/CursorManagerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.zalando.fahrschein;

import org.junit.Before;
import org.junit.Test;
import org.zalando.fahrschein.domain.Cursor;
import org.zalando.fahrschein.domain.Partition;

import javax.annotation.Nullable;
import java.io.IOException;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class CursorManagerTest {

private final CursorManager cursorManager = mock(CursorManager.class);

@Before
public void setupMock() throws IOException {
doCallRealMethod().when(cursorManager).updatePartitions(any(), any());
doCallRealMethod().when(cursorManager).fromNewestAvailableOffsets(any(), any());
doCallRealMethod().when(cursorManager).fromOldestAvailableOffset(any(), any());
}

private void run(@Nullable String initialOffset, String oldestAvailableOffset, String newestAvailableOffset, @Nullable String expectedOffset) throws IOException {
when(cursorManager.getCursors("test")).thenReturn(initialOffset == null ? emptyList() : singletonList(new Cursor("0", initialOffset)));
cursorManager.updatePartitions("test", singletonList(new Partition("0", oldestAvailableOffset, newestAvailableOffset)));
if (expectedOffset != null) {
verify(cursorManager).onSuccess("test", new Cursor("0", expectedOffset));
} else {
verify(cursorManager, never()).onSuccess(any(), any());
}
}

@Test
public void shouldNotUpdatePartitionWhenOffsetStillAvailable() throws IOException {
run("20", "10", "30", null);
}

@Test
public void shouldUpdatePartitionWhenNoCursorAndLastConsumedOffsetNoLongerAvailable() throws IOException {
run(null, "10", "20", "BEGIN");
}

@Test
public void shouldUpdatePartitionWhenLastConsumedOffsetNoLongerAvailable() throws IOException {
run("5", "10", "20", "BEGIN");
}

@Test
public void shouldUpdatePartitionToBeginWhenNoCursorAndPartitionIsEmpty() throws IOException {
run(null, "0", "BEGIN", "BEGIN");
}

@Test
public void shouldNotUpdatePartitionWhenCursorIsAreadyAtBegin() throws IOException {
run("BEGIN", "0", "BEGIN", null);
}

@Test
public void shouldUpdatePartitionToNewestAvailableWhenNoCursorAndPartitionIsExpired() throws IOException {
run(null, "2", "1", "BEGIN");
}

@Test
public void shouldUpdatePartitionToNewestAvailableWhenPartitionIsExpired() throws IOException {
run("10", "22", "21", "BEGIN");
}

}
21 changes: 0 additions & 21 deletions src/test/java/org/zalando/fahrschein/domain/PartitionTest.java

This file was deleted.

0 comments on commit 8221f8b

Please sign in to comment.