Skip to content

Commit

Permalink
Merge pull request eclipse-ditto#1815 from eclipse-ditto/feature/1583…
Browse files Browse the repository at this point in the history
…-apply-filter-in-historical-events

eclipse-ditto#1583 apply RQL based filtering when streaming "historical" thing events
  • Loading branch information
thjaeckle authored Jan 8, 2024
2 parents 7a507f6 + 7cfe527 commit c30d459
Show file tree
Hide file tree
Showing 12 changed files with 291 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public final class SubscribeForPersistedEvents extends AbstractStreamingSubscrip
@Nullable private final Instant fromHistoricalTimestamp;
@Nullable private final Instant toHistoricalTimestamp;
@Nullable private final String prefix;
@Nullable private final String filter;

private SubscribeForPersistedEvents(final EntityId entityId,
final JsonPointer resourcePath,
Expand All @@ -69,6 +70,7 @@ private SubscribeForPersistedEvents(final EntityId entityId,
@Nullable final Instant fromHistoricalTimestamp,
@Nullable final Instant toHistoricalTimestamp,
@Nullable final String prefix,
@Nullable final CharSequence filter,
final DittoHeaders dittoHeaders) {

super(TYPE, entityId, resourcePath, dittoHeaders);
Expand All @@ -77,6 +79,7 @@ private SubscribeForPersistedEvents(final EntityId entityId,
this.fromHistoricalTimestamp = fromHistoricalTimestamp;
this.toHistoricalTimestamp = toHistoricalTimestamp;
this.prefix = prefix;
this.filter = filter != null ? filter.toString() : null;
}

/**
Expand All @@ -89,7 +92,9 @@ private SubscribeForPersistedEvents(final EntityId entityId,
* @param dittoHeaders the command headers of the request.
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, long, long, CharSequence, DittoHeaders)}
*/
@Deprecated
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
final long fromHistoricalRevision,
Expand All @@ -103,6 +108,38 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
null,
null,
null,
null,
dittoHeaders);
}

/**
* Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code long} revisions.
*
* @param entityId the entityId that should be streamed.
* @param resourcePath the resource path for which to stream events.
* @param fromHistoricalRevision the revision to start the streaming from.
* @param toHistoricalRevision the revision to stop the streaming at.
* @param dittoHeaders the command headers of the request.
* @param filter the optional RQL filter to apply for persisted events before publishing to the stream
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @since 3.5.0
*/
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
final long fromHistoricalRevision,
final long toHistoricalRevision,
@Nullable final CharSequence filter,
final DittoHeaders dittoHeaders) {

return new SubscribeForPersistedEvents(entityId,
resourcePath,
fromHistoricalRevision,
toHistoricalRevision,
null,
null,
null,
filter,
dittoHeaders);
}

Expand All @@ -116,7 +153,9 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
* @param dittoHeaders the command headers of the request.
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, Instant, Instant, CharSequence, DittoHeaders)}
*/
@Deprecated
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
@Nullable final Instant fromHistoricalTimestamp,
Expand All @@ -130,6 +169,72 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
fromHistoricalTimestamp,
toHistoricalTimestamp,
null,
null,
dittoHeaders);
}

/**
* Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code Instant} timestamps.
*
* @param entityId the entityId that should be streamed.
* @param resourcePath the resource path for which to stream events.
* @param fromHistoricalTimestamp the timestamp to start the streaming from.
* @param toHistoricalTimestamp the timestamp to stop the streaming at.
* @param dittoHeaders the command headers of the request.
* @param filter the optional RQL filter to apply for persisted events before publishing to the stream
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @since 3.5.0
*/
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
@Nullable final Instant fromHistoricalTimestamp,
@Nullable final Instant toHistoricalTimestamp,
@Nullable final CharSequence filter,
final DittoHeaders dittoHeaders) {

return new SubscribeForPersistedEvents(entityId,
resourcePath,
0L,
Long.MAX_VALUE,
fromHistoricalTimestamp,
toHistoricalTimestamp,
null,
filter,
dittoHeaders);
}

/**
* Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code Instant} timestamps.
*
* @param entityId the entityId that should be streamed.
* @param resourcePath the resource path for which to stream events.
* @param fromHistoricalRevision the revision to start the streaming from.
* @param toHistoricalRevision the revision to stop the streaming at.
* @param fromHistoricalTimestamp the timestamp to start the streaming from.
* @param toHistoricalTimestamp the timestamp to stop the streaming at.
* @param dittoHeaders the command headers of the request.
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, Long, Long, Instant, Instant, CharSequence, DittoHeaders)}
*/
@Deprecated
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
@Nullable final Long fromHistoricalRevision,
@Nullable final Long toHistoricalRevision,
@Nullable final Instant fromHistoricalTimestamp,
@Nullable final Instant toHistoricalTimestamp,
final DittoHeaders dittoHeaders) {

return new SubscribeForPersistedEvents(entityId,
resourcePath,
null != fromHistoricalRevision ? fromHistoricalRevision : 0L,
null != toHistoricalRevision ? toHistoricalRevision : Long.MAX_VALUE,
fromHistoricalTimestamp,
toHistoricalTimestamp,
null,
null,
dittoHeaders);
}

