storm学习笔记(二)——Storm组件详解之Tuple、Spout

article/2025/9/20 22:07:12

目录

Tuple元组

结构

生命周期

Spout数据源

结构

开发spout组件


Storm的核心概念包括:Stream、Spout、Bolt、Tuple、Task、Worker、Stream Grouping、Topology

Stream是被处理的数据,Spout是数据源,Bolt是处理数据的容器,Tuple是数据单元,Task是运行Spout和Bolt中的线程,Worker是运行这些线程的进程,Stream Grouping规定了Bolt接受何种类型的数据最为输入,Topology是由Stream Grouping连接起来的Spout和Bolt节点网络。

Tuple元组

结构

Tuple是Storm的主要数据结构,是Storm中使用的最基本单元、数据模型和元组。

Tuple就是一个值列表,Tuple中的值可以是任何类型的,动态类型的Tuple的fields可以不用声明。

默认情况下,Storm中的Tuple支持私有类型、字符串、字节数组等作为他的字段值。

Tuple的字段默认类型有:integer、float、double、long、short、string、byte、binary(byte[ ])。

数据结构如下图:可以理解成一个键值对类型的数据结构。

生命周期

下段java代码展示了Spout(消息源)接口发出Tuple(消息)的整个过程,源码如下:

public interface ISpout extends Serializable{void open(Map conf,TopologyContext context,SpoutOutputCollector collector);void nextTuple();void ack(Object msgId);void fail(Object msgId);void close();
}

首先,Storm调用Spout(消息源)的nextTuple方法来获取下一个Tuple,Spout通过Open方法的参数提供的SpoutOutputCollector将新Tuple发射到其中一个输出消息流。发射Tuple时,Spout提供一个message-id,通过这个ID来追踪该Tuple。然后,Storm跟踪该Tuple的树形结构是否成功创建,从根据message-id调用Spout中的ack函数,以确认Tuple是否被完全处理。如果Tuple超时,则调用Spout的fail方法。由此看出,同一个Tuple不管是acked还是failed都是由创建它的Spout发出并维护的,所以Storm会利用内部的Acker机制保证每个Tuple被可靠地处理。最后,在任务完成后,Spout调用Close方法结束Tuple的使命。

Spout数据源

结构

数据源(消息源)Spout是Storm的Topology中的消息生产者(Tuple的创造者),最源头的接口是IComponent,如下图所示,几个Spout接口都继承自IComponent。

Spout从外部获取数据后,向Topology中发出的Tuple可以是可靠的,也可以是不可靠的。一个可靠的消息源可以重新发射一个Tuple(如果该Tuple没有被Storm成功处理),但是一个不可靠的消息源,Spout一旦发出一个Tuple就把它彻底“遗忘”,也就不可能再发了。

Spout可以发射多个流。要达到这样的效果,使用OutputFieldsDeclarer.declareStream来定义多个流(定义多个Stream),然后使用SpoutOutputCollector来发射指定的流。

Spout的最顶层抽象是ISpout接口,在通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余代码,可以直接继承BaseRichSpout。

开发spout组件

下段代码是开发Spout组件的一个简单的实例:创建普通Java工程,导入storm依赖包到lib文件夹下,buildpath之后即可。

