Flink清洗日志服务SLS的数据并求ACUPCU

article/2025/8/29 11:51:22

上文说到为什么使用Flink实时消费阿里云日志服务SLS的数据,并把阿里云上Flink消费SLS的代码粘贴到本地,做了相关修改之后成功把整个流程跑通了。但仅仅这样是不够的,从控制台上面输出的数据来看是个比较难看的字符串,可以说没多大用处。因此本文主要是继续使用Flink来对从日志服务SLS过来的数据做一系列的清洗,然后再计算几个工作中的指标。

相关ETL代码如下,就是把需要使用到的各个字段提取出来,后期做迭代的时候再把下面代码重新封装一下

public static final class EtlFlatMapFunction implements FlatMapFunction<RawLogGroupList, Tuple3<String, String, Long>> {@Overridepublic void flatMap(RawLogGroupList rawLogGroupList, Collector<Tuple3<String, String, Long>> collector) throws Exception {Iterator rawLogGroupsIterator = rawLogGroupList.rawLogGroups.iterator();while (rawLogGroupsIterator.hasNext()) {RawLogGroup lg = (RawLogGroup) rawLogGroupsIterator.next();// String source = lg.source;// Map<String, String> tags = lg.tags;String topic = lg.topic;Iterator<RawLog> rowLogs = lg.logs.iterator();while (rowLogs.hasNext()) {RawLog rowLog = rowLogs.next();Map<String, String> contents = rowLog.getContents();Long online = Long.parseLong(contents.getOrDefault("online", "0"));String logTime = contents.getOrDefault("time", "0");String changeLogTime = logTime.substring(0, 17) + "00";collector.collect(new Tuple3<>(topic, changeLogTime, online));}}}}

从上图可以看到,ETL成功了。

下面我们一起来算ACU和PCU两个指标

代码写好之后,检查了一遍又一遍,代码的逻辑没有问题,但就是如下图那样标红,当时也比较郁闷,百思不得其解,因为我在另外一个编辑器使用AggregateFunction是完全没有问题的。

 后来经过一番思考之后,恍然大悟,Flink的版本问题!日志服务SLS这个source支持的Flink版本是1.3.2,而我另外一个编辑器的Flink版本是1.7.2。想到之后,瞬间无语。

因为不同的Flink版本,AggregateFunction的返回值的类型是不一样的,如下图是Flink1.3.2中add的返回值类型

下图是Flink1.7.2中add的返回值类型

这时候,我想耍一个小心机,能不能直接把Flink的版本升级到1.7.2呢?答案是不行,如下图所示,少包了。

那没有办法,还是用回Flink1.3.2吧,然后把add的返回类型设置为void,并稍微修改add的逻辑

