Skip to content

Commit

Permalink
Support identity placeholder in MQTT binding topics
Browse files Browse the repository at this point in the history
  • Loading branch information
epieffe committed Dec 29, 2024
1 parent 3e2a862 commit def41f5
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public MqttRouteConfig resolveSubscribe(
String topic)
{
return routes.stream()
.filter(r -> r.authorized(authorization) && r.matchesSubscribe(topic))
.filter(r -> r.authorized(authorization) && r.matchesSubscribe(topic, authorization))
.findFirst()
.orElse(null);
}
Expand All @@ -128,7 +128,7 @@ public MqttRouteConfig resolvePublish(
String topic)
{
return routes.stream()
.filter(r -> r.authorized(authorization) && r.matchesPublish(topic))
.filter(r -> r.authorized(authorization) && r.matchesPublish(topic, authorization))
.findFirst()
.orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,29 @@
import java.util.stream.Collectors;

import io.aklivity.zilla.runtime.binding.mqtt.config.MqttConditionConfig;
import io.aklivity.zilla.runtime.engine.config.GuardedConfig;

public final class MqttConditionMatcher
{
private final List<Matcher> sessionMatchers;
private final List<Matcher> subscribeMatchers;
private final List<Matcher> publishMatchers;
private final List<String> subscribeTopics;
private final List<String> publishTopics;
private final List<GuardedConfig> guarded;

public MqttConditionMatcher(
MqttConditionConfig condition)
MqttConditionConfig condition,
List<GuardedConfig> guarded)
{
this.sessionMatchers =
condition.sessions != null && !condition.sessions.isEmpty() ?
asWildcardMatcher(condition.sessions.stream().map(s -> s.clientId).collect(Collectors.toList())) : null;
this.subscribeMatchers =
this.subscribeTopics =
condition.subscribes != null && !condition.subscribes.isEmpty() ?
asTopicMatcher(condition.subscribes.stream().map(s -> s.topic).collect(Collectors.toList())) : null;
this.publishMatchers =
condition.subscribes.stream().map(s -> s.topic).collect(Collectors.toList()) : null;
this.publishTopics =
condition.publishes != null && !condition.publishes.isEmpty() ?
asTopicMatcher(condition.publishes.stream().map(s -> s.topic).collect(Collectors.toList())) : null;
condition.publishes.stream().map(s -> s.topic).collect(Collectors.toList()) : null;
this.guarded = guarded;
}

public boolean matchesSession(
Expand All @@ -62,14 +66,15 @@ public boolean matchesSession(
}

public boolean matchesSubscribe(
String topic)
String topic,
long authorization)
{
boolean match = false;
if (subscribeMatchers != null)
if (subscribeTopics != null)
{
for (Matcher matcher : subscribeMatchers)
for (String subscribeTopic : subscribeTopics)
{
match = matcher.reset(topic).matches();
match = topicMatches(topic, subscribeTopic, authorization);
if (match)
{
break;
Expand All @@ -80,14 +85,15 @@ public boolean matchesSubscribe(
}

public boolean matchesPublish(
String topic)
String topic,
long authorization)
{
boolean match = false;
if (publishMatchers != null)
if (publishTopics != null)
{
for (Matcher matcher : publishMatchers)
for (String publishTopic : publishTopics)
{
match = matcher.reset(topic).matches();
match = topicMatches(topic, publishTopic, authorization);
if (match)
{
break;
Expand All @@ -97,6 +103,27 @@ public boolean matchesPublish(
return match;
}

private boolean topicMatches(String topic, String pattern, long authorization)
{
for (GuardedConfig g : guarded)
{
String identity = g.identity.apply(authorization);
if (identity != null)
{
pattern = pattern.replace(String.format("{guarded[%s].identity}", g.name), identity);
}
}
return topic.matches(pattern
.replace("{", "\\{")
.replace("}", "\\}")
.replace("[", "\\[")
.replace("]", "\\]")
.replace(".", "\\.")
.replace("$", "\\$")
.replace("+", "[^/]*")
.replace("#", ".*"));
}

private static List<Matcher> asWildcardMatcher(
List<String> wildcards)
{
Expand All @@ -115,20 +142,4 @@ private static List<Matcher> asWildcardMatcher(

return matchers;
}

private static List<Matcher> asTopicMatcher(
List<String> wildcards)
{
List<Matcher> matchers = new ArrayList<>();
for (String wildcard : wildcards)
{
matchers.add(Pattern.compile(wildcard
.replace(".", "\\.")
.replace("$", "\\$")
.replace("+", "[^/]*")
.replace("#", ".*")).matcher(""));

}
return matchers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public MqttRouteConfig(
this.id = route.id;
this.when = route.when.stream()
.map(MqttConditionConfig.class::cast)
.map(MqttConditionMatcher::new)
.map(conf -> new MqttConditionMatcher(conf, route.guarded))
.collect(toList());
this.with = (MqttWithConfig) route.with;
this.authorized = route.authorized;
Expand All @@ -63,14 +63,16 @@ boolean matchesSession(
}

boolean matchesSubscribe(
String topic)
String topic,
long authorization)
{
return when.isEmpty() || when.stream().anyMatch(m -> m.matchesSubscribe(topic));
return when.isEmpty() || when.stream().anyMatch(m -> m.matchesSubscribe(topic, authorization));
}

boolean matchesPublish(
String topic)
String topic,
long authorization)
{
return when.isEmpty() || when.stream().anyMatch(m -> m.matchesPublish(topic));
return when.isEmpty() || when.stream().anyMatch(m -> m.matchesPublish(topic, authorization));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
{
"title": "Topic",
"type": "string",
"pattern": "^(\\/?([\\w{}-]*|\\+)(\\/((?![-_])[\\w{}-]*|\\+))*(\\/#)?|#|\\/|\\$SYS(\\/((?![-_])[\\w{}-]*|\\+))*(\\/#)?)$"
"pattern": "^(\\/?([\\w{}\\[\\]\\.-]*|\\+)(\\/((?![-_])[\\w{}\\[\\]\\.-]*|\\+))*(\\/#)?|#|\\/|\\$SYS(\\/((?![-_])[\\w{}\\[\\]\\.-]*|\\+))*(\\/#)?)$"
}
}
}
Expand All @@ -101,7 +101,7 @@
"topic": {
"title": "Topic",
"type": "string",
"pattern": "^(\\/?([\\w{}-]*|\\+)(\\/((?![-_])[\\w{}-]*|\\+))*(\\/#)?|#|\\/|\\$SYS(\\/((?![-_])[\\w{}-]*|\\+))*(\\/#)?)$"
"pattern": "^(\\/?([\\w{}\\[\\]\\.-]*|\\+)(\\/((?![-_])[\\w{}\\[\\]\\.-]*|\\+))*(\\/#)?|#|\\/|\\$SYS(\\/((?![-_])[\\w{}\\[\\]\\.-]*|\\+))*(\\/#)?)$"
}
}
}
Expand Down

0 comments on commit def41f5

Please sign in to comment.