Expand All @@ -142,16 +247,19 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
* @param toHistoricalRevision the revision to stop the streaming at.
* @param fromHistoricalTimestamp the timestamp to start the streaming from.
* @param toHistoricalTimestamp the timestamp to stop the streaming at.
* @param filter the optional RQL filter to apply for persisted events before publishing to the stream
* @param dittoHeaders the command headers of the request.
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @since 3.5.0
*/
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
@Nullable final Long fromHistoricalRevision,
@Nullable final Long toHistoricalRevision,
@Nullable final Instant fromHistoricalTimestamp,
@Nullable final Instant toHistoricalTimestamp,
@Nullable final CharSequence filter,
final DittoHeaders dittoHeaders) {

return new SubscribeForPersistedEvents(entityId,
Expand All @@ -161,6 +269,7 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
fromHistoricalTimestamp,
toHistoricalTimestamp,
null,
filter,
dittoHeaders);
}

Expand All @@ -182,6 +291,7 @@ public static SubscribeForPersistedEvents fromJson(final JsonObject jsonObject,
jsonObject.getValue(JsonFields.JSON_FROM_HISTORICAL_TIMESTAMP).map(Instant::parse).orElse(null),
jsonObject.getValue(JsonFields.JSON_TO_HISTORICAL_TIMESTAMP).map(Instant::parse).orElse(null),
jsonObject.getValue(JsonFields.PREFIX).orElse(null),
jsonObject.getValue(JsonFields.FILTER).orElse(null),
dittoHeaders
);
}
Expand All @@ -195,7 +305,7 @@ public static SubscribeForPersistedEvents fromJson(final JsonObject jsonObject,
*/
public SubscribeForPersistedEvents setPrefix(@Nullable final String prefix) {
return new SubscribeForPersistedEvents(entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, getDittoHeaders());
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter, getDittoHeaders());
}

/**
Expand Down Expand Up @@ -244,6 +354,14 @@ public Optional<String> getPrefix() {
return Optional.ofNullable(prefix);
}

/**
* @return the optional RQL filter to apply for persisted events before publishing to the stream
* @since 3.5.0
*/
public Optional<String> getFilter() {
return Optional.ofNullable(filter);
}

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
final JsonSchemaVersion schemaVersion,
Expand All @@ -263,6 +381,7 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
jsonObjectBuilder.set(JsonFields.JSON_TO_HISTORICAL_TIMESTAMP, toHistoricalTimestamp.toString(), predicate);
}
getPrefix().ifPresent(thePrefix -> jsonObjectBuilder.set(JsonFields.PREFIX, thePrefix));
getFilter().ifPresent(theFilter -> jsonObjectBuilder.set(JsonFields.FILTER, theFilter));
}

@Override
Expand All @@ -273,13 +392,13 @@ public String getTypePrefix() {
@Override
public SubscribeForPersistedEvents setDittoHeaders(final DittoHeaders dittoHeaders) {
return new SubscribeForPersistedEvents(entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, dittoHeaders);
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter, dittoHeaders);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
fromHistoricalTimestamp, toHistoricalTimestamp, prefix);
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter);
}

@Override
Expand All @@ -297,7 +416,8 @@ public boolean equals(@Nullable final Object obj) {
toHistoricalRevision == that.toHistoricalRevision &&
Objects.equals(fromHistoricalTimestamp, that.fromHistoricalTimestamp) &&
Objects.equals(toHistoricalTimestamp, that.toHistoricalTimestamp) &&
Objects.equals(prefix, that.prefix);
Objects.equals(prefix, that.prefix) &&
Objects.equals(filter, that.filter);
}

@Override
Expand All @@ -313,6 +433,7 @@ public String toString() {
+ ", fromHistoricalTimestamp=" + fromHistoricalTimestamp
+ ", toHistoricalTimestamp=" + toHistoricalTimestamp
+ ", prefix=" + prefix
+ ", filter=" + filter
+ "]";
}

