Skip to content

Commit

Permalink
Merge pull request #171 from zalando-nakadi/batch-deletion
Browse files Browse the repository at this point in the history
Batch deletion of eventlogs
  • Loading branch information
ePaul authored Aug 2, 2023
2 parents 9d642c2 + 23b471f commit 5a9b887
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,19 @@ public void lockSomeMessages(String lockId, Instant now, Instant lockExpires) {

@Override
public void delete(EventLog eventLog) {
Map<String, Object> namedParameterMap = new HashMap<>();
namedParameterMap.put("id", eventLog.getId());
jdbcTemplate.update(
"DELETE FROM nakadi_events.event_log where id = :id",
namedParameterMap
delete(Collections.singleton(eventLog));
}

@Override
public void delete(Collection<EventLog> eventLogs) {
MapSqlParameterSource[] namedParameterMaps = eventLogs.stream()
.map(eventLog ->
new MapSqlParameterSource().addValue("id", eventLog.getId())
).toArray(MapSqlParameterSource[]::new);

jdbcTemplate.batchUpdate(
"DELETE FROM nakadi_events.event_log where id = :id",
namedParameterMaps
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package org.zalando.nakadiproducer.eventlog.impl;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.core.Is.is;

import javax.transaction.Transactional;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.zalando.nakadiproducer.BaseMockedExternalCommunicationIT;

import java.util.List;

@Transactional
public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {

Expand Down Expand Up @@ -44,14 +48,45 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {
public void setUp() {
eventLogRepository.deleteAll();

persistTestEvent("FLOW_ID");
}

private void persistTestEvent(String flowId) {
final EventLog eventLog = EventLog.builder()
.eventBodyData(WAREHOUSE_EVENT_BODY_DATA)
.eventType(WAREHOUSE_EVENT_TYPE)
.compactionKey(COMPACTION_KEY)
.flowId("FLOW_ID").build();
.flowId(flowId)
.build();
eventLogRepository.persist(eventLog);
}

@Test
public void testDeleteMultipleEvents() {
persistTestEvent("second_Flow-ID");
persistTestEvent("third flow-ID");
persistTestEvent("fourth flow-ID");
persistTestEvent("fifth flow-ID");

List<EventLog> events = findAllEventsInDB();
assertThat(events, hasSize(5));
EventLog notDeleted = events.remove(0);

// now the actual test – delete just 4 of the 5 events from the DB
eventLogRepository.delete(events);

List<EventLog> remaining = findAllEventsInDB();
assertThat(remaining, hasSize(1));
assertThat(remaining.get(0).getId(), is(notDeleted.getId()));
assertThat(remaining.get(0).getFlowId(), is(notDeleted.getFlowId()));
}

private List<EventLog> findAllEventsInDB() {
return jdbcTemplate.query(
"SELECT * FROM nakadi_events.event_log",
new BeanPropertyRowMapper<>(EventLog.class));
}

@Test
public void testFindEventInRepositoryById() {
Integer id = jdbcTemplate.queryForObject(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ public interface EventLogRepository {

void delete(EventLog eventLog);

default void delete(Collection<EventLog> eventLogs) {
eventLogs.forEach(this::delete);
}

void persist(EventLog eventLog);

default void persist(Collection<EventLog> eventLogs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private void tryToPublishBatch(List<BatchItem> batch) throws Exception {
.filter(rawEvent -> !failedEids.contains(convertToUUID(rawEvent.getId())));
}

successfulEvents.forEach(eventLogRepository::delete);
eventLogRepository.delete(successfulEvents.collect(Collectors.toList()));
}

private List<String> collectEids(EventPublishingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.zalando.fahrschein.EventPublishingException;
import org.zalando.fahrschein.domain.BatchItemResponse;
import org.zalando.nakadiproducer.eventlog.impl.EventLog;
Expand All @@ -16,20 +19,23 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;

import static com.jayway.jsonpath.JsonPath.read;
import static java.time.Instant.now;
import static java.time.temporal.ChronoUnit.MINUTES;
import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.Collections.singletonList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
Expand All @@ -47,13 +53,16 @@ public class EventTransmissionServiceTest {
private MockNakadiPublishingClient publishingClient;
private ObjectMapper mapper;
private EventLogRepository repo;
@Captor
private ArgumentCaptor<Collection<EventLog>> eventLogColCaptor;

@BeforeEach
public void setUp() {
repo = mock(EventLogRepository.class);
publishingClient = spy(new MockNakadiPublishingClient());
mapper = spy(new ObjectMapper().registerModules(new JavaTimeModule()));
service = new EventTransmissionService(repo, publishingClient, mapper, 600, 60);
MockitoAnnotations.openMocks(this);
}

@Test
Expand Down Expand Up @@ -135,9 +144,8 @@ public void testErrorInPayloadDeserializationIsHandledGracefully() throws IOExce
assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003"));

// and only the successful ones have been deleted.
verify(repo).delete(ev1);
verify(repo, never()).delete(ev2);
verify(repo).delete(ev3);
List<EventLog> deletedEvents = verifyDeletionAndGetAllDeletedEvents();
assertThat(deletedEvents, containsInAnyOrder(ev1, ev3));
}

@Test
Expand All @@ -164,9 +172,8 @@ public void testUnknownErrorInTransmissionIsHandledGracefully() throws Exception
assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003"));

// and only the successful ones have been deleted.
verify(repo, never()).delete(ev1);
verify(repo, never()).delete(ev2);
verify(repo).delete(ev3);
List<EventLog> deletedEvents = verifyDeletionAndGetAllDeletedEvents();
assertThat(deletedEvents, containsInAnyOrder(ev3));
}

@Test
Expand Down Expand Up @@ -196,9 +203,8 @@ public void testEventPublishingExceptionIsHandledGracefully() throws Exception {
assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003"));

// and only the successful ones have been deleted.
verify(repo).delete(ev1);
verify(repo, never()).delete(ev2);
verify(repo).delete(ev3);
List<EventLog> deletedEvents = verifyDeletionAndGetAllDeletedEvents();
assertThat(deletedEvents, containsInAnyOrder(ev1, ev3));
}

@Test
Expand Down Expand Up @@ -260,4 +266,12 @@ private TypeReference<LinkedHashMap<String, Object>> anyLinkedHashmapTypeReferen
return any();
}

private List<EventLog> verifyDeletionAndGetAllDeletedEvents() {
verify(repo, Mockito.atLeastOnce()).delete(eventLogColCaptor.capture());
verify(repo, never()).delete(any(EventLog.class));
return eventLogColCaptor.getAllValues()
.stream()
.flatMap(c -> c.stream())
.collect(Collectors.toList());
}
}

0 comments on commit 5a9b887

Please sign in to comment.