JStorm Storm 上手demo

article/2025/8/30 16:27:36
折线之间的内容整理自: http://blog.csdn.net/suifeng3051/article/details/38369689
--------------------------------------------------------------------------------------------------------------------------------------------
在全面介绍Storm之前,我们先通过一个简单的Demo让大家整体感受一下什么是Storm。
Storm运行模式:
  1. 本地模式(Local Mode): 即Topology(相当于一个任务,后续会详细讲解)  运行在本地机器的单一JVM上,这个模式主要用来开发、调试。
  2. 远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的,因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式。
写一个HelloWord Storm

     我们现在创建这么一个应用,统计文本文件中的单词个数,详细学习过Hadoop的朋友都应该写过。那么我们需要具体创建这样一个Topology,用一个spout负责读取文本文件,用第一个bolt来解析成单词,用第二个bolt来对解析出的单词计数,整体结构如图所示:

可以从这里下载源码:http://download.csdn.net/detail/xunzaosiyecao/9818483


写一个可运行的Demo很简单,我们只需要三步:
  1. 创建一个Spout读取数据
  2. 创建bolt处理数据
  3. 创建一个Topology提交到集群
下面我们就写一下,以下代码拷贝到eclipse(依赖的jar包到官网下载即可)即可运行。
1.创建一个Spout作为数据源
Spout作为数据源,它实现了IRichSpout接口,功能是读取一个文本文件并把它的每一行内容发送给bolt。
package storm.demo.spout;  import java.io.BufferedReader;  
import java.io.FileNotFoundException;  
import java.io.FileReader;  
import java.util.Map;  
import backtype.storm.spout.SpoutOutputCollector;  
import backtype.storm.task.TopologyContext;  
import backtype.storm.topology.IRichSpout;  
import backtype.storm.topology.OutputFieldsDeclarer;  
import backtype.storm.tuple.Fields;  
import backtype.storm.tuple.Values;  
public class WordReader implements IRichSpout {  private static final long serialVersionUID = 1L;  private SpoutOutputCollector collector;  private FileReader fileReader;  private boolean completed = false;  public boolean isDistributed() {  return false;  }  /** * 这是第一个方法,里面接收了三个参数,第一个是创建Topology时的配置, * 第二个是所有的Topology数据,第三个是用来把Spout的数据发射给bolt * **/  @Override  public void open(Map conf, TopologyContext context,  SpoutOutputCollector collector) {  try {  //获取创建Topology时指定的要读取的文件路径  this.fileReader = new FileReader(conf.get("wordsFile").toString());  } catch (FileNotFoundException e) {  throw new RuntimeException("Error reading file ["  + conf.get("wordFile") + "]");  }  //初始化发射器  this.collector = collector;  }  /** * 这是Spout最主要的方法,在这里我们读取文本文件,并把它的每一行发射出去(给bolt) * 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下 * **/  @Override  public void nextTuple() {  if (completed) {  try {  Thread.sleep(1000);  } catch (InterruptedException e) {  // Do nothing  }  return;  }  String str;  // Open the reader  BufferedReader reader = new BufferedReader(fileReader);  try {  // Read all lines  while ((str = reader.readLine()) != null) {  /** * 发射每一行,Values是一个ArrayList的实现 */  this.collector.emit(new Values(str), str);  }  } catch (Exception e) {  throw new RuntimeException("Error reading tuple", e);  } finally {  completed = true;  }  }  @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {  declarer.declare(new Fields("line"));  }  @Override  public void close() {  // TODO Auto-generated method stub  }  @Override  public void activate() {  // TODO Auto-generated method stub  }  @Override  public void deactivate() {  // TODO Auto-generated method stub  }  @Override  public void ack(Object msgId) {  System.out.println("OK:" + msgId);  }  @Override  public void fail(Object msgId) {  System.out.println("FAIL:" + msgId);  }  @Override  public Map<String, Object> getComponentConfiguration() {  // TODO Auto-generated method stub  return null;  }  
}  

2.创建两个bolt来处理Spout发射出的数据
Spout已经成功读取文件并把每一行作为一个tuple(在Storm数据以tuple的形式传递)发射过来,我们这里需要创建两个bolt分别来负责解析每一行和对单词计数。
Bolt中最重要的是execute方法,每当一个tuple传过来时它便会被调用。
第一个bolt:WordNormalizer
package storm.demo.bolt;  
import java.util.ArrayList;  
import java.util.List;  
import java.util.Map;  
import backtype.storm.task.OutputCollector;  
import backtype.storm.task.TopologyContext;  
import backtype.storm.topology.IRichBolt;  
import backtype.storm.topology.OutputFieldsDeclarer;  
import backtype.storm.tuple.Fields;  
import backtype.storm.tuple.Tuple;  
import backtype.storm.tuple.Values;  
public class WordNormalizer implements IRichBolt {  private OutputCollector collector;  @Override  public void prepare(Map stormConf, TopologyContext context,  OutputCollector collector) {  this.collector = collector;  }  /**这是bolt中最重要的方法,每当接收到一个tuple时,此方法便被调用 * 这个方法的作用就是把文本文件中的每一行切分成一个个单词,并把这些单词发射出去(给下一个bolt处理) * **/  @Override  public void execute(Tuple input) {  String sentence = input.getString(0);  String[] words = sentence.split(" ");  for (String word : words) {  word = word.trim();  if (!word.isEmpty()) {  word = word.toLowerCase();  // Emit the word  List a = new ArrayList();  a.add(input);  collector.emit(a, new Values(word));  }  }  //确认成功处理一个tuple  collector.ack(input);  }  @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {  declarer.declare(new Fields("word"));  }  @Override  public void cleanup() {  // TODO Auto-generated method stub  }  @Override  public Map<String, Object> getComponentConfiguration() {  // TODO Auto-generated method stub  return null;  }  
}  
第二个bolt: WordCounter

package storm.demo.bolt;  
import java.util.HashMap;  
import java.util.Map;  
import backtype.storm.task.OutputCollector;  
import backtype.storm.task.TopologyContext;  
import backtype.storm.topology.IRichBolt;  
import backtype.storm.topology.OutputFieldsDeclarer;  
import backtype.storm.tuple.Tuple;  public class WordCounter implements IRichBolt {  Integer id;  String name;  Map<String, Integer> counters;  private OutputCollector collector;  @Override  public void prepare(Map stormConf, TopologyContext context,  OutputCollector collector) {  this.counters = new HashMap<String, Integer>();  this.collector = collector;  this.name = context.getThisComponentId();  this.id = context.getThisTaskId();  }  @Override  public void execute(Tuple input) {  String str = input.getString(0);  if (!counters.containsKey(str)) {  counters.put(str, 1);  } else {  Integer c = counters.get(str) + 1;  counters.put(str, c);  }  // 确认成功处理一个tuple  collector.ack(input);  }  /** * Topology执行完毕的清理工作,比如关闭连接、释放资源等操作都会写在这里 * 因为这只是个Demo,我们用它来打印我们的计数器 * */  @Override  public void cleanup() {  System.out.println("-- Word Counter [" + name + "-" + id + "] --");  for (Map.Entry<String, Integer> entry : counters.entrySet()) {  System.out.println(entry.getKey() + ": " + entry.getValue());  }  counters.clear();  }  @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {  // TODO Auto-generated method stub  }  @Override  public Map<String, Object> getComponentConfiguration() {  // TODO Auto-generated method stub  return null;  }  
}  
3.在main函数中创建一个Topology
在这里我们要创建一个Topology和一个LocalCluster对象,还有一个Config对象做一些配置。   
package storm.demo;  import storm.demo.bolt.WordCounter;  
import storm.demo.bolt.WordNormalizer;  
import storm.demo.spout.WordReader;  
import backtype.storm.Config;  
import backtype.storm.LocalCluster;  
import backtype.storm.topology.TopologyBuilder;  
import backtype.storm.tuple.Fields;  
public class WordCountTopologyMain {  public static void main(String[] args) throws InterruptedException {  //定义一个Topology  TopologyBuilder builder = new TopologyBuilder();  builder.setSpout("word-reader",new WordReader());  builder.setBolt("word-normalizer", new WordNormalizer())  .shuffleGrouping("word-reader");  builder.setBolt("word-counter", new WordCounter(),2)  .fieldsGrouping("word-normalizer", new Fields("word"));  //配置  Config conf = new Config();  conf.put("wordsFile", "d:/text.txt");  conf.setDebug(false);  //提交Topology  conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);  //创建一个本地模式cluster  LocalCluster cluster = new LocalCluster();  cluster.submitTopology("Getting-Started-Toplogie", conf,  builder.createTopology());  Thread.sleep(1000);  cluster.shutdown();  }  
}  
运行这个函数我们即可看到后台打印出来的单词个数。(ps:因为是Local模式,运行开始可能会打印很多错误log,这个先不用管)

---------------------------------------------------------------------------------------------------------------------------------------------

折线之间的内容整理自:http://blog.csdn.net/suifeng3051/article/details/38369689

以上是Storm的上手例子,那么JStorm 应该如何写呢?

我们用的是JStorm,但上面的可以不修改一行就可以在JStorm上跑起来。

 <!-- Storm Dependency --><!-- <dependency><groupId>storm</groupId><artifactId>storm</artifactId><version>0.7.1</version></dependency>--><!-- JStorm Dependency --><dependency><groupId>com.alibaba.jstorm</groupId><artifactId>jstorm-core</artifactId><version>2.1.1</version></dependency>
修改代码中pom文件的依赖项即可,其余的不需要修改。

小注:

如果不清楚如何使读取config下word.txt,可以修改TopologyMain类,将其中的

//conf.put("wordsFile", args[0]);
//在conf添加路径wordsFile的时候,可以将路径写死,弄成一个固定值
//比如:我这里将word.txt放到了/usr/local/jstorm-2.2.1/wait_deploy/路径下
conf.put("wordsFile", "/usr/local/jstorm-2.2.1/wait_deploy/word.txt");
 如果是要运行在JStrom上,使用mvn打包命令:

# 打包时跳过测试
mvn clean package  -Dmaven.test.skip=true
将打包后的文件提交到JStorm即可

例如我这里打包文件名为:Getting-Started-0.0.1-SNAPSHOT.jar,提交命令:

//提交jar 
//jar包名称:Getting-Started-0.0.1-SNAPSHOT.jar
//入口类:TopologyMain
//入口类需要参数的话,需要在入口类后面添加需要的参数
jstorm jar Getting-Started-0.0.1-SNAPSHOT.jar TopologyMain
#提交jar 

jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter
  • xxxx.jar 为打包后的jar
  • com.alibaba.xxxx.xx 为入口类,即提交任务的类
  • parameter即为提交参数

demo中部分函数及参数注释:
setBolt方法中的参数parallelism_hint代表这样一个Spout或Bolt有多少个实例,即对应多少个线程,一个实例对应一个线程。


注意理解spout及bolt:

spout:自定义获取待处理流的地方

bolt:自定义处理流的地方



JStorm的安装可以参考官网:https://github.com/alibaba/jstorm/wiki/JStorm-Chinese-Documentation

下午写JStorm的demo花了一下午的时间,主要原因是:知道storm代码不需要修改就能跑在jstorm上,但上网搜资料的还是搜索jstorm的案例,但网上大部分jstrom的demo都是跑不起来的,或者需要自己升级版本的。jstorm官网的Example,拉到本地后,也是各种报错。

要写jstorm的代码,搜索storm,参考storm部分即可。


作者:jiankunking 出处:http://blog.csdn.net/jiankunking





http://chatgpt.dhexx.cn/article/5rcMMrTr.shtml

相关文章

PyTorch从零开始实现Transformer

文章目录 自注意力Transformer块编码器解码器块解码器整个Transformer参考来源全部代码&#xff08;可直接运行&#xff09; 自注意力 计算公式 代码实现 class SelfAttention(nn.Module):def __init__(self, embed_size, heads):super(SelfAttention, self).__init__()self.e…

jstorm storm 入门demo

jstorm和storm比较 jstorm 是阿里巴巴开源的基于storm采用Java重写的一套分布式实时流计算框架&#xff0c;使用简单&#xff0c;特点如下&#xff1a; 1&#xff0c;开发非常迅速: 接口简单&#xff0c;容易上手&#xff0c;只要遵守Topology&#xff0c;Spout&#xff0c;Bo…

JStorm介绍

一、简介 JStorm是一个分布式实时计算引擎。JStorm是一个类似于Hadoop MapReduce的系统&#xff0c;用户按照指定的接口实现一个任务&#xff0c;然后将这个任务交给JStorm系统&#xff0c;JStorm将这个任务跑起来&#xff0c;并按7*24小时运行。如果中间一个worker发生了意外…

马氏距离实例详解

介绍 马氏距离是由印度统计学家马哈拉诺比斯&#xff08;P. C. Mahalanobis&#xff09;提出的&#xff0c;表示数据的协方差距离。它是一种有效的计算两个未知样本集的相似度的方法。与欧氏距离不同的是它考虑到各种特性之间的联系&#xff08;例如&#xff1a;一条关于身高的…

距离度量:闵氏、欧式、马氏、余弦、汉明等

目录 1. 闵氏距离&#xff08;Minkowski Distance&#xff09; 2. 欧式距离&#xff08;Euclidean Distance&#xff09; 3. 标准化欧式距离&#xff08;Standardized Euclidean distance&#xff09; 4. 马氏距离&#xff08;Mahalanobis Distance&#xff09; 5. 余弦距…

马氏距离(Mahalanobis Distance)推导及几何意义

看了一些博客对马氏距离的解释&#xff0c;似乎没有讲到本质的地方&#xff0c;本文从欧氏距离存在的问题开始入手&#xff0c;一步步推导出马氏距离&#xff0c;并得出结论&#xff1a;原始空间中的马氏距离等于坐标旋转变换及缩放后的空间中的欧氏距离。 假设数据集 X ∈ R N…

直观理解--马氏距离

首先我们很了解欧氏距离了&#xff0c;就是用来计算欧式空间&#xff08;就是我们常见的坐标系&#xff09;中两个点的距离的。 比如点 x ( x 1 , … , x n ) x (x_1,…,x_n) x(x1​,…,xn​) 和 y ( y 1 , … , y n ) y (y_1,…,y_n) y(y1​,…,yn​) 的欧氏距离为&…

Mahalanobis距离(马氏距离)的“哲学”解释

讲解教授&#xff1a;赵辉 (FROM : UESTC) 课程&#xff1a;《模式识别》 整理&#xff1a;PO主 基础知识&#xff1a; 假设空间中两点x&#xff0c;y&#xff0c;定义&#xff1a; 欧几里得距离&#xff0c; Mahalanobis距离&#xff0c; 不难发现&#xff0c;如果去掉…

六大距离:欧式距离、街道距离、马氏距离等

文章目录 1 简 介2 距离特征2.1 Euclidean距离2.2 Cosine距离2.3 manhattan距离2.4 chebyshev距离2.5 minkowski距离2.6 mahalanobis距离 3 代 码实现 1 简 介 数值向量是数据建模问题中最为常见的一类特征&#xff0c;例如&#xff1a; 在一些涉及图片&#xff0c;文本信息等…

马氏距离例题详解(全网最详细)

马氏距离例题详解 定义 马哈拉诺比斯距离是由印度统计学家马哈拉诺比斯 (英语)提出的&#xff0c;表示数据的协方差距离。它是一种有效的计算两个未知样本集的相似度的方法。与欧氏距离不同的是它考虑到各种特性之间的联系&#xff08;例如&#xff1a;一条关于身高的信息会带…

浅谈马氏距离【Mahalonobis Distance】

浅谈马氏距离【Mahalonobis Distance】 1. Introduction2. 欧式距离对于多元数据会存在一些什么问题&#xff1f;3 .什么是马氏距离4.马氏距离背后的数学和intuition5. 利用python来计算马氏距离6. Case1: 使用马氏距离进行多元异常值检测7. Case 2: 对分类问题应用马氏距离8. …

距离度量之马氏距离

马氏距离 用来度量一个样本点&#xff30;与数据分布为&#xff24;的集合的距离。 假设样本点为&#xff1a; 数据集分布的均值为&#xff1a; 协方差矩阵为&#xff33;。 则这个样本点&#xff30;与数据集合的马氏距离为&#xff1a; 马氏距离也可以衡量两个来自同一…

马氏距离(Mahalanobis Distance)介绍与实例

本文介绍马氏距离&#xff08;Mahalanobis Distance&#xff09;&#xff0c;通过本文&#xff0c;你将了解到马氏距离的含义、马氏距离与欧式距离的比较以及一个通过马氏距离进行异常检测的例子&#xff08;基于Python的sklearn包&#xff09;。 目的 计算两个样本间的距离时…

马氏距离-Mahalanobis Distance

一、学习目的 在训练one-shoting learning 的神经网路的时候&#xff0c;由于采用的是欧式距离&#xff0c;欧氏距离虽然很有用&#xff0c;但也有明显的缺点。它将样品的不同属性&#xff08;即各指标或各变量&#xff09;之间的差别等同看待&#xff0c;这一点有时不能满足实…

欧氏距离与马氏距离

Preface 之前在写《Multi-view CNNs for 3D Objects Recognition》的阅读笔记的时候&#xff0c;文章中的一个创新点便是将MVCNN网络提取到的3D Objects的形状特征描述符&#xff0c;投影到马氏距离&#xff08;Mahalanobis Distance&#xff09;上&#xff0c;“这样的话&…

马氏距离 Mahalanobis Distance

马氏距离 Mahalanobis Distance 1. 马氏距离定义2. 马氏距离实际意义2.1 欧氏距离近就一定相似&#xff1f;2.2 归一化后欧氏距离就一定相似&#xff1f;2.3 算上维度的方差就够了&#xff1f; 3. 马氏距离的几何意义4. 马氏距离的推导5. 马氏距离限制 Reference: 马氏距离(Mah…

马氏距离概念

马氏距离 一、基本概念&#xff1a; 方差&#xff1a;方差是标准差的平方&#xff0c;而标准差的意义是数据集中各个点到均值点距离的平均值。反应的是数据的离散程度。 协方差&#xff1a;标准差与方差是描述一维数据的&#xff0c;当存在多维数据时&#xff0c;我们通常需要知…

马氏距离通俗理解

基础知识&#xff1a; 假设空间中两点x&#xff0c;y&#xff0c;定义&#xff1a; 欧几里得距离&#xff0c; Mahalanobis距离&#xff0c; 不难发现&#xff0c;如果去掉马氏距离中的协方差矩阵&#xff0c;就退化为欧氏距离。那么我们就需要探究这个多出来的因子究竟有什么含…

马氏距离Mahalanobis Distance实例

简介 如果按照欧氏距离去理解马氏距离&#xff0c;一定会迷惑一段时间。因为欧氏距离可以计算两个点之间的距离&#xff0c;而马氏距离是计算一个点距离一个聚类的距离。如果想通过马氏距离去计算某两个点之间的距离是行不通的。下面按照一般的套路介绍一下欧氏距离与马氏距离…

马氏距离详解

马氏距离详解 一、理性认知二、感性认知第一个例子第二个例子 三、实例认知四、公式推导推导过程 致谢 一、理性认知 马氏距离(Mahalanobis distance)是由印度统计学家马哈拉诺比斯(P. C. Mahalanobis)提出的&#xff0c;表示点与一个分布之间的距离。它是一种有效的计算两个未…