Skip to content

Commit

Permalink
Batch deletion of eventlogs
Browse files Browse the repository at this point in the history
* repository: add batch deletion method to interface (with default implementation)
* repository: implement batch deletion (uses updateBatch)
* transmission: use batch deletion
  • Loading branch information
ePaul committed Jul 28, 2023
1 parent 9d642c2 commit 4e41d31
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,19 @@ public void lockSomeMessages(String lockId, Instant now, Instant lockExpires) {

@Override
public void delete(EventLog eventLog) {
Map<String, Object> namedParameterMap = new HashMap<>();
namedParameterMap.put("id", eventLog.getId());
jdbcTemplate.update(
"DELETE FROM nakadi_events.event_log where id = :id",
namedParameterMap
delete(Collections.singleton(eventLog));
}

@Override
public void delete(Collection<EventLog> eventLogs) {
MapSqlParameterSource[] namedParameterMaps = eventLogs.stream()
.map(eventLog ->
new MapSqlParameterSource().addValue("id", eventLog.getId())
).toArray(MapSqlParameterSource[]::new);

jdbcTemplate.batchUpdate(
"DELETE FROM nakadi_events.event_log where id = :id",
namedParameterMaps
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ public interface EventLogRepository {

void delete(EventLog eventLog);

default void delete(Collection<EventLog> eventLogs) {
eventLogs.forEach(this::delete);
}

void persist(EventLog eventLog);

default void persist(Collection<EventLog> eventLogs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private void tryToPublishBatch(List<BatchItem> batch) throws Exception {
.filter(rawEvent -> !failedEids.contains(convertToUUID(rawEvent.getId())));
}

successfulEvents.forEach(eventLogRepository::delete);
eventLogRepository.delete(successfulEvents.collect(Collectors.toList()));
}

private List<String> collectEids(EventPublishingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.zalando.fahrschein.EventPublishingException;
import org.zalando.fahrschein.domain.BatchItemResponse;
import org.zalando.nakadiproducer.eventlog.impl.EventLog;
Expand All @@ -16,20 +19,23 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;

import static com.jayway.jsonpath.JsonPath.read;
import static java.time.Instant.now;
import static java.time.temporal.ChronoUnit.MINUTES;
import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.Collections.singletonList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
Expand All @@ -47,13 +53,16 @@ public class EventTransmissionServiceTest {
private MockNakadiPublishingClient publishingClient;
private ObjectMapper mapper;
private EventLogRepository repo;
@Captor
private ArgumentCaptor<Collection<EventLog>> eventLogColCaptor;

@BeforeEach
public void setUp() {
repo = mock(EventLogRepository.class);
publishingClient = spy(new MockNakadiPublishingClient());
mapper = spy(new ObjectMapper().registerModules(new JavaTimeModule()));
service = new EventTransmissionService(repo, publishingClient, mapper, 600, 60);
MockitoAnnotations.openMocks(this);
}

@Test
Expand Down Expand Up @@ -135,9 +144,8 @@ public void testErrorInPayloadDeserializationIsHandledGracefully() throws IOExce
assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003"));

// and only the successful ones have been deleted.
verify(repo).delete(ev1);
verify(repo, never()).delete(ev2);
verify(repo).delete(ev3);
List<EventLog> deletedEvents = verifyDeletionAndGetAllDeletedEvents();
assertThat(deletedEvents, containsInAnyOrder(ev1, ev3));
}

@Test
Expand All @@ -164,9 +172,8 @@ public void testUnknownErrorInTransmissionIsHandledGracefully() throws Exception
assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003"));

// and only the successful ones have been deleted.
verify(repo, never()).delete(ev1);
verify(repo, never()).delete(ev2);
verify(repo).delete(ev3);
List<EventLog> deletedEvents = verifyDeletionAndGetAllDeletedEvents();
assertThat(deletedEvents, containsInAnyOrder(ev3));
}

@Test
Expand Down Expand Up @@ -196,9 +203,8 @@ public void testEventPublishingExceptionIsHandledGracefully() throws Exception {
assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003"));

// and only the successful ones have been deleted.
verify(repo).delete(ev1);
verify(repo, never()).delete(ev2);
verify(repo).delete(ev3);
List<EventLog> deletedEvents = verifyDeletionAndGetAllDeletedEvents();
assertThat(deletedEvents, containsInAnyOrder(ev1, ev3));
}

@Test
Expand Down Expand Up @@ -260,4 +266,12 @@ private TypeReference<LinkedHashMap<String, Object>> anyLinkedHashmapTypeReferen
return any();
}

private List<EventLog> verifyDeletionAndGetAllDeletedEvents() {
verify(repo, Mockito.atLeastOnce()).delete(eventLogColCaptor.capture());
verify(repo, never()).delete(any(EventLog.class));
return eventLogColCaptor.getAllValues()
.stream()
.flatMap(c -> c.stream())
.collect(Collectors.toList());
}
}

0 comments on commit 4e41d31

Please sign in to comment.