storm-executor-spout(2)

article/2025/9/20 21:17:43

2121SC@SDUSC

storm-executor-spout(2)

用一个spout读取Twitter数据。采用拓扑并行化,多个spout从同一个流读取数据的不同部分。如果有多个流要读取,可以在任意组件内(spouts/bolts)访问TopologyContext。利用这一特性,能够把流划分到多个spouts读取。

 public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {
//从context对象获取spout大小
int spoutsSize = context.getComponentTasks(context.getThisComponentId()).size();
//从这个spout得到任务id
int myIdx = context.getThisTaskIndex();
String[] tracks = ((String) conf.get("track")).split(",");
StringBuffer tracksBuffer = new StringBuffer();
for(int i=0; i< tracks.length;i++){//Check if this spout must read the track wordif( i % spoutsSize == myIdx){tracksBuffer.append(",");tracksBuffer.append(tracks[i]);}
}
if(tracksBuffer.length() == 0) {throw new RuntimeException("没有为spout得到track配置" +
" [spouts大小:"+spoutsSize+", tracks:"+tracks.length+"] tracks的数量必须高于spout的数量");this.track =tracksBuffer.substring(1).toString();
}
...}

可以把collector对象均匀的分配给多个数据源

在这里插入图片描述
从一个spout连接到已知设备。也可以使用相同的方法连接未知设备,不过需要借助于一个协同系统维护的设备列表。协同系统负责探察列表的变化,并根据变化创建或销毁连接。比如,从web服务器收集日志文件时,web服务器列表可能随着时间变化。当添加一台web服务器时,协同系统探查到变化并为它创建一个新的spout。
在这里插入图片描述

消息队列

第二种方法是,通过一个队列系统接收来自消息分发器的消息,并把消息转发给spout。更进一步的做法是,把队列系统作为spout和数据源之间的中间件,可以利用多队列系统的重播能力增强队列可靠性。不需要知道有关消息分发器的任何事情,而且添加或移除分发器的操作比直接连接简单的多。这个架构的问题在于队列是一个故障点,另外还要为处理流程引入新的环节。

在这里插入图片描述

注:可以通过轮询队列或哈希队列(把队列消息通过哈希发送给spouts或创建多个队列使队列spouts一一对应)在多个spouts之间实现并行性。

在spout的open方法创建一个线程,用来获取消息(使用线程是为了避免锁定nextTuple在主循环的调用):

new Thread(new Runnable() {
@Override
public void run() {try{Jedis client= new Jedis(redisHost, redisPort);List res = client.blpop(Integer.MAX_VALUE, queues);messages.offer(res.get(1));}catch(Exception e){LOG.error("从redis读取队列出错",e);try {Thread.sleep(100);}catch(InterruptedException e1){}}
}
}).start();

这个线程的惟一目的就是,创建redis连接,然后执行blpop命令。每当收到了一个消息,它就被添加到一个内部消息队列,然后会被nextTuple消费。对于spout来说数据源就是redis队列,它不知道消息分发者在哪里也不知道消息的数量。

注:不要在spout创建太多线程,因为每个spout都运行在不同的线程。可以增加拓扑并行性,也就是通过Storm集群在分布式环境创建更多线程。

在nextTuple方法中,要做的惟一的事情就是从内部消息队列获取消息并再次分发它们。

public void nextTuple(){
while(!messages.isEmpty()){collector.emit(new Values(messages.poll()));
}
}

注:可以借助redis在spout实现消息重发,从而实现可靠的拓扑。

小结
不存在适用于所有拓扑的架构模式。如果知道数据源,并且能够控制它们,就可以使用直接连接;然而如果需要添加未知数据源或从多种数据源接收数据,就最好使用消息队列。如果要执行在线过程,你可以使用DRPCSpout或类似的实现。

知识点学习:https://ifeve.com/getting-started-with-storm-4/


http://chatgpt.dhexx.cn/article/2q0m0VLd.shtml

相关文章

php读取excel效率,PhpSpreadsheet VS Box\Spout读取excel性能对比

