Skip to content

Commit

Permalink
Batch deletion of eventlogs
Browse files Browse the repository at this point in the history
* repository: add batch deletion method to interface (with default implementation)
* repository: implement batch deletion (uses updateBatch)
* repository: refactoring for persist
* transmission: use batch deletion
  • Loading branch information
ePaul committed Jul 26, 2023
1 parent 9d642c2 commit cce3009
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@
import java.util.HashMap;
import java.util.Map;


import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;

public class EventLogRepositoryImpl implements EventLogRepository {

public static final String INSERT_STATEMENT = "INSERT INTO " +
" nakadi_events.event_log " +
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until, compaction_key)" +
"VALUES " +
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil, :compactionKey)";
private NamedParameterJdbcTemplate jdbcTemplate;
private int lockSize;

Expand Down Expand Up @@ -61,11 +67,19 @@ public void lockSomeMessages(String lockId, Instant now, Instant lockExpires) {

@Override
public void delete(EventLog eventLog) {
Map<String, Object> namedParameterMap = new HashMap<>();
namedParameterMap.put("id", eventLog.getId());
jdbcTemplate.update(
"DELETE FROM nakadi_events.event_log where id = :id",
namedParameterMap
delete(Collections.singleton(eventLog));
}

@Override
public void delete(Collection<EventLog> eventLogs) {
MapSqlParameterSource[] namedParameterMaps = eventLogs.stream()
.map(eventLog ->
new MapSqlParameterSource().addValue("id", eventLog.getId())
).toArray(MapSqlParameterSource[]::new);

jdbcTemplate.batchUpdate(
"DELETE FROM nakadi_events.event_log where id = :id",
namedParameterMaps
);
}

Expand All @@ -76,30 +90,26 @@ public void persist(EventLog eventLog) {

@Override
public void persist(Collection<EventLog> eventLogs) {
MapSqlParameterSource[] namedParameterMaps = eventLogs.stream()
.map(eventLog -> {
Timestamp now = toSqlTimestamp(Instant.now());
MapSqlParameterSource namedParameterMap = new MapSqlParameterSource();
namedParameterMap.addValue("eventType", eventLog.getEventType());
namedParameterMap.addValue("eventBodyData", eventLog.getEventBodyData());
namedParameterMap.addValue("flowId", eventLog.getFlowId());
namedParameterMap.addValue("created", now);
namedParameterMap.addValue("lastModified", now);
namedParameterMap.addValue("lockedBy", eventLog.getLockedBy());
namedParameterMap.addValue("lockedUntil", eventLog.getLockedUntil());
namedParameterMap.addValue("compactionKey", eventLog.getCompactionKey());
return namedParameterMap;
})
jdbcTemplate.batchUpdate(INSERT_STATEMENT, mapToParameterSources(eventLogs));
}

private MapSqlParameterSource[] mapToParameterSources(Collection<EventLog> eventLogs) {
return eventLogs.stream()
.map(this::toSqlParameterSource)
.toArray(MapSqlParameterSource[]::new);
}

jdbcTemplate.batchUpdate(
"INSERT INTO " +
" nakadi_events.event_log " +
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until, compaction_key)" +
"VALUES " +
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil, :compactionKey)",
namedParameterMaps
);
private MapSqlParameterSource toSqlParameterSource(EventLog eventLog) {
Timestamp now = toSqlTimestamp(Instant.now());
return new MapSqlParameterSource()
.addValue("eventType", eventLog.getEventType())
.addValue("eventBodyData", eventLog.getEventBodyData())
.addValue("flowId", eventLog.getFlowId())
.addValue("created", now)
.addValue("lastModified", now)
.addValue("lockedBy", eventLog.getLockedBy())
.addValue("lockedUntil", eventLog.getLockedUntil())
.addValue("compactionKey", eventLog.getCompactionKey());
}

private Timestamp toSqlTimestamp(Instant now) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ public interface EventLogRepository {

void delete(EventLog eventLog);

default void delete(Collection<EventLog> eventLogs) {
eventLogs.forEach(this::delete);
}

void persist(EventLog eventLog);

default void persist(Collection<EventLog> eventLogs) {
for (EventLog eventLog : eventLogs) {
persist(eventLog);
}
eventLogs.forEach(this::persist);
}

void deleteAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private void tryToPublishBatch(List<BatchItem> batch) throws Exception {
.filter(rawEvent -> !failedEids.contains(convertToUUID(rawEvent.getId())));
}

successfulEvents.forEach(eventLogRepository::delete);
eventLogRepository.delete(successfulEvents.collect(Collectors.toList()));
}

private List<String> collectEids(EventPublishingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.zalando.fahrschein.EventPublishingException;
import org.zalando.fahrschein.domain.BatchItemResponse;
import org.zalando.nakadiproducer.eventlog.impl.EventLog;
Expand All @@ -16,20 +19,23 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;

import static com.jayway.jsonpath.JsonPath.read;
import static java.time.Instant.now;
import static java.time.temporal.ChronoUnit.MINUTES;
import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.Collections.singletonList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
Expand All @@ -47,13 +53,16 @@ public class EventTransmissionServiceTest {
private MockNakadiPublishingClient publishingClient;
private ObjectMapper mapper;
private EventLogRepository repo;
@Captor
private ArgumentCaptor<Collection<EventLog>> eventLogColCaptor;

@BeforeEach
public void setUp() {
repo = mock(EventLogRepository.class);
publishingClient = spy(new MockNakadiPublishingClient());
mapper = spy(new ObjectMapper().registerModules(new JavaTimeModule()));
service = new EventTransmissionService(repo, publishingClient, mapper, 600, 60);
MockitoAnnotations.openMocks(this);
}

@Test
Expand Down Expand Up @@ -135,9 +144,8 @@ public void testErrorInPayloadDeserializationIsHandledGracefully() throws IOExce
assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003"));

// and only the successful ones have been deleted.
verify(repo).delete(ev1);
verify(repo, never()).delete(ev2);
verify(repo).delete(ev3);
List<EventLog> deletedEvents = verifyDeletionAndGetAllDeletedEvents();
assertThat(deletedEvents, containsInAnyOrder(ev1, ev3));
}

@Test
Expand All @@ -164,9 +172,8 @@ public void testUnknownErrorInTransmissionIsHandledGracefully() throws Exception
assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003"));

// and only the successful ones have been deleted.
verify(repo, never()).delete(ev1);
verify(repo, never()).delete(ev2);
verify(repo).delete(ev3);
List<EventLog> deletedEvents = verifyDeletionAndGetAllDeletedEvents();
assertThat(deletedEvents, containsInAnyOrder(ev3));
}

@Test
Expand Down Expand Up @@ -196,9 +203,8 @@ public void testEventPublishingExceptionIsHandledGracefully() throws Exception {
assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003"));

// and only the successful ones have been deleted.
verify(repo).delete(ev1);
verify(repo, never()).delete(ev2);
verify(repo).delete(ev3);
List<EventLog> deletedEvents = verifyDeletionAndGetAllDeletedEvents();
assertThat(deletedEvents, containsInAnyOrder(ev1, ev3));
}

@Test
Expand Down Expand Up @@ -260,4 +266,12 @@ private TypeReference<LinkedHashMap<String, Object>> anyLinkedHashmapTypeReferen
return any();
}

private List<EventLog> verifyDeletionAndGetAllDeletedEvents() {
verify(repo, Mockito.atLeastOnce()).delete(eventLogColCaptor.capture());
verify(repo, never()).delete(any(EventLog.class));
return eventLogColCaptor.getAllValues()
.stream()
.flatMap(c -> c.stream())
.collect(Collectors.toList());
}
}

0 comments on commit cce3009

Please sign in to comment.