Skip to content

Commit

Permalink
feat: generalize Kafka properties (#16)
Browse files Browse the repository at this point in the history
* feat: generalize kafka properties. use spring-kafka props inst.of custom, move props to template params

* fix wrong param name in example

* fix: generation for mqtt and amqp (#14)

* fix api calls for mqtt and amqp

* fix port variables

* feat: add annotations to the model generation (#12)

* WIP, update to spec 2.0

* WIP, update to spec 2.0, remove definition of topic via extension

* working version for api 2.0

* add model generation

* Add support of Kafka generation

* update readme

* fix value ref, add jackson lib

* add validation annotations to schema

* fix no type case

* add annotation for jackson

* add description and examples to javadoc

* Update readme, move missed features block

* switch publish to subscribe and vice versa, because of wrong understanding of spec

* Fix consumed key, add key to publish operation

* move jackson and validation dependencies from kafka block

* fix README formatting

* mark annotations as done in README

* add support of multi-line description

* fix json in examples

* chore(release): 0.5.0 (#15)

chore(release): 0.5.0

* fix api calls for mqtt and amqp

* fix port variables

* fix yet another api operation call

Co-authored-by: asyncapi-bot <[email protected]>

* chore(release): 0.5.1 (#19)

chore(release): 0.5.1

* feat: generalize kafka properties. use spring-kafka props inst.of custom, move props to template params

* fix wrong param name in example

Co-authored-by: asyncapi-bot <[email protected]>
  • Loading branch information
Tenischev and asyncapi-bot authored Apr 26, 2020
1 parent 04af4a5 commit 5c1062f
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 24 deletions.
12 changes: 11 additions & 1 deletion .tp-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,15 @@
"supportedProtocols": ["kafka", "amqp", "mqtt"],
"nonRenderableFiles": [
"**/*.jar"
]
],
"parameters": {
"listenerPollTimeout": {
"description": "Only for Kafka. Timeout to use when polling the consumer.",
"required": false
},
"listenerConcurrency": {
"description": "Only for Kafka. Number of threads to run in the listener containers.",
"required": false
}
}
}
19 changes: 16 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,35 @@ components:
Options:

-V, --version output the version number
-t, --templates <templateDir> directory where templates are located (defaults to internal templates directory)
-o, --output <outputDir> directory where to put the generated files (defaults to current directory)
-p, --param <name=value> additional param to pass to templates
-h, --help output usage information
```

#### Supported parameters

|Name|Description|Required|Default|
|---|---|---|---|
|listenerPollTimeout|Only for Kafka. Timeout in ms to use when polling the consumer.|No|`3000`|
|listenerConcurrency|Only for Kafka. Number of threads to run in the listener containers.|No|`3`|

#### Examples

The shortest possible syntax:
```bash
ag asyncapi.yaml @asyncapi/java-spring-template
```

Specify where to put the result:
Specify where to put the result and define poll timeout:
```bash
ag -o ./src asyncapi.yaml @asyncapi/java-spring-template
ag -o ./src asyncapi.yaml -p listenerPollTimeout=5000 @asyncapi/java-spring-template
```

If you don't have the AsyncAPI Generator installed, you can install it like this:

```
npm install -g @asyncapi/generator
```
### Run it

Go to the root folder of the generated code and run this command (you need the JDK1.8):
Expand Down
15 changes: 0 additions & 15 deletions partials/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,6 @@
@Configuration
{% if hasPublish %}@EnableKafka{% endif %}
public class Config {
{%- if hasSubscribe or hasPublish %}
@Value("${kafka.bootstrap-servers:localhost:9092}")
private String bootstrapServers;
{% endif %}
{%- if hasPublish %}
@Value("${kafka.subscribe.pool-timeout:3000}")
private long poolTimeout;

@Value("${kafka.subscribe.amount-of-listeners:3}")
private Integer amountOfListeners;
{% endif %}
{%- if hasSubscribe %}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
Expand All @@ -57,7 +46,6 @@ public ProducerFactory<Integer, String> producerFactory() {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS,
Expand All @@ -78,8 +66,6 @@ public Map<String, Object> producerConfigs() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(amountOfListeners);
factory.getContainerProperties().setPollTimeout(poolTimeout);
return factory;
}
Expand All @@ -91,7 +77,6 @@ public ConsumerFactory<Integer, String> consumerFactory() {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS,
Expand Down
11 changes: 6 additions & 5 deletions template/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ mqtt:
{% endfor %}
{% endif %}{% endfor %}
{%- if asyncapi | isProtocol('kafka') %}
kafka:
bootstrap-servers: {% for serverName, server in asyncapi.servers() %}{% if server.variable('port') %}{{server.url() | replace('{port}', server.variable('port').defaultValue())}}{% else %}{{server.url()}}{% endif %}{% if not loop.last %},{% endif %}{% endfor %}
spring:
kafka:
bootstrap-servers: {% for serverName, server in asyncapi.servers() %}{% if server.variable('port') %}{{server.url() | replace('{port}', server.variable('port').defaultValue())}}{% else %}{{server.url()}}{% endif %}{% if not loop.last %},{% endif %}{% endfor %}
{%- if hasPublish %}
subscribe:
pool-timeout: 3000
amount-of-listeners: 3
listener:
poll-timeout: {% if params.listenerPollTimeout %}{{params.listenerPollTimeout}}{% else %}3000{% endif%}
concurrency: {% if params.listenerConcurrency %}{{params.listenerConcurrency}}{% else %}3{% endif%}
{% endif %}
{% endif %}

0 comments on commit 5c1062f

Please sign in to comment.