phpspreadsheet版本:1.5.0 spout版本:2.7.3 在同样的环境下,运行代码,spout的在内存使用和时间花费上都占优,在phpspreadsheet读取失败的文档spout依然能正确完成读取。 spout代码 ini_set(memory_limit, 2G); var_dump(memory_get_usage()); var_dump(microtime()); $loa…

KafkaSpout 浅析

最近在使用storm做一个实时计算的项目,Spout需要从 KAFKA 集群中读取数据&#xff0c;为了提高开发效率&#xff0c;直接使用了Storm提供的KAFKA插件。今天抽空看了一下KafkaSpout的源码&#xff0c;记录下心得体会。 KafkaSpout基于kafka.javaapi.consumer.SimpleConsumer实现…

storm trident的多数据流,多spout

storm trident的多数据流&#xff0c;多spout (STORM)[storm, kafka] storm可以使用接收多个spout作为数据源&#xff0c;core storm与trident均可以&#xff0c;本文主要介绍trident的用法。 在trident中设置多个spout的基本思路是先建立多个spout&#xff0c;然后分别创建…

storm学习笔记(二)——Storm组件详解之Tuple、Spout

目录 Tuple元组 结构 生命周期 Spout数据源 结构 开发spout组件 Storm的核心概念包括&#xff1a;Stream、Spout、Bolt、Tuple、Task、Worker、Stream Grouping、Topology Stream是被处理的数据&#xff0c;Spout是数据源&#xff0c;Bolt是处理数据的容器&#xff0c;T…

java 纳秒 格式化_Java日期时间API系列35-----Jdk8中java.time包中的新的日期时间API类应用,微秒和纳秒等更精确的时间格式化和解析。...

通过Java日期时间API系列1-----Jdk7及以前的日期时间类中得知,Java8以前除了java.sql.Timestamp扩充纳秒,其他类最大只精确到毫秒;Java8 time包所有相关类都支持纳秒。下面是示意图: 图中的nano 是 一秒钟包含的纳秒值,0到999999999。毫秒,微秒和纳秒都是通过这个值计算得…

c语言计时程序 纳秒,前端Tips#4 - 用 process.hrtime 获取纳秒级的计时精度

视频讲解 文字讲解 如果去测试代码运行的时长&#xff0c;你会选择哪个时间函数&#xff1f; 一般第一时间想到的函数是 Date.now 或 Date.getTime。 1、先讲结论 之所以这么选&#xff0c;是基于 精度 和 时钟同步 两方面考虑的。 2、知识讲解 首先看一下 Date.now 的缺点 返回…

纳秒时代

1978年在英特尔公司的历史中是很不平凡的一年。这一年它满10岁了&#xff0c;员工数首次超过1万人。这一年&#xff0c;它卖掉了竞争激烈的电子表&#xff08;digital watch&#xff09;业务。最重要的是&#xff0c;在这一年6月&#xff0c;它推出了具有跨时代意义的8086芯片。…

linux内核纳秒精度时间,Linux时钟精度:毫秒?微妙?纳秒?

最近被内核时钟精度弄的很是郁闷。具体情况如下&#xff1a; 扫盲&#xff1a;1秒1000毫秒1000000微妙1000000000纳秒 首先&#xff1a;linux有一个很重要的概念——节拍&#xff0c;它的单位是(次/秒)。2.6内核这个值是1000&#xff0c;系统中用一个HZ的宏表征这个值。同时有全…

java 日期 纳秒_java8 ZonedDateTime 日期精度到纳秒

1秒 10E3毫秒 10E6 微妙 10E9 纳秒 使用java8 Instant 内部实际System.currentTimeMillis() 在模型上 可输出纳秒数据 重点是模型 时间戳转日期 public static ZonedDateTime ofInstant(Instant instant, ZoneId zone) { Objects.requireNonNull(instant, "instant&qu…

第九章:NAT(网络地址转换协议)

文章目录 一、NAT1、NAT介绍①公有网络地址②私有网络地址 2、NAT工作原理3、NAT功能 二、NAT的实现方式1、静态转换&#xff08;static Translation&#xff09;实验对比 2、动态转换2.1 ACL&#xff08;访问控制列表&#xff09;2.2 配置动态NAT实验效果 3、端口多路复用3.1 …

【NAT网络地址转换(私网公网地址、静态NAT、动态NAT、NAPT、Easy IP、NAT Server)】-20211215、20211216

目录 一、NAT产生背景 1.产生背景 2.私网地址、公网地址​ 私网IP地址&#xff0c;既可以一定上缓解ip的不足&#xff0c;在私网里&#xff0c;ip地址可以随意使用。 公网地址&#xff0c;在需要访问公网时&#xff0c;运用网络地址转换NAT技术&#xff0c;可以实现。 二…

什么是私网,公网?

我们常说的内网和外网&#xff0c;通常是相对于防火墙而言的&#xff0c;在防火墙内部叫做内网&#xff0c;反之就是外网。 在一定程度上外网等同于公网&#xff0c;内网等同于私网。公网地址 公网地址是指在因特网上直接可达的地址&#xff0c;如果你有一个公网地址&#xff0…

NAT——公私网地址转换

NAT—网络地址转换 NAT NAT又称为网络地址转换&#xff0c;用于实现私有网络和公有网络之间的转换 私有和公有网络地址 公有网络地址是指互联网上全球唯一的IP地址 私有网络地址是指内部网络或者主机的IP地址 IANA&#xff08;互联网数字分配机制&#xff09;规定将下列的IP地…

公网地址和私网地址问题

服务器映射用于将内网服务器的私网地址映射为公网地址&#xff0c;供Internet用户访问。选择“静态映射”类型可以将每一台服务器映射成一个独立的公网IP地址。“服务器负载均衡”类型可以将多台服务器映射成同一个公网地址&#xff0c;Internet用户在访问这个公网地址时&#…

私网地址与Internet地址

一、A、B、C三类地址 可用地址范围备注A类1.0.0.1-126.255.255.254B类128.1.0.1-191.255.255.254C类192.0.1.1-223.255.255.254D类224.0.0.1-239.255.255.254D类为多播地址 说明&#xff1a; 1. 每一个地址都是用网络位主机位组成的。 2. 全0的和全1的网络位和主机位都要去掉…

计算机网络 网络层 私网地址和公网地址及子网划分

公网地址 公有地址分配和管理由Inter NIC&#xff08;Internet Network Information Center 因特网信息中心&#xff09;负责。各级ISP使用的公网地址都需要向Inter NIC提出申请&#xff0c;有Inter NIC统一发放&#xff0c;这样就能确保地址块不冲突。 私网地址&#xff08;不…

为什么百度查到的ip和ipconfig查到的不一样;详解公网Ip和私网ip;详解网络分类ABC;

IP可以分为Public IP 和 Private IP,出现这种规划的原因在于IPv4所能表示的IP太少而电脑太多以至于不够用&#xff0c;然而只有Public IP才能直接连接上网络&#xff0c;所以对于那些公司&#xff0c;学校&#xff0c;政府机构等场所&#xff0c;就可以集中使用私有的IP进行管理…

挑战华为社招:掌握数据库其实很容易

前言 我的一个朋友,开发四年了,没跳过槽,四年时间也不过是从最开始的10K涨到了15K,经常和我吐槽工资低。去年8月份左右开始了他“骑驴找马”的行动,从各种地方找学习资料、刷面试题。值得庆幸的是,他出去找工作时疫情还不严重,异常顺利的面进了蚂蚁,薪资更是翻了几倍。…

javaspringboot面试,挑战华为社招

前言 redis简单来说 就是一个数据库&#xff0c;不过与传统数据库不同的是 redis 的数据是存在内存中的&#xff0c;所以存写速度非常快&#xff0c;因此 redis 被广泛应用于缓存方向。另外&#xff0c;redis 也经常用来做分布式锁。redis 提供了多种数据类型来支持不同的业务…