package storm;import java.util.Map;
import java.util.Random;
import java.util.stream.Collector;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/** 用于产生数据源* 本例中 数据源是不断生成的一个1-100内的随机数*/
public class NumberSpout extends BaseRichSpout {private SpoutOutputCollector collector;/** 这是Spout类中最重要的一个方法。用于发射Tuple* */@Overridepublic void nextTuple() {// TODO Auto-generated method stubwhile(true){int randomNum = new Random().nextInt(100);//Values可以理解为是Tuple的值,是一个集合类型,值可以是一个,也可以是多个Values value = new Values(randomNum);//emit方法用于发射元组collector.emit(value);try {Thread.sleep(500);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}/** 当一个Task被初始化时会调用此open方法。* 一般都会在此方法中初始化发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext。**/@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {// 对collector进行初始化,因为nextTuple()方法利用collector发射元组this.collector = collector;}/** 此方法用于声名当前spout的Tuple发送流,* 流的定义是通过OutputFieldsDeclare.declareStream方法完成的* 其中的参数包括了发送的域Fields。*/@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// Fields可以理解为时Tuple的键declarer.declare(new Fields("number"));}}

代码说明:从100以内的整数中随机产生一个数作为Tuple的值,然后通过_collector发送到Topology。Spout的最重要方法是nextTuple。nextTuple方法发射一个新的元组到Topology,如果没有新元组发射,则直接返回。

注意:任务Spout的nextTuple方法都不要实现成阻塞的,因为Storm是在相同的线程中调用Spout的方法。

此外,Spout的另外两个重要方法是ack和fail方法,当Spout发射的元组被拓扑成功处理时,调用ack方法;当处理失败时,调用fail方法。ack和fail方法仅可被可靠的Spout调用。


http://chatgpt.dhexx.cn/article/1quIdlGN.shtml

相关文章

java 纳秒 格式化_Java日期时间API系列35-----Jdk8中java.time包中的新的日期时间API类应用,微秒和纳秒等更精确的时间格式化和解析。...

通过Java日期时间API系列1-----Jdk7及以前的日期时间类中得知,Java8以前除了java.sql.Timestamp扩充纳秒,其他类最大只精确到毫秒;Java8 time包所有相关类都支持纳秒。下面是示意图: 图中的nano 是 一秒钟包含的纳秒值,0到999999999。毫秒,微秒和纳秒都是通过这个值计算得…

c语言计时程序 纳秒,前端Tips#4 - 用 process.hrtime 获取纳秒级的计时精度

视频讲解 文字讲解 如果去测试代码运行的时长,你会选择哪个时间函数? 一般第一时间想到的函数是 Date.now 或 Date.getTime。 1、先讲结论 之所以这么选,是基于 精度 和 时钟同步 两方面考虑的。 2、知识讲解 首先看一下 Date.now 的缺点 返回…

纳秒时代

1978年在英特尔公司的历史中是很不平凡的一年。这一年它满10岁了,员工数首次超过1万人。这一年,它卖掉了竞争激烈的电子表(digital watch)业务。最重要的是,在这一年6月,它推出了具有跨时代意义的8086芯片。…

linux内核纳秒精度时间,Linux时钟精度:毫秒?微妙?纳秒?

最近被内核时钟精度弄的很是郁闷。具体情况如下: 扫盲:1秒1000毫秒1000000微妙1000000000纳秒 首先:linux有一个很重要的概念——节拍,它的单位是(次/秒)。2.6内核这个值是1000,系统中用一个HZ的宏表征这个值。同时有全…

java 日期 纳秒_java8 ZonedDateTime 日期精度到纳秒

1秒 10E3毫秒 10E6 微妙 10E9 纳秒 使用java8 Instant 内部实际System.currentTimeMillis() 在模型上 可输出纳秒数据 重点是模型 时间戳转日期 public static ZonedDateTime ofInstant(Instant instant, ZoneId zone) { Objects.requireNonNull(instant, "instant&qu…

第九章:NAT(网络地址转换协议)

文章目录 一、NAT1、NAT介绍①公有网络地址②私有网络地址 2、NAT工作原理3、NAT功能 二、NAT的实现方式1、静态转换(static Translation)实验对比 2、动态转换2.1 ACL(访问控制列表)2.2 配置动态NAT实验效果 3、端口多路复用3.1 …

【NAT网络地址转换(私网公网地址、静态NAT、动态NAT、NAPT、Easy IP、NAT Server)】-20211215、20211216

目录 一、NAT产生背景 1.产生背景 2.私网地址、公网地址​ 私网IP地址,既可以一定上缓解ip的不足,在私网里,ip地址可以随意使用。 公网地址,在需要访问公网时,运用网络地址转换NAT技术,可以实现。 二…

什么是私网,公网?

我们常说的内网和外网,通常是相对于防火墙而言的,在防火墙内部叫做内网,反之就是外网。 在一定程度上外网等同于公网,内网等同于私网。公网地址 公网地址是指在因特网上直接可达的地址,如果你有一个公网地址&#xff0…

NAT——公私网地址转换

NAT—网络地址转换 NAT NAT又称为网络地址转换,用于实现私有网络和公有网络之间的转换 私有和公有网络地址 公有网络地址是指互联网上全球唯一的IP地址 私有网络地址是指内部网络或者主机的IP地址 IANA(互联网数字分配机制)规定将下列的IP地…

公网地址和私网地址问题

服务器映射用于将内网服务器的私网地址映射为公网地址,供Internet用户访问。选择“静态映射”类型可以将每一台服务器映射成一个独立的公网IP地址。“服务器负载均衡”类型可以将多台服务器映射成同一个公网地址,Internet用户在访问这个公网地址时&#…

私网地址与Internet地址

一、A、B、C三类地址 可用地址范围备注A类1.0.0.1-126.255.255.254B类128.1.0.1-191.255.255.254C类192.0.1.1-223.255.255.254D类224.0.0.1-239.255.255.254D类为多播地址 说明: 1. 每一个地址都是用网络位主机位组成的。 2. 全0的和全1的网络位和主机位都要去掉…

计算机网络 网络层 私网地址和公网地址及子网划分

公网地址 公有地址分配和管理由Inter NIC(Internet Network Information Center 因特网信息中心)负责。各级ISP使用的公网地址都需要向Inter NIC提出申请,有Inter NIC统一发放,这样就能确保地址块不冲突。 私网地址(不…

为什么百度查到的ip和ipconfig查到的不一样;详解公网Ip和私网ip;详解网络分类ABC;

IP可以分为Public IP 和 Private IP,出现这种规划的原因在于IPv4所能表示的IP太少而电脑太多以至于不够用,然而只有Public IP才能直接连接上网络,所以对于那些公司,学校,政府机构等场所,就可以集中使用私有的IP进行管理…

挑战华为社招:掌握数据库其实很容易

前言 我的一个朋友,开发四年了,没跳过槽,四年时间也不过是从最开始的10K涨到了15K,经常和我吐槽工资低。去年8月份左右开始了他“骑驴找马”的行动,从各种地方找学习资料、刷面试题。值得庆幸的是,他出去找工作时疫情还不严重,异常顺利的面进了蚂蚁,薪资更是翻了几倍。…

javaspringboot面试,挑战华为社招

前言 redis简单来说 就是一个数据库,不过与传统数据库不同的是 redis 的数据是存在内存中的,所以存写速度非常快,因此 redis 被广泛应用于缓存方向。另外,redis 也经常用来做分布式锁。redis 提供了多种数据类型来支持不同的业务…

挑战华为社招:不止面试题,笔记源码统统都有,最强技术实现

前言 说起来开始进行面试是11月倒数第二周,上午9点,我还在去公司的公交上,突然收到蚂蚁的面试电话,其实算不上真正的面试。面试官只是和我聊了下他们在做的事情(主要是做双十一这里大促的稳定性保障,偏中间…

挑战华为社招:字节跳动上千道精选面试题还不刷起来

前言 成为优秀的架构师是大部分初中级工程师的阶段性目标。优秀的架构师往往具备七种核心能力:编程能力、调试能力、编译部署能力、性能优化能力、业务架构能力、在线运维能力、项目管理能力和规划能力。 这几种能力之间的关系大概如下图。编程能力、调试能力和编…

华为社招面试

工作第三年,在某招聘软件上填写简历后接到华为HR面试邀请,面试部门为运营商路由器,网上查看岗位相关要求之后发现与自己十分不匹配,不过机会难得,所以决定抱着学习的态度去参加面试。 2018年3月3号周六前往华为北研所Q…

华为社招技术二面,总结复盘

点击上方“Java基基”,选择“设为星标” 做积极的人,而不是积极废人! 源码精品专栏 原创 | Java 2020 超神之路,很肝~中文详细注释的开源项目RPC 框架 Dubbo 源码解析网络应用框架 Netty 源码解析消息中间件 RocketMQ 源码解析数…