Skip to content

Commit

Permalink
Merge pull request #1283 from fdelbrayelle/feat/issues/1096_kafka_con…
Browse files Browse the repository at this point in the history
…sumer

Feat/issues/1096 kafka consumer
  • Loading branch information
pascalgrimaud authored May 1, 2022
2 parents 82efa50 + 7e521e2 commit aae5829
Show file tree
Hide file tree
Showing 20 changed files with 512 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ private Constants() {}
public static final String TECHNICAL_INFRASTRUCTURE = getPath(TECHNICAL, INFRASTRUCTURE);
public static final String TECHNICAL_INFRASTRUCTURE_PRIMARY = getPath(TECHNICAL_INFRASTRUCTURE, PRIMARY);
public static final String TECHNICAL_INFRASTRUCTURE_SECONDARY = getPath(TECHNICAL_INFRASTRUCTURE, SECONDARY);
public static final String TECHNICAL_INFRASTRUCTURE_CONFIG = getPath(TECHNICAL_INFRASTRUCTURE, CONFIG);
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private GeneratorAction() {}
public static final String REACT_CYPRESS = "react-cypress";

public static final String SPRINGBOOT_KAFKA = "springboot-kafka";
public static final String SPRINGBOOT_KAFKA_DUMMY_PRODUCER = "springboot-kafka-dummy-producer";
public static final String SPRINGBOOT_KAFKA_DUMMY_PRODUCER_CONSUMER = "springboot-kafka-dummy-producer-consumer";
public static final String SPRINGBOOT_KAFKA_AKHQ = "springboot-kafka-akhq";

public static final String VUE = "vue";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ public void init(final Project project) {
kafkaService.init(project);
}

