diff --git a/README.md b/README.md index 3efb8352..daac5496 100644 --- a/README.md +++ b/README.md @@ -345,7 +345,7 @@ the list of `SourceRecord`s expected by Kafka Connect. > ```java > public interface HttpResponseParser extends Configurable { > -> List parse(HttpResponse response); +> List parse(String endpoint, HttpResponse response); > } > ``` > * Type: `Class` @@ -419,7 +419,7 @@ Parses the HTTP response into a key-value SourceRecord. This process is decompos > ```java > public interface KvSourceRecordMapper extends Configurable { > -> SourceRecord map(KvRecord record); +> SourceRecord map(String endpoint, KvRecord record); > } > ``` > * Type: `Class` diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java index 8ddee5b3..7c6faec2 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.source.SourceTaskContext; import com.github.castorm.kafka.connect.http.model.Offset; +import com.github.castorm.kafka.connect.http.model.Partition; import com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactoryConfig; import lombok.extern.slf4j.Slf4j; @@ -90,10 +91,10 @@ public List poll() throws InterruptedException { } private HttpSourceTaskSingleEndpoint getTaskForRecord(SourceRecord record) { - String endpoint = Offset.getEndpointFromPartition(record.sourcePartition()); + String endpoint = Partition.getEndpointFromPartition(record.sourcePartition()); HttpSourceTaskSingleEndpoint task = tasks.get(endpoint); if (task == null) { - throw new ConnectException("No HttpSourceTaskSingleEndpoint found for topic " + endpoint); + throw new ConnectException("No HttpSourceTaskSingleEndpoint found for endpoint " + endpoint); } return task; } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTaskSingleEndpoint.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTaskSingleEndpoint.java index 9dacb55e..b9af2c39 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTaskSingleEndpoint.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTaskSingleEndpoint.java @@ -25,6 +25,7 @@ import com.github.castorm.kafka.connect.http.model.HttpRequest; import com.github.castorm.kafka.connect.http.model.HttpResponse; import com.github.castorm.kafka.connect.http.model.Offset; +import com.github.castorm.kafka.connect.http.model.Partition; import com.github.castorm.kafka.connect.http.record.spi.SourceRecordFilterFactory; import com.github.castorm.kafka.connect.http.record.spi.SourceRecordSorter; import com.github.castorm.kafka.connect.http.request.spi.HttpRequestFactory; @@ -102,7 +103,9 @@ public void start(Map settings) { } private Offset loadOffset(SourceTaskContext context, Map initialOffset) { - Map restoredOffset = ofNullable(context.offsetStorageReader().offset(Offset.getPartition(endpoint))).orElseGet(Collections::emptyMap); + Map restoredOffset = ofNullable( + context.offsetStorageReader().offset( + Partition.getPartition(endpoint))).orElseGet(Collections::emptyMap); return Offset.of(!restoredOffset.isEmpty() ? restoredOffset : initialOffset, endpoint); } @@ -115,7 +118,7 @@ public List poll() throws InterruptedException { HttpResponse response = execute(request); - List records = responseParser.parse(response); + List records = responseParser.parse(endpoint, response); List unseenRecords = recordSorter.sort(records).stream() .filter(recordFilterFactory.create(offset)) diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/Offset.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/Offset.java index eca4e128..f34c4315 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/Offset.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/Offset.java @@ -40,39 +40,25 @@ public class Offset { private final Map properties; - String endpoint; - private Offset(String endpoint, Map properties) { + private Offset(Map properties) { this.properties = properties; - this.endpoint = endpoint; } - public static Map getPartition(String endpoint) { - return Map.of("endpoint", endpoint); + public static Offset of(Map properties) { + return new Offset(properties); } - public Map getPartition() { - return Map.of("endpoint", endpoint); - } - - public static String getEndpointFromPartition(Map partition) { - return partition.get("endpoint").toString(); - } - - public static Offset of(Map properties, String endpoint) { - return new Offset(endpoint, properties); - } - - public static Offset of(Map properties, String key, String endpoint) { + public static Offset of(Map properties, String key) { Map props = new HashMap<>(properties); props.put(KEY_KEY, key); - return new Offset(endpoint, props); + return new Offset(props); } - public static Offset of(Map properties, String key, Instant timestamp, String endpoint) { + public static Offset of(Map properties, String key, Instant timestamp) { Map props = new HashMap<>(properties); props.put(KEY_KEY, key); props.put(TIMESTAMP_KEY, timestamp.toString()); - return new Offset(endpoint, props); + return new Offset(props); } public Map toMap() { @@ -86,8 +72,4 @@ public Optional getKey() { public Optional getTimestamp() { return ofNullable((String) properties.get(TIMESTAMP_KEY)).map(Instant::parse); } - - public String getEndpoint() { - return endpoint; - } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/Partition.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/Partition.java new file mode 100644 index 00000000..b72f887e --- /dev/null +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/Partition.java @@ -0,0 +1,14 @@ +package com.github.castorm.kafka.connect.http.model; + +import java.util.Map; + +public class Partition { + public static Map getPartition(String endpoint) { + return Map.of("endpoint", endpoint); + } + + public static String getEndpointFromPartition(Map partition) { + return partition.get("endpoint").toString(); + } + +} diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/OffsetRecordFilterFactory.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/OffsetRecordFilterFactory.java index 7a6ca496..87237646 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/OffsetRecordFilterFactory.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/OffsetRecordFilterFactory.java @@ -42,7 +42,7 @@ public Predicate create(Offset offset) { AtomicBoolean lastSeenReached = new AtomicBoolean(false); return delegate.create(offset).or(record -> { boolean result = lastSeenReached.get(); - if (!result && Offset.of(record.sourceOffset(), offset.getEndpoint()).equals(offset)) { + if (!result && Offset.of(record.sourceOffset()).equals(offset)) { lastSeenReached.set(true); } return result; diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/SchemedKvSourceRecordMapper.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/SchemedKvSourceRecordMapper.java index 77507792..3636e767 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/SchemedKvSourceRecordMapper.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/SchemedKvSourceRecordMapper.java @@ -21,6 +21,7 @@ */ import com.github.castorm.kafka.connect.http.model.Offset; +import com.github.castorm.kafka.connect.http.model.Partition; import com.github.castorm.kafka.connect.http.record.model.KvRecord; import com.github.castorm.kafka.connect.http.record.spi.KvSourceRecordMapper; import lombok.RequiredArgsConstructor; @@ -75,16 +76,15 @@ public void configure(Map settings) { } @Override - public SourceRecord map(KvRecord record) { + public SourceRecord map(String endpoint, KvRecord record) { Offset offset = record.getOffset(); Long timestamp = offset.getTimestamp().map(Instant::toEpochMilli).orElseGet(System::currentTimeMillis); - String endpoint = offset.getEndpoint(); Struct key = keyStruct(record.getKey()); Struct value = valueStruct(record.getKey(), record.getValue(), timestamp, endpoint); - Map sourcePartition = offset.getPartition(); - + Map sourcePartition = Partition.getPartition(endpoint); + return new SourceRecord( sourcePartition, offset.toMap(), diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/StringKvSourceRecordMapper.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/StringKvSourceRecordMapper.java index 4d500ca7..b1ccc552 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/StringKvSourceRecordMapper.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/StringKvSourceRecordMapper.java @@ -21,6 +21,7 @@ */ import com.github.castorm.kafka.connect.http.model.Offset; +import com.github.castorm.kafka.connect.http.model.Partition; import com.github.castorm.kafka.connect.http.record.model.KvRecord; import com.github.castorm.kafka.connect.http.record.spi.KvSourceRecordMapper; import lombok.RequiredArgsConstructor; @@ -64,15 +65,15 @@ public void configure(Map settings) { } @Override - public SourceRecord map(KvRecord record) { + public SourceRecord map(String endpoint, KvRecord record) { Offset offset = record.getOffset(); - Map sourcePartition = offset.getPartition(); + Map sourcePartition = Partition.getPartition(endpoint); return new SourceRecord( sourcePartition, offset.toMap(), - config.getTopicName(offset.getEndpoint()), + config.getTopicName(endpoint), null, keySchema, record.getKey(), diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/spi/KvSourceRecordMapper.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/spi/KvSourceRecordMapper.java index c1a8c1b8..b8919b48 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/spi/KvSourceRecordMapper.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/record/spi/KvSourceRecordMapper.java @@ -29,7 +29,7 @@ @FunctionalInterface public interface KvSourceRecordMapper extends Configurable { - SourceRecord map(KvRecord record); + SourceRecord map(String endpoint, KvRecord record); default void configure(Map map) { // Do nothing diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParser.java index d41a6acf..82e60948 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParser.java @@ -54,9 +54,9 @@ public void configure(Map configs) { } @Override - public List parse(HttpResponse response) { + public List parse(String endpoint, HttpResponse response) { return recordParser.parse(response).stream() - .map(recordMapper::map) + .map(x -> recordMapper.map(endpoint, x)) .collect(toList()); } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java index b6e16e0e..91d0ad84 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java @@ -54,10 +54,10 @@ public void configure(Map settings) { } @Override - public List parse(HttpResponse response) { + public List parse(String endpoint, HttpResponse response) { switch (policy.resolve(response)) { case PROCESS: - return delegate.parse(response); + return delegate.parse(endpoint, response); case SKIP: return emptyList(); case FAIL: diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java index 46c93f1e..ddb00e76 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java @@ -74,17 +74,14 @@ private KvRecord map(JacksonRecord record) { .orElseGet(() -> ofNullable(offsets.get("key")).map(String.class::cast)) .orElseGet(() -> generateConsistentKey(record.getBody())); - String endpoint = ofNullable(offsets.get("endpoint")).map(String.class::cast) - .orElseThrow(() -> new IllegalStateException("Record parsed without endpoint from ElasticSearch ")); - Optional timestamp = ofNullable(record.getTimestamp()) .map(Optional::of) .orElseGet(() -> ofNullable(offsets.get("timestamp")).map(String.class::cast)) .map(timestampParser::parse); Offset offset = timestamp - .map(ts -> Offset.of(offsets, key, ts, endpoint)) - .orElseGet(() -> Offset.of(offsets, key, endpoint)); + .map(ts -> Offset.of(offsets, key, ts)) + .orElseGet(() -> Offset.of(offsets, key)); return KvRecord.builder() .key(key) diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java index df15b37d..9b908d4e 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java @@ -30,7 +30,7 @@ @FunctionalInterface public interface HttpResponseParser extends Configurable { - List parse(HttpResponse response); + List parse(String endpoint, HttpResponse response); default void configure(Map map) { // Do nothing diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfigTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfigTest.java index 832b213e..380c5397 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfigTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfigTest.java @@ -133,7 +133,7 @@ public HttpRequest createRequest(Offset offset) { } public static class TestResponseParser implements HttpResponseParser { - public List parse(HttpResponse response) { + public List parse(String endpoint, HttpResponse response) { return null; } } diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceTaskTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceTaskTest.java index 5f935d43..a636fcfa 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceTaskTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceTaskTest.java @@ -195,7 +195,7 @@ void givenTaskStarted_whenPoll_thenThrottled() throws InterruptedException, IOEx task.start(emptyMap()); given(requestFactory.createRequest(offset)).willReturn(request); given(client.execute(request)).willReturn(response); - given(responseParser.parse(response)).willReturn(asList(record(offsetMap))); + given(responseParser.parse("dummy-endpoint", response)).willReturn(asList(record(offsetMap))); given(recordFilterFactory.create(offset)).willReturn(__ -> true); task.poll(); @@ -211,7 +211,7 @@ void givenTaskStarted_whenPoll_thenResultsReturned() throws InterruptedException task.start(emptyMap()); given(requestFactory.createRequest(offset)).willReturn(request); given(client.execute(request)).willReturn(response); - given(responseParser.parse(response)).willReturn(asList(record(offsetMap))); + given(responseParser.parse("dummy-endpoint", response)).willReturn(asList(record(offsetMap))); given(recordSorter.sort(asList(record(offsetMap)))).willReturn(asList(record(offsetMap))); given(recordFilterFactory.create(offset)).willReturn(__ -> true); @@ -226,7 +226,7 @@ void givenTaskStarted_whenPoll_thenResultsSorted() throws InterruptedException, task.start(emptyMap()); given(requestFactory.createRequest(offset)).willReturn(request); given(client.execute(request)).willReturn(response); - given(responseParser.parse(response)).willReturn(asList(record(offsetMap))); + given(responseParser.parse("dummy-endpoint", response)).willReturn(asList(record(offsetMap))); given(recordSorter.sort(asList(record(offsetMap)))).willReturn(asList(record(offsetMap(1)), record(offsetMap(2)))); given(recordFilterFactory.create(offset)).willReturn(__ -> true); @@ -241,7 +241,7 @@ void givenTaskStarted_whenPoll_thenFilterFilters() throws InterruptedException, task.start(emptyMap()); given(requestFactory.createRequest(offset)).willReturn(request); given(client.execute(request)).willReturn(response); - given(responseParser.parse(response)).willReturn(asList(record(offsetMap))); + given(responseParser.parse("dummy-endpoint", response)).willReturn(asList(record(offsetMap))); given(recordFilterFactory.create(offset)).willReturn(__ -> false); assertThat(task.poll()).isEmpty(); @@ -255,7 +255,7 @@ void givenTaskStarted_whenPollAndCommitRecords_thenOffsetUpdated() throws Interr task.start(emptyMap()); given(requestFactory.createRequest(offset)).willReturn(request); given(client.execute(request)).willReturn(response); - given(responseParser.parse(response)).willReturn(asList(record(offsetMap))); + given(responseParser.parse("dummy-endpoint", response)).willReturn(asList(record(offsetMap))); given(recordSorter.sort(asList(record(offsetMap)))) .willReturn(asList(record(offsetMap(1)), record(offsetMap(2)), record(offsetMap(3)))); given(recordFilterFactory.create(offset)).willReturn(__ -> true); diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/StreamkapElasticConnectorTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/StreamkapElasticConnectorTest.java index be168042..3729e2ad 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/StreamkapElasticConnectorTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/StreamkapElasticConnectorTest.java @@ -33,6 +33,7 @@ import org.testcontainers.utility.DockerImageName; import com.github.castorm.kafka.connect.http.model.Offset; +import com.github.castorm.kafka.connect.http.model.Partition; import lombok.extern.slf4j.Slf4j; @@ -119,8 +120,8 @@ private List runTasks(Map config, int nbTasks, int records.addAll(polledRecords); for (SourceRecord record : polledRecords) { assertEquals(record.topic(), - Offset.getEndpointFromPartition( - record.sourcePartition()).replaceAll("[^a-zA-Z0-9_]", "_")); + Partition.getEndpointFromPartition(record.sourcePartition()) + .replaceAll("[^a-zA-Z0-9_]", "_")); task.commitRecord(record, null); } task.commit(); diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/OffsetRecordFilterFactoryTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/OffsetRecordFilterFactoryTest.java index 093b7be1..e60b3f20 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/OffsetRecordFilterFactoryTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/OffsetRecordFilterFactoryTest.java @@ -59,7 +59,7 @@ void givenNotSeen_whenCreateAndTest_thenFalse() { @Test void givenSeen_whenCreateAndTest_thenTrue() { - Predicate predicate = factory.create(Offset.of(ImmutableMap.of("i", 5), "dummy-endpoint")); + Predicate predicate = factory.create(Offset.of(ImmutableMap.of("i", 5))); predicate.test(record(5)); diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/OffsetTimestampRecordFilterFactoryTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/OffsetTimestampRecordFilterFactoryTest.java index c1d7d3ee..1d33234a 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/OffsetTimestampRecordFilterFactoryTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/OffsetTimestampRecordFilterFactoryTest.java @@ -44,12 +44,12 @@ class OffsetTimestampRecordFilterFactoryTest { @Test void givenOffset_whenTestEarlier_thenFalse() { - assertThat(factory.create(Offset.of(emptyMap(), key, now, endpoint)).test(record(now.minus(1, MINUTES)))).isFalse(); + assertThat(factory.create(Offset.of(emptyMap(), key, now)).test(record(now.minus(1, MINUTES)))).isFalse(); } @Test void givenOffset_whenTestLater_thenTrue() { - assertThat(factory.create(Offset.of(emptyMap(), key, now, endpoint)).test(record(now.plus(1, MINUTES)))).isTrue(); + assertThat(factory.create(Offset.of(emptyMap(), key, now)).test(record(now.plus(1, MINUTES)))).isTrue(); } interface Fixture { diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/SchemedKvSourceRecordMapperTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/SchemedKvSourceRecordMapperTest.java index 9c9a1a8a..08d9f881 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/SchemedKvSourceRecordMapperTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/SchemedKvSourceRecordMapperTest.java @@ -50,64 +50,64 @@ class SchemedKvSourceRecordMapperTest { @BeforeEach void setUp() { - given(config.getTopicName("endpoint")).willReturn("topic"); + given(config.getTopicName("dummy-endpoint")).willReturn("topic"); mapper = new SchemedKvSourceRecordMapper(__ -> config); mapper.configure(emptyMap()); } @Test void givenTopic_whenMap_thenTopicMapped() { - assertThat(mapper.map(record).topic()).isEqualTo("topic"); + assertThat(mapper.map("dummy-endpoint", record).topic()).isEqualTo("topic"); } @Test void givenKey_whenMap_thenKeyMapped() { - assertThat(((Struct) mapper.map(record.withKey("key")).key()).get("_streamkap_key")).isEqualTo("key"); + assertThat(((Struct) mapper.map("dummy-endpoint", record.withKey("key")).key()).get("_streamkap_key")).isEqualTo("key"); } @Test void givenValue_whenMap_thenValueMapped() { - assertThat(((Struct) mapper.map(record.withValue("value")).value()).get("_streamkap_value")).isEqualTo("value"); + assertThat(((Struct) mapper.map("dummy-endpoint", record.withValue("value")).value()).get("_streamkap_value")).isEqualTo("value"); } @Test void givenKey_whenMap_thenValueKeyMapped() { - assertThat(((Struct) mapper.map(record.withKey("key")).value()).get("_streamkap_key")).isEqualTo("key"); + assertThat(((Struct) mapper.map("dummy-endpoint", record.withKey("key")).value()).get("_streamkap_key")).isEqualTo("key"); } @Test void givenOffsetTimestamp_whenMap_thenValueTimestampMapped() { - assertThat(((Struct) mapper.map(record.withOffset(offset)).value()).get("_streamkap_timestamp")).isEqualTo(now.toEpochMilli()); + assertThat(((Struct) mapper.map("dummy-endpoint", record.withOffset(offset)).value()).get("_streamkap_timestamp")).isEqualTo(now.toEpochMilli()); } @Test void givenOffset_whenMap_thenOffsetMapped() { - assertThat(mapper.map(record.withOffset(offset)).sourceOffset()).isEqualTo(offset.toMap()); + assertThat(mapper.map("dummy-endpoint", record.withOffset(offset)).sourceOffset()).isEqualTo(offset.toMap()); } @Test void givenTimestamp_whenMap_thenTimestampMapped() { - assertThat(mapper.map(record.withOffset(offset)).timestamp()).isEqualTo(now.toEpochMilli()); + assertThat(mapper.map("dummy-endpoint", record.withOffset(offset)).timestamp()).isEqualTo(now.toEpochMilli()); } @Test void whenMap_thenNoPartitionMapped() { - assertThat(mapper.map(record).kafkaPartition()).isNull(); + assertThat(mapper.map("dummy-endpoint", record).kafkaPartition()).isNull(); } @Test void whenMap_thenKeySchemaMapped() { - assertThat(mapper.map(record).keySchema()).isNotNull(); + assertThat(mapper.map("dummy-endpoint", record).keySchema()).isNotNull(); } @Test void whenMap_thenValueSchemaMapped() { - assertThat(mapper.map(record).valueSchema()).isNotNull(); + assertThat(mapper.map("dummy-endpoint", record).valueSchema()).isNotNull(); } interface Fixture { Instant now = now(); - Offset offset = Offset.of(ImmutableMap.of("k", "v"), "key", now, "endpoint"); + Offset offset = Offset.of(ImmutableMap.of("k", "v"), "key", now); KvRecord record = KvRecord.builder().value("not-null").offset(offset).build(); } } diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/StringKvSourceRecordMapperTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/StringKvSourceRecordMapperTest.java index f3045065..55fb7bda 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/StringKvSourceRecordMapperTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/record/StringKvSourceRecordMapperTest.java @@ -49,54 +49,54 @@ class StringKvSourceRecordMapperTest { @BeforeEach void setUp() { - given(config.getTopicName("endpoint")).willReturn("topic"); + given(config.getTopicName("dummy-endpoint")).willReturn("topic"); mapper = new StringKvSourceRecordMapper(__ -> config); mapper.configure(emptyMap()); } @Test void givenTopic_whenMap_thenTopicMapped() { - assertThat(mapper.map(record).topic()).isEqualTo("topic"); + assertThat(mapper.map("dummy-endpoint", record).topic()).isEqualTo("topic"); } @Test void givenKey_whenMap_thenIdMapped() { - assertThat(mapper.map(record.withKey("value")).key()).isEqualTo("value"); + assertThat(mapper.map("dummy-endpoint", record.withKey("value")).key()).isEqualTo("value"); } @Test void givenValue_whenMap_thenBodyMapped() { - assertThat(mapper.map(record.withValue("value")).value()).isEqualTo("value"); + assertThat(mapper.map("dummy-endpoint", record.withValue("value")).value()).isEqualTo("value"); } @Test void givenOffset_whenMap_thenOffsetMapped() { - assertThat(mapper.map(record.withOffset(offset)).sourceOffset()).isEqualTo(offset.toMap()); + assertThat(mapper.map("dummy-endpoint", record.withOffset(offset)).sourceOffset()).isEqualTo(offset.toMap()); } @Test void givenTimestamp_whenMap_thenTimestampMapped() { - assertThat(mapper.map(record.withOffset(offset)).timestamp()).isEqualTo(now.toEpochMilli()); + assertThat(mapper.map("dummy-endpoint", record.withOffset(offset)).timestamp()).isEqualTo(now.toEpochMilli()); } @Test void whenMap_thenNoPartitionMapped() { - assertThat(mapper.map(record).kafkaPartition()).isNull(); + assertThat(mapper.map("dummy-endpoint", record).kafkaPartition()).isNull(); } @Test void whenMap_thenKeySchemaMapped() { - assertThat(mapper.map(record).keySchema()).isNotNull(); + assertThat(mapper.map("dummy-endpoint", record).keySchema()).isNotNull(); } @Test void whenMap_thenValueSchemaMapped() { - assertThat(mapper.map(record).valueSchema()).isNotNull(); + assertThat(mapper.map("dummy-endpoint", record).valueSchema()).isNotNull(); } interface Fixture { Instant now = now(); - Offset offset = Offset.of(ImmutableMap.of("k", "v"), "key", now, "endpoint"); + Offset offset = Offset.of(ImmutableMap.of("k", "v"), "key", now); KvRecord record = KvRecord.builder().value("not-null").offset(offset).build(); } } diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserConfigTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserConfigTest.java index d9283ce6..304900b5 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserConfigTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserConfigTest.java @@ -70,7 +70,7 @@ public List parse(HttpResponse response) { public static class TestRecordMapper implements KvSourceRecordMapper { @Override - public SourceRecord map(KvRecord record) { return null; } + public SourceRecord map(String endpoint, KvRecord record) { return null; } } private static KvHttpResponseParserConfig config(Map settings) { diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserTest.java index 8b885f92..996dc4b5 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserTest.java @@ -68,7 +68,7 @@ void givenEmptyList_whenParse_thenEmpty() { given(recordParser.parse(response)).willReturn(emptyList()); - assertThat(parser.parse(response)).isEmpty(); + assertThat(parser.parse("dummy-endpoint", response)).isEmpty(); } @Test @@ -76,18 +76,18 @@ void givenList_whenParse_thenItemsMapped() { given(recordParser.parse(response)).willReturn(singletonList(record)); - parser.parse(response); + parser.parse("dummy-endpoint", response); - then(recordFactory).should().map(record); + then(recordFactory).should().map("dummy-endpoint", record); } @Test void givenEmptyList_whenParse_thenItemsMappedReturned() { given(recordParser.parse(response)).willReturn(singletonList(record)); - given(recordFactory.map(record)).willReturn(sourceRecord); + given(recordFactory.map("dummy-endpoint", record)).willReturn(sourceRecord); - assertThat(parser.parse(response)).containsExactly(sourceRecord); + assertThat(parser.parse("dummy-endpoint", response)).containsExactly(sourceRecord); } interface Fixture { diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfigTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfigTest.java index b7af6b10..5e118380 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfigTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfigTest.java @@ -59,7 +59,7 @@ void whenPolicy_thenInitialized() { public static class TestResponseParser implements HttpResponseParser { @Override - public List parse(HttpResponse response) { + public List parse(String endpoint, HttpResponse response) { return null; } } diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserTest.java index 27dd2b29..08339d9c 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserTest.java @@ -71,7 +71,7 @@ void givenPolicyFail_whenParse_thenIllegalState() { given(policy.resolve(response)).willReturn(FAIL); - assertThat(catchThrowable(() -> parser.parse(response))).isInstanceOf(IllegalStateException.class); + assertThat(catchThrowable(() -> parser.parse("dummy-endpoint", response))).isInstanceOf(IllegalStateException.class); } @Test @@ -79,9 +79,9 @@ void givenPolicyFail_whenParse_thenDontDelegate() { given(policy.resolve(response)).willReturn(FAIL); - catchThrowable(() -> parser.parse(response)); + catchThrowable(() -> parser.parse("dummy-endpoint", response)); - then(delegate).should(never()).parse(any()); + then(delegate).should(never()).parse(any(), any()); } @Test @@ -89,9 +89,9 @@ void givenPolicyProcess_whenParse_thenDelegate() { given(policy.resolve(response)).willReturn(PROCESS); - parser.parse(response); + parser.parse("dummy-endpoint", response); - then(delegate).should().parse(response); + then(delegate).should().parse("dummy-endpoint", response); } @Test @@ -99,9 +99,9 @@ void givenPolicyProcess_whenParse_thenResponseFromDelegate() { given(policy.resolve(response)).willReturn(PROCESS); - given(delegate.parse(response)).willReturn(ImmutableList.of(record)); + given(delegate.parse("dummy-endpoint", response)).willReturn(ImmutableList.of(record)); - assertThat(parser.parse(response)).containsExactly(record); + assertThat(parser.parse("dummy-endpoint", response)).containsExactly(record); } @Test @@ -109,9 +109,9 @@ void givenPolicySkip_whenParse_thenDontDelegate() { given(policy.resolve(response)).willReturn(SKIP); - parser.parse(response); + parser.parse("dummy-endpoint", response); - then(delegate).should(never()).parse(any()); + then(delegate).should(never()).parse(any(), any()); } @Test @@ -119,7 +119,7 @@ void givenPolicySkip_whenParse_thenEmptyList() { given(policy.resolve(response)).willReturn(SKIP); - assertThat(parser.parse(response)).isEmpty(); + assertThat(parser.parse("dummy-endpoint", response)).isEmpty(); } interface Fixture { diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/timer/FixedIntervalTimerTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/timer/FixedIntervalTimerTest.java index 395ccd2b..85b9d7b8 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/timer/FixedIntervalTimerTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/timer/FixedIntervalTimerTest.java @@ -79,7 +79,7 @@ private void givenConfiguredTimer(long intervalMillis, long lastPollMillis) { } interface Fixture { - Offset offset = Offset.of(emptyMap(), "key", now(), "endpoint"); + Offset offset = Offset.of(emptyMap(), "key", now()); long intervalMillis = 300000L; long lastPollMillis = System.currentTimeMillis(); long maxExecutionTimeMillis = 500L;