Expand All @@ -339,6 +460,9 @@ private JsonFields() {

static final JsonFieldDefinition<String> PREFIX =
JsonFactory.newStringFieldDefinition("prefix", REGULAR, V_2);

public static final JsonFieldDefinition<String> FILTER =
JsonFactory.newStringFieldDefinition("filter", REGULAR, V_2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class SubscribeForPersistedEventsTest {
private static final long KNOWN_TO_REV = 42L;
private static final String KNOWN_FROM_TS = "2022-10-25T14:00:00Z";
private static final String KNOWN_TO_TS = "2022-10-25T15:00:00Z";
private static final String KNOWN_FILTER = "exists(thingId)";

private static final String JSON_ALL_FIELDS = JsonFactory.newObjectBuilder()
.set(Command.JsonFields.TYPE, SubscribeForPersistedEvents.TYPE)
Expand All @@ -53,6 +54,7 @@ public final class SubscribeForPersistedEventsTest {
.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_REVISION, KNOWN_TO_REV)
.set(SubscribeForPersistedEvents.JsonFields.JSON_FROM_HISTORICAL_TIMESTAMP, KNOWN_FROM_TS)
.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_TIMESTAMP, KNOWN_TO_TS)
.set(SubscribeForPersistedEvents.JsonFields.FILTER, KNOWN_FILTER)
.build()
.toString();

Expand All @@ -63,6 +65,7 @@ public final class SubscribeForPersistedEventsTest {
.set(StreamingSubscriptionCommand.JsonFields.JSON_RESOURCE_PATH, KNOWN_RESOURCE_PATH)
.set(SubscribeForPersistedEvents.JsonFields.JSON_FROM_HISTORICAL_REVISION, KNOWN_FROM_REV)
.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_REVISION, KNOWN_TO_REV)
.set(SubscribeForPersistedEvents.JsonFields.FILTER, KNOWN_FILTER)
.build().toString();

@Test
Expand All @@ -88,6 +91,7 @@ public void toJsonWithAllFieldsSet() {
KNOWN_TO_REV,
Instant.parse(KNOWN_FROM_TS),
Instant.parse(KNOWN_TO_TS),
KNOWN_FILTER,
DittoHeaders.empty()
);

Expand All @@ -102,6 +106,7 @@ public void toJsonWithOnlyRequiredFieldsSet() {
JsonPointer.of(KNOWN_RESOURCE_PATH),
KNOWN_FROM_REV,
KNOWN_TO_REV,
KNOWN_FILTER,
DittoHeaders.empty());
final String json = command.toJsonString();
assertThat(json).isEqualTo(JSON_MINIMAL);
Expand All @@ -116,6 +121,7 @@ public void fromJsonWithAllFieldsSet() {
KNOWN_TO_REV,
Instant.parse(KNOWN_FROM_TS),
Instant.parse(KNOWN_TO_TS),
KNOWN_FILTER,
DittoHeaders.empty()
);
assertThat(SubscribeForPersistedEvents.fromJson(JsonObject.of(JSON_ALL_FIELDS), DittoHeaders.empty()))
Expand All @@ -130,6 +136,7 @@ public void fromJsonWithOnlyRequiredFieldsSet() {
JsonPointer.of(KNOWN_RESOURCE_PATH),
KNOWN_FROM_REV,
KNOWN_TO_REV,
KNOWN_FILTER,
DittoHeaders.empty()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,22 @@
import javax.annotation.Nullable;
import javax.jms.JMSRuntimeException;

import org.apache.pekko.actor.ActorKilledException;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.OneForOneStrategy;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.ReceiveTimeout;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.japi.pf.DeciderBuilder;
import org.apache.pekko.japi.pf.FI;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.base.service.config.supervision.LocalAskTimeoutConfig;
Expand All @@ -48,17 +60,6 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.pekko.actor.ActorKilledException;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.OneForOneStrategy;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.ReceiveTimeout;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.japi.pf.DeciderBuilder;
import org.apache.pekko.japi.pf.FI;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;

/**
* Supervisor for {@link ConnectionPersistenceActor} which means it will create, start and watch it as child actor.
* <p>
Expand Down Expand Up @@ -158,6 +159,11 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));
}

@Override
protected boolean applyPersistedEventFilter(final Event<?> event, final SubscribeForPersistedEvents subscribe) {
return true;
}

@Override
protected boolean shouldBecomeTwinSignalProcessingAwaiting(final Signal<?> signal) {
return super.shouldBecomeTwinSignalProcessingAwaiting(signal) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
"type": "string",
"format": "date-time",
"description": "The timestamp to stop the streaming at."
},
"filter": {
"type": "string",
"description": "An RQL expression defining which events to filter for in the stream. Only supported for thing events."
}
}
}
Loading

0 comments on commit c30d459

Please sign in to comment.