storm trident的多数据流,多spout
@(STORM)[storm, kafka]
storm可以使用接收多个spout作为数据源,core storm与trident均可以,本文主要介绍trident的用法。
在trident中设置多个spout的基本思路是先建立多个spout,然后分别创建对应的Stream,并将这些stream merge在一起。
我们这里介绍2个例子,第一个是使用多spout的方式来从kafka读取多个topic,第二个例子是一个spout从kafka读数据,另一个spout定时产生一些数据,拓扑根据这个数据进行定时操作。
(一)使用多个spout同时从kafka中读取多个topic
注意使用storm-kafka从多个topic中读取数据的更优方法请见:https://issues.apache.org/jira/browse/STORM-817?jql=project%20%3D%20STORM%20AND%20status%20in%20%28Open%2C%20%22In%20Progress%22%2C%20Reopened%29%20AND%20component%20%3D%20storm-kafka
这里只是用于介绍如何使用多个spout。
TODO:看看是如何实现的
1、创建第一个spout
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(zkHost, topic, "storm");kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
2、创建第二个spout
TridentKafkaConfig kafkaConfig2 = new TridentKafkaConfig(zkHost, "streaming_g17_sdc", "storm2");kafkaConfig2.scheme = new SchemeAsMultiScheme(new StringScheme());TransactionalTridentKafkaSpout kafkaSpout2 = new TransactionalTridentKafkaSpout(kafkaConfig2);
3、分别基于这2个spout创建一个Stream
TridentTopology topology = new TridentTopology();Stream firstStream = topology.newStream("kafka", kafkaSpout);Stream secondStream = topology.newStream("kafka2", kafkaSpout2);
4、merge这2个stream
topology.merge(firstStream,secondStream).shuffle().each(new Fields("str"), new PrepaidFunction(), new Fields("word"));
(二)定时spout
自定义了一个TimerSpout,用于每10秒发送一条消息。我们先看一下如何使用这个spout,最后才看这个spout是如何定义的。
1、创建spout
ITridentSpout mySpout = new TimerSpout();
2、使用spout
topology.newStream("myspout", mySpout).broadcast().each(new Fields("timer"), new PrepaidFunction(),new Fields("word"));
这里使用了broadcast()以保证每个bolt都收到这个消息。
注意一个拓扑可以形成多个流组合,如除了上面的newStream,还有以下代码:
Stream firstStream = topology.newStream("kafka", kafkaSpout);Stream secondStream = topology.newStream("kafka2", kafkaSpout2);
topology.merge(firstStream, secondStream).each(new Fields("str"), new PrepaidFunction(), new Fields("word"));
3、在bolt中使用这个消息
if(tuple.get(0).toString().contains("$$$TIMERMESSAGE$$$")){//DO something according to the timer.log.info("receive timer=============================at " + Calendar.getInstance().getTime());}
4、看一下这个Spout是如何定义的
(1)Spout
package com.lujinhong.commons.storm.multispout;import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import storm.trident.spout.ITridentSpout;import java.util.Map;public class TimerSpout implements ITridentSpout<Long> {private static final long serialVersionUID = 1L;BatchCoordinator<Long> coordinator = new TimerCoordinator();Emitter<Long> emitter = new TimerEmitter();@Overridepublic BatchCoordinator<Long> getCoordinator(String txStateId, Map conf, TopologyContext context) {return coordinator;}@Overridepublic Emitter<Long> getEmitter(String txStateId, Map conf, TopologyContext context) {return emitter;}@Overridepublic Map getComponentConfiguration() {return null;}@Overridepublic Fields getOutputFields() {return new Fields("timer");}
}
(2)Coordinator
package com.lujinhong.commons.storm.multispout;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.spout.ITridentSpout.BatchCoordinator;import java.io.Serializable;public class TimerCoordinator implements BatchCoordinator<Long>, Serializable {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(TimerCoordinator.class);@Overridepublic boolean isReady(long txid) {return true;}@Overridepublic void close() {}@Overridepublic Long initializeTransaction(long txid, Long prevMetadata, Long currMetadata) {LOG.info("Initializing Transaction [" + txid + "]");return null;}@Overridepublic void success(long txid) {LOG.info("Successful Transaction [" + txid + "]");}
}
(3)Emitter
package com.lujinhong.commons.storm.multispout;import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout.Emitter;
import storm.trident.topology.TransactionAttempt;import java.io.Serializable;
import java.util.Arrays;
import java.util.Calendar;
import java.util.concurrent.atomic.AtomicInteger;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import backtype.storm.tuple.Values;public class TimerEmitter implements Emitter<Long>, Serializable {private static final long serialVersionUID = 1L;private static Logger LOG = LoggerFactory.getLogger(TimerEmitter.class);AtomicInteger successfulTransactions = new AtomicInteger(0);//使用特殊字符,以避免其它消息中含有这个字符串。private static final String TIMER_MESSAGE = "$$$TIMERMESSAGE$$$";@Overridepublic void emitBatch(TransactionAttempt tx, Long coordinatorMeta, TridentCollector collector) {try {LOG.info("sleep 10 seconds");Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}Object v = new Values(TIMER_MESSAGE);LOG.info("Emit one timer tuple at :" + Calendar.getInstance().getTime());collector.emit(Arrays.asList(v));}@Overridepublic void success(TransactionAttempt tx) {successfulTransactions.incrementAndGet();}@Overridepublic void close() {}}
(三)拓扑分析
使用上述代码构建的拓扑,包括2个stream,其中一个stream还包含2个kafkaSpout,即共有2个stream, 3个spout,我们看一下storm UI上的信息:
从上面的2个图可以看出,这个拓扑包含2个独立的处理流程,spouts也包含2个,即分别对应一个newStream()。