public void addDummyProducer(final Project project) {
kafkaService.addDummyProducer(project);
public void addDummyProducerConsumer(final Project project) {
kafkaService.addDummyProducerConsumer(project);
}

public void addAkhq(final Project project) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package tech.jhipster.lite.generator.server.springboot.broker.kafka.domain;

import static tech.jhipster.lite.common.domain.FileUtils.getPath;
import static tech.jhipster.lite.generator.project.domain.Constants.MAIN_DOCKER;
import static tech.jhipster.lite.generator.project.domain.Constants.MAIN_JAVA;
import static tech.jhipster.lite.generator.project.domain.Constants.TECHNICAL_INFRASTRUCTURE_SECONDARY;
import static tech.jhipster.lite.generator.project.domain.Constants.TEST_JAVA;
import static tech.jhipster.lite.generator.project.domain.DefaultConfig.BASE_NAME;
import static tech.jhipster.lite.generator.project.domain.DefaultConfig.DEFAULT_BASE_NAME;
import static tech.jhipster.lite.generator.project.domain.DefaultConfig.PACKAGE_NAME;
import static tech.jhipster.lite.generator.project.domain.Constants.*;
import static tech.jhipster.lite.generator.project.domain.DefaultConfig.*;
import static tech.jhipster.lite.generator.server.springboot.broker.kafka.domain.Akhq.AKHQ_DOCKER_COMPOSE_FILE;
import static tech.jhipster.lite.generator.server.springboot.broker.kafka.domain.Akhq.AKHQ_DOCKER_IMAGE;
import static tech.jhipster.lite.generator.server.springboot.broker.kafka.domain.Kafka.KAFKA_DOCKER_COMPOSE_FILE;
Expand All @@ -29,6 +24,7 @@ public class KafkaDomainService implements KafkaService {
private static final String SOURCE = "server/springboot/broker/kafka";
private static final String DUMMY_TOPIC_NAME = "kafka.topic.dummy";
private static final String DUMMY_PRODUCER_PATH = "dummy/infrastructure/secondary/kafka/producer";
private static final String DUMMY_CONSUMER_PATH = "dummy/infrastructure/primary/kafka/consumer";

private final BuildToolService buildToolService;

Expand Down Expand Up @@ -56,36 +52,48 @@ public void init(final Project project) {
addDockerCompose(project);
addProperties(project);
addTestcontainers(project);
addConfiguration(project);
}

private void addConfiguration(final Project project) {
final String packageNamePath = project.getPackageNamePath().orElse(getPath(DefaultConfig.PACKAGE_PATH));
final String configKafkaPath = TECHNICAL_INFRASTRUCTURE_CONFIG + "/kafka";

projectRepository.template(project, SOURCE, "KafkaProperties.java", getPath(MAIN_JAVA, packageNamePath, configKafkaPath));
projectRepository.template(project, SOURCE, "KafkaPropertiesTest.java", getPath(TEST_JAVA, packageNamePath, configKafkaPath));

projectRepository.template(
project,
SOURCE,
"KafkaConfiguration.java",
getPath(MAIN_JAVA, packageNamePath, TECHNICAL_INFRASTRUCTURE_CONFIG + "/kafka")
);
}

@Override
public void addDummyProducer(final Project project) {
public void addDummyProducerConsumer(final Project project) {
if (!springBootCommonService.getProperty(project, DUMMY_TOPIC_NAME).isPresent()) {
project.addDefaultConfig(PACKAGE_NAME);
project.addDefaultConfig(BASE_NAME);
final String packageNamePath = project.getPackageNamePath().orElse(getPath(DefaultConfig.PACKAGE_PATH));
final String secondaryKafkaPath = TECHNICAL_INFRASTRUCTURE_SECONDARY + "/kafka";

final String topicName = "queue." + project.getBaseName().orElse("jhipster") + ".dummy";
springBootCommonService.addProperties(project, DUMMY_TOPIC_NAME, topicName);
springBootCommonService.addPropertiesTest(project, DUMMY_TOPIC_NAME, topicName);

projectRepository.template(project, SOURCE, "KafkaProducerProperties.java", getPath(MAIN_JAVA, packageNamePath, secondaryKafkaPath));
projectRepository.template(
project,
SOURCE,
"KafkaProducerPropertiesTest.java",
getPath(TEST_JAVA, packageNamePath, secondaryKafkaPath)
);
projectRepository.template(project, SOURCE, "DummyProducer.java", getPath(MAIN_JAVA, packageNamePath, DUMMY_PRODUCER_PATH));
projectRepository.template(project, SOURCE, "DummyProducerTest.java", getPath(TEST_JAVA, packageNamePath, DUMMY_PRODUCER_PATH));
projectRepository.template(project, SOURCE, "DummyProducerIT.java", getPath(TEST_JAVA, packageNamePath, DUMMY_PRODUCER_PATH));
projectRepository.template(project, SOURCE, "KafkaConfiguration.java", getPath(MAIN_JAVA, packageNamePath, secondaryKafkaPath));

projectRepository.template(project, SOURCE, "AbstractConsumer.java", getPath(MAIN_JAVA, packageNamePath, DUMMY_CONSUMER_PATH));
projectRepository.template(project, SOURCE, "DummyConsumer.java", getPath(MAIN_JAVA, packageNamePath, DUMMY_CONSUMER_PATH));
projectRepository.template(project, SOURCE, "DummyConsumerIT.java", getPath(TEST_JAVA, packageNamePath, DUMMY_CONSUMER_PATH));
projectRepository.template(project, SOURCE, "DummyConsumerTest.java", getPath(TEST_JAVA, packageNamePath, DUMMY_CONSUMER_PATH));
}
}

@Override
public void addAkhq(Project project) {
public void addAkhq(final Project project) {
final String akhqDockerImage = dockerService.getImageNameWithVersion(AKHQ_DOCKER_IMAGE).orElseThrow();
project.addConfig("akhqDockerImage", akhqDockerImage);
projectRepository.template(project, SOURCE, AKHQ_DOCKER_COMPOSE_FILE, MAIN_DOCKER, AKHQ_DOCKER_COMPOSE_FILE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
public interface KafkaService {
void init(Project project);

void addDummyProducer(Project project);
void addDummyProducerConsumer(Project project);

void addAkhq(Project project);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ public void init(final @RequestBody ProjectDTO projectDTO) {
kafkaApplicationService.init(project);
}

@Operation(summary = "Add a dummy Kafka producer")
@ApiResponse(responseCode = "500", description = "An error occurred while adding a dummy Kafka producer")
@PostMapping("/dummy-producer")
@GeneratorStep(id = GeneratorAction.SPRINGBOOT_KAFKA_DUMMY_PRODUCER)
public void addDummyProducer(final @RequestBody ProjectDTO projectDTO) {
@Operation(summary = "Add dummy Kafka producer and consumer")
@ApiResponse(responseCode = "500", description = "An error occurred while adding dummy Kafka producer and consumer")
@PostMapping("/dummy-producer-consumer")
@GeneratorStep(id = GeneratorAction.SPRINGBOOT_KAFKA_DUMMY_PRODUCER_CONSUMER)
public void addDummyProducerConsumer(final @RequestBody ProjectDTO projectDTO) {
final Project project = ProjectDTO.toProject(projectDTO);
kafkaApplicationService.addDummyProducer(project);
kafkaApplicationService.addDummyProducerConsumer(project);
}

@Operation(summary = "Add AKHQ")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package {{packageName}}.dummy.infrastructure.primary.kafka.consumer;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractConsumer<T> implements Runnable {
private final Logger log = LoggerFactory.getLogger(AbstractConsumer.class);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer<String, T> consumer;
private final String topicName;
private final int pollingTimeout;
protected AbstractConsumer(final String topicName, final int pollingTimeout, final KafkaConsumer<String, T> consumer) {
this.topicName = topicName;
this.pollingTimeout = pollingTimeout;
this.consumer = consumer;
}

@Override
public void run() {
try {
consumer.subscribe(Collections.singleton(topicName));
while (!closed.get()) {
final ConsumerRecords<String, T> records = consumer.poll(Duration.ofMillis(pollingTimeout));
records.forEach(this::handleMessage);
consumer.commitSync();
}
} catch (final WakeupException e) {
// Ignore exception if closing
if (!closed.get()) {
throw e;
}
} catch (final Exception e) {
log.error("An error occurred while trying to poll records from topic!", e);
} finally {
consumer.close();
}
}

// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}

public boolean isClosed() {
return closed.get();
}

public void setClosed(boolean closed) {
this.closed.set(closed);
}

protected abstract boolean handleMessage(ConsumerRecord<String, T> consumerRecord);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package {{packageName}}.dummy.infrastructure.primary.kafka.consumer;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.stereotype.Service;

@Service
public class DummyConsumer extends AbstractConsumer<String> {
private final Logger log = LoggerFactory.getLogger(DummyConsumer.class);
public DummyConsumer(
@Value("${kafka.topic.dummy}") final String topicName,
@Value("${kafka.polling.timeout}") final int pollingTimeout,
final KafkaConsumer<String, String> kafkaConsumer
) {
super(topicName, pollingTimeout, kafkaConsumer);
}

@PostConstruct
public void init() {
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}

@PreDestroy
public void destroy() {
shutdown();
}

@Override
protected boolean handleMessage(final ConsumerRecord<String, String> consumerRecord) {
// /!\ Maybe you could delete the next log calls to avoid disclosing personal user information
final String value = consumerRecord.value();
if (value == null) {
log.error("Null value in record {}", consumerRecord);
return false;
}

log.info("Handling record: {}", value);
// Here is where you can handle your records
return true;
}

@Bean
public void executeKafkaRunner() {
new SimpleAsyncTaskExecutor().execute(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package {{packageName}}.dummy.infrastructure.primary.kafka.consumer;

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import {{packageName}}.IntegrationTest;
import {{packageName}}.dummy.infrastructure.secondary.kafka.producer.DummyProducer;

@IntegrationTest
class DummyConsumerIT {
@Autowired
private DummyProducer dummyProducer;
@Autowired
private DummyConsumer dummyConsumer;
@Test
void shouldHandleMessage() {
final String messageToSend = "dummy";
dummyProducer.send(messageToSend);
ConsumerRecord<String, String> record = new ConsumerRecord<>("queue.kafkaapp.dummy", 0, 0, null, messageToSend);
boolean actualResult = dummyConsumer.handleMessage(record);
assertThat(actualResult).isTrue();
}
}
Loading

0 comments on commit aae5829

Please sign in to comment.