通过SpringBoot、Kafkah和Storm的整合,实现了项目KafkaProducerStorm中请求数据库的数据并将数据发送到Kafka,在项目KafkaConsumerStorm中消费Kafka的数据并将消费掉的数据传给Storm进行判断,然后将符合条件的数据信息插入到数据库(项目源码点击这里)。
该项目主要是向Kafka发送数据。 1.KafkaProducerUtil类 在项目中创建了KafkaProducerUtil类,用sendMessage(String msg,String url,String topicName)以向Kafka发送数据,其中kafka的相关配置都放在application.properties中,详细代码如下:
package com.KafakaProducer.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Component;
import org.apache.kafka.clients.producer.KafkaProducer;
@Component
public final class KafkaProducerUtil {
//向kafka发送单条消息
public static boolean sendMessage(String msg,String url,String topicName) {
KafkaProducer<String, String> producer=null;
boolean falg=false;
try{
Properties props=init(url);
producer= new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>(topicName,msg));
falg=true;
}catch(Exception e){
e.printStackTrace();
}finally{
producer.close();
}
return falg;
}
//初始化配置
private static Properties init(String url){
Properties props = new Properties();
props.put("bootstrap.servers", url);
//acks=0:如果设置为0,生产者不会等待kafka的响应。
//acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
//acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
props.put("acks", "all");
//配置为大于0的值的话,客户端会在消息发送失败时重新发送。
props.put("retries", 0);
//当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
return props;
}
}
2.InfoController类 在controller类里面主要负责处理请求返回数据,有两种方法,一是通过id查询到一条数据将该条数据发往Kafka,二是查询数据库的所有数据,然后将拿到的列表形式数据逐一的发往Kafka。在发往Kafka时需要将拿到的数据通过实体类里面定义的toString方法转换成字符串,如下:
@Override
public String toString() {
return JSON.toJSONString(this);
}
另外,该toString方法会对数据的内容按照字母顺序排序,可以在实体类中通过注解@JSONType(orders={"id","name","age"})来按指定顺序排序,InfoController类的代码如下:
package com.KafakaProducer.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.KafakaProducer.config.ApplicationConfiguration;
import com.KafakaProducer.dao.InfoDao;
import com.KafakaProducer.entity.Info;
import com.KafakaProducer.producer.KafkaProducerUtil;
import com.KafakaProducer.service.InfoService;
import java.io.IOException;
import java.util.List;
@RestController
@RequestMapping(value = "/api")
public class InfoController {
@Autowired
private InfoService infoService;
@Autowired
private InfoDao infoDao;
@Autowired
ApplicationConfiguration app;
//根据id查询信息
@RequestMapping(value = "/findById",method = RequestMethod.GET)
public Info findById(@RequestParam(value = "id",required = true) int id) throws IOException{
System.out.println("开始根据id查询信息!");
Info result = infoDao.findById(id);
KafkaProducerUtil.sendMessage(result.toString(), app.getServers(), app.getTopicName());
return infoService.findById(id);
}
//查询所有信息
@PostMapping(value = "/findAll")
public List<Info> findAll() {
List<Info> result = infoService.findAll();//列表形式
//逐条向Kafka发送消息
for(int i=0;i<result.size();i++) {//.size()就是获取到ArrayList中存储的对象的个数
//.get(i)当i=0时,取得是list集合中第一个元素
KafkaProducerUtil.sendMessage(result.get(i).toString(), app.getServers(), app.getTopicName());
}
return infoService.findAll();
}
}
该项目用来消费Kafka的数据,并将消费掉的数据通过Storm处理后再插入到数据库。在项目中创建storm包,其下创建Spout、Bolt及Topology类来实现Storm对数据的处理。 1.Spout类 Spout是Storm获取数据的一个组件,主要实现nextTuple方法,在其中添加从Kafka消费获取数据的代码,就能将数据进一步交给Bolt进行处理,代码如下:
package com.KafkaConsumer.storm;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.KafkaConsumer.config.ApplicationConfiguration;
import com.KafkaConsumer.constant.Constants;
import com.KafkaConsumer.entity.InfoGet;
import com.KafkaConsumer.util.GetSpringBean;
import com.alibaba.fastjson.JSON;
public class Spout extends BaseRichSpout{
private static final long serialVersionUID = -2548451744178936478L;
private static final Logger logger = LoggerFactory.getLogger(Spout.class);
private SpoutOutputCollector collector;
private KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
private ApplicationConfiguration app;
@SuppressWarnings("rawtypes")
@Override
public void open(Map map, TopologyContext arg1, SpoutOutputCollector collector) {
app=GetSpringBean.getBean(ApplicationConfiguration.class);
kafkaInit();
this.collector = collector;
}
@Override
public void nextTuple() {
for (;;) {
try {
msgList = consumer.poll(100);
if (null != msgList && !msgList.isEmpty()) {
String msg = "";
List<InfoGet> list=new ArrayList<InfoGet>();
for (ConsumerRecord<String, String> record : msgList) {
// 原始数据
msg = record.value();
if (null == msg || "".equals(msg.trim())) {
continue;
}
try{
list.add(JSON.parseObject(msg, InfoGet.class));
}catch(Exception e){
logger.error("数据格式不符!数据:{}",msg);
continue;
}
}
logger.info("Spout发射的数据:"+list);
//发送到bolt中
this.collector.emit(new Values(JSON.toJSONString(list)));
consumer.commitAsync();
}else{
TimeUnit.SECONDS.sleep(3);
logger.info("未拉取到数据...");
}
} catch (Exception e) {
logger.error("消息队列处理异常!", e);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e1) {
logger.error("暂停失败!",e1);
}
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(Constants.FIELD));
}
/**
* 初始化kafka配置
*/
private void kafkaInit(){
Properties props = new Properties();
props.put("bootstrap.servers", app.getServers());
props.put("max.poll.records", app.getMaxPollRecords());
props.put("enable.auto.commit", app.getAutoCommit());
props.put("group.id", app.getGroupId());
props.put("auto.offset.reset", app.getCommitRule());
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
String topic=app.getTopicName();
this.consumer.subscribe(Arrays.asList(topic));
logger.info("消息队列[" + topic + "] 开始初始化...");
}
}
2.Bolt类 Bolt类主要是对数据进行处理,对数据做一个简单的判断,然后将符合条件的插入到数据库,Bolt类主要处理业务逻辑的方法是execute,主要实现的方法也是写在其中,这里主要是对数据的age属性做一判断,如果age>10,则可以进一步插入数据库,否则丢弃,代码如下:
package com.KafkaConsumer.storm;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.KafkaConsumer.constant.Constants;
import com.KafkaConsumer.entity.InfoGet;
import com.KafkaConsumer.service.InfoGetService;
import com.KafkaConsumer.util.GetSpringBean;
import com.alibaba.fastjson.JSON;
public class Bolt extends BaseRichBolt{
private static final long serialVersionUID = 6542256546124282695L;
private static final Logger logger = LoggerFactory.getLogger(Bolt.class);
private InfoGetService infoGetService;
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map map, TopologyContext arg1, OutputCollector collector) {
infoGetService=GetSpringBean.getBean(InfoGetService.class);
}
@Override
public void execute(Tuple tuple) {
String msg=tuple.getStringByField(Constants.FIELD);
try{
List<InfoGet> listUser =JSON.parseArray(msg,InfoGet.class);
//移除age小于10的数据
if(listUser!=null&&listUser.size()>0){
Iterator<InfoGet> iterator = listUser.iterator();
while (iterator.hasNext()) {
InfoGet user = iterator.next();
if (user.getAge()<10) {
logger.warn("Bolt移除的数据:{}",user);
iterator.remove();
}
}
if(listUser!=null&&listUser.size()>0){
infoGetService.insert(listUser);
}
}
}catch(Exception e){
logger.error("Bolt的数据处理失败!数据:{}",msg,e);
}
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
}
3.Topology类 Topology类是Storm的主类,主要是对Topology(拓步)进行提交,在其中需要对spout和bolt进行相应的设置,如下:
package com.KafkaConsumer.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.KafkaConsumer.constant.Constants;
@Component
public class Topology {
private final Logger logger = LoggerFactory.getLogger(Topology.class);
public void runStorm(String[] args) {
// 定义一个拓扑
TopologyBuilder builder = new TopologyBuilder();
// 设置1个Executeor(线程),默认一个
builder.setSpout(Constants.KAFKA_SPOUT, new Spout(), 1);
// shuffleGrouping:表示是随机分组
// 设置1个Executeor(线程),和两个task
builder.setBolt(Constants.INSERT_BOLT, new Bolt(), 1).setNumTasks(1).shuffleGrouping(Constants.KAFKA_SPOUT);
Config conf = new Config();
//设置一个应答者
conf.setNumAckers(1);
//设置一个work
conf.setNumWorkers(1);
try {
// 有参数时,表示向集群提交作业,并把第一个参数当做topology名称
// 没有参数时,本地提交
if (args != null && args.length > 0) {
logger.info("运行远程模式");
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
// 启动本地模式
logger.info("运行本地模式");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Topology", conf, builder.createTopology());
}
} catch (Exception e) {
logger.error("storm启动失败!程序退出!",e);
System.exit(1);
}
logger.info("storm启动成功...");
}
}
4.App.java 该类是SpringBoot启动的主类,在其中添加Storm的Topology,Storm会随着SpringBoot启动而启动,如下:
package com.KafkaConsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import com.KafkaConsumer.storm.Topology;
import com.KafkaConsumer.util.GetSpringBean;
/**
* Hello world!
*
*/
@SpringBootApplication
public class App
{
public static void main(String[] args) {
// 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
ConfigurableApplicationContext context = SpringApplication.run(App.class, args);
GetSpringBean springBean=new GetSpringBean();
springBean.setApplicationContext(context);
Topology app = context.getBean(Topology.class);
app.runStorm(args);
}
}
- 启动KafkaProducerStorm项目,测试findAll方法,在postman浏览器中输入请求,send之后,会将数据库的所有数据返回,如下: 打开Kafka Tool,在对应的topic下,可看到数据已写入Kafka:
- 启动KafkaConsumerStorm项目,控制台会显示如下处理结果:
Spout发射的数据:[{"age":11,"id":1,"name":"aa"}, {"age":22,"id":2,"name":"bb"}, {"age":33,"id":3,"name":"cc"}, {"age":4,"id":4,"name":"dd"}, {"age":8,"id":5,"name":"ee"}] Bolt移除的数据:{"age":4,"id":4,"name":"dd"} Bolt移除的数据:{"age":8,"id":5,"name":"ee"} 数据写入成功!