目录
Tuple元组
结构
生命周期
Spout数据源
结构
开发spout组件
Storm的核心概念包括:Stream、Spout、Bolt、Tuple、Task、Worker、Stream Grouping、Topology
Stream是被处理的数据,Spout是数据源,Bolt是处理数据的容器,Tuple是数据单元,Task是运行Spout和Bolt中的线程,Worker是运行这些线程的进程,Stream Grouping规定了Bolt接受何种类型的数据最为输入,Topology是由Stream Grouping连接起来的Spout和Bolt节点网络。
Tuple元组
结构
Tuple是Storm的主要数据结构,是Storm中使用的最基本单元、数据模型和元组。
Tuple就是一个值列表,Tuple中的值可以是任何类型的,动态类型的Tuple的fields可以不用声明。
默认情况下,Storm中的Tuple支持私有类型、字符串、字节数组等作为他的字段值。
Tuple的字段默认类型有:integer、float、double、long、short、string、byte、binary(byte[ ])。
数据结构如下图:可以理解成一个键值对类型的数据结构。
生命周期
下段java代码展示了Spout(消息源)接口发出Tuple(消息)的整个过程,源码如下:
public interface ISpout extends Serializable{void open(Map conf,TopologyContext context,SpoutOutputCollector collector);void nextTuple();void ack(Object msgId);void fail(Object msgId);void close();
}
首先,Storm调用Spout(消息源)的nextTuple方法来获取下一个Tuple,Spout通过Open方法的参数提供的SpoutOutputCollector将新Tuple发射到其中一个输出消息流。发射Tuple时,Spout提供一个message-id,通过这个ID来追踪该Tuple。然后,Storm跟踪该Tuple的树形结构是否成功创建,从根据message-id调用Spout中的ack函数,以确认Tuple是否被完全处理。如果Tuple超时,则调用Spout的fail方法。由此看出,同一个Tuple不管是acked还是failed都是由创建它的Spout发出并维护的,所以Storm会利用内部的Acker机制保证每个Tuple被可靠地处理。最后,在任务完成后,Spout调用Close方法结束Tuple的使命。
Spout数据源
结构
数据源(消息源)Spout是Storm的Topology中的消息生产者(Tuple的创造者),最源头的接口是IComponent,如下图所示,几个Spout接口都继承自IComponent。
Spout从外部获取数据后,向Topology中发出的Tuple可以是可靠的,也可以是不可靠的。一个可靠的消息源可以重新发射一个Tuple(如果该Tuple没有被Storm成功处理),但是一个不可靠的消息源,Spout一旦发出一个Tuple就把它彻底“遗忘”,也就不可能再发了。
Spout可以发射多个流。要达到这样的效果,使用OutputFieldsDeclarer.declareStream来定义多个流(定义多个Stream),然后使用SpoutOutputCollector来发射指定的流。
Spout的最顶层抽象是ISpout接口,在通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余代码,可以直接继承BaseRichSpout。
开发spout组件
下段代码是开发Spout组件的一个简单的实例:创建普通Java工程,导入storm依赖包到lib文件夹下,buildpath之后即可。
package storm;import java.util.Map;
import java.util.Random;
import java.util.stream.Collector;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/** 用于产生数据源* 本例中 数据源是不断生成的一个1-100内的随机数*/
public class NumberSpout extends BaseRichSpout {private SpoutOutputCollector collector;/** 这是Spout类中最重要的一个方法。用于发射Tuple* */@Overridepublic void nextTuple() {// TODO Auto-generated method stubwhile(true){int randomNum = new Random().nextInt(100);//Values可以理解为是Tuple的值,是一个集合类型,值可以是一个,也可以是多个Values value = new Values(randomNum);//emit方法用于发射元组collector.emit(value);try {Thread.sleep(500);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}/** 当一个Task被初始化时会调用此open方法。* 一般都会在此方法中初始化发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext。**/@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {// 对collector进行初始化,因为nextTuple()方法利用collector发射元组this.collector = collector;}/** 此方法用于声名当前spout的Tuple发送流,* 流的定义是通过OutputFieldsDeclare.declareStream方法完成的* 其中的参数包括了发送的域Fields。*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// Fields可以理解为时Tuple的键declarer.declare(new Fields("number"));}}
代码说明:从100以内的整数中随机产生一个数作为Tuple的值,然后通过_collector发送到Topology。Spout的最重要方法是nextTuple。nextTuple方法发射一个新的元组到Topology,如果没有新元组发射,则直接返回。
注意:任务Spout的nextTuple方法都不要实现成阻塞的,因为Storm是在相同的线程中调用Spout的方法。
此外,Spout的另外两个重要方法是ack和fail方法,当Spout发射的元组被拓扑成功处理时,调用ack方法;当处理失败时,调用fail方法。ack和fail方法仅可被可靠的Spout调用。