Skip to content

Commit

Permalink
Fixed bug leading to semaphore leak when filtering messages (#1891)
Browse files Browse the repository at this point in the history
* Fixed bug leading to semaphore leak

* Make filtered message receiver always returning a message

* Make filtered message receiver always returning a message

* Fixed message filtering for batch subscription

* Fixed message filtering for batch subscription

* Fixed message filtering for batch subscription

* Fixed message filtering for batch subscription

* Fixed message filtering for batch subscription

* Fixed message filtering for batch subscription

* CR

* Make checkstyle happy

* Make isFiltered thread safe
  • Loading branch information
szczygiel-m authored Aug 23, 2024
1 parent 8057c70 commit 1b730c2
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.util.Optional;
import java.util.Set;

import static java.util.stream.Collectors.toList;

/**
* Implementation note: this class is partially mutable and may be accessed from multiple
* threads involved in message lifecycle, it must be thread safe.
Expand Down Expand Up @@ -53,6 +51,8 @@ public class Message implements FilterableMessage {

private long currentMessageBackoff = -1;

private boolean isFiltered = false;

public Message(String id,
String topic,
byte[] content,
Expand Down Expand Up @@ -122,7 +122,7 @@ public boolean isTtlExceeded(long ttlMillis) {

public synchronized void incrementRetryCounter(Collection<URI> succeededUris) {
this.retryCounter++;
this.succeededUris.addAll(succeededUris.stream().map(URI::toString).collect(toList()));
this.succeededUris.addAll(succeededUris.stream().map(URI::toString).toList());
}

public synchronized int getRetryCounter() {
Expand Down Expand Up @@ -206,6 +206,14 @@ public String getSubscription() {
return subscription;
}

public synchronized boolean isFiltered() {
return isFiltered;
}

public synchronized void setFiltered(boolean filtered) {
isFiltered = filtered;
}

public static class Builder {
private String id;
private PartitionOffset partitionOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,19 @@ public void consume(Runnable signalsInterrupt) {
if (maybeMessage.isPresent()) {
Message message = maybeMessage.get();

if (logger.isDebugEnabled()) {
logger.debug(
"Read message {} partition {} offset {}",
message.getContentType(), message.getPartition(), message.getOffset()
);
if (message.isFiltered()) {
profiler.flushMeasurements(ConsumerRun.FILTERED);
} else {
if (logger.isDebugEnabled()) {
logger.debug(
"Read message {} partition {} offset {}",
message.getContentType(), message.getPartition(), message.getOffset()
);
}

Message convertedMessage = messageConverterResolver.converterFor(message, subscription).convert(message, topic);
sendMessage(convertedMessage, profiler);
}

Message convertedMessage = messageConverterResolver.converterFor(message, subscription).convert(message, topic);
sendMessage(convertedMessage, profiler);
} else {
pendingOffsets.releaseSlot();
profiler.flushMeasurements(ConsumerRun.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ private Optional<Message> readAndTransform(Subscription subscription, String bat
if (maybeMessage.isPresent()) {
Message message = maybeMessage.get();

Message transformed = messageConverterResolver.converterFor(message, subscription).convert(message, topic);
transformed = message().fromMessage(transformed).withData(wrap(subscription, transformed)).build();
if (!message.isFiltered()) {
Message transformed = messageConverterResolver.converterFor(message, subscription).convert(message, topic);
transformed = message().fromMessage(transformed).withData(wrap(subscription, transformed)).build();

trackers.get(subscription).logInflight(toMessageMetadata(transformed, subscription, batchId));
trackers.get(subscription).logInflight(toMessageMetadata(transformed, subscription, batchId));

return Optional.of(transformed);
return Optional.of(transformed);
}
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package pl.allegro.tech.hermes.consumers.consumer.profiling;

public enum ConsumerRun {
EMPTY, DELIVERED, DISCARDED, RETRIED
EMPTY, DELIVERED, DISCARDED, RETRIED, FILTERED
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@

public interface MessageReceiver {

/**
* Retrieves the next available message from the queue.
*
* <p>Depending on the context, the returned {@link Optional} can contain:
* <ul>
* <li>A {@link Message} that contains a valid message ready to be sent.</li>
* <li>A {@link Message} with the `isFiltered` flag set, indicating that the message
* has been filtered and should be skipped during processing or sending.</li>
* <li>{@code null}, indicating that there are no messages currently available in the queue.</li>
* </ul>
*
* @return an {@link Optional} containing the next {@link Message} if available;
* an {@link Optional} containing a filtered message if it should be skipped;
* or an empty {@link Optional} if there are no messages in the queue.
*/
Optional<Message> next();

default void stop() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ public FilteringMessageReceiver(MessageReceiver receiver,

@Override
public Optional<Message> next() {
return receiver.next().map(message ->
allow(message) ? message : null
);
return receiver.next().map(this::filter);
}

private boolean allow(Message message) {
private Message filter(Message message) {
FilterResult result = filterChain.apply(message);
filteredMessageHandler.handle(result, message, subscription);
return !result.isFiltered();
message.setFiltered(result.isFiltered());
return message;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy;
import pl.allegro.tech.hermes.api.ContentType;
import pl.allegro.tech.hermes.api.MessageFilterSpecification;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension;
Expand All @@ -23,10 +24,14 @@
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED;
import static com.google.common.collect.ImmutableMap.of;
import static java.util.Arrays.stream;
import static org.awaitility.Awaitility.waitAtMost;
import static pl.allegro.tech.hermes.api.BatchSubscriptionPolicy.Builder.batchSubscriptionPolicy;
import static pl.allegro.tech.hermes.api.TopicWithSchema.topicWithSchema;
import static pl.allegro.tech.hermes.integrationtests.assertions.HermesAssertions.assertThatMetrics;
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription;
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscriptionWithRandomName;
import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topicWithRandomName;

public class BatchDeliveryTest {
Expand All @@ -39,10 +44,19 @@ public class BatchDeliveryTest {
@RegisterExtension
public static final TestSubscribersExtension subscribers = new TestSubscribersExtension();

static final AvroUser BOB = new AvroUser("Bob", 50, "blue");

static final AvroUser ALICE = new AvroUser("Alice", 20, "magenta");

private static final TestMessage[] SMALL_BATCH = TestMessage.simpleMessages(2);

private static final TestMessage SINGLE_MESSAGE = TestMessage.simple();

private static final TestMessage SINGLE_MESSAGE_FILTERED = BOB.asTestMessage();

private static final MessageFilterSpecification MESSAGE_NAME_FILTER =
new MessageFilterSpecification(of("type", "jsonpath", "path", ".name", "matcher", "^Bob.*"));

@Test
public void shouldDeliverMessagesInBatch() {
// given
Expand All @@ -67,6 +81,43 @@ public void shouldDeliverMessagesInBatch() {
expectSingleBatch(subscriber, SMALL_BATCH);
}

@Test
public void shouldFilterIncomingEventsForBatch() {
// given
TestSubscriber subscriber = subscribers.createSubscriber();
Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build());
final Subscription subscription = hermes.initHelper().createSubscription(subscriptionWithRandomName(topic.getName(), subscriber.getEndpoint())
.withSubscriptionPolicy(buildBatchPolicy()
.withBatchSize(2)
.withBatchTime(3)
.withBatchVolume(1024)
.build())
.withFilter(MESSAGE_NAME_FILTER)
.build());

// when
hermes.api().publishUntilSuccess(topic.getQualifiedName(), BOB.asJson());
hermes.api().publishUntilSuccess(topic.getQualifiedName(), ALICE.asJson());

// then
expectSingleBatch(subscriber, SINGLE_MESSAGE_FILTERED);
waitAtMost(Duration.ofSeconds(10)).untilAsserted(() ->
hermes.api().getConsumersMetrics()
.expectStatus()
.isOk()
.expectBody(String.class)
.value((body) -> assertThatMetrics(body)
.contains("hermes_consumers_subscription_filtered_out_total")
.withLabels(
"group", topic.getName().getGroupName(),
"subscription", subscription.getName(),
"topic", topic.getName().getName()
)
.withValue(1.0)
)
);
}

@Test
public void shouldDeliverBatchInGivenTimePeriod() {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,69 @@ public void shouldReportMetricForFilteredSubscription() {
);
}

@Test
public void shouldNotIncreaseInflightForFilteredSubscription() {
// given
TestSubscriber subscriber = subscribers.createSubscriber(503);
Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build());
final Subscription subscription = hermes.initHelper().createSubscription(
subscription(topic, "subscription")
.withEndpoint(subscriber.getEndpoint())
.withSubscriptionPolicy(
subscriptionPolicy()
.withMessageTtl(3600)
.withInflightSize(1)
.build()
)
.withFilter(filterMatchingHeaderByPattern("Trace-Id", "^vte.*"))
.build()
);
TestMessage unfiltered = TestMessage.of("msg", "unfiltered");
TestMessage filtered = TestMessage.of("msg", "filtered");

// when
hermes.api().publishUntilSuccess(topic.getQualifiedName(), filtered.body(), header("Trace-Id", "otherTraceId"));
hermes.api().publishUntilSuccess(topic.getQualifiedName(), filtered.body(), header("Trace-Id", "otherTraceId"));

// then
waitAtMost(Duration.ofSeconds(10)).untilAsserted(() ->
hermes.api().getConsumersMetrics()
.expectStatus()
.isOk()
.expectBody(String.class)
.value((body) -> assertThatMetrics(body)
.contains("hermes_consumers_subscription_inflight")
.withLabels(
"group", topic.getName().getGroupName(),
"subscription", subscription.getName(),
"topic", topic.getName().getName()
)
.withValue(0.0)
)
);

// when
hermes.api().publishUntilSuccess(topic.getQualifiedName(), unfiltered.body(), header("Trace-Id", "vte12"));
hermes.api().publishUntilSuccess(topic.getQualifiedName(), unfiltered.body(), header("Trace-Id", "vte12"));

// then
waitAtMost(Duration.ofSeconds(10)).untilAsserted(() ->
hermes.api().getConsumersMetrics()
.expectStatus()
.isOk()
.expectBody(String.class)
.value((body) -> assertThatMetrics(body)
.contains("hermes_consumers_subscription_inflight")
.withLabels(
"group", topic.getName().getGroupName(),
"subscription", subscription.getName(),
"topic", topic.getName().getName()
)
.withValue(1.0)
)
);
}

@Test
public void shouldReportMetricsForSuccessfulBatchDelivery() {
// given
Expand Down

0 comments on commit 1b730c2

Please sign in to comment.