diff --git a/pom.xml b/pom.xml index db35ded..0334701 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,7 @@ pom Quarkus - AsyncAPI Root - - + quarkus-asyncapi quarkus-asyncapi-scanner quarkus-asyncapi-common docs diff --git a/quarkus-asyncapi/deployment/src/main/java/io/quarkiverse/asyncapi/generator/AsyncApiCodeGenerator.java b/quarkus-asyncapi/deployment/src/main/java/io/quarkiverse/asyncapi/generator/AsyncApiCodeGenerator.java index def3dea..a447bf7 100644 --- a/quarkus-asyncapi/deployment/src/main/java/io/quarkiverse/asyncapi/generator/AsyncApiCodeGenerator.java +++ b/quarkus-asyncapi/deployment/src/main/java/io/quarkiverse/asyncapi/generator/AsyncApiCodeGenerator.java @@ -12,7 +12,7 @@ import org.eclipse.microprofile.config.Config; -import com.asyncapi.v2._6_0.model.AsyncAPI; +import com.asyncapi.v3._0_0.model.AsyncAPI; import io.quarkiverse.asyncapi.config.AsyncAPISupplier; import io.quarkiverse.asyncapi.config.AsyncAPIUtils; diff --git a/quarkus-asyncapi/deployment/src/test/resources/asyncapi.yml b/quarkus-asyncapi/deployment/src/test/resources/asyncapi.yml index d4e9459..d8571e1 100644 --- a/quarkus-asyncapi/deployment/src/test/resources/asyncapi.yml +++ b/quarkus-asyncapi/deployment/src/test/resources/asyncapi.yml @@ -1,51 +1,66 @@ -asyncapi: '2.0.0' +asyncapi: 3.0.0 id: 'urn:com:kafka:server' info: title: Kafka Application - version: '1.0.0' + version: 1.0.0 description: Kafka Application license: name: Apache 2.0 - url: https://www.apache.org/licenses/LICENSE-2.0 + url: 'https://www.apache.org/licenses/LICENSE-2.0' servers: production: - url: localhost:9092 - description: Development server + host: 'localhost:9092' protocol: kafka - protocolVersion: '1.0.0' + protocolVersion: 1.0.0 + description: Development server x-trigger-version: v0.9.1-0.20190603184501-d845e1d612f8 x-activity-version: v0.9.1-0.20190603184501-d845e1d612f8 channels: - /message: - description: A message channel - subscribe: - summary: Get messages - message: + message: + address: message + messages: + publish.message: $ref: '#/components/messages/message' - traits: - - bindings: - flogo-kafka: - partitions: "0" - offset: 0 - publish: - summary: Send messages - message: + subscribe.message: $ref: '#/components/messages/message' - traits: - - bindings: - flogo-kafka: - partitions: "0" - offset: 0 - /dup: - description: A duplicate message channel - subscribe: - summary: Get messages - message: + description: A message channel + dup: + address: /dup + messages: + publish.message: $ref: '#/components/messages/message' - publish: - summary: Send messages - message: + subscribe.message: $ref: '#/components/messages/message' + description: A duplicate message channel +operations: + /message.publish: + action: send + channel: + $ref: '#/channels/message' + summary: Send messages + messages: + - $ref: '#/channels/message/messages/publish.message' + /message.subscribe: + action: receive + channel: + $ref: '#/channels/message' + summary: Get messages + messages: + - $ref: '#/channels/~1message/messages/subscribe.message' + /dup.publish: + action: send + channel: + $ref: '#/channels/dup' + summary: Send messages + messages: + - $ref: '#/channels/dup/messages/publish.message' + /dup.subscribe: + action: receive + channel: + $ref: '#/channels/dup' + summary: Get messages + messages: + - $ref: '#/channels/dup/messages/subscribe.message' components: messages: message: @@ -54,7 +69,7 @@ components: summary: A message contentType: application/json payload: - $ref: "#/components/schemas/message" + $ref: '#/components/schemas/message' schemas: message: type: object diff --git a/quarkus-asyncapi/integration-tests/simple/src/test/asyncapi/asyncapihttp.yml b/quarkus-asyncapi/integration-tests/simple/src/test/asyncapi/asyncapihttp.yml index 85a4828..d5495a4 100644 --- a/quarkus-asyncapi/integration-tests/simple/src/test/asyncapi/asyncapihttp.yml +++ b/quarkus-asyncapi/integration-tests/simple/src/test/asyncapi/asyncapihttp.yml @@ -1,33 +1,41 @@ -asyncapi: '2.0.0' +asyncapi: 3.0.0 id: 'urn:com:http:server' info: title: Http Application - version: '1.0.0' + version: 1.0.0 description: Http Application license: name: Apache 2.0 - url: https://www.apache.org/licenses/LICENSE-2.0 + url: 'https://www.apache.org/licenses/LICENSE-2.0' servers: production: - url: localhost:8080 - description: Development server + host: 'localhost:8080' protocol: http + description: Development server channels: first: - description: A message channel - subscribe: - summary: Get messages - message: + address: first + messages: + publish.message: $ref: '#/components/messages/message' - publish: - summary: Send messages - message: + subscribe.message: $ref: '#/components/messages/message' - traits: - - bindings: - flogo-kafka: - partitions: "0" - offset: 0 + description: A message channel +operations: + first.publish: + action: send + channel: + $ref: '#/channels/first' + summary: Send messages + messages: + - $ref: '#/channels/first/messages/publish.message' + first.subscribe: + action: receive + channel: + $ref: '#/channels/first' + summary: Get messages + messages: + - $ref: '#/channels/first/messages/subscribe.message' components: messages: message: @@ -35,12 +43,12 @@ components: summary: A message contentType: application/json payload: - $ref: "#/components/schemas/message" + $ref: '#/components/schemas/message' schemas: message: type: object properties: - name: + name: type: string age: type: integer diff --git a/quarkus-asyncapi/integration-tests/simple/src/test/asyncapi/asyncapikafka.yml b/quarkus-asyncapi/integration-tests/simple/src/test/asyncapi/asyncapikafka.yml index 19ff616..5572436 100644 --- a/quarkus-asyncapi/integration-tests/simple/src/test/asyncapi/asyncapikafka.yml +++ b/quarkus-asyncapi/integration-tests/simple/src/test/asyncapi/asyncapikafka.yml @@ -1,41 +1,45 @@ -asyncapi: '2.0.0' +asyncapi: 3.0.0 id: 'urn:com:kafka:server' info: title: Kafka Application - version: '1.0.0' + version: 1.0.0 description: Kafka Application license: name: Apache 2.0 - url: https://www.apache.org/licenses/LICENSE-2.0 + url: 'https://www.apache.org/licenses/LICENSE-2.0' servers: production: - url: localhost:9092 - description: Development server + host: 'localhost:9092' protocol: kafka - protocolVersion: '1.0.0' + protocolVersion: 1.0.0 + description: Development server x-trigger-version: v0.9.1-0.20190603184501-d845e1d612f8 x-activity-version: v0.9.1-0.20190603184501-d845e1d612f8 channels: - /message: - description: A message channel - subscribe: - summary: Get messages - message: + message: + address: message + messages: + publish.message: $ref: '#/components/messages/message' - traits: - - bindings: - flogo-kafka: - partitions: "0" - offset: 0 - publish: - summary: Send messages - message: + subscribe.message: $ref: '#/components/messages/message' - traits: - - bindings: - flogo-kafka: - partitions: "0" - offset: 0 + description: A message channel +operations: + /message.publish: + action: receive + channel: + $ref: '#/channels/message' + summary: Send messages + + messages: + - $ref: '#/channels/message/messages/publish.message' + /message.subscribe: + action: send + channel: + $ref: '#/channels/message' + summary: Get messages + messages: + - $ref: '#/channels/~1message/messages/subscribe.message' components: messages: message: @@ -43,12 +47,12 @@ components: summary: A message contentType: application/json payload: - $ref: "#/components/schemas/message" + $ref: '#/components/schemas/message' schemas: message: type: object properties: - name: + name: type: string age: type: integer diff --git a/quarkus-asyncapi/integration-tests/simple/src/test/java/io/quarkiverse/asyncapi/config/AsyncAPIResourceGeneratorTest.java b/quarkus-asyncapi/integration-tests/simple/src/test/java/io/quarkiverse/asyncapi/config/AsyncAPIResourceGeneratorTest.java index 76e1a51..7dd17d7 100644 --- a/quarkus-asyncapi/integration-tests/simple/src/test/java/io/quarkiverse/asyncapi/config/AsyncAPIResourceGeneratorTest.java +++ b/quarkus-asyncapi/integration-tests/simple/src/test/java/io/quarkiverse/asyncapi/config/AsyncAPIResourceGeneratorTest.java @@ -9,7 +9,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.junit.jupiter.api.Test; -import com.asyncapi.v2._6_0.model.AsyncAPI; +import com.asyncapi.v3._0_0.model.AsyncAPI; import io.quarkus.test.junit.QuarkusTest; @@ -19,10 +19,10 @@ public class AsyncAPIResourceGeneratorTest { @Inject AsyncAPIRegistry registry; - @ConfigProperty(name = "mp.messaging.incoming./message.connector") + @ConfigProperty(name = "mp.messaging.incoming.message.connector") Optional kafkaIncomingConnector; - @ConfigProperty(name = "mp.messaging.outgoing./message_out.connector") + @ConfigProperty(name = "mp.messaging.outgoing.message_out.connector") Optional kafkaOutgoingConnector; @ConfigProperty(name = "mp.messaging.incoming.first.connector") @@ -31,10 +31,10 @@ public class AsyncAPIResourceGeneratorTest { @ConfigProperty(name = "mp.messaging.outgoing.first_out.connector") Optional httpOutgoingConnector; - @ConfigProperty(name = "mp.messaging.incoming./message.topic") + @ConfigProperty(name = "mp.messaging.incoming.message.topic") Optional incomingTopic; - @ConfigProperty(name = "mp.messaging.outgoing./message_out.topic") + @ConfigProperty(name = "mp.messaging.outgoing.message_out.topic") Optional outgoingTopic; @ConfigProperty(name = "mp.messaging.incoming.first.path") @@ -50,8 +50,8 @@ void testKafkaGenerator() { assertThat(asyncAPI.get().getId()).isEqualTo("urn:com:kafka:server"); assertThat(kafkaIncomingConnector.get()).isEqualTo("smallrye-kafka"); assertThat(kafkaOutgoingConnector.get()).isEqualTo("smallrye-kafka"); - assertThat(incomingTopic.get()).isEqualTo("/message"); - assertThat(outgoingTopic.get()).isEqualTo("/message"); + assertThat(incomingTopic.get()).isEqualTo("message"); + assertThat(outgoingTopic.get()).isEqualTo("message"); } @Test diff --git a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/AsyncAPIRegistry.java b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/AsyncAPIRegistry.java index a957ad3..d746fcf 100644 --- a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/AsyncAPIRegistry.java +++ b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/AsyncAPIRegistry.java @@ -2,7 +2,7 @@ import java.util.Optional; -import com.asyncapi.v2._6_0.model.AsyncAPI; +import com.asyncapi.v3._0_0.model.AsyncAPI; /** Holder of Async API instances */ public interface AsyncAPIRegistry { diff --git a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/AsyncAPISupplier.java b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/AsyncAPISupplier.java index a0b8446..bacfc84 100644 --- a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/AsyncAPISupplier.java +++ b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/AsyncAPISupplier.java @@ -1,6 +1,6 @@ package io.quarkiverse.asyncapi.config; -import com.asyncapi.v2._6_0.model.AsyncAPI; +import com.asyncapi.v3._0_0.model.AsyncAPI; public interface AsyncAPISupplier { String id(); diff --git a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/AsyncConfigSource.java b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/AsyncConfigSource.java index a03a668..80db893 100644 --- a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/AsyncConfigSource.java +++ b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/AsyncConfigSource.java @@ -3,13 +3,15 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.asyncapi.v2._6_0.model.AsyncAPI; -import com.asyncapi.v2._6_0.model.channel.ChannelItem; -import com.asyncapi.v2._6_0.model.server.Server; +import com.asyncapi.v3._0_0.model.AsyncAPI; +import com.asyncapi.v3._0_0.model.channel.Channel; +import com.asyncapi.v3._0_0.model.operation.Operation; +import com.asyncapi.v3._0_0.model.server.Server; import io.quarkiverse.asyncapi.config.channels.ChannelConfigurer; import io.quarkiverse.asyncapi.config.channels.ChannelConfigurerFactory; @@ -20,6 +22,7 @@ public class AsyncConfigSource extends MapBackedConfigSource { private static final long serialVersionUID = 1L; private static final Logger logger = LoggerFactory.getLogger(AsyncConfigSource.class); + private static final String CHANNEL_PREFIX = "#/channels/"; public AsyncConfigSource(AsyncAPISupplier asyncAPISupplier) { super(asyncAPISupplier.id(), getMapFromAsyncApi(asyncAPISupplier.asyncAPI())); @@ -27,14 +30,27 @@ public AsyncConfigSource(AsyncAPISupplier asyncAPISupplier) { private static Map getMapFromAsyncApi(AsyncAPI asyncAPI) { Map result = new HashMap<>(); - for (Server server : asyncAPI.getServers().values()) { + for (Object serverObject : asyncAPI.getServers().values()) { + Server server = (Server) serverObject; ChannelConfigurer configurer = ChannelConfigurerFactory.get(server); configurer.commonConfig(server, result); - for (Entry entry : asyncAPI.getChannels().entrySet()) { - configurer.channelConfig(entry.getKey(), entry.getValue(), server, result); + for (Entry entry : asyncAPI.getOperations().entrySet()) { + Operation operation = (Operation) entry.getValue(); + getChannelFromOperation(asyncAPI.getChannels(), operation) + .ifPresent(channel -> configurer.channelConfig(server, channel, operation, result)); } } logger.debug("Produced config source {}", result); return result; } + + private static Optional getChannelFromOperation(Map channels, Operation oper) { + String channelRef = oper.getChannel().getRef(); + if (channelRef.startsWith(CHANNEL_PREFIX)) { + String channelName = channelRef.substring(CHANNEL_PREFIX.length()); + logger.debug("Checking channel name {}", channelName); + return Optional.ofNullable((Channel) channels.get(channelName)); + } + return Optional.empty(); + } } diff --git a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/JacksonAsyncAPISupplier.java b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/JacksonAsyncAPISupplier.java index f9bcf76..2a8d663 100644 --- a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/JacksonAsyncAPISupplier.java +++ b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/JacksonAsyncAPISupplier.java @@ -3,7 +3,7 @@ import java.io.IOException; import java.io.UncheckedIOException; -import com.asyncapi.v2._6_0.model.AsyncAPI; +import com.asyncapi.v3._0_0.model.AsyncAPI; public abstract class JacksonAsyncAPISupplier implements AsyncAPISupplier { diff --git a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/MapAsyncAPIRegistry.java b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/MapAsyncAPIRegistry.java index 708b1bd..7a12872 100644 --- a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/MapAsyncAPIRegistry.java +++ b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/MapAsyncAPIRegistry.java @@ -4,7 +4,7 @@ import java.util.Map; import java.util.Optional; -import com.asyncapi.v2._6_0.model.AsyncAPI; +import com.asyncapi.v3._0_0.model.AsyncAPI; public class MapAsyncAPIRegistry implements AsyncAPIRegistry { private final Map map; diff --git a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/AbstractChannelConfigurer.java b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/AbstractChannelConfigurer.java index 6d2bf70..8af3d1e 100644 --- a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/AbstractChannelConfigurer.java +++ b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/AbstractChannelConfigurer.java @@ -2,9 +2,10 @@ import java.util.Map; -import com.asyncapi.v2._6_0.model.channel.ChannelItem; -import com.asyncapi.v2._6_0.model.channel.operation.Operation; -import com.asyncapi.v2._6_0.model.server.Server; +import com.asyncapi.v3._0_0.model.channel.Channel; +import com.asyncapi.v3._0_0.model.operation.Operation; +import com.asyncapi.v3._0_0.model.operation.OperationAction; +import com.asyncapi.v3._0_0.model.server.Server; public abstract class AbstractChannelConfigurer implements ChannelConfigurer { @@ -34,19 +35,16 @@ public String protocol() { } @Override - public void channelConfig(String channelName, ChannelItem item, Server server, Map result) { - - if (item.getSubscribe() != null) { - String incomingChannel = channelName; + public void channelConfig(Server server, Channel channel, Operation operation, Map result) { + if (operation.getAction() == OperationAction.RECEIVE) { + String incomingChannel = channel.getAddress(); result.put(incomingProperty(incomingChannel, CONNECTOR), connectorId); - addIncomingChannel(incomingChannel, channelName, item.getPublish(), server, result); - } - if (item.getPublish() != null) { - String outgoingChannel = channelName + "_out"; + addIncomingChannel(incomingChannel, channel.getAddress(), operation, server, result); + } else if (operation.getAction() == OperationAction.SEND) { + String outgoingChannel = channel.getAddress() + "_out"; result.put(outgoingProperty(outgoingChannel, CONNECTOR), connectorId); - addOutgoingChannel(outgoingChannel, channelName, item.getPublish(), server, result); + addOutgoingChannel(outgoingChannel, channel.getAddress(), operation, server, result); } - } public void commonConfig(Server server, Map result) { diff --git a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/ChannelConfigurer.java b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/ChannelConfigurer.java index b558665..b0231f6 100644 --- a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/ChannelConfigurer.java +++ b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/ChannelConfigurer.java @@ -2,14 +2,15 @@ import java.util.Map; -import com.asyncapi.v2._6_0.model.channel.ChannelItem; -import com.asyncapi.v2._6_0.model.server.Server; +import com.asyncapi.v3._0_0.model.channel.Channel; +import com.asyncapi.v3._0_0.model.operation.Operation; +import com.asyncapi.v3._0_0.model.server.Server; public interface ChannelConfigurer { String protocol(); - void channelConfig(String channelName, ChannelItem item, Server server, Map result); + void channelConfig(Server server, Channel channel, Operation operation, Map result); void commonConfig(Server server, Map result); } diff --git a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/ChannelConfigurerFactory.java b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/ChannelConfigurerFactory.java index 38bf9ef..c8bfb4e 100644 --- a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/ChannelConfigurerFactory.java +++ b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/ChannelConfigurerFactory.java @@ -6,7 +6,7 @@ import java.util.function.Function; import java.util.stream.Collectors; -import com.asyncapi.v2._6_0.model.server.Server; +import com.asyncapi.v3._0_0.model.server.Server; public class ChannelConfigurerFactory { diff --git a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/HttpChannelConfigurer.java b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/HttpChannelConfigurer.java index 7adfa80..c67984b 100644 --- a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/HttpChannelConfigurer.java +++ b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/HttpChannelConfigurer.java @@ -2,8 +2,8 @@ import java.util.Map; -import com.asyncapi.v2._6_0.model.channel.operation.Operation; -import com.asyncapi.v2._6_0.model.server.Server; +import com.asyncapi.v3._0_0.model.operation.Operation; +import com.asyncapi.v3._0_0.model.server.Server; public class HttpChannelConfigurer extends AbstractChannelConfigurer { diff --git a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/KafkaChannelConfigurer.java b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/KafkaChannelConfigurer.java index 32c8fba..42b6ff9 100644 --- a/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/KafkaChannelConfigurer.java +++ b/quarkus-asyncapi/runtime/src/main/java/io/quarkiverse/asyncapi/config/channels/KafkaChannelConfigurer.java @@ -2,8 +2,8 @@ import java.util.Map; -import com.asyncapi.v2._6_0.model.channel.operation.Operation; -import com.asyncapi.v2._6_0.model.server.Server; +import com.asyncapi.v3._0_0.model.operation.Operation; +import com.asyncapi.v3._0_0.model.server.Server; public class KafkaChannelConfigurer extends AbstractChannelConfigurer { @@ -33,7 +33,13 @@ protected void addIncomingChannel(String smallryeChannel, String channelName, Op @Override public void commonConfig(Server server, Map result) { - String serverUri = server.getUrl(); - result.compute("kafka.bootstrap.servers", (k, v) -> v == null ? serverUri : v + "," + serverUri); + StringBuilder sb = new StringBuilder(server.getHost()); + if (server.getPathname() != null) { + if (!server.getPathname().startsWith("/")) { + sb.append('/'); + } + sb.append(server.getPathname()); + } + result.compute("kafka.bootstrap.servers", (k, v) -> v == null ? sb.toString() : v + "," + sb); } }