jstorm storm 入门demo

article/2025/8/30 16:25:35

jstorm和storm比较

jstorm 是阿里巴巴开源的基于storm采用Java重写的一套分布式实时流计算框架,使用简单,特点如下:

1,开发非常迅速: 接口简单,容易上手,只要遵守Topology,Spout,Bolt的编程规范即可开发出一个扩展性极好的应用,底层rpc,worker之间冗余,数据分流之类的动作完全不用考虑。
2,扩展性极好:当一级处理单元速度,直接配置一下并发数,即可线性扩展性能
3,健壮:当worker失效或机器出现故障时, 自动分配新的worker替换失效worker
4,数据准确性: 可以采用Acker机制,保证数据不丢失。 如果对精度有更多一步要求,采用事务机制,保证数据准确。


为什么要选择jstorm,而不采用twitter的storm呢?jstorm对比storm有如下优点:

1,Nimbus 实现HA
2,彻底解决Storm雪崩问题:底层RPC采用netty + disruptor保证发送速度和接受速度是匹配的
3,新增supervisor、Supervisor shutdown时、提交新任务,worker数不够时,均不自动触发任务rebalance
4,新topology不影响现有任务,新任务无需去抢占老任务的cpu,memory,disk和net
5,减少对ZK的访问量:去掉大量无用的watch;task的心跳时间延长一倍;Task心跳检测无需全ZK扫描减少对ZK的访问量:去掉大量无用的watch;task的心跳时间延长一倍;Task心跳检测无需全ZK扫描
6,Worker 内部全流水线模式:Spout nextTuple和ack/fail运行在不同线程

7,性能:采用ZeroMq, 比storm快30%;采用netty时, 和storm快10%,并且稳定非常多


入门jstorm demo


下面写一个 简单的demo,完成jstorm的初体验(上面是目录结构)。
这个小demo 包含4个类:

  1. com.muhao.demo.WordSpout 是 Topology 的 spout 组件,发送随机的 Word 。
  2. com.muhao.demo.CountBolt 是 Topology 的 bolt  组件,搜集spout发送过来的Word,并做统计,这里为了看到在大量的日志信息中看到实验结果,将实验结果打印到文件中了。
  3. com.muhao.demo.LocalPology 是 Local 模式的Topology 实现,local 模式就是为了方便调试,不需要任何的环境配置,直接可在eclipse中运行。也是入门的最好方式。
  4. com.muhao.demo.RemoteTopology 是要以jar的形式提交到集群上,运行的模式。

具体的class 源码在这里贴出来:

com.muhao.demo.WordSpout 实现:

package com.muhao.demo;import java.util.Map;import org.apache.commons.lang.math.RandomUtils;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;public class WordSpout implements IRichSpout {// 一定要 生成 一个 serialVersionUID,因为这些class 都是要经过序列化的private static final long serialVersionUID = -4515102038086645770L;private String[] strs= {"one","two","three","four","five","six"};SpoutOutputCollector collector;public WordSpout() {super();System.out.println("WordSpout()****************************");}/*** 定义发射的字段类型,是第一个要是执行的方法。* @param declarer*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}/*** 打开 stream 流资源,只会执行一次* @param conf* @param context* @param collector*/@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {System.out.println("*****************open(Map conf, TopologyContext context, SpoutOutputCollector collector)");this.collector=collector;}/*** 循环执行,向外发送 Tuple */@Overridepublic void nextTuple() {int index=RandomUtils.nextInt(6);collector.emit(new Values(strs[index]));System.out.println("***************nextTuple() : "+strs[index]);}@Overridepublic void close() {}@Overridepublic void activate() {}@Overridepublic void deactivate() {}@Overridepublic void ack(Object msgId) {}@Overridepublic void fail(Object msgId) {}@Overridepublic Map<String, Object> getComponentConfiguration() {return null;}}

com.muhao.demo.CountBolt 实现:

package com.muhao.demo;import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import org.apache.log4j.Logger;import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;public class CountBolt implements IRichBolt{// 一定要 生成 一个 serialVersionUID,因为这些class 都是要经过序列化的private static final long serialVersionUID = 8740926838799779884L;Map<String,Integer> map=new HashMap<>();private FileWriter writer;public CountBolt() {System.out.println("CountBolt:**********************************");}/*** prepare 在这里仅仅是启动了一个文件写的定时线程,每2秒将结果写到文件中,并清空map.*/@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);try {//以文件追加的方式打开文件writer = new FileWriter("e://my_log.txt",true);} catch (IOException e) {e.printStackTrace();}//开启定时线程pool.scheduleAtFixedRate(()->{try {writer.write("\r\n");writer.write("***************************************");System.out.println("***************************************");for(Entry<String, Integer> entry:map.entrySet()) {writer.write(entry.getKey()+" : "+entry.getValue());writer.write("\r\n");System.out.println(entry.getKey()+" : "+entry.getValue());}writer.flush();map.clear();} catch (IOException e) {e.printStackTrace();}}, 2000L, 2000L, TimeUnit.MILLISECONDS);}/*** 收到 spout 发送来的 Word 进行统计*/@Overridepublic void execute(Tuple input) {System.out.println("**********execute(Tuple input)");String word=input.getString(0);if(map.get(word)==null) {map.put(word, 1);}else {map.put(word, map.get(word)+1);}}/*** Topology 被 shutdown时会被触发的动作,我们可以用来做一些清理工作*/@Overridepublic void cleanup() {System.out.println("*******************public void cleanup()");for(Entry<String, Integer> entry:map.entrySet()) {System.out.println(entry.getKey()+" : "+entry.getValue());}map.clear();}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}

com.muhao.demo.LocalPology 实现:

package com.muhao.demo;import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;public class LocalPology {public static void main(String[] args) throws InterruptedException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("out-word", new WordSpout(),1);builder.setBolt("word-count", new CountBolt(),1).shuffleGrouping("out-word");//本地模式:本地提交LocalCluster cluster = new LocalCluster();  Config conf = new Config();conf.setNumWorkers(2);conf.setDebug(true);conf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);cluster.submitTopology("firstTopo", conf, builder.createTopology());  //一定要等待足够的时间,否则程序没来得及运行就已经结束了,程序启动需要消耗时间Thread.sleep(30000);  cluster.killTopology("firstTopo");  cluster.shutdown();  }
}

com.muhao.demo.RemoteTopology 实现:

package com.muhao.demo;import java.util.HashMap;
import java.util.Map;import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.SpoutDeclarer;
import backtype.storm.topology.TopologyBuilder;/*** 抄自 http://www.jstorm.io/QuickStart_cn/Example.html */
public class RemoteTopology {public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {Map conf = new HashMap();//topology所有自定义的配置均放入这个MapTopologyBuilder builder = new TopologyBuilder();//创建topology的生成器int spoutParal = 1;//获取spout的并发设置SpoutDeclarer spout = builder.setSpout("out-word",new WordSpout(), spoutParal);//创建Spout, 其中new SequenceSpout() 为真正spout对象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 为spout的名字,注意名字中不要含有空格int boltParal = 1;//获取bolt的并发设置BoltDeclarer totalBolt = builder.setBolt("word-count", new CountBolt(),boltParal).shuffleGrouping("out-word");//创建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 为bolt名字,TotalCount 为bolt对象,boltParal为bolt并发数,//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME), //表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的数据,并且以shuffle方式,//即每个spout随机轮询发送tuple到下一级bolt中int ackerParal =  1;Config.setNumAckers(conf, ackerParal);//设置表示acker的并发数int workerNum = 2;conf.put(Config.TOPOLOGY_WORKERS, workerNum);//表示整个topology将使用几个workerconf.put(Config.STORM_CLUSTER_MODE, "distributed");//设置topolog模式为分布式,这样topology就可以放到JStorm集群上运行StormSubmitter.submitTopology("streamName", conf,builder.createTopology());//提交topology}}

pom.xml 依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.muhao.test</groupId><artifactId>z_jstorm_demo</artifactId><version>0.0.1-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><jstorm.version>2.2.1</jstorm.version><storm.version>storm-0.9.2-incubating</storm.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><!-- <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version></dependency> --><!-- 下面是jstorm运行依赖 --><dependency><groupId>com.alibaba.jstorm</groupId><artifactId>jstorm-core</artifactId><version>${jstorm.version}</version><scope>provided</scope></dependency><!-- 下面是storm运行依赖 --><!-- <dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.0.1</version><scope>provided</scope></dependency> --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.10</version><scope>test</scope></dependency><dependency><groupId>commons-collections</groupId><artifactId>commons-collections</artifactId><version>3.2.1</version></dependency><dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId><version>1.11</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.4</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>16.0.1</version></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version></dependency><dependency><groupId>io.dropwizard.metrics</groupId><artifactId>metrics-core</artifactId><version>3.1.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><configuration><!--<forkCount>1</forkCount> --><forkMode>pertest</forkMode><argLine>-Xms1024m -Xmx4096m</argLine></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>1.7.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.muhao.demo.RemoteTopology</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>

在本地运行 LocalPology main 方法结果如下:


写入的文件内容:

***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************
***************************************six : 84
four : 64
one : 91
three : 64
two : 84
five : 58***************************************six : 11973
four : 12195
one : 12220
two : 11924
five : 11975
three : 11966***************************************six : 11702
four : 11670
one : 11727
three : 11595
five : 11573
two : 11596***************************************six : 11126
four : 11083
one : 11116
three : 10981
five : 11014
two : 10849***************************************six : 10798
four : 10803
one : 10853
five : 10729
two : 10604
three : 10682***************************************
***************************************
***************************************
***************************************

可见 在开始的 20 秒左右的时间程序没有启动,没有 运行 spout 和 bolt 程序,所以设置 Sleep 时间要长一些。

在项目根目录执行 mvn package 打包jar 上传 jstorm集群运行

由于没有手动 shutdown,所以只有通过文件查看结果,和面结果一样,就不显示了。

storm运行相同(呵呵)代码

jstorm 说的是原先在storm运行的代码不用动一行就可以直接运行,在原来的基础上在pom.xml 中将jstorm的依赖改成对storm的依赖,但是还需要在具体的类中更改引用的class:要将backtype.storm.XXX 改成 org.apache.storm.XXX 。 因为 jstorm : backtype.storm ,而 storm : org.apache.storm 。并且由于jstorm写的 类 和 storm 写的 类都是相同的,比如 :jstorm 的 backtype.storm.tuple.Tuple; 和 storm 的 org.apache.storm.tuple.Tuple 。

具体的代码在我的博客中也上传了。





http://chatgpt.dhexx.cn/article/29KixCLb.shtml

相关文章

JStorm介绍

一、简介 JStorm是一个分布式实时计算引擎。JStorm是一个类似于Hadoop MapReduce的系统&#xff0c;用户按照指定的接口实现一个任务&#xff0c;然后将这个任务交给JStorm系统&#xff0c;JStorm将这个任务跑起来&#xff0c;并按7*24小时运行。如果中间一个worker发生了意外…

马氏距离实例详解

介绍 马氏距离是由印度统计学家马哈拉诺比斯&#xff08;P. C. Mahalanobis&#xff09;提出的&#xff0c;表示数据的协方差距离。它是一种有效的计算两个未知样本集的相似度的方法。与欧氏距离不同的是它考虑到各种特性之间的联系&#xff08;例如&#xff1a;一条关于身高的…

距离度量:闵氏、欧式、马氏、余弦、汉明等

目录 1. 闵氏距离&#xff08;Minkowski Distance&#xff09; 2. 欧式距离&#xff08;Euclidean Distance&#xff09; 3. 标准化欧式距离&#xff08;Standardized Euclidean distance&#xff09; 4. 马氏距离&#xff08;Mahalanobis Distance&#xff09; 5. 余弦距…

马氏距离(Mahalanobis Distance)推导及几何意义

看了一些博客对马氏距离的解释&#xff0c;似乎没有讲到本质的地方&#xff0c;本文从欧氏距离存在的问题开始入手&#xff0c;一步步推导出马氏距离&#xff0c;并得出结论&#xff1a;原始空间中的马氏距离等于坐标旋转变换及缩放后的空间中的欧氏距离。 假设数据集 X ∈ R N…

直观理解--马氏距离

首先我们很了解欧氏距离了&#xff0c;就是用来计算欧式空间&#xff08;就是我们常见的坐标系&#xff09;中两个点的距离的。 比如点 x ( x 1 , … , x n ) x (x_1,…,x_n) x(x1​,…,xn​) 和 y ( y 1 , … , y n ) y (y_1,…,y_n) y(y1​,…,yn​) 的欧氏距离为&…

Mahalanobis距离(马氏距离)的“哲学”解释

讲解教授&#xff1a;赵辉 (FROM : UESTC) 课程&#xff1a;《模式识别》 整理&#xff1a;PO主 基础知识&#xff1a; 假设空间中两点x&#xff0c;y&#xff0c;定义&#xff1a; 欧几里得距离&#xff0c; Mahalanobis距离&#xff0c; 不难发现&#xff0c;如果去掉…

六大距离:欧式距离、街道距离、马氏距离等

文章目录 1 简 介2 距离特征2.1 Euclidean距离2.2 Cosine距离2.3 manhattan距离2.4 chebyshev距离2.5 minkowski距离2.6 mahalanobis距离 3 代 码实现 1 简 介 数值向量是数据建模问题中最为常见的一类特征&#xff0c;例如&#xff1a; 在一些涉及图片&#xff0c;文本信息等…

马氏距离例题详解(全网最详细)

马氏距离例题详解 定义 马哈拉诺比斯距离是由印度统计学家马哈拉诺比斯 (英语)提出的&#xff0c;表示数据的协方差距离。它是一种有效的计算两个未知样本集的相似度的方法。与欧氏距离不同的是它考虑到各种特性之间的联系&#xff08;例如&#xff1a;一条关于身高的信息会带…

浅谈马氏距离【Mahalonobis Distance】

浅谈马氏距离【Mahalonobis Distance】 1. Introduction2. 欧式距离对于多元数据会存在一些什么问题&#xff1f;3 .什么是马氏距离4.马氏距离背后的数学和intuition5. 利用python来计算马氏距离6. Case1: 使用马氏距离进行多元异常值检测7. Case 2: 对分类问题应用马氏距离8. …

距离度量之马氏距离

马氏距离 用来度量一个样本点&#xff30;与数据分布为&#xff24;的集合的距离。 假设样本点为&#xff1a; 数据集分布的均值为&#xff1a; 协方差矩阵为&#xff33;。 则这个样本点&#xff30;与数据集合的马氏距离为&#xff1a; 马氏距离也可以衡量两个来自同一…

马氏距离(Mahalanobis Distance)介绍与实例

本文介绍马氏距离&#xff08;Mahalanobis Distance&#xff09;&#xff0c;通过本文&#xff0c;你将了解到马氏距离的含义、马氏距离与欧式距离的比较以及一个通过马氏距离进行异常检测的例子&#xff08;基于Python的sklearn包&#xff09;。 目的 计算两个样本间的距离时…

马氏距离-Mahalanobis Distance

一、学习目的 在训练one-shoting learning 的神经网路的时候&#xff0c;由于采用的是欧式距离&#xff0c;欧氏距离虽然很有用&#xff0c;但也有明显的缺点。它将样品的不同属性&#xff08;即各指标或各变量&#xff09;之间的差别等同看待&#xff0c;这一点有时不能满足实…

欧氏距离与马氏距离

Preface 之前在写《Multi-view CNNs for 3D Objects Recognition》的阅读笔记的时候&#xff0c;文章中的一个创新点便是将MVCNN网络提取到的3D Objects的形状特征描述符&#xff0c;投影到马氏距离&#xff08;Mahalanobis Distance&#xff09;上&#xff0c;“这样的话&…

马氏距离 Mahalanobis Distance

马氏距离 Mahalanobis Distance 1. 马氏距离定义2. 马氏距离实际意义2.1 欧氏距离近就一定相似&#xff1f;2.2 归一化后欧氏距离就一定相似&#xff1f;2.3 算上维度的方差就够了&#xff1f; 3. 马氏距离的几何意义4. 马氏距离的推导5. 马氏距离限制 Reference: 马氏距离(Mah…

马氏距离概念

马氏距离 一、基本概念&#xff1a; 方差&#xff1a;方差是标准差的平方&#xff0c;而标准差的意义是数据集中各个点到均值点距离的平均值。反应的是数据的离散程度。 协方差&#xff1a;标准差与方差是描述一维数据的&#xff0c;当存在多维数据时&#xff0c;我们通常需要知…

马氏距离通俗理解

基础知识&#xff1a; 假设空间中两点x&#xff0c;y&#xff0c;定义&#xff1a; 欧几里得距离&#xff0c; Mahalanobis距离&#xff0c; 不难发现&#xff0c;如果去掉马氏距离中的协方差矩阵&#xff0c;就退化为欧氏距离。那么我们就需要探究这个多出来的因子究竟有什么含…

马氏距离Mahalanobis Distance实例

简介 如果按照欧氏距离去理解马氏距离&#xff0c;一定会迷惑一段时间。因为欧氏距离可以计算两个点之间的距离&#xff0c;而马氏距离是计算一个点距离一个聚类的距离。如果想通过马氏距离去计算某两个点之间的距离是行不通的。下面按照一般的套路介绍一下欧氏距离与马氏距离…

马氏距离详解

马氏距离详解 一、理性认知二、感性认知第一个例子第二个例子 三、实例认知四、公式推导推导过程 致谢 一、理性认知 马氏距离(Mahalanobis distance)是由印度统计学家马哈拉诺比斯(P. C. Mahalanobis)提出的&#xff0c;表示点与一个分布之间的距离。它是一种有效的计算两个未…

马氏距离(Mahalanobis Distance)

马氏距离(Mahalanobis Distance)是度量学习中一种常用的距离指标,同欧氏距离、曼哈顿距离、汉明距离等一样被用作评定数据之间的相似度指标。但却可以应对高维线性分布的数据中各维度间非独立同分布的问题。 什么是马氏距离 马氏距离(Mahalanobis Distance)是一种距离的度量,…

马氏距离详解(数学原理、适用场景、应用示例代码)

看了很多关于马氏距离&#xff08;Mahalanobis Distance&#xff09;的介绍&#xff0c;但是总感觉有一些地方不太清晰&#xff0c;所以结合数学公式、机器学习中的应用案例&#xff0c;从头梳理一下。 马氏距离实际上是欧氏距离在多变量下的“加强版”&#xff0c;用于测量点…