Skip to content

Commit

Permalink
fix: concurrent modification when getting event sources (v5) (#2572)
Browse files Browse the repository at this point in the history
  • Loading branch information
csviri authored Nov 13, 2024
1 parent 0efc6d3 commit 40951de
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.javaoperatorsdk.operator.processing.event;


import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Stream;
Expand Down Expand Up @@ -33,7 +35,8 @@ public void add(EventSource eventSource) {
+ " is already registered with name: " + name);
}
sourceByName.put(name, eventSource);
sources.computeIfAbsent(keyFor(eventSource), k -> new HashMap<>()).put(name, eventSource);
sources.computeIfAbsent(keyFor(eventSource), k -> new ConcurrentHashMap<>()).put(name,
eventSource);
}

public EventSource remove(String name) {
Expand Down Expand Up @@ -144,7 +147,6 @@ public <S> List<EventSource<S, P>> getEventSources(Class<S> dependentType) {
if (sourcesForType == null) {
return Collections.emptyList();
}

return sourcesForType.values().stream()
.map(es -> (EventSource<S, P>) es).toList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class EventSourceManagerTest {
public void registersEventSource() {
EventSource eventSource = mock(EventSource.class);
when(eventSource.resourceType()).thenReturn(EventSource.class);
when(eventSource.name()).thenReturn("name1");

eventSourceManager.registerEventSource(eventSource);

Expand Down Expand Up @@ -95,6 +96,7 @@ void retrievingEventSourceForClassShouldWork() {

ManagedInformerEventSource eventSource = mock(ManagedInformerEventSource.class);
when(eventSource.resourceType()).thenReturn(String.class);
when(eventSource.name()).thenReturn("name1");
manager.registerEventSource(eventSource);

var source = manager.getEventSourceFor(String.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package io.javaoperatorsdk.operator.processing.event;

import java.util.ConcurrentModificationException;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.ConfigMap;
Expand Down Expand Up @@ -177,7 +182,43 @@ void getEventSourcesShouldWork() {
assertThat(eventSources.getEventSources(Service.class)).isEmpty();
}


@Test
void testConcurrentAddRemoveAndGet() throws InterruptedException {

final var concurrentExceptionFound = new AtomicBoolean(false);

for (int i = 0; i < 1000 && !concurrentExceptionFound.get(); i++) {
final var eventSources = new EventSources();
var eventSourceList =
IntStream.range(1, 20).mapToObj(n -> eventSourceMockWithName(EventSource.class,
"name" + n, HasMetadata.class)).toList();

IntStream.range(1, 10).forEach(n -> eventSources.add(eventSourceList.get(n - 1)));

var phaser = new Phaser(2);

var t1 = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
IntStream.range(11, 20).forEach(n -> eventSources.add(eventSourceList.get(n - 1)));
});
var t2 = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
try {
eventSources.getEventSources(eventSourceList.get(0).resourceType());
} catch (ConcurrentModificationException e) {
concurrentExceptionFound.set(true);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}

assertThat(concurrentExceptionFound)
.withFailMessage("ConcurrentModificationException thrown")
.isFalse();
}

<T extends EventSource> EventSource eventSourceMockWithName(Class<T> clazz, String name,
Class resourceType) {
Expand Down

0 comments on commit 40951de

Please sign in to comment.