        @Overridepublic void add(Tuple3<String, String, Long> value, Tuple3<Long, Long, Long> accumulator) {
//            Long pcu = Math.max(value.f2, accumulator.f2);accumulator.f0 += value.f2;accumulator.f1 += 1L;accumulator.f2 = Math.max(value.f2, accumulator.f2);
//            return new Tuple3<>(accumulator.f0 + value.f2, accumulator.f1 + 1L, pcu);}

好,运行吧,成功!! 

下面是整个程序的主体代码

package com.flink.onlinenumber;import com.aliyun.openservices.log.flink.FlinkLogConsumer;
import com.aliyun.openservices.log.flink.data.RawLogGroupList;
import com.aliyun.openservices.log.flink.data.RawLogGroupListDeserializer;
import com.aliyun.openservices.log.flink.util.Consts;
import com.flink.functions.OnlineNumberFunctions;
import com.flink.utils.SlsProps;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;import java.util.*;public class RealTimeOnlineNumApp {public static void main(String[] args) throws Exception {final Properties configProps = SlsProps.getSlsConfigProps("mafia-online_number", Consts.LOG_BEGIN_CURSOR);final OutputTag<Tuple3<String, String, Long>> lateOutputTag = new OutputTag<Tuple3<String, String, Long>>("acu-puc-late-data"){};// 设置日志服务的消息反序列化方法RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启Flink exactly once语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 每5s保存一次checkpointenv.enableCheckpointing(5000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<RawLogGroupList> logStream = env.addSource(new FlinkLogConsumer<>(deserializer, configProps));DataStream<Tuple3<String, String, Long>> etlLogStream = logStream.flatMap(new OnlineNumberFunctions.EtlFlatMapFunction());// 入库MySQLetlLogStream.print();// 求acu pcuDataStream<Tuple3<String, String, Long>> etlLogStreamWithTsWm = etlLogStream.assignTimestampsAndWatermarks(new OnlineNumberFunctions.AcuPcuWatermarksPeriodicGenerator());etlLogStreamWithTsWm.keyBy(0).timeWindow(Time.minutes(30)).allowedLateness(Time.minutes(5))
//                .sideOutputLateData(lateOutputTag)
//                .aggregate(new OnlineNumberFunctions.AcuPcuAggregateFunc())
//                .reduce(new EtlLogReduceFunc())
//                .process(new OnlineNumberFunctions.EtlLogProcessWindowFunction()).aggregate(new OnlineNumberFunctions.AcuPcuAggregateFunc(),new OnlineNumberFunctions.EtlLogReduceProcessWindowFunction()).print();.getSideOutput(lateOutputTag)env.execute("RealTimeOnlineNumApp");}}

可以看到,上面的代码我已经做了一些优化,比如说,我一开始是使用的ProcessWindowFunction来做聚合的,但是ProcessWindowFunction是等到窗口触发的时候把窗口内所有的数据做聚合,显然,这样的性能是不高的,当然我可以直接使用reduce来直接做增量聚合,这样的话每来一条数据就处理一条,但是问题来了,这个需求还是需要返回窗口开始和结束的时间,而这是reduce并没有提供的,因此,最后选择了aggregate,既可以做增量聚合,又可以获取到窗口的相关元信息,只需要把AggregateFunction和ProcessWindowFunction的对象作为参数传进aggregate方法就可以了,注意,如果aggregate只有一个参数,它也仅仅是增量聚合而已。

下一篇,我会对之前写的代码进行迭代和优化,敬请期待!!

 

 

 

 

 


http://chatgpt.dhexx.cn/article/1BXmuG5u.shtml

相关文章

pcu tps_Mac版Microsoft Office 2011重新定义您的TPS报告体验

pcu tps Office 2011 for Mac is going to be released in a couple of days, and we got our hands on the latest version already. Here’s a quick tour of some of the new features in the latest version of Office. Mac版Office 2011将在几天内发布&#xff0c;我们已…

网站服务器主要是pcu还是内存,PCU占用一直跑的很高的真正元凶

windows中有一个很神秘的进程SVCHOST&#xff0c;大多的用户对于他都很不了解。其实他是windows的服务器宿主&#xff0c;很多系统自带的服务器都要通过它来运行。用一个比喻的话&#xff0c;它相当于系统服务的马甲。但是SVCHOST有时候CPU占用会非常高&#xff0c;如何看透它的…

分形之谢尔宾斯基(Sierpinski)三角形

谢尔宾斯基三角形&#xff08;英语&#xff1a;Sierpinski triangle&#xff09;是一种分形&#xff0c;由波兰数学家谢尔宾斯基在1915年提出,它是一种典型的自相似集。也有的资料将其称之为谢尔宾斯基坟垛. 其生成过程为: 取一个实心的三角形。&#xff08;多数使用等边三角形…

AutoJs学习-实现谢尔宾斯基三角

往期文章分享 点击跳转>《导航贴》- Unity手册&#xff0c;系统实战学习点击跳转>《导航贴》- Android手册&#xff0c;重温移动开发 &#x1f449;关于作者 众所周知&#xff0c;人生是一个漫长的流程&#xff0c;不断克服困难&#xff0c;不断反思前进的过程。在这个过…

Python数据结构15:turtle模块制图,画直线,正方形,星星,递归可视化:分形树,谢尔宾斯基三角形

1. Python中的turtle模块制图 前面已经讲了递归的原理&#xff0c;这里用递归作图来直观的理解递归。 首先了解以下Python中用于作图的内置海龟作图系统turtle module。 Python内置&#xff0c;随时可用&#xff0c;以LOGO语言的创意为基础。 其意象为模拟海龟在沙滩上爬行而留…

关于谢尔宾斯基地毯的讲解

和谢尔宾斯基三角形一样&#xff0c;谢尔宾斯基地毯也是数学家谢尔宾斯基提出的一个分形图形&#xff0c;谢尔宾斯基地毯和谢尔宾斯基三角形基本类似&#xff0c;不同之处在于谢尔宾斯基地毯采用的是正方形进行分形构造&#xff0c;而谢尔宾斯基三角形采用的等边三角形进行分形…

基于马尔可夫链的谢尔宾斯基三角形(sierpinski)产生仿真

目录 1.算法仿真效果 2.MATLAB源码 3.算法概述 4.部分参考文献 1.算法仿真效果 matlab2022a仿真结果如下: 2.MATLAB源码 %***********

turtle库使用——谢尔宾斯基三角形

谢尔宾斯基三角形本质上是分形。所谓分形是一个几何图形&#xff0c;它可以分为许多部分&#xff0c;每个部分皆是整体的缩小版。这个三角形的建立概念如下&#xff1a; 1.建立一个等边三角形&#xff0c;这个三角形称0阶&#xff08;order0&#xff09;谢尔宾斯基三角形。 2…

分形之谢尔宾斯基(Sierpinski)地毯

前面讲了谢尔宾斯基三角形,和这一节的将把三角形变为正方形,即谢尔宾斯基地毯,它是由瓦茨瓦夫谢尔宾斯基于1916年提出的一种分形&#xff0c;是自相似集的一种。 谢尔宾斯基地毯的构造与谢尔宾斯基三角形相似&#xff0c;区别仅在于谢尔宾斯基地毯是以正方形而非等边三角形为基…

turtle递归作图绘制谢尔宾斯基地毯【详解】

了解谢尔宾斯基地毯 我们先从谢尔宾斯基三角形讲起&#xff1a; 谢尔宾斯基三角形&#xff08;英语&#xff1a;Sierpinski triangle&#xff09;是一种分形&#xff0c;由波兰数学家谢尔宾斯基在1915年提出。它是自相似集的例子。 以下是0到3阶的谢尔宾斯基三角形&#xff1a…

Python实验舱谢尔宾斯基地毯绘制教程

谢尔宾斯基地毯&#xff0c;和谢尔宾斯基三角形相似&#xff0c;下图就是谢尔宾斯基地毯 一&#xff0c;起始代码 p.width(3) p.speed(10) import turtle nint(input()) baseint(200/(3 ** n)) lengthbase * (3 ** n) p.pensize(0.1) turtle.tracer(False) p.pencolor(blue) p…

Matlab 谢尔宾斯基三角形

本文主要介绍了一种谢尔宾斯基三角形生产方法。谢尔宾斯基三角形是混沌与分形学里面的一个经典案例&#xff0c;能生成谢尔宾斯基三角形的方法很多&#xff0c;我采用的方法是取中点的方式&#xff0c;具体流程如下&#xff1a; 1.在任意三角形△ABC内或外任取一点P&#xff1…

关于谢尔宾斯基三角(Sierpinski)的讲解

谢尔宾斯基三角&#xff08;Sierpinski&#xff09;是一种分形&#xff0c;由波兰数学家谢尔宾斯基在1915年提出。它是自相似集的例子。它的豪斯多夫维是log(3)/log(2) ≈ 1.585。 * 图一&#xff1a;完成后的 谢尔宾斯基三角

Python递归绘制谢尔宾斯基三角形

首先&#xff0c;回顾递归的概念&#xff1a; 一个函数不停地调用它本身&#xff0c;我们就认为这个函数使用了递归。 先来看一个谢尔宾斯基三角形的示例&#xff1a; 所以首先&#xff0c;我们需要一个方法&#xff0c;它能帮我们画三角形&#xff0c;在这里画一个三角形&am…

谢尔宾斯基地毯的讲解

谢尔宾斯基地毯是数学家谢尔宾斯基提出的一个分形图形&#xff0c;谢尔宾斯基地毯和谢尔宾斯基三角形基本类似&#xff0c;不同之处在于谢尔宾斯基地毯采用的是正方形进行分形构造&#xff0c;而谢尔宾斯基三角形采用的等边三角形进行分形构造。谢尔宾斯基地毯和它本身的一部分…

Java面向对象编程:利用递归思想绘制“谢尔宾斯基地毯”和“谢尔宾斯基三角形”

1、递归&#xff1a;在方法中调用本方法。 2、递归调用会无限循环下去&#xff0c;因此方法体中必须有结束方法的条件。返回值为void时通常写为&#xff1a; if (条件) {return; } 下面使用递归绘制“谢尔宾斯基地毯”和“谢尔宾斯基三角形”。 谢尔宾斯基地毯。 1&#xff…

谢尔宾斯基三角形:Python+turtle

本来觉得谢尔宾斯基三角形挺难的。分析以后&#xff0c;其实还是挺简单的。 挺好理解&#xff0c;供大家一起学习 """功能&#xff1a;绘制谢尔宾斯基三角形环境&#xff1a;python3.7日期&#xff1a;2019/1/14 21:49作者&#xff1a;指尖魔法师版本&#xf…

用PYTHON画谢尔宾斯基三角形(代码可复制)

今天我们来画一个谢尔宾斯基三角形。 好我们先看原图: 这是一个等边三角形&#xff0c;把每一条边平均分成两份&#xff0c;再把这三个焦点连起来&#xff0c;形成四个等边三角形&#xff0c;中间的一个不看&#xff0c;紧接着再继续按着刚刚的步骤走&#xff0c;就可以画出来…

Python实验舱谢尔宾斯基三角形绘制教程

三角形&#xff0c;大家应该再熟悉不过了。 上图是一个由四个小三角形拼成的大三角形&#xff0c;也叫谢尔宾斯基三角形。 谢尔宾斯基三角形还能继续画下去&#xff1a; 2阶&#xff08;上图是1阶&#xff09;&#xff1a; 3阶&#xff1a; 5阶&#xff1a; 8阶&#xff1a; …

谢尔宾斯基三角新

谢尔宾斯基三角形&#xff08;英语&#xff1a;Sierpinski triangle&#xff09;是一种分形&#xff0c;由波兰数学家谢尔宾斯基在1915年提出。它是自相似集的例子。它的豪斯多夫维 1.取一个实心的三角形。&#xff08;多数使用等边三角形&#xff09; 2.沿三边中点的连线&am…