From bb97a6aea4c7da27aff97dde58a1b8d45b7afedd Mon Sep 17 00:00:00 2001 From: Iulius Hutuleac Date: Fri, 25 Oct 2024 14:51:00 +0200 Subject: [PATCH] add some paging capabilities --- .../response/jackson/JacksonResponseRecordParser.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java index 0bab612..7bc784b 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java @@ -27,6 +27,7 @@ import lombok.RequiredArgsConstructor; import org.apache.kafka.common.Configurable; +import java.util.Date; import java.util.Map; import java.util.function.Function; import java.util.stream.Stream; @@ -75,9 +76,13 @@ Stream getRecords(byte[] body) { private Map getResponseOffset(JsonNode node) { if(responseOffsetPointers.isEmpty()) return emptyMap(); - else - return responseOffsetPointers.entrySet().stream() - .collect(toMap(Map.Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText())); + else { + Map t = responseOffsetPointers.entrySet().stream() + .collect(toMap(Map.Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText())); + t.put("last_poll_timestamp", "" + new Date().getTime()); + return t; + } + } private JacksonRecord toJacksonRecord(JsonNode jsonRecord, Map responseOffset) {