一、需求:统计网站访问量(实时统计)
技术选型:特点(数据量大、做计算、实时)实时流式计算框架:storm1)spout 数据源,接入数据源 本地文件2)splitbolt 业务逻辑处理 切分数据 拿到网址3)bolt 累加次数求和
1、PvCountSpout类
package com.demo.pvcount;import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.util.Map;import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class PvCountSpout implements IRichSpout{private SpoutOutputCollector collector;private BufferedReader br;private String line;@Overridepublic void nextTuple() {//发送读取的数据的每一行try {while((line = br.readLine())!= null) {//发送数据到splitboltcollector.emit(new Values(line));//设置延迟Thread.sleep(500);}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {this.collector = collector;//读取文件try {br = new BufferedReader(new InputStreamReader(new FileInputStream("e:/weblog.log")));} catch (FileNotFoundException e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {//声明declarer.declare(new Fields("logs"));}//处理tuple成功 回调的方法 @Overridepublic void ack(Object arg0) {}//如果spout在失效的模式中 调用此方法来激活 @Overridepublic void activate() {}//在spout程序关闭前执行 不能保证一定被执行 kill -9 是不执行 storm kill 是不执行 @Overridepublic void close() {}//在spout失效期间,nextTuple不会被调用 @Overridepublic void deactivate() {}//处理tuple失败回调的方法 @Overridepublic void fail(Object arg0) {}//配置 @Overridepublic Map<String, Object> getComponentConfiguration() {return null;} }
2、PvCountSplitBolt类
package com.demo.pvcount;import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class PvCountSplitBolt implements IRichBolt{private OutputCollector collector;//一个bolt即将关闭时调用 不能保证一定被调用 资源清理 @Overridepublic void cleanup() {}private int pvnum = 0;//业务逻辑 分布式 集群 并发度 线程 (接收tuple然后进行处理) @Overridepublic void execute(Tuple input) {//1.获取数据String line = input.getStringByField("logs");//2.切分数据String[] fields = line.split("\t");String session_id = fields[1];//3.局部累加if (session_id != null) {//累加pvnum++;//输出collector.emit(new Values(Thread.currentThread().getId(),pvnum));}}//初始化调用 @Overridepublic void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {this.collector = collector;}//声明 @Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {//声明输出declarer.declare(new Fields("threadid","pvnum"));}//配置 @Overridepublic Map<String, Object> getComponentConfiguration() {return null;} }
3、PvCountSumBolt类
package com.demo.pvcount;import java.util.HashMap; import java.util.Iterator; import java.util.Map;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; public class PvCountSumBolt implements IRichBolt{private HashMap<Long, Integer> hashMap = new HashMap<>();@Overridepublic void cleanup() {}//全局累加求和 业务逻辑 @Overridepublic void execute(Tuple input) {//1.获取数据Long threadid = input.getLongByField("threadid");Integer pvnum = input.getIntegerByField("pvnum");//2.创建集合 存储(threadid,pvnum) 15 20 hashMap.put(threadid, pvnum);//3.累加求和(拿到集合中所有value值)Iterator<Integer> iterator = hashMap.values().iterator();//4.清空之前的数据int sumnum = 0;while (iterator.hasNext()) {sumnum += iterator.next();}System.err.println(Thread.currentThread().getName() + "总访问量为->" + sumnum);}@Overridepublic void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {}@Overridepublic void declareOutputFields(OutputFieldsDeclarer arg0) {}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;} }
4、PvCountDriver类
package com.demo.pvcount;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;public class PvCountDriver {public static void main(String[] args) {// 1.hadoop->Job storm->topology 创建拓扑TopologyBuilder builder = new TopologyBuilder();// 2.指定设置builder.setSpout("PvCountSpout", new PvCountSpout(), 1);builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4).fieldsGrouping("PvCountSpout", new Fields("logs"));builder.setBolt("PvCountSumBolt", new PvCountSumBolt(), 1).fieldsGrouping("PvCountSplitBolt", new Fields("pvnum"));// 3.创建配置信息Config conf = new Config();conf.setNumWorkers(2);// 4.提交任务LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("pvcounttopology", conf, builder.createTopology());} }
5、PvCountDriver_Shuffle类
package com.demo.pvcount;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder;public class PvCountDriver_Shuffle {public static void main(String[] args) {// 1.hadoop->Job storm->topology 创建拓扑TopologyBuilder builder = new TopologyBuilder();// 2.指定设置builder.setSpout("PvCountSpout", new PvCountSpout(), 1);builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4).shuffleGrouping("PvCountSpout");builder.setBolt("PvCountSumBolt", new PvCountSumBolt(), 2).shuffleGrouping("PvCountSplitBolt");// 3.创建配置信息Config conf = new Config();conf.setNumWorkers(2);// 4.提交任务LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("pvcounttopology", conf, builder.createTopology());} }
6、weblog.log文件
storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 10:40:49 storm.apache.org VVVYH6Y4V4SFXZWWEQRQWEQ 2018-08-07 08:40:50 storm.apache.org BBYH61456DEL89RG5VV9UYU7 2018-08-07 10:40:49 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 09:40:49 storm.apache.org CCYH6Y4V4SCVXTG6DPB4VH9U123 2018-08-07 10:40:49 storm.apache.org CCYH6Y4V4SCVXTG6DPB4VH9U123 2018-08-07 12:40:49 storm.apache.org VVVYH6Y4V4SFXZWWEQRQWEQ 2018-08-07 08:40:52 storm.apache.org CCYH6Y4V4SCVXTG6DPB4VH9U123 2018-08-07 08:40:50 storm.apache.org VVVYH6Y4V4SFXZWWEQRQWEQ 2018-08-07 09:40:49...
...
... storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 08:40:53 storm.apache.org BBYH61456DEL89RG5VV9UYU7 2018-08-07 12:40:49 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 08:40:51 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 10:40:49 storm.apache.org HUNTERH6YCGFJYERTT834R52FDXV9U34 2018-08-07 08:40:53 storm.apache.org BBYH61456DEL89RG5VV9UYU7 2018-08-07 08:40:50 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 08:40:53 storm.apache.org VVVYH6Y4V4SFXZWWEQRQWEQ 2018-08-07 10:40:49
7、运行(4)中的main方法,控制台显示如下图:
此时在weblog.log文件中增加几条数据,则总访问量相应增加几条。
至此,简单实现了网站访问量实时统计。