Skip to content

Commit

Permalink
for ES, when using aliases, the index name returned does not match th…
Browse files Browse the repository at this point in the history
…e name of the alias, so offsets are wrong
  • Loading branch information
acristu committed Jan 25, 2024
1 parent 84d5e4b commit 251e17c
Show file tree
Hide file tree
Showing 25 changed files with 100 additions and 101 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ the list of `SourceRecord`s expected by Kafka Connect.
> ```java
> public interface HttpResponseParser extends Configurable {
>
> List<SourceRecord> parse(HttpResponse response);
> List<SourceRecord> parse(String endpoint, HttpResponse response);
> }
> ```
> * Type: `Class`
Expand Down Expand Up @@ -419,7 +419,7 @@ Parses the HTTP response into a key-value SourceRecord. This process is decompos
> ```java
> public interface KvSourceRecordMapper extends Configurable {
>
> SourceRecord map(KvRecord record);
> SourceRecord map(String endpoint, KvRecord record);
> }
> ```
> * Type: `Class`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.connect.source.SourceTaskContext;

import com.github.castorm.kafka.connect.http.model.Offset;
import com.github.castorm.kafka.connect.http.model.Partition;
import com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactoryConfig;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -90,10 +91,10 @@ public List<SourceRecord> poll() throws InterruptedException {
}

