本篇文章主要介绍了"104-storm 整合 kafka之保存MySQL数据库",主要涉及到Exception,zookeeper方面的内容,对于MySql感兴趣的同学可以参考一下:
整合KafkaStorm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程...
整合Kafka+Storm,消息通过各种方式进入到Kafka消息中间件,比如通过使用Flume来收集的日志数据,然后暂由Kafka中的路由暂存,然后在由实时计算程序Storm做实时分析,这时候我们需要讲Storm中的Spout中读取Kafka中的消息,然后交由具体的Bolt组件分析处理。实际上在 apache-storm-0.9.3这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置。1、配置Maven依赖包
org.apache.kafka
kafka_2.10
0.8.2.0
org.slf4j
slf4j-log4j12
org.apache.storm
storm-core
0.9.3
provided
org.slf4j
log4j-over-slf4j
org.slf4j
slf4j-api
org.apache.storm
storm-kafka
0.9.3
storm程序能接收到数据,并进行处理,但是会发现数据被重复处理这是因为在bolt中没有对数据进行确认,需要调用ack或者fail方法, 修改完成之后即可。2、编写Storm程序package com.yun.storm;
import java.util.UUID;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
/**
* Storm读取Kafka消息中间件数据
*
* @author shenfl
*
*/
public class KafkaLogProcess {
private static final String BOLT_ID = LogFilterBolt.class.getName();
private static final String SPOUT_ID = KafkaSpout.class.getName();
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
//表示kafka使用的zookeeper的地址
String brokerZkStr = "192.168.2.20:2181";
ZkHosts zkHosts = new ZkHosts(brokerZkStr);
//表示的是kafak中存储数据的主题名称
String topic = "mytopic";
//指定zookeeper中的一个根目录,里面存储kafkaspout读取数据的位置等信息
String zkRoot = "/kafkaspout";
String id = UUID.randomUUID().toString();
SpoutConfig spoutconf = new SpoutConfig(zkHosts, topic, zkRoot, id);
builder.setSpout(SPOUT_ID , new KafkaSpout(spoutconf));
builder.setBolt(BOLT_ID,new LogFilterBolt()).shuffleGrouping(SPOUT_ID);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(KafkaLogProcess.class.getSimpleName(), new Config(),builder.createTopology() );
}
}
package com.yun.storm;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
/**
* 处理来自KafkaSpout的tuple,并保存到数据库中
*
* @author shenfl
*
*/
public class LogFilterBolt extends BaseRichBolt {
private OutputCollector collector;
/**
*
*/
private static final long serialVersionUID = 1L;
Pattern p = Pattern.compile("省公司鉴权接口url\\[(.*)]\\,响应时间\\[([0-9]+)\\],当前时间\\[([0-9]+)\\]");
/**
* 每个LogFilterBolt实例仅初始化一次
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
try {
// 接收KafkaSpout的数据
byte[] bytes = input.getBinaryByField("bytes");
String value = new String(bytes).replaceAll("[\n\r]", "");
// 解析数据并入库
Matcher m = p.matcher(value);
if (m.find()) {
String url = m.group(1);
String usetime = m.group(2);
String currentTime = m.group(3);
System.out.println(url + "->" + usetime + "->" + currentTime);
}
this.collector.ack(input);
} catch (Exception e) {
this.collector.fail(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
3、解析日志入库3.1 引入Maven依赖包
commons-dbutils
commons-dbutils
1.6
mysql
mysql-connector-java
5.1.29
3.2 编写MyDbUtils工具类(1)创建数据表
create database jfyun;
CREATE TABLE `log_info` (
`id` int(10) NOT NULL AUTO_INCREMENT,
`topdomain` varchar(100) COLLATE latin1_german1_ci DEFAULT NULL,
`usetime` varchar(10) COLLATE latin1_german1_ci DEFAULT NULL,
`time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1803 DEFAULT CHARSET=latin1 COLLATE=latin1_german1_ci
(2)MyDbUtils的程序