jstorm和storm比较
jstorm 是阿里巴巴开源的基于storm采用Java重写的一套分布式实时流计算框架,使用简单,特点如下:
1,开发非常迅速: 接口简单,容易上手,只要遵守Topology,Spout,Bolt的编程规范即可开发出一个扩展性极好的应用,底层rpc,worker之间冗余,数据分流之类的动作完全不用考虑。
2,扩展性极好:当一级处理单元速度,直接配置一下并发数,即可线性扩展性能
3,健壮:当worker失效或机器出现故障时, 自动分配新的worker替换失效worker
4,数据准确性: 可以采用Acker机制,保证数据不丢失。 如果对精度有更多一步要求,采用事务机制,保证数据准确。
为什么要选择jstorm,而不采用twitter的storm呢?jstorm对比storm有如下优点:
1,Nimbus 实现HA
2,彻底解决Storm雪崩问题:底层RPC采用netty + disruptor保证发送速度和接受速度是匹配的
3,新增supervisor、Supervisor shutdown时、提交新任务,worker数不够时,均不自动触发任务rebalance
4,新topology不影响现有任务,新任务无需去抢占老任务的cpu,memory,disk和net
5,减少对ZK的访问量:去掉大量无用的watch;task的心跳时间延长一倍;Task心跳检测无需全ZK扫描减少对ZK的访问量:去掉大量无用的watch;task的心跳时间延长一倍;Task心跳检测无需全ZK扫描
6,Worker 内部全流水线模式:Spout nextTuple和ack/fail运行在不同线程
7,性能:采用ZeroMq, 比storm快30%;采用netty时, 和storm快10%,并且稳定非常多
入门jstorm demo
下面写一个 简单的demo,完成jstorm的初体验(上面是目录结构)。
这个小demo 包含4个类:
- com.muhao.demo.WordSpout 是 Topology 的 spout 组件,发送随机的 Word 。
- com.muhao.demo.CountBolt 是 Topology 的 bolt 组件,搜集spout发送过来的Word,并做统计,这里为了看到在大量的日志信息中看到实验结果,将实验结果打印到文件中了。
- com.muhao.demo.LocalPology 是 Local 模式的Topology 实现,local 模式就是为了方便调试,不需要任何的环境配置,直接可在eclipse中运行。也是入门的最好方式。
- com.muhao.demo.RemoteTopology 是要以jar的形式提交到集群上,运行的模式。
具体的class 源码在这里贴出来:
com.muhao.demo.WordSpout 实现:
package com.muhao.demo;import java.util.Map;import org.apache.commons.lang.math.RandomUtils;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;public class WordSpout implements IRichSpout {// 一定要 生成 一个 serialVersionUID,因为这些class 都是要经过序列化的private static final long serialVersionUID = -4515102038086645770L;private String[] strs= {"one","two","three","four","five","six"};SpoutOutputCollector collector;public WordSpout() {super();System.out.println("WordSpout()****************************");}/*** 定义发射的字段类型,是第一个要是执行的方法。* @param declarer*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}/*** 打开 stream 流资源,只会执行一次* @param conf* @param context* @param collector*/@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {System.out.println("*****************open(Map conf, TopologyContext context, SpoutOutputCollector collector)");this.collector=collector;}/*** 循环执行,向外发送 Tuple */@Overridepublic void nextTuple() {int index=RandomUtils.nextInt(6);collector.emit(new Values(strs[index]));System.out.println("***************nextTuple() : "+strs[index]);}@Overridepublic void close() {}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@Overridepublic void ack(Object msgId) {}@Overridepublic void fail(Object msgId) {}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}
com.muhao.demo.CountBolt 实现:
package com.muhao.demo;import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import org.apache.log4j.Logger;import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;public class CountBolt implements IRichBolt{// 一定要 生成 一个 serialVersionUID,因为这些class 都是要经过序列化的private static final long serialVersionUID = 8740926838799779884L;Map<String,Integer> map=new HashMap<>();private FileWriter writer;public CountBolt() {System.out.println("CountBolt:**********************************");}/*** prepare 在这里仅仅是启动了一个文件写的定时线程,每2秒将结果写到文件中,并清空map.*/@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);try {//以文件追加的方式打开文件writer = new FileWriter("e://my_log.txt",true);} catch (IOException e) {e.printStackTrace();}//开启定时线程pool.scheduleAtFixedRate(()->{try {writer.write("\r\n");writer.write("***************************************");System.out.println("***************************************");for(Entry<String, Integer> entry:map.entrySet()) {writer.write(entry.getKey()+" : "+entry.getValue());writer.write("\r\n");System.out.println(entry.getKey()+" : "+entry.getValue());}writer.flush();map.clear();} catch (IOException e) {e.printStackTrace();}}, 2000L, 2000L, TimeUnit.MILLISECONDS);}/*** 收到 spout 发送来的 Word 进行统计*/@Overridepublic void execute(Tuple input) {System.out.println("**********execute(Tuple input)");String word=input.getString(0);if(map.get(word)==null) {map.put(word, 1);}else {map.put(word, map.get(word)+1);}}/*** Topology 被 shutdown时会被触发的动作,我们可以用来做一些清理工作*/@Overridepublic void cleanup() {System.out.println("*******************public void cleanup()");for(Entry<String, Integer> entry:map.entrySet()) {System.out.println(entry.getKey()+" : "+entry.getValue());}map.clear();}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}
com.muhao.demo.LocalPology 实现:
package com.muhao.demo;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;public class LocalPology {public static void main(String[] args) throws InterruptedException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("out-word", new WordSpout(),1);builder.setBolt("word-count", new CountBolt(),1).shuffleGrouping("out-word");//本地模式:本地提交LocalCluster cluster = new LocalCluster(); Config conf = new Config();conf.setNumWorkers(2);conf.setDebug(true);conf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);cluster.submitTopology("firstTopo", conf, builder.createTopology()); //一定要等待足够的时间,否则程序没来得及运行就已经结束了,程序启动需要消耗时间Thread.sleep(30000); cluster.killTopology("firstTopo"); cluster.shutdown(); }
}
com.muhao.demo.RemoteTopology 实现:
package com.muhao.demo;import java.util.HashMap;
import java.util.Map;import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.SpoutDeclarer;
import backtype.storm.topology.TopologyBuilder;/*** 抄自 http://www.jstorm.io/QuickStart_cn/Example.html */
public class RemoteTopology {public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {Map conf = new HashMap();//topology所有自定义的配置均放入这个MapTopologyBuilder builder = new TopologyBuilder();//创建topology的生成器int spoutParal = 1;//获取spout的并发设置SpoutDeclarer spout = builder.setSpout("out-word",new WordSpout(), spoutParal);//创建Spout, 其中new SequenceSpout() 为真正spout对象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 为spout的名字,注意名字中不要含有空格int boltParal = 1;//获取bolt的并发设置BoltDeclarer totalBolt = builder.setBolt("word-count", new CountBolt(),boltParal).shuffleGrouping("out-word");//创建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 为bolt名字,TotalCount 为bolt对象,boltParal为bolt并发数,//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME), //表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的数据,并且以shuffle方式,//即每个spout随机轮询发送tuple到下一级bolt中int ackerParal = 1;Config.setNumAckers(conf, ackerParal);//设置表示acker的并发数int workerNum = 2;conf.put(Config.TOPOLOGY_WORKERS, workerNum);//表示整个topology将使用几个workerconf.put(Config.STORM_CLUSTER_MODE, "distributed");//设置topolog模式为分布式,这样topology就可以放到JStorm集群上运行StormSubmitter.submitTopology("streamName", conf,builder.createTopology());//提交topology}}
pom.xml 依赖:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.muhao.test</groupId><artifactId>z_jstorm_demo</artifactId><version>0.0.1-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><jstorm.version>2.2.1</jstorm.version><storm.version>storm-0.9.2-incubating</storm.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><!-- <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version></dependency> --><!-- 下面是jstorm运行依赖 --><dependency><groupId>com.alibaba.jstorm</groupId><artifactId>jstorm-core</artifactId><version>${jstorm.version}</version><scope>provided</scope></dependency><!-- 下面是storm运行依赖 --><!-- <dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.0.1</version><scope>provided</scope></dependency> --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.10</version><scope>test</scope></dependency><dependency><groupId>commons-collections</groupId><artifactId>commons-collections</artifactId><version>3.2.1</version></dependency><dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId><version>1.11</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.4</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>16.0.1</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version></dependency><dependency><groupId>io.dropwizard.metrics</groupId><artifactId>metrics-core</artifactId><version>3.1.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><configuration><!--<forkCount>1</forkCount> --><forkMode>pertest</forkMode><argLine>-Xms1024m -Xmx4096m</argLine></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>1.7.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.muhao.demo.RemoteTopology</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>
在本地运行 LocalPology main 方法结果如下:
写入的文件内容:
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************six : 84
four : 64
one : 91
three : 64
two : 84
five : 58***************************************six : 11973
four : 12195
one : 12220
two : 11924
five : 11975
three : 11966***************************************six : 11702
four : 11670
one : 11727
three : 11595
five : 11573
two : 11596***************************************six : 11126
four : 11083
one : 11116
three : 10981
five : 11014
two : 10849***************************************six : 10798
four : 10803
one : 10853
five : 10729
two : 10604
three : 10682***************************************
***************************************
***************************************
***************************************
可见 在开始的 20 秒左右的时间程序没有启动,没有 运行 spout 和 bolt 程序,所以设置 Sleep 时间要长一些。
在项目根目录执行 mvn package 打包jar 上传 jstorm集群运行
由于没有手动 shutdown,所以只有通过文件查看结果,和面结果一样,就不显示了。
storm运行相同(呵呵)代码
jstorm 说的是原先在storm运行的代码不用动一行就可以直接运行,在原来的基础上在pom.xml 中将jstorm的依赖改成对storm的依赖,但是还需要在具体的类中更改引用的class:要将backtype.storm.XXX 改成 org.apache.storm.XXX 。 因为 jstorm : backtype.storm ,而 storm : org.apache.storm 。并且由于jstorm写的 类 和 storm 写的 类都是相同的,比如 :jstorm 的 backtype.storm.tuple.Tuple; 和 storm 的 org.apache.storm.tuple.Tuple 。
具体的代码在我的博客中也上传了。