private HttpSourceTaskSingleEndpoint getTaskForRecord(SourceRecord record) {
String endpoint = Offset.getEndpointFromPartition(record.sourcePartition());
String endpoint = Partition.getEndpointFromPartition(record.sourcePartition());
HttpSourceTaskSingleEndpoint task = tasks.get(endpoint);
if (task == null) {
throw new ConnectException("No HttpSourceTaskSingleEndpoint found for topic " + endpoint);
throw new ConnectException("No HttpSourceTaskSingleEndpoint found for endpoint " + endpoint);
}
return task;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.github.castorm.kafka.connect.http.model.HttpRequest;
import com.github.castorm.kafka.connect.http.model.HttpResponse;
import com.github.castorm.kafka.connect.http.model.Offset;
import com.github.castorm.kafka.connect.http.model.Partition;
import com.github.castorm.kafka.connect.http.record.spi.SourceRecordFilterFactory;
import com.github.castorm.kafka.connect.http.record.spi.SourceRecordSorter;
import com.github.castorm.kafka.connect.http.request.spi.HttpRequestFactory;
Expand Down Expand Up @@ -102,7 +103,9 @@ public void start(Map<String, String> settings) {
}

private Offset loadOffset(SourceTaskContext context, Map<String, String> initialOffset) {
Map<String, Object> restoredOffset = ofNullable(context.offsetStorageReader().offset(Offset.getPartition(endpoint))).orElseGet(Collections::emptyMap);
Map<String, Object> restoredOffset = ofNullable(
context.offsetStorageReader().offset(
Partition.getPartition(endpoint))).orElseGet(Collections::emptyMap);
return Offset.of(!restoredOffset.isEmpty() ? restoredOffset : initialOffset, endpoint);
}

Expand All @@ -115,7 +118,7 @@ public List<SourceRecord> poll() throws InterruptedException {

HttpResponse response = execute(request);

List<SourceRecord> records = responseParser.parse(response);
List<SourceRecord> records = responseParser.parse(endpoint, response);

List<SourceRecord> unseenRecords = recordSorter.sort(records).stream()
.filter(recordFilterFactory.create(offset))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,39 +40,25 @@ public class Offset {

private final Map<String, ?> properties;

String endpoint;
private Offset(String endpoint, Map<String, ?> properties) {
private Offset(Map<String, ?> properties) {
this.properties = properties;
this.endpoint = endpoint;
}

public static Map<String, ?> getPartition(String endpoint) {
return Map.of("endpoint", endpoint);
public static Offset of(Map<String, ?> properties) {
return new Offset(properties);
}

public Map<String, ?> getPartition() {
return Map.of("endpoint", endpoint);
}

public static String getEndpointFromPartition(Map<String, ?> partition) {
return partition.get("endpoint").toString();
}

public static Offset of(Map<String, ?> properties, String endpoint) {
return new Offset(endpoint, properties);
}

public static Offset of(Map<String, ?> properties, String key, String endpoint) {
public static Offset of(Map<String, ?> properties, String key) {
Map<String, Object> props = new HashMap<>(properties);
props.put(KEY_KEY, key);
return new Offset(endpoint, props);
return new Offset(props);
}

public static Offset of(Map<String, ?> properties, String key, Instant timestamp, String endpoint) {
public static Offset of(Map<String, ?> properties, String key, Instant timestamp) {
Map<String, Object> props = new HashMap<>(properties);
props.put(KEY_KEY, key);
props.put(TIMESTAMP_KEY, timestamp.toString());
return new Offset(endpoint, props);
return new Offset(props);
}

public Map<String, ?> toMap() {
Expand All @@ -86,8 +72,4 @@ public Optional<String> getKey() {
public Optional<Instant> getTimestamp() {
return ofNullable((String) properties.get(TIMESTAMP_KEY)).map(Instant::parse);
}

public String getEndpoint() {
return endpoint;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.github.castorm.kafka.connect.http.model;

import java.util.Map;

public class Partition {
public static Map<String, ?> getPartition(String endpoint) {
return Map.of("endpoint", endpoint);
}

public static String getEndpointFromPartition(Map<String, ?> partition) {
return partition.get("endpoint").toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public Predicate<SourceRecord> create(Offset offset) {
AtomicBoolean lastSeenReached = new AtomicBoolean(false);
return delegate.create(offset).or(record -> {
boolean result = lastSeenReached.get();
if (!result && Offset.of(record.sourceOffset(), offset.getEndpoint()).equals(offset)) {
if (!result && Offset.of(record.sourceOffset()).equals(offset)) {
lastSeenReached.set(true);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

import com.github.castorm.kafka.connect.http.model.Offset;
import com.github.castorm.kafka.connect.http.model.Partition;
import com.github.castorm.kafka.connect.http.record.model.KvRecord;
import com.github.castorm.kafka.connect.http.record.spi.KvSourceRecordMapper;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -75,16 +76,15 @@ public void configure(Map<String, ?> settings) {
}

@Override
public SourceRecord map(KvRecord record) {
public SourceRecord map(String endpoint, KvRecord record) {

Offset offset = record.getOffset();
Long timestamp = offset.getTimestamp().map(Instant::toEpochMilli).orElseGet(System::currentTimeMillis);
String endpoint = offset.getEndpoint();

Struct key = keyStruct(record.getKey());
Struct value = valueStruct(record.getKey(), record.getValue(), timestamp, endpoint);
Map<String, ?> sourcePartition = offset.getPartition();
Map<String, ?> sourcePartition = Partition.getPartition(endpoint);

return new SourceRecord(
sourcePartition,
offset.toMap(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

import com.github.castorm.kafka.connect.http.model.Offset;
import com.github.castorm.kafka.connect.http.model.Partition;
import com.github.castorm.kafka.connect.http.record.model.KvRecord;
import com.github.castorm.kafka.connect.http.record.spi.KvSourceRecordMapper;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -64,15 +65,15 @@ public void configure(Map<String, ?> settings) {
}

@Override
public SourceRecord map(KvRecord record) {
public SourceRecord map(String endpoint, KvRecord record) {

Offset offset = record.getOffset();
Map<String, ?> sourcePartition = offset.getPartition();
Map<String, ?> sourcePartition = Partition.getPartition(endpoint);

return new SourceRecord(
sourcePartition,
offset.toMap(),
config.getTopicName(offset.getEndpoint()),
config.getTopicName(endpoint),
null,
keySchema,
record.getKey(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
@FunctionalInterface
public interface KvSourceRecordMapper extends Configurable {

SourceRecord map(KvRecord record);
SourceRecord map(String endpoint, KvRecord record);

default void configure(Map<String, ?> map) {
// Do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public void configure(Map<String, ?> configs) {
}

@Override
public List<SourceRecord> parse(HttpResponse response) {
public List<SourceRecord> parse(String endpoint, HttpResponse response) {
return recordParser.parse(response).stream()
.map(recordMapper::map)
.map(x -> recordMapper.map(endpoint, x))
.collect(toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ public void configure(Map<String, ?> settings) {
}

@Override
public List<SourceRecord> parse(HttpResponse response) {
public List<SourceRecord> parse(String endpoint, HttpResponse response) {
switch (policy.resolve(response)) {
case PROCESS:
return delegate.parse(response);
return delegate.parse(endpoint, response);
case SKIP:
return emptyList();
case FAIL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,14 @@ private KvRecord map(JacksonRecord record) {
.orElseGet(() -> ofNullable(offsets.get("key")).map(String.class::cast))
.orElseGet(() -> generateConsistentKey(record.getBody()));

String endpoint = ofNullable(offsets.get("endpoint")).map(String.class::cast)
.orElseThrow(() -> new IllegalStateException("Record parsed without endpoint from ElasticSearch "));

Optional<Instant> timestamp = ofNullable(record.getTimestamp())
.map(Optional::of)
.orElseGet(() -> ofNullable(offsets.get("timestamp")).map(String.class::cast))
.map(timestampParser::parse);

Offset offset = timestamp
.map(ts -> Offset.of(offsets, key, ts, endpoint))
.orElseGet(() -> Offset.of(offsets, key, endpoint));
.map(ts -> Offset.of(offsets, key, ts))
.orElseGet(() -> Offset.of(offsets, key));

return KvRecord.builder()
.key(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@FunctionalInterface
public interface HttpResponseParser extends Configurable {

List<SourceRecord> parse(HttpResponse response);
List<SourceRecord> parse(String endpoint, HttpResponse response);

default void configure(Map<String, ?> map) {
// Do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public HttpRequest createRequest(Offset offset) {
}

public static class TestResponseParser implements HttpResponseParser {
public List<SourceRecord> parse(HttpResponse response) {
public List<SourceRecord> parse(String endpoint, HttpResponse response) {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void givenTaskStarted_whenPoll_thenThrottled() throws InterruptedException, IOEx
task.start(emptyMap());
given(requestFactory.createRequest(offset)).willReturn(request);
given(client.execute(request)).willReturn(response);
given(responseParser.parse(response)).willReturn(asList(record(offsetMap)));
given(responseParser.parse("dummy-endpoint", response)).willReturn(asList(record(offsetMap)));
given(recordFilterFactory.create(offset)).willReturn(__ -> true);

task.poll();
Expand All @@ -211,7 +211,7 @@ void givenTaskStarted_whenPoll_thenResultsReturned() throws InterruptedException
task.start(emptyMap());
given(requestFactory.createRequest(offset)).willReturn(request);
given(client.execute(request)).willReturn(response);
given(responseParser.parse(response)).willReturn(asList(record(offsetMap)));
given(responseParser.parse("dummy-endpoint", response)).willReturn(asList(record(offsetMap)));
given(recordSorter.sort(asList(record(offsetMap)))).willReturn(asList(record(offsetMap)));
given(recordFilterFactory.create(offset)).willReturn(__ -> true);

Expand All @@ -226,7 +226,7 @@ void givenTaskStarted_whenPoll_thenResultsSorted() throws InterruptedException,
task.start(emptyMap());
given(requestFactory.createRequest(offset)).willReturn(request);
given(client.execute(request)).willReturn(response);
given(responseParser.parse(response)).willReturn(asList(record(offsetMap)));
given(responseParser.parse("dummy-endpoint", response)).willReturn(asList(record(offsetMap)));
given(recordSorter.sort(asList(record(offsetMap)))).willReturn(asList(record(offsetMap(1)), record(offsetMap(2))));
given(recordFilterFactory.create(offset)).willReturn(__ -> true);

Expand All @@ -241,7 +241,7 @@ void givenTaskStarted_whenPoll_thenFilterFilters() throws InterruptedException,
task.start(emptyMap());
given(requestFactory.createRequest(offset)).willReturn(request);
given(client.execute(request)).willReturn(response);
given(responseParser.parse(response)).willReturn(asList(record(offsetMap)));
given(responseParser.parse("dummy-endpoint", response)).willReturn(asList(record(offsetMap)));
given(recordFilterFactory.create(offset)).willReturn(__ -> false);

assertThat(task.poll()).isEmpty();
Expand All @@ -255,7 +255,7 @@ void givenTaskStarted_whenPollAndCommitRecords_thenOffsetUpdated() throws Interr
task.start(emptyMap());
given(requestFactory.createRequest(offset)).willReturn(request);
given(client.execute(request)).willReturn(response);
given(responseParser.parse(response)).willReturn(asList(record(offsetMap)));
given(responseParser.parse("dummy-endpoint", response)).willReturn(asList(record(offsetMap)));
given(recordSorter.sort(asList(record(offsetMap))))
.willReturn(asList(record(offsetMap(1)), record(offsetMap(2)), record(offsetMap(3))));
given(recordFilterFactory.create(offset)).willReturn(__ -> true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.testcontainers.utility.DockerImageName;

import com.github.castorm.kafka.connect.http.model.Offset;
import com.github.castorm.kafka.connect.http.model.Partition;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -119,8 +120,8 @@ private List<SourceRecord> runTasks(Map<String, String> config, int nbTasks, int
records.addAll(polledRecords);
for (SourceRecord record : polledRecords) {
assertEquals(record.topic(),
Offset.getEndpointFromPartition(
record.sourcePartition()).replaceAll("[^a-zA-Z0-9_]", "_"));
Partition.getEndpointFromPartition(record.sourcePartition())
.replaceAll("[^a-zA-Z0-9_]", "_"));
task.commitRecord(record, null);
}
task.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void givenNotSeen_whenCreateAndTest_thenFalse() {
@Test
void givenSeen_whenCreateAndTest_thenTrue() {

Predicate<SourceRecord> predicate = factory.create(Offset.of(ImmutableMap.of("i", 5), "dummy-endpoint"));
Predicate<SourceRecord> predicate = factory.create(Offset.of(ImmutableMap.of("i", 5)));

predicate.test(record(5));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ class OffsetTimestampRecordFilterFactoryTest {

@Test
void givenOffset_whenTestEarlier_thenFalse() {
assertThat(factory.create(Offset.of(emptyMap(), key, now, endpoint)).test(record(now.minus(1, MINUTES)))).isFalse();
assertThat(factory.create(Offset.of(emptyMap(), key, now)).test(record(now.minus(1, MINUTES)))).isFalse();
}

@Test
void givenOffset_whenTestLater_thenTrue() {
assertThat(factory.create(Offset.of(emptyMap(), key, now, endpoint)).test(record(now.plus(1, MINUTES)))).isTrue();
assertThat(factory.create(Offset.of(emptyMap(), key, now)).test(record(now.plus(1, MINUTES)))).isTrue();
}

interface Fixture {
Expand Down
Loading

0 comments on commit 251e17c

Please sign in to comment.