diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java index 9ed63e3..72bd16f 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java @@ -7,6 +7,7 @@ import java.util.HashMap; import java.util.Map; + import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; @@ -14,6 +15,11 @@ public class EventLogRepositoryImpl implements EventLogRepository { + public static final String INSERT_STATEMENT = "INSERT INTO " + + " nakadi_events.event_log " + + " (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until, compaction_key)" + + "VALUES " + + " (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil, :compactionKey)"; private NamedParameterJdbcTemplate jdbcTemplate; private int lockSize; @@ -61,11 +67,19 @@ public void lockSomeMessages(String lockId, Instant now, Instant lockExpires) { @Override public void delete(EventLog eventLog) { - Map namedParameterMap = new HashMap<>(); - namedParameterMap.put("id", eventLog.getId()); - jdbcTemplate.update( - "DELETE FROM nakadi_events.event_log where id = :id", - namedParameterMap + delete(Collections.singleton(eventLog)); + } + + @Override + public void delete(Collection eventLogs) { + MapSqlParameterSource[] namedParameterMaps = eventLogs.stream() + .map(eventLog -> + new MapSqlParameterSource().addValue("id", eventLog.getId()) + ).toArray(MapSqlParameterSource[]::new); + + jdbcTemplate.batchUpdate( + "DELETE FROM nakadi_events.event_log where id = :id", + namedParameterMaps ); } @@ -76,30 +90,26 @@ public void persist(EventLog eventLog) { @Override public void persist(Collection eventLogs) { - MapSqlParameterSource[] namedParameterMaps = eventLogs.stream() - .map(eventLog -> { - Timestamp now = toSqlTimestamp(Instant.now()); - MapSqlParameterSource namedParameterMap = new MapSqlParameterSource(); - namedParameterMap.addValue("eventType", eventLog.getEventType()); - namedParameterMap.addValue("eventBodyData", eventLog.getEventBodyData()); - namedParameterMap.addValue("flowId", eventLog.getFlowId()); - namedParameterMap.addValue("created", now); - namedParameterMap.addValue("lastModified", now); - namedParameterMap.addValue("lockedBy", eventLog.getLockedBy()); - namedParameterMap.addValue("lockedUntil", eventLog.getLockedUntil()); - namedParameterMap.addValue("compactionKey", eventLog.getCompactionKey()); - return namedParameterMap; - }) + jdbcTemplate.batchUpdate(INSERT_STATEMENT, mapToParameterSources(eventLogs)); + } + + private MapSqlParameterSource[] mapToParameterSources(Collection eventLogs) { + return eventLogs.stream() + .map(this::toSqlParameterSource) .toArray(MapSqlParameterSource[]::new); + } - jdbcTemplate.batchUpdate( - "INSERT INTO " + - " nakadi_events.event_log " + - " (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until, compaction_key)" + - "VALUES " + - " (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil, :compactionKey)", - namedParameterMaps - ); + private MapSqlParameterSource toSqlParameterSource(EventLog eventLog) { + Timestamp now = toSqlTimestamp(Instant.now()); + return new MapSqlParameterSource() + .addValue("eventType", eventLog.getEventType()) + .addValue("eventBodyData", eventLog.getEventBodyData()) + .addValue("flowId", eventLog.getFlowId()) + .addValue("created", now) + .addValue("lastModified", now) + .addValue("lockedBy", eventLog.getLockedBy()) + .addValue("lockedUntil", eventLog.getLockedUntil()) + .addValue("compactionKey", eventLog.getCompactionKey()); } private Timestamp toSqlTimestamp(Instant now) { diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java index 1d3bcbd..920894c 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java @@ -10,12 +10,14 @@ public interface EventLogRepository { void delete(EventLog eventLog); + default void delete(Collection eventLogs) { + eventLogs.forEach(this::delete); + } + void persist(EventLog eventLog); default void persist(Collection eventLogs) { - for (EventLog eventLog : eventLogs) { - persist(eventLog); - } + eventLogs.forEach(this::persist); } void deleteAll(); diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java index f56f730..a12ad38 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java @@ -116,7 +116,7 @@ private void tryToPublishBatch(List batch) throws Exception { .filter(rawEvent -> !failedEids.contains(convertToUUID(rawEvent.getId()))); } - successfulEvents.forEach(eventLogRepository::delete); + eventLogRepository.delete(successfulEvents.collect(Collectors.toList())); } private List collectEids(EventPublishingException e) { diff --git a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java index 1517ec6..8ea8a05 100644 --- a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java +++ b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java @@ -6,7 +6,10 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; import org.zalando.fahrschein.EventPublishingException; import org.zalando.fahrschein.domain.BatchItemResponse; import org.zalando.nakadiproducer.eventlog.impl.EventLog; @@ -16,20 +19,23 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; +import java.util.stream.Collectors; import static com.jayway.jsonpath.JsonPath.read; import static java.time.Instant.now; import static java.time.temporal.ChronoUnit.MINUTES; import static java.time.temporal.ChronoUnit.SECONDS; import static java.util.Collections.singletonList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; -import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doCallRealMethod; @@ -47,6 +53,8 @@ public class EventTransmissionServiceTest { private MockNakadiPublishingClient publishingClient; private ObjectMapper mapper; private EventLogRepository repo; + @Captor + private ArgumentCaptor> eventLogColCaptor; @BeforeEach public void setUp() { @@ -54,6 +62,7 @@ public void setUp() { publishingClient = spy(new MockNakadiPublishingClient()); mapper = spy(new ObjectMapper().registerModules(new JavaTimeModule())); service = new EventTransmissionService(repo, publishingClient, mapper, 600, 60); + MockitoAnnotations.openMocks(this); } @Test @@ -135,9 +144,8 @@ public void testErrorInPayloadDeserializationIsHandledGracefully() throws IOExce assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003")); // and only the successful ones have been deleted. - verify(repo).delete(ev1); - verify(repo, never()).delete(ev2); - verify(repo).delete(ev3); + List deletedEvents = verifyDeletionAndGetAllDeletedEvents(); + assertThat(deletedEvents, containsInAnyOrder(ev1, ev3)); } @Test @@ -164,9 +172,8 @@ public void testUnknownErrorInTransmissionIsHandledGracefully() throws Exception assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003")); // and only the successful ones have been deleted. - verify(repo, never()).delete(ev1); - verify(repo, never()).delete(ev2); - verify(repo).delete(ev3); + List deletedEvents = verifyDeletionAndGetAllDeletedEvents(); + assertThat(deletedEvents, containsInAnyOrder(ev3)); } @Test @@ -196,9 +203,8 @@ public void testEventPublishingExceptionIsHandledGracefully() throws Exception { assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003")); // and only the successful ones have been deleted. - verify(repo).delete(ev1); - verify(repo, never()).delete(ev2); - verify(repo).delete(ev3); + List deletedEvents = verifyDeletionAndGetAllDeletedEvents(); + assertThat(deletedEvents, containsInAnyOrder(ev1, ev3)); } @Test @@ -260,4 +266,12 @@ private TypeReference> anyLinkedHashmapTypeReferen return any(); } + private List verifyDeletionAndGetAllDeletedEvents() { + verify(repo, Mockito.atLeastOnce()).delete(eventLogColCaptor.capture()); + verify(repo, never()).delete(any(EventLog.class)); + return eventLogColCaptor.getAllValues() + .stream() + .flatMap(c -> c.stream()) + .collect(Collectors.toList()); + } }