storm trident的多数据流,多spout

article/2025/9/20 21:17:52

storm trident的多数据流,多spout

@(STORM)[storm, kafka]

storm可以使用接收多个spout作为数据源,core storm与trident均可以,本文主要介绍trident的用法。

在trident中设置多个spout的基本思路是先建立多个spout,然后分别创建对应的Stream,并将这些stream merge在一起。

我们这里介绍2个例子,第一个是使用多spout的方式来从kafka读取多个topic,第二个例子是一个spout从kafka读数据,另一个spout定时产生一些数据,拓扑根据这个数据进行定时操作。

(一)使用多个spout同时从kafka中读取多个topic

注意使用storm-kafka从多个topic中读取数据的更优方法请见:https://issues.apache.org/jira/browse/STORM-817?jql=project%20%3D%20STORM%20AND%20status%20in%20%28Open%2C%20%22In%20Progress%22%2C%20Reopened%29%20AND%20component%20%3D%20storm-kafka
这里只是用于介绍如何使用多个spout。

TODO:看看是如何实现的

1、创建第一个spout

    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHost, topic, "storm");kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);

2、创建第二个spout

    TridentKafkaConfig kafkaConfig2 = new TridentKafkaConfig(zkHost, "streaming_g17_sdc", "storm2");kafkaConfig2.scheme = new SchemeAsMultiScheme(new StringScheme());TransactionalTridentKafkaSpout kafkaSpout2 = new TransactionalTridentKafkaSpout(kafkaConfig2);

3、分别基于这2个spout创建一个Stream

    TridentTopology topology = new TridentTopology();Stream firstStream = topology.newStream("kafka", kafkaSpout);Stream secondStream = topology.newStream("kafka2", kafkaSpout2);

4、merge这2个stream

    topology.merge(firstStream,secondStream).shuffle().each(new Fields("str"), new PrepaidFunction(), new Fields("word"));

(二)定时spout

自定义了一个TimerSpout,用于每10秒发送一条消息。我们先看一下如何使用这个spout,最后才看这个spout是如何定义的。

1、创建spout

    ITridentSpout mySpout = new TimerSpout();

2、使用spout

    topology.newStream("myspout", mySpout).broadcast().each(new Fields("timer"), new PrepaidFunction(),new Fields("word"));

这里使用了broadcast()以保证每个bolt都收到这个消息。
注意一个拓扑可以形成多个流组合,如除了上面的newStream,还有以下代码:

Stream firstStream = topology.newStream("kafka", kafkaSpout);Stream secondStream = topology.newStream("kafka2", kafkaSpout2);
topology.merge(firstStream, secondStream).each(new Fields("str"), new PrepaidFunction(), new Fields("word"));

3、在bolt中使用这个消息

if(tuple.get(0).toString().contains("$$$TIMERMESSAGE$$$")){//DO something according to the timer.log.info("receive timer=============================at " + Calendar.getInstance().getTime());}

4、看一下这个Spout是如何定义的

(1)Spout

package com.lujinhong.commons.storm.multispout;import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import storm.trident.spout.ITridentSpout;import java.util.Map;public class TimerSpout implements ITridentSpout<Long> {private static final long serialVersionUID = 1L;BatchCoordinator<Long> coordinator = new TimerCoordinator();Emitter<Long> emitter = new TimerEmitter();@Overridepublic BatchCoordinator<Long> getCoordinator(String txStateId, Map conf, TopologyContext context) {return coordinator;}@Overridepublic Emitter<Long> getEmitter(String txStateId, Map conf, TopologyContext context) {return emitter;}@Overridepublic Map getComponentConfiguration() {return null;}@Overridepublic Fields getOutputFields() {return new Fields("timer");}
}

(2)Coordinator

package com.lujinhong.commons.storm.multispout;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.spout.ITridentSpout.BatchCoordinator;import java.io.Serializable;public class TimerCoordinator implements BatchCoordinator<Long>, Serializable {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(TimerCoordinator.class);@Overridepublic boolean isReady(long txid) {return true;}@Overridepublic void close() {}@Overridepublic Long initializeTransaction(long txid, Long prevMetadata, Long currMetadata) {LOG.info("Initializing Transaction [" + txid + "]");return null;}@Overridepublic void success(long txid) {LOG.info("Successful Transaction [" + txid + "]");}
}

(3)Emitter

package com.lujinhong.commons.storm.multispout;import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout.Emitter;
import storm.trident.topology.TransactionAttempt;import java.io.Serializable;
import java.util.Arrays;
import java.util.Calendar;
import java.util.concurrent.atomic.AtomicInteger;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import backtype.storm.tuple.Values;public class TimerEmitter implements Emitter<Long>, Serializable {private static final long serialVersionUID = 1L;private static Logger LOG = LoggerFactory.getLogger(TimerEmitter.class);AtomicInteger successfulTransactions = new AtomicInteger(0);//使用特殊字符,以避免其它消息中含有这个字符串。private static final String TIMER_MESSAGE = "$$$TIMERMESSAGE$$$";@Overridepublic void emitBatch(TransactionAttempt tx, Long coordinatorMeta, TridentCollector collector) {try {LOG.info("sleep 10 seconds");Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}Object v = new Values(TIMER_MESSAGE);LOG.info("Emit one timer tuple at :" +   Calendar.getInstance().getTime());collector.emit(Arrays.asList(v));}@Overridepublic void success(TransactionAttempt tx) {successfulTransactions.incrementAndGet();}@Overridepublic void close() {}}

(三)拓扑分析

使用上述代码构建的拓扑,包括2个stream,其中一个stream还包含2个kafkaSpout,即共有2个stream, 3个spout,我们看一下storm UI上的信息:

image

image

从上面的2个图可以看出,这个拓扑包含2个独立的处理流程,spouts也包含2个,即分别对应一个newStream()。


http://chatgpt.dhexx.cn/article/iWGWd3Yq.shtml

相关文章

storm学习笔记(二)——Storm组件详解之Tuple、Spout

目录 Tuple元组 结构 生命周期 Spout数据源 结构 开发spout组件 Storm的核心概念包括&#xff1a;Stream、Spout、Bolt、Tuple、Task、Worker、Stream Grouping、Topology Stream是被处理的数据&#xff0c;Spout是数据源&#xff0c;Bolt是处理数据的容器&#xff0c;T…

java 纳秒 格式化_Java日期时间API系列35-----Jdk8中java.time包中的新的日期时间API类应用,微秒和纳秒等更精确的时间格式化和解析。...

通过Java日期时间API系列1-----Jdk7及以前的日期时间类中得知,Java8以前除了java.sql.Timestamp扩充纳秒,其他类最大只精确到毫秒;Java8 time包所有相关类都支持纳秒。下面是示意图: 图中的nano 是 一秒钟包含的纳秒值,0到999999999。毫秒,微秒和纳秒都是通过这个值计算得…

c语言计时程序 纳秒,前端Tips#4 - 用 process.hrtime 获取纳秒级的计时精度

视频讲解 文字讲解 如果去测试代码运行的时长&#xff0c;你会选择哪个时间函数&#xff1f; 一般第一时间想到的函数是 Date.now 或 Date.getTime。 1、先讲结论 之所以这么选&#xff0c;是基于 精度 和 时钟同步 两方面考虑的。 2、知识讲解 首先看一下 Date.now 的缺点 返回…

纳秒时代

1978年在英特尔公司的历史中是很不平凡的一年。这一年它满10岁了&#xff0c;员工数首次超过1万人。这一年&#xff0c;它卖掉了竞争激烈的电子表&#xff08;digital watch&#xff09;业务。最重要的是&#xff0c;在这一年6月&#xff0c;它推出了具有跨时代意义的8086芯片。…

linux内核纳秒精度时间,Linux时钟精度:毫秒?微妙?纳秒?

最近被内核时钟精度弄的很是郁闷。具体情况如下&#xff1a; 扫盲&#xff1a;1秒1000毫秒1000000微妙1000000000纳秒 首先&#xff1a;linux有一个很重要的概念——节拍&#xff0c;它的单位是(次/秒)。2.6内核这个值是1000&#xff0c;系统中用一个HZ的宏表征这个值。同时有全…

java 日期 纳秒_java8 ZonedDateTime 日期精度到纳秒

1秒 10E3毫秒 10E6 微妙 10E9 纳秒 使用java8 Instant 内部实际System.currentTimeMillis() 在模型上 可输出纳秒数据 重点是模型 时间戳转日期 public static ZonedDateTime ofInstant(Instant instant, ZoneId zone) { Objects.requireNonNull(instant, "instant&qu…

第九章:NAT(网络地址转换协议)

文章目录 一、NAT1、NAT介绍①公有网络地址②私有网络地址 2、NAT工作原理3、NAT功能 二、NAT的实现方式1、静态转换&#xff08;static Translation&#xff09;实验对比 2、动态转换2.1 ACL&#xff08;访问控制列表&#xff09;2.2 配置动态NAT实验效果 3、端口多路复用3.1 …

【NAT网络地址转换(私网公网地址、静态NAT、动态NAT、NAPT、Easy IP、NAT Server)】-20211215、20211216

目录 一、NAT产生背景 1.产生背景 2.私网地址、公网地址​ 私网IP地址&#xff0c;既可以一定上缓解ip的不足&#xff0c;在私网里&#xff0c;ip地址可以随意使用。 公网地址&#xff0c;在需要访问公网时&#xff0c;运用网络地址转换NAT技术&#xff0c;可以实现。 二…

什么是私网,公网?

我们常说的内网和外网&#xff0c;通常是相对于防火墙而言的&#xff0c;在防火墙内部叫做内网&#xff0c;反之就是外网。 在一定程度上外网等同于公网&#xff0c;内网等同于私网。公网地址 公网地址是指在因特网上直接可达的地址&#xff0c;如果你有一个公网地址&#xff0…

NAT——公私网地址转换

NAT—网络地址转换 NAT NAT又称为网络地址转换&#xff0c;用于实现私有网络和公有网络之间的转换 私有和公有网络地址 公有网络地址是指互联网上全球唯一的IP地址 私有网络地址是指内部网络或者主机的IP地址 IANA&#xff08;互联网数字分配机制&#xff09;规定将下列的IP地…

公网地址和私网地址问题

服务器映射用于将内网服务器的私网地址映射为公网地址&#xff0c;供Internet用户访问。选择“静态映射”类型可以将每一台服务器映射成一个独立的公网IP地址。“服务器负载均衡”类型可以将多台服务器映射成同一个公网地址&#xff0c;Internet用户在访问这个公网地址时&#…

私网地址与Internet地址

一、A、B、C三类地址 可用地址范围备注A类1.0.0.1-126.255.255.254B类128.1.0.1-191.255.255.254C类192.0.1.1-223.255.255.254D类224.0.0.1-239.255.255.254D类为多播地址 说明&#xff1a; 1. 每一个地址都是用网络位主机位组成的。 2. 全0的和全1的网络位和主机位都要去掉…

计算机网络 网络层 私网地址和公网地址及子网划分

公网地址 公有地址分配和管理由Inter NIC&#xff08;Internet Network Information Center 因特网信息中心&#xff09;负责。各级ISP使用的公网地址都需要向Inter NIC提出申请&#xff0c;有Inter NIC统一发放&#xff0c;这样就能确保地址块不冲突。 私网地址&#xff08;不…

为什么百度查到的ip和ipconfig查到的不一样;详解公网Ip和私网ip;详解网络分类ABC;

IP可以分为Public IP 和 Private IP,出现这种规划的原因在于IPv4所能表示的IP太少而电脑太多以至于不够用&#xff0c;然而只有Public IP才能直接连接上网络&#xff0c;所以对于那些公司&#xff0c;学校&#xff0c;政府机构等场所&#xff0c;就可以集中使用私有的IP进行管理…

挑战华为社招:掌握数据库其实很容易

前言 我的一个朋友,开发四年了,没跳过槽,四年时间也不过是从最开始的10K涨到了15K,经常和我吐槽工资低。去年8月份左右开始了他“骑驴找马”的行动,从各种地方找学习资料、刷面试题。值得庆幸的是,他出去找工作时疫情还不严重,异常顺利的面进了蚂蚁,薪资更是翻了几倍。…

javaspringboot面试,挑战华为社招

前言 redis简单来说 就是一个数据库&#xff0c;不过与传统数据库不同的是 redis 的数据是存在内存中的&#xff0c;所以存写速度非常快&#xff0c;因此 redis 被广泛应用于缓存方向。另外&#xff0c;redis 也经常用来做分布式锁。redis 提供了多种数据类型来支持不同的业务…

挑战华为社招:不止面试题,笔记源码统统都有,最强技术实现

前言 说起来开始进行面试是11月倒数第二周&#xff0c;上午9点&#xff0c;我还在去公司的公交上&#xff0c;突然收到蚂蚁的面试电话&#xff0c;其实算不上真正的面试。面试官只是和我聊了下他们在做的事情&#xff08;主要是做双十一这里大促的稳定性保障&#xff0c;偏中间…

挑战华为社招:字节跳动上千道精选面试题还不刷起来

前言 成为优秀的架构师是大部分初中级工程师的阶段性目标。优秀的架构师往往具备七种核心能力&#xff1a;编程能力、调试能力、编译部署能力、性能优化能力、业务架构能力、在线运维能力、项目管理能力和规划能力。 这几种能力之间的关系大概如下图。编程能力、调试能力和编…

华为社招面试

工作第三年&#xff0c;在某招聘软件上填写简历后接到华为HR面试邀请&#xff0c;面试部门为运营商路由器&#xff0c;网上查看岗位相关要求之后发现与自己十分不匹配&#xff0c;不过机会难得&#xff0c;所以决定抱着学习的态度去参加面试。 2018年3月3号周六前往华为北研所Q…