Skip to content

Commit

Permalink
Merge pull request #6 from SWM-WeLike2Coding/feat/newTickerAlert
Browse files Browse the repository at this point in the history
feat: 카프카 컨슈머를 통해 메시지를 전달받고 slack에 전달하는 기능 구현
  • Loading branch information
kjungw1025 authored Aug 30, 2024
2 parents 3be5d98 + c7991fd commit d67051f
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 1 deletion.
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ dependencies {

// spring cloud
implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client'
implementation 'org.springframework.cloud:spring-cloud-starter-openfeign'

// kafka
implementation 'org.springframework.kafka:spring-kafka'

// lombok
compileOnly 'org.projectlombok:lombok'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class ElSwhereNotificationServiceApplication {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.wl2c.elswherenotificationservice.client.slack.api;

import com.wl2c.elswherenotificationservice.client.slack.dto.RequestMessageDto;
import jakarta.validation.Valid;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

@FeignClient(name = "slack-client", url = "${slack.api.url}")
public interface SlackClient {

@PostMapping
ResponseEntity<String> sendAlert(@Valid @RequestBody RequestMessageDto requestMessageDto);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.wl2c.elswherenotificationservice.client.slack.dto;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;

@Getter
public class RequestMessageDto {

@Schema(description = "전달할 메시지 내용")
private final String text;

@JsonCreator
public RequestMessageDto(@JsonProperty("text") String text) {
this.text = text;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.wl2c.elswherenotificationservice.notification.controller;
package com.wl2c.elswherenotificationservice.domain.health;

import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.wl2c.elswherenotificationservice.domain.slack.model.dto;

import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Getter
@NoArgsConstructor
@ToString
public class NewTickerMessage {

@NotNull
private Long productId;

@NotNull
private String productName;

@NotNull
private String equity;

@Builder
private NewTickerMessage(Long productId,
String productName,
String equity) {
this.productId = productId;
this.productName = productName;
this.equity = equity;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.wl2c.elswherenotificationservice.domain.slack.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.wl2c.elswherenotificationservice.client.slack.api.SlackClient;
import com.wl2c.elswherenotificationservice.client.slack.dto.RequestMessageDto;
import com.wl2c.elswherenotificationservice.domain.slack.model.dto.NewTickerMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
@Slf4j
public class NewTickerMessageReceiver {

private final SlackClient slackClient;

@KafkaListener(topics = "new-ticker-alert", groupId = "new-ticker-alert-consumer", containerFactory = "kafkaConsumerContainerFactory")
public void receive(String stringMessage) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
NewTickerMessage newTickerMessage = objectMapper.readValue(stringMessage, NewTickerMessage.class);
log.info("new-ticker-alert Message Consumed : " + stringMessage);

ResponseEntity<String> response = slackClient.sendAlert(createMessage(newTickerMessage));
log.info("new-ticker-alert Response : " + response.getStatusCode());
}

private RequestMessageDto createMessage(NewTickerMessage newTickerMessage) {
String stringBuilder = "새로운 티커 정보 반영 필요\n" +
"상품 ID : " +
newTickerMessage.getProductId() +
"\n\n" +
"상품명 : " +
newTickerMessage.getProductName() +
"\n\n" +
"기초자산 : " +
newTickerMessage.getEquity();

return new RequestMessageDto(stringBuilder);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.wl2c.elswherenotificationservice.global.config.kafka;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.util.backoff.FixedBackOff;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaConsumerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(customErrorHandler());
return factory;
}

private ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);

return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
}

private DefaultErrorHandler customErrorHandler() {

return new DefaultErrorHandler((consumerRecord, e) -> {
log.error("[Error] topic = {}, key = {}, value = {}, error message = {}",
consumerRecord.topic(),
consumerRecord.key(),
consumerRecord.value(),
e.getMessage());
}, new FixedBackOff(1000L, 10));
}
}

0 comments on commit d67051f

Please sign in to comment.