Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#88 | Initial idea to support pagination #90

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@
import static com.github.castorm.kafka.connect.common.ConfigUtils.breakDownMap;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;

@Getter
class HttpSourceConnectorConfig extends AbstractConfig {
Expand All @@ -55,6 +56,11 @@ class HttpSourceConnectorConfig extends AbstractConfig {
private static final String RECORD_SORTER = "http.record.sorter";
private static final String RECORD_FILTER_FACTORY = "http.record.filter.factory";
private static final String OFFSET_INITIAL = "http.offset.initial";
private static final String HANDLE_PAGINATION = "http.request.pagination.handle";
private static final String NEXT_PAGE_URL_MODE = "http.request.pagination.mode";
private static final String NEXT_PAGE_BASE_URL = "http.request.pagination.baseurl";
private static final String OVERWRITE = "overwrite";
private static final String APPEND = "append";

private final TimerThrottler throttler;
private final HttpRequestFactory requestFactory;
Expand All @@ -63,6 +69,9 @@ class HttpSourceConnectorConfig extends AbstractConfig {
private final SourceRecordFilterFactory recordFilterFactory;
private final SourceRecordSorter recordSorter;
private final Map<String, String> initialOffset;
private final Boolean handlePagination;
private final Boolean appendNextPageUrl;
private final String baseUrl;

HttpSourceConnectorConfig(Map<String, ?> originals) {
super(config(), originals);
Expand All @@ -74,6 +83,9 @@ class HttpSourceConnectorConfig extends AbstractConfig {
recordSorter = getConfiguredInstance(RECORD_SORTER, SourceRecordSorter.class);
recordFilterFactory = getConfiguredInstance(RECORD_FILTER_FACTORY, SourceRecordFilterFactory.class);
initialOffset = breakDownMap(getString(OFFSET_INITIAL));
handlePagination = getBoolean(HANDLE_PAGINATION);
appendNextPageUrl = getString(NEXT_PAGE_URL_MODE).equalsIgnoreCase(APPEND);
baseUrl = getString(NEXT_PAGE_BASE_URL);
}

public static ConfigDef config() {
Expand All @@ -84,6 +96,9 @@ public static ConfigDef config() {
.define(RESPONSE_PARSER, CLASS, PolicyHttpResponseParser.class, HIGH, "Response Parser Class")
.define(RECORD_SORTER, CLASS, OrderDirectionSourceRecordSorter.class, LOW, "Record Sorter Class")
.define(RECORD_FILTER_FACTORY, CLASS, OffsetRecordFilterFactory.class, LOW, "Record Filter Factory Class")
.define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset");
.define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset")
.define(HANDLE_PAGINATION, BOOLEAN, false, LOW, "Handle Pagination")
.define(NEXT_PAGE_URL_MODE, STRING, OVERWRITE, ConfigDef.ValidString.in(APPEND, OVERWRITE), LOW, "Append or overwrite the next page URL")
.define(NEXT_PAGE_BASE_URL, STRING, "", LOW, "Base URL in case of append mode");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.Function;

import static com.github.castorm.kafka.connect.common.VersionUtils.getVersion;
Expand All @@ -67,6 +66,16 @@ public class HttpSourceTask extends SourceTask {

private SourceRecordFilterFactory recordFilterFactory;

private Boolean handlePagination;

private Boolean appendNextPageUrl;

private String baseUrl;

private String modifiedUrl;

private HttpRequest request = null;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this here?


private ConfirmationWindow<Map<String, ?>> confirmationWindow = new ConfirmationWindow<>(emptyList());

@Getter
Expand All @@ -92,6 +101,10 @@ public void start(Map<String, String> settings) {
recordSorter = config.getRecordSorter();
recordFilterFactory = config.getRecordFilterFactory();
offset = loadOffset(config.getInitialOffset());
handlePagination = !Objects.isNull(config.getHandlePagination()) && config.getHandlePagination();
appendNextPageUrl = !Objects.isNull(config.getAppendNextPageUrl()) && config.getAppendNextPageUrl();
baseUrl = config.getBaseUrl();
modifiedUrl = null;
}

private Offset loadOffset(Map<String, String> initialOffset) {
Expand All @@ -104,11 +117,37 @@ public List<SourceRecord> poll() throws InterruptedException {

throttler.throttle(offset.getTimestamp().orElseGet(Instant::now));

HttpRequest request = requestFactory.createRequest(offset);
// HttpRequest request = requestFactory.createRequest(offset);

List<SourceRecord> records = new ArrayList<>();

if(handlePagination && !Objects.isNull(modifiedUrl)) {
request = HttpRequest.builder()
.method(request.getMethod())
.url(modifiedUrl)
.headers(request.getHeaders())
.body(request.getBody())
.build();
Comment on lines +125 to +130
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are creating a request here. Why not doing it in the entity meant for that: RequestFactory?

You might want to create a RequestFactory specific for this use case though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to create a different request factory to take care of pagination. But I am stuck at a point. I am not able to understand how to pass the next page URL from the response to this request factory to create a new request.

} else {
request = requestFactory.createRequest(offset);
}

HttpResponse response = execute(request);

List<SourceRecord> records = responseParser.parse(response);
records.addAll(responseParser.parse(response));

if(handlePagination) {
Optional<String> nextPageUrl = responseParser.getNextPageUrl(response);
log.info("Next page URL: {}", nextPageUrl.orElse("no value"));
if( isNextPageUrlPresent(nextPageUrl) ) {
modifiedUrl = appendNextPageUrl
? baseUrl + nextPageUrl.orElse("")
: nextPageUrl.orElse(null);
} else {
modifiedUrl = null;
}
}
Comment on lines +139 to +149
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be extracted somewhere else, it doesn't seem to be at the same level of abstraction as the rest of the code in this method. Too detailed and specific about pagination.



List<SourceRecord> unseenRecords = recordSorter.sort(records).stream()
.filter(recordFilterFactory.create(offset))
Expand All @@ -129,6 +168,13 @@ private HttpResponse execute(HttpRequest request) {
}
}

private Boolean isNextPageUrlPresent(Optional<String> nextPageUrl) {
return nextPageUrl.isPresent() &&
!Objects.isNull(nextPageUrl.orElse(null)) &&
!nextPageUrl.orElse(null).equalsIgnoreCase("null");
}


private static List<Map<String, ?>> extractOffsets(List<SourceRecord> recordsToSend) {
return recordsToSend.stream()
.map(SourceRecord::sourceOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -59,4 +60,8 @@ public List<SourceRecord> parse(HttpResponse response) {
.map(recordMapper::map)
.collect(toList());
}

public Optional<String> getNextPageUrl(HttpResponse response) {
return recordParser.getNextPageUrl(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -65,4 +66,17 @@ public List<SourceRecord> parse(HttpResponse response) {
throw new IllegalStateException(String.format("Policy failed for response code: %s, body: %s", response.getCode(), ofNullable(response.getBody()).map(String::new).orElse("")));
}
}

@Override
public Optional<String> getNextPageUrl(HttpResponse response) {
switch (policy.resolve(response)) {
case PROCESS:
return delegate.getNextPageUrl(response);
case SKIP:
return Optional.empty();
case FAIL:
default:
throw new IllegalStateException(String.format("Policy failed for response code: %s, body: %s", response.getCode(), ofNullable(response.getBody()).map(String::new).orElse("")));
}
}
Comment on lines +70 to +81
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of the PolicyHttpResponseParser is to enforce a policy for whether response should be parsed, skipped, or failed based on http response status codes.

What's the reasoning to fit this functionality here? isn't it unrelated to the purpose of the class?

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public List<KvRecord> parse(HttpResponse response) {
.collect(toList());
}

@Override
public Optional<String> getNextPageUrl(HttpResponse response) {
return responseParser.getNextPageUrl(response.getBody());
}

private KvRecord map(JacksonRecord record) {

Map<String, Object> offsets = record.getOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void configure(Map<String, ?> settings) {
keyPointer = config.getKeyPointer();
valuePointer = config.getValuePointer();
offsetPointers = config.getOffsetPointers();
timestampPointer = config.getTimestampPointer();

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static java.util.stream.Collectors.toMap;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;

@Getter
Expand All @@ -49,12 +50,14 @@ public class JacksonRecordParserConfig extends AbstractConfig {
private static final String ITEM_KEY_POINTER = "http.response.record.key.pointer";
private static final String ITEM_TIMESTAMP_POINTER = "http.response.record.timestamp.pointer";
private static final String ITEM_OFFSET_VALUE_POINTER = "http.response.record.offset.pointer";
private static final String NEXT_PAGE_POINTER = "http.response.next.page.pointer";

private final JsonPointer recordsPointer;
private final List<JsonPointer> keyPointer;
private final JsonPointer valuePointer;
private final Optional<JsonPointer> timestampPointer;
private final Map<String, JsonPointer> offsetPointers;
private final Optional<JsonPointer> nextPagePointer;

JacksonRecordParserConfig(Map<String, ?> originals) {
super(config(), originals);
Expand All @@ -65,6 +68,7 @@ public class JacksonRecordParserConfig extends AbstractConfig {
offsetPointers = breakDownMap(getString(ITEM_OFFSET_VALUE_POINTER)).entrySet().stream()
.map(entry -> new SimpleEntry<>(entry.getKey(), compile(entry.getValue())))
.collect(toMap(Entry::getKey, Entry::getValue));
nextPagePointer = ofNullable(getString(NEXT_PAGE_POINTER)).map(JsonPointer::compile);
}

public static ConfigDef config() {
Expand All @@ -73,6 +77,7 @@ public static ConfigDef config() {
.define(ITEM_POINTER, STRING, "/", HIGH, "Item JsonPointer")
.define(ITEM_KEY_POINTER, STRING, null, HIGH, "Item Key JsonPointers")
.define(ITEM_TIMESTAMP_POINTER, STRING, null, MEDIUM, "Item Timestamp JsonPointer")
.define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers");
.define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers")
.define(NEXT_PAGE_POINTER, STRING, "/next", LOW, "Pointer for next page");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably not fair to assume something as arbitrary "/next" as a default.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.Configurable;

import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Stream;

Expand All @@ -45,6 +46,10 @@ public class JacksonResponseRecordParser implements Configurable {

private JsonPointer recordsPointer;

private Optional<JsonPointer> nextPagePointer;

private JsonNode jsonBody;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is meant to be stateless, we shouldn't be keeping this body around here.


public JacksonResponseRecordParser() {
this(new JacksonRecordParser(), new JacksonSerializer(new ObjectMapper()));
}
Expand All @@ -61,14 +66,21 @@ public void configure(Map<String, ?> settings) {

Stream<JacksonRecord> getRecords(byte[] body) {

JsonNode jsonBody = serializer.deserialize(body);
this.jsonBody = serializer.deserialize(body);

Map<String, Object> responseOffset = getResponseOffset(jsonBody);

return serializer.getArrayAt(jsonBody, recordsPointer)
.map(jsonRecord -> toJacksonRecord(jsonRecord, responseOffset));
}

Optional<String> getNextPageUrl(byte[] body) {
return nextPagePointer.map(pointer ->
serializer.checkIfNonNull(this.jsonBody, pointer)
? serializer.getObjectAt(this.jsonBody, pointer).asText()
: null);
Comment on lines +79 to +81
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would belong better one level down at the serializer, by providing something like this:

Optional<JsonNode> getObjectAtIfPresent(JsonNode node, JsonPointer pointer)

That would mean you wouldn't need to expose checkIfNonNull, as it would be leaving that responsibility closer to where the decision is made.

}

private Map<String, Object> getResponseOffset(JsonNode node) {
return emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ Stream<JsonNode> getArrayAt(JsonNode node, JsonPointer pointer) {
return array.isArray() ? stream(array.spliterator(), false) : Stream.of(array);
}

boolean checkIfNonNull(JsonNode node, JsonPointer pointer) {
return !node.at(pointer).isMissingNode();
}


private static JsonNode getRequiredAt(JsonNode body, JsonPointer recordsPointer) {
return JSON_ROOT.equals(recordsPointer) ? body : body.requiredAt(recordsPointer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

@FunctionalInterface
public interface HttpResponseParser extends Configurable {

List<SourceRecord> parse(HttpResponse response);

default void configure(Map<String, ?> map) {
// Do nothing
}

Optional<String> getNextPageUrl(HttpResponse response);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The responsibility of this parser is to translate responses to records, this additional responsibility is not cohesive with it.

In other words, we are breaking single responsibility here. If the new functionality were cohesive with the existing one, maybe we could consider raising the abstraction level of this interface to absorb it, but I don't think it is.

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

@FunctionalInterface
public interface KvRecordHttpResponseParser extends Configurable {

List<KvRecord> parse(HttpResponse response);

default void configure(Map<String, ?> map) {
// Do nothing
}

Optional<String> getNextPageUrl(HttpResponse response);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as HttpResponseParser.



}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.github.castorm.kafka.connect.http.HttpSourceConnectorConfigTest.Fixture.config;
import static com.github.castorm.kafka.connect.http.HttpSourceConnectorConfigTest.Fixture.configWithout;
Expand Down Expand Up @@ -136,6 +137,11 @@ public static class TestResponseParser implements HttpResponseParser {
public List<SourceRecord> parse(HttpResponse response) {
return null;
}

@Override
public Optional<String> getNextPageUrl(HttpResponse response) {
return Optional.empty();
}
}

public static class TestRecordSorter implements SourceRecordSorter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -65,6 +66,11 @@ public static class TestResponseParser implements KvRecordHttpResponseParser {
public List<KvRecord> parse(HttpResponse response) {
return null;
}

@Override
public Optional<String> getNextPageUrl(HttpResponse response) {
return Optional.empty();
}
}

public static class TestRecordMapper implements KvSourceRecordMapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -62,6 +63,11 @@ public static class TestResponseParser implements HttpResponseParser {
public List<SourceRecord> parse(HttpResponse response) {
return null;
}

@Override
public Optional<String> getNextPageUrl(HttpResponse response) {
return Optional.empty();
}
}

public static class TestPolicy implements HttpResponsePolicy {
Expand Down