storm从0.9.0开始,增加了指标统计框架,用来收集应用程序的特定指标,并将其输出到外部系统。
本文中采用的监听类是LoggingMetricsConsumer,统计指标值将输出到metric.log日志文件中。
当然也可以自定义监听类,只需要实现 IMetricsConsumer接口即可,这些类可以在代码里注册(registerMetricsConsumer),也可以在 storm.yaml配置文件中注册:
topology.metrics.consumer.register: - class: "backtype.storm.metrics.LoggingMetricsConsumer" parallelism.hint: 1 - class: "org.mycompany.MyMetricsConsumer" parallelism.hint: 1 argument: - endpoint: "metrics-collector.mycompany.org" |
下面简单修改storm-starter包中的例子,统计三个指标:
1、每5秒统计execute方法的调用次数
2、每1分钟统计每个单词的出现次数
3、每1分钟统计所有出现的单词的平均长度
public class ExclamationTopology { public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; //定义指标统计对象 transient CountMetric _countMetric; transient MultiCountMetric _wordCountMetric; transient ReducedMetric _wordLengthMeanMetric; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; initMetrics(context); } @Override public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); updateMetrics(tuple.getString(0)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } //初始化计数器 void initMetrics(TopologyContext context) { _countMetric = new CountMetric(); _wordCountMetric = new MultiCountMetric(); _wordLengthMeanMetric = new ReducedMetric(new MeanReducer()); context.registerMetric("execute_count", _countMetric, 5); context.registerMetric("word_count", _wordCountMetric, 60); context.registerMetric("word_length", _wordLengthMeanMetric, 60); } //更新计数器 void updateMetrics(String word) { _countMetric.incr(); _wordCountMetric.scope(word).incr(); _wordLengthMeanMetric.update(word.length()); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); Config conf = new Config(); conf.setDebug(true); //输出统计指标值到日志文件中 conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 2); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } } } |
统计结果的获取与清空(源码):
*********************************************************** CountMetric *********************************************************** public Object getValueAndReset() { long ret = _value; _value = 0; return ret; } *********************************************************** MultiCountMetric *********************************************************** public Object getValueAndReset() { Map ret = new HashMap(); for(Map.Entry<String, CountMetric> e : _value.entrySet()) { ret.put(e.getKey(), e.getValue().getValueAndReset()); } return ret; } *********************************************************** ReducedMetric *********************************************************** public Object getValueAndReset() { Object ret = _reducer.extractResult(_accumulator); _accumulator = _reducer.init(); return ret; } |
Storm UI界面:
metric日志内容节选——
某一台机器上的日志:
2014-04-01 00:57:38,855 64645 1396339058 ubuntu:6703 8:exclaim1 execute_count 776 2014-04-01 00:58:03,855 89645 1396339083 ubuntu:6703 8:exclaim1 execute_count 66 2014-04-01 01:01:38,878 304668 1396339298 ubuntu:6703 8:exclaim1 word_count {bertels=172, jackson=152, nathan=155, mike=155, golda=155} 2014-04-01 01:01:38,878 304668 1396339298 ubuntu:6703 8:exclaim1 word_length 5.821292775665399 2014-04-01 01:03:38,878 424668 1396339418 ubuntu:6703 8:exclaim1 word_count {bertels=161, jackson=156, nathan=151, mike=158, golda=163} 2014-04-01 01:03:38,878 424668 1396339418 ubuntu:6703 8:exclaim1 word_length 5.79467680608365 2014-04-01 01:03:48,937 434727 1396339428 ubuntu:6703 8:exclaim1 execute_count 65 2014-04-01 01:03:58,935 444725 1396339438 ubuntu:6703 8:exclaim1 execute_count 65 2014-04-01 01:05:38,879 544669 1396339538 ubuntu:6703 8:exclaim1 word_count {bertels=165, jackson=167, nathan=144, mike=146, golda=165} 2014-04-01 01:05:38,879 544669 1396339538 ubuntu:6703 8:exclaim1 word_length 5.841168996188056 |
另一台机器上的日志:
2014-04-01 00:57:00,934 12756 1396339020 ubuntu:6703 7:exclaim1 execute_count 95 2014-04-01 00:57:00,958 12780 1396339020 ubuntu:6702 9:exclaim2 execute_count 95 2014-04-01 00:57:00,959 12781 1396339020 ubuntu:6703 10:exclaim2 execute_count 94 2014-04-01 00:57:05,927 17749 1396339025 ubuntu:6702 6:exclaim1 execute_count 97 2014-04-01 00:57:10,948 22770 1396339030 ubuntu:6702 9:exclaim2 execute_count 99 2014-04-01 00:57:15,931 27753 1396339035 ubuntu:6703 7:exclaim1 execute_count 100 2014-04-01 00:58:55,949 127771 1396339135 ubuntu:6702 9:exclaim2 word_length 8.829391891891891 2014-04-01 00:58:55,963 127785 1396339135 ubuntu:6703 10:exclaim2 word_count {bertels!!!=250, golda!!!=216, jackson!!!=237, nathan!!!=231, mike!!!=250} 2014-04-01 00:58:55,963 127785 1396339135 ubuntu:6703 10:exclaim2 word_length 8.806587837837839 2014-04-01 00:59:55,918 187740 1396339195 ubuntu:6702 6:exclaim1 word_count {bertels=256, jackson=248, nathan=234, mike=224, golda=219} 2014-04-01 00:59:55,995 187817 1396339195 ubuntu:6703 10:exclaim2 execute_count 98 2014-04-01 01:00:00,994 192816 1396339200 ubuntu:6702 9:exclaim2 execute_count 98 2014-04-01 01:00:05,966 197788 1396339205 ubuntu:6703 7:exclaim1 execute_count 99 2014-04-01 01:00:55,949 247771 1396339255 ubuntu:6702 9:exclaim2 word_count {bertels!!!=248, golda!!!=233, jackson!!!=220, nathan!!!=241, mike!!!=240} 2014-04-01 01:00:55,965 247787 1396339255 ubuntu:6703 10:exclaim2 word_count {bertels!!!=258, golda!!!=240, jackson!!!=214, nathan!!!=258, mike!!!=215} 2014-04-01 01:00:55,965 247787 1396339255 ubuntu:6703 10:exclaim2 word_length 8.832911392405064 |