Skip to content

Commit

Permalink
add some paging capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
Iulius Hutuleac committed Oct 24, 2024
1 parent b9e41cb commit 08d8857
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 30 deletions.
2 changes: 1 addition & 1 deletion kafka-connect-http-infra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>kafka-connect-http-parent</artifactId>
<groupId>com.github.castorm</groupId>
<version>0.8.12-SNAPSHOT</version>
<version>0.8.13</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion kafka-connect-http-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>kafka-connect-http-parent</artifactId>
<groupId>com.github.castorm</groupId>
<version>0.8.12-SNAPSHOT</version>
<version>0.8.13</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
17 changes: 1 addition & 16 deletions kafka-connect-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>kafka-connect-http-parent</artifactId>
<groupId>com.github.castorm</groupId>
<version>0.8.12-SNAPSHOT</version>
<version>0.8.13</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down Expand Up @@ -88,21 +88,6 @@
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>sources</classifier> <!-- Use classifier for supplemental artifacts -->
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class HttpSourceConnectorConfig extends AbstractConfig {
private static final String RECORD_SORTER = "http.record.sorter";
private static final String RECORD_FILTER_FACTORY = "http.record.filter.factory";
private static final String OFFSET_INITIAL = "http.offset.initial";
private static final String NEXT_PAGE_OFFSET = "http.offset.nextpage";

private final TimerThrottler throttler;
private final HttpRequestFactory requestFactory;
Expand All @@ -63,6 +64,7 @@ class HttpSourceConnectorConfig extends AbstractConfig {
private final SourceRecordFilterFactory recordFilterFactory;
private final SourceRecordSorter recordSorter;
private final Map<String, String> initialOffset;
private String nextPageOffset;

HttpSourceConnectorConfig(Map<String, ?> originals) {
super(config(), originals);
Expand All @@ -74,6 +76,7 @@ class HttpSourceConnectorConfig extends AbstractConfig {
recordSorter = getConfiguredInstance(RECORD_SORTER, SourceRecordSorter.class);
recordFilterFactory = getConfiguredInstance(RECORD_FILTER_FACTORY, SourceRecordFilterFactory.class);
initialOffset = breakDownMap(getString(OFFSET_INITIAL));
nextPageOffset = getString(NEXT_PAGE_OFFSET);
}

public static ConfigDef config() {
Expand All @@ -84,6 +87,7 @@ public static ConfigDef config() {
.define(RESPONSE_PARSER, CLASS, PolicyHttpResponseParser.class, HIGH, "Response Parser Class")
.define(RECORD_SORTER, CLASS, OrderDirectionSourceRecordSorter.class, LOW, "Record Sorter Class")
.define(RECORD_FILTER_FACTORY, CLASS, OffsetRecordFilterFactory.class, LOW, "Record Filter Factory Class")
.define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset");
.define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset")
.define(NEXT_PAGE_OFFSET, STRING, "", HIGH, "Next Page offset");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.github.castorm.kafka.connect.http.record.spi.SourceRecordSorter;
import com.github.castorm.kafka.connect.http.request.spi.HttpRequestFactory;
import com.github.castorm.kafka.connect.http.response.spi.HttpResponseParser;

import com.github.castorm.kafka.connect.timer.TimerThrottler;
import edu.emory.mathcs.backport.java.util.Collections;
import lombok.Getter;
Expand All @@ -40,10 +41,15 @@

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import com.github.castorm.kafka.connect.common.ConfigUtils;

import static com.github.castorm.kafka.connect.common.ConfigUtils.breakDownMap;
import static com.github.castorm.kafka.connect.common.VersionUtils.getVersion;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
Expand All @@ -69,6 +75,8 @@ public class HttpSourceTask extends SourceTask {

private ConfirmationWindow<Map<String, ?>> confirmationWindow = new ConfirmationWindow<>(emptyList());

private String nextPageOffset;

@Getter
private Offset offset;

Expand Down Expand Up @@ -103,18 +111,40 @@ private Offset loadOffset(Map<String, String> initialOffset) {
public List<SourceRecord> poll() throws InterruptedException {

throttler.throttle(offset.getTimestamp().orElseGet(Instant::now));
offset.setValue(nextPageOffset, "");

boolean hasNextPage = true;

HttpRequest request = requestFactory.createRequest(offset);
List<SourceRecord> allRecords = new ArrayList<>();
while(hasNextPage) {
HttpRequest request = requestFactory.createRequest(offset);

HttpResponse response = execute(request);
log.info("Request for page {}", request.toString());

List<SourceRecord> records = responseParser.parse(response);
HttpResponse response = execute(request);

List<SourceRecord> records = responseParser.parse(response);

if(!records.isEmpty()) {
allRecords.addAll(records);
String nextPage = (String) records.get(0).sourceOffset().get(nextPageOffset);
if(nextPage != null && !nextPage.trim().isEmpty()) {
log.info("Request for next page {}", nextPage);
offset.setValue(nextPageOffset, nextPage);
} else {
hasNextPage = false;
}

} else {
hasNextPage = false;
}
}

List<SourceRecord> unseenRecords = recordSorter.sort(records).stream()
List<SourceRecord> unseenRecords = recordSorter.sort(allRecords).stream()
.filter(recordFilterFactory.create(offset))
.collect(toList());

log.info("Request for offset {} yields {}/{} new records", offset.toMap(), unseenRecords.size(), records.size());
log.info("Request for offset {} yields {}/{} new records", offset.toMap(), unseenRecords.size(), allRecords.size());

confirmationWindow = new ConfirmationWindow<>(extractOffsets(unseenRecords));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import lombok.Builder.Default;
import lombok.Value;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

Expand All @@ -50,4 +51,13 @@ public class HttpRequest {
public enum HttpMethod {
GET, HEAD, POST, PUT, PATCH
}

@Override
public String toString() {
return "HttpRequest{" +
"method=" + method +
", url='" + url + '\'' +
", queryParams=" + queryParams +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ public Optional<String> getKey() {
return ofNullable((String) properties.get(KEY_KEY));
}

@SuppressWarnings("unchecked")
public void setValue(String key, Object value) {
if (key != null) {
((Map<String, Object>) properties).put(key, value);
}
}

public Optional<Instant> getTimestamp() {
return ofNullable((String) properties.get(TIMESTAMP_KEY)).map(Instant::parse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class PolicyHttpResponseParser implements HttpResponseParser {

private HttpResponsePolicy policy;

private Map<String, String> skipOffsets;

public PolicyHttpResponseParser() {
this(PolicyHttpResponseParserConfig::new);
}
Expand All @@ -51,6 +53,12 @@ public void configure(Map<String, ?> settings) {
PolicyHttpResponseParserConfig config = configFactory.apply(settings);
delegate = config.getDelegateParser();
policy = config.getPolicy();
skipOffsets = config.getSkipOffsets();
}

@Override
public Map<String, String> getOffsetReset() {
return skipOffsets;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,35 @@

import java.util.Map;

import static com.github.castorm.kafka.connect.common.ConfigUtils.breakDownMap;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;

@Getter
public class PolicyHttpResponseParserConfig extends AbstractConfig {

private static final String PARSER_DELEGATE = "http.response.policy.parser";
private static final String POLICY = "http.response.policy";
private static final String SKIP_OFFSET_POLICY = "http.response.skip.policy.offsets";

private final HttpResponseParser delegateParser;

private final HttpResponsePolicy policy;

private Map<String, String> skipOffsets;

public PolicyHttpResponseParserConfig(Map<String, ?> originals) {
super(config(), originals);
delegateParser = getConfiguredInstance(PARSER_DELEGATE, HttpResponseParser.class);
policy = getConfiguredInstance(POLICY, HttpResponsePolicy.class);
skipOffsets = breakDownMap(getString(SKIP_OFFSET_POLICY));
}

public static ConfigDef config() {
return new ConfigDef()
.define(PARSER_DELEGATE, CLASS, KvHttpResponseParser.class, HIGH, "Response Parser Delegate Class")
.define(POLICY, CLASS, StatusCodeHttpResponsePolicy.class, HIGH, "Response Policy Class");
.define(POLICY, CLASS, StatusCodeHttpResponsePolicy.class, HIGH, "Response Policy Class")
.define(SKIP_OFFSET_POLICY, STRING, "", HIGH, "Reset Offsets");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ Stream<JacksonRecord> getRecords(byte[] body) {
}

private Map<String, Object> getResponseOffset(JsonNode node) {
return responseOffsetPointers.entrySet().stream()
if(responseOffsetPointers.isEmpty())
return emptyMap();
else
return responseOffsetPointers.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -32,6 +33,10 @@ public interface HttpResponseParser extends Configurable {

List<SourceRecord> parse(HttpResponse response);

public default Map<String, String> getOffsetReset() {
return Collections.emptyMap();
}

default void configure(Map<String, ?> map) {
// Do nothing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;

import static com.github.castorm.kafka.connect.http.HttpSourceTaskTest.Fixture.offset;
Expand Down Expand Up @@ -298,14 +299,14 @@ void whenStop_thenNothingHappens() {
interface Fixture {
Instant now = now();
String key = "customKey";
Map<String, Object> offsetMap = ImmutableMap.of("custom", "value", "key", key, "timestamp", now.toString());
Map<String, String> offsetInitialMap = ImmutableMap.of("k2", "v2");
Map<String, Object> offsetMap = new HashMap<>(ImmutableMap.of("custom", "value", "key", key, "timestamp", now.toString()));
Map<String, String> offsetInitialMap = new HashMap<>(ImmutableMap.of("k2", "v2"));
Offset offset = Offset.of(offsetMap);
HttpRequest request = HttpRequest.builder().build();
HttpResponse response = HttpResponse.builder().build();

static Map<String, Object> offsetMap(Object value) {
return ImmutableMap.of("custom", value, "key", key, "timestamp", now.toString());
return new HashMap<>(ImmutableMap.of("custom", value, "key", key, "timestamp", now.toString()));
}

static SourceRecord record(Map<String, Object> offset) {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
<slf4j.version>1.7.36</slf4j.version>
<logback.version>1.2.10</logback.version>
<lombok.version>1.18.22</lombok.version>
<kafka.version>3.0.0</kafka.version>
<kafka.version>3.6.1</kafka.version>
<okhttp.version>4.9.3</okhttp.version>
<jackson.version>2.13.1</jackson.version>
<freemarker.version>2.3.31</freemarker.version>
Expand Down

0 comments on commit 08d8857

Please sign in to comment.