Flink1.14相关-3.数据流量计算

article/2025/10/19 1:08:13

1. 前言

使用环境
Flink1.14.6+Centos7.9+Java8
Flink1.14.6安装部署测试参考:参考链接

2 .数据采集

2.1 信号监听

需要监听文件夹下的新文件产生,并且数据库中的值未更新时才发送消息通知后续模块开始采集数据。【后面需要重新搭建监听部分和判断部分】

2.2 数据实时采集

做不到非常实时,一次采集平均时间在4s左右。

信号监听部分获取到新数据后会将新数据对应的shotNum发送到Kafka的Topic【newFilePath】中,数据采集模块【一个是用于持久化的数据采集,不监听这个topic;另一个是实时处理的数据采集,监听newFilePath】。

3. 数据处理

实时处理部分说明【需要进一步修改完善,目前只做测试】:

  1. 有两个采集文件,分别处理不同目录【需要后续补充】。在其接收到newFilePath中的数据,即shotNum后,拼接文件名【三种文件名】,扫描目录并拼接完整路径,文件存在的情况下采集文件元数据【文件当前size;文件修改时间;文件名;所属子树;自定义flag】。
  2. 数据采集完成,通过Topic【file_info】发送到处理程序。
  3. 处理程序接收文件元数据,通过keyby实现按shotnum分组处理。
  4. 处理完成后的数据取三个值【shotnum;datasize;ctime】,并在每一个新的flag中返回上一个flag对应的差值,由此实现数据流的计算。

4. 代码

采集模块通过python构建

4.1 系统依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>ExperimentInfo-Kafka</artifactId><groupId>com.scali</groupId><version>1</version></parent><modelVersion>4.0.0</modelVersion><artifactId>Storage-Flink</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.6</version></dependency>
<!--        欺诈检测的common依赖包,包括一些实体等内容--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-walkthrough-common_2.12</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.14.6</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.14.6</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.14.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.14.6</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.19</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.28</version></dependency></dependencies>
</project>

4.2 监听模块

监听部分代码:

import os
import timefrom kafka import KafkaProducer
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
#监听新文件产生并传输完整路径
#
import readconfigfolder_path = '/home/text'  # 文件夹路径
file_info = {}  # 存储文件信息的字典
patterns = ["*.data"]  # 要监视的文件类型class NewFileHandler(PatternMatchingEventHandler):def on_created(self, event):if not event.is_directory:global file_infofile_path = event.src_pathfile_name = os.path.basename(file_path)list1 = file_name.split('.')[0]shot = list1.split('_')[1]print(shot)def on_deleted(self, event):if not event.is_directory:global file_infofile_path = event.src_path# 如果文件被删除,则从字典中删除文件信息if file_path in file_info:del file_info[file_path]print(f'File deleted: {file_path}')
def sendMsg(msg):kafkaConfig = readconfig.read_kafka()producer = KafkaProducer(bootstrap_servers=[kafkaConfig["host"]],)# 发送Kafka消息producer.send(kafkaConfig['topic2'], msg)print('Send message: {}'.format(msg))
if __name__ == "__main__":event_handler = NewFileHandler(patterns)observer = Observer()observer.schedule(event_handler, folder_path, recursive=True)observer.start()try:while True:time.sleep(1)  # 暂停1秒钟,然后再次检查文件夹中的文件except KeyboardInterrupt:observer.stop()observer.join()

4.3 数据采集模块

import datetime
import os
from json import dumpsimport pymysql
from kafka import KafkaProducer, KafkaConsumerimport readconfigflag = 0
curshot = None
def callMetaFilesize(ospath,shotnum):global flagnumdir = str(shotnum // 1000)# ['east', 't1', 't2', 't3', 't4', 't5', 't6', 't7', 't8']dir_list = os.listdir(ospath)# 将传入的文件夹继续遍历到列表中for dir in dir_list:curdir = ospath + '/' + dir + '/' + numdirfilename1 = dir+"_"+str(shotnum)+".characteristics"isFile = os.path.exists(curdir+"/"+filename1)if isFile == False:continuefiledir = []filedir.append(filename1)filedir.append(dir+"_"+str(shotnum)+".tree")filedir.append(dir+"_"+str(shotnum)+".datafile")for file in filedir:fsize = os.path.getsize(curdir + "/" + file)# 文件最後修改時間# ftime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime((os.path.getmtime(curdir + "/" + file))))try:msg = {"id":flag,"shotnum": shotnum,"subtree": dir,"filename": file,"fsize": fsize,# "ftime": ftime,# "ts_1": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}sendMsg(msg)except:print("kafka error")# 发完一组flag自动加1,这样flink consumer能够按照flag进行分组flag += 1
def sendMsg(msg):producer = KafkaProducer(bootstrap_servers=["202.127.205.60:9002"],value_serializer=lambda x: dumps(x).encode('utf-8'))producer.send("file_info",value=msg)print("send msg :",msg)
#     这部分应该要改
def getMdsMaxShot():mdspath = readconfig.read_mdsdata()db = pymysql.connect(host=mdspath['host'], port=mdspath['port'], user=mdspath['user'],password=mdspath['password'], database=mdspath['database'], charset=mdspath['charset'])sql = """ SELECT treeshot FROM `new_shot`"""cursor = db.cursor()cursor.execute(sql)res = cursor.fetchone()[0]cursor.close()db.close()return res
def recivemsg():global curshotglobal flagconsumer = KafkaConsumer("newFilePath",bootstrap_servers=["202.127.205.60:9002"])# 循环读取消息并进行处理while True:# 从Kafka中读取消息msg = consumer.poll(timeout_ms=1000)# maxMDSShot = getMdsMaxShot()# 如果有新的消息到达if msg:# 从消息中提取数据for tp, msgs in msg.items():for msg in msgs:data = msg.value.decode('utf-8')curshot = dataflag = 0#发一个新的触发事件,将计算的size和置为0,发来新的炮数据了print("Received message: {}".format(data))else:# 添加结束判断if curshot != None:# 如果没有新的消息到达# east和east_1下的数据都要读出来,计算时间差值callMetaFilesize("/home/text", int(curshot))# 度每个目录大概三秒,所以此处可能要开两个流程,east处理east下流程,east_1处理east_1,然后相加求和# callMetaFilesize("/var/ftp/pub/eastdata/arch/east_1", int(curshot))
if __name__ == '__main__':recivemsg()

4.4 数据处理模块

实现流程【主函数DataStreamAchieve】

package com.scali.experiment.datastream;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.scali.experiment.entity.Message;
import com.scali.experiment.flink.FraudDetector;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;import java.util.Properties;public class DataStreamAchieve {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/** 1 . 配置kafka数据接收环境* */Properties kafkaProps = new Properties();kafkaProps.setProperty("bootstrap.servers", "202.127.205.60:9002");kafkaProps.setProperty("group.id", "flink-kafka-json-processing");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("file_info",new SimpleStringSchema(),kafkaProps);
//        kafkaConsumer.setStartFromEarliest();/** 2. datastream转换* */DataStream<String> kafkaStream = env.addSource(kafkaConsumer);DataStream<Message> jsonStream = kafkaStream.map(new MessageMapper());DataStream<Message> resStream = jsonStream.keyBy(Message::getShotnum).process(new MessageProcessFunction());resStream.print();resStream.addSink(new MysqlSink());env.execute("Flink Kafka JSON Processing");}public static class  DataStreamDiff extends KeyedProcessFunction<Long, Message, MysqlSink> {private transient ValueState<Double> sumState;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("sumState",Double.class,0.0);sumState = getRuntimeContext().getState(descriptor);}@Overridepublic void processElement(Message value, KeyedProcessFunction<Long, Message, MysqlSink>.Context ctx, Collector<MysqlSink> out) throws Exception {}}public static class MessageMapper implements MapFunction<String, Message> {@Overridepublic Message map(String json) {// Deserialize JSON to JsonMessage object using fastjson libraryreturn JSON.parseObject(json, Message.class);}}public static class DifferenceMapper implements MapFunction<Message, Message> {private Message lastMessage;@Overridepublic Message map(Message currentMessage) {if (lastMessage != null) {currentMessage.setDifference(currentMessage.getFsizeConverted() - lastMessage.getFsizeConverted());} else {currentMessage.setDifference(currentMessage.getFsizeConverted());}lastMessage = currentMessage;return currentMessage;}}
}

数据处理【MessageProcessFunction】

package com.scali.experiment.datastream;import com.scali.experiment.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Map;/*** @Author:HE* @Date:Created in 10:38 2023/7/4* @Description:*/
@Slf4j
public class MessageProcessFunction extends KeyedProcessFunction<Long, Message, Message> {// 用于记录每个 shotnum 中每个 id 对应的 fsize 总和
//    private Map<Long, Map<Long, Double>> idToSumMap = new HashMap<>();private Long lastId = -1L;private Long lastShotnum = -1L;private Integer countSum = 0;private Map<Long,Map<Long,Double>> shotNumMap = new HashMap<>();@Overridepublic void processElement(Message message, KeyedProcessFunction<Long, Message, Message>.Context ctx, Collector<Message> out) throws Exception {long id = message.getId();long shotnum = message.getShotnum();double fsize = message.getFsize() * 0.0000009537;// 如果当前 shotnum 中还没有记录过当前 id,则初始化为 0shotNumMap.computeIfAbsent(shotnum, k -> new HashMap<>()).putIfAbsent(id, 0.0);
//        idToSumMap.computeIfAbsent(shotnum, k -> new HashMap<>()).putIfAbsent(id, 0.0);// 更新当前 id 的 fsize 总和double sum = shotNumMap.get(shotnum).get(id);shotNumMap.get(shotnum).put(id, sum + fsize);// 如果当前 id =1 输出的是id=0的那组数据if(id != lastId){countSum = 0;}if (1 == id){if(0 == countSum){out.collect(new Message(lastId, lastShotnum,shotNumMap.get(lastShotnum).getOrDefault(lastId, 0.0) ));}}else {if(0 == countSum && -1L != lastId && -1L != lastShotnum){out.collect(new Message(lastId, lastShotnum,shotNumMap.get(lastShotnum).getOrDefault(lastId, 0.0)-shotNumMap.get(lastShotnum).getOrDefault(lastId - 1, 0.0) ));}}countSum++;lastId = id;lastShotnum = shotnum;}
}

mysql持久化部分【MysqlSink】

package com.scali.experiment.datastream;/*** @Author:HE* @Date:Created in 12:32 2023/7/3* @Description:*/import com.scali.experiment.entity.Message;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Timestamp;public class MysqlSink extends RichSinkFunction<Message> {private Connection connection;private PreparedStatement preparedStatement;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Class.forName("com.mysql.cj.jdbc.Driver");connection = DriverManager.getConnection("jdbc:mysql://ip:3306/flink-datastream", "username", "password");preparedStatement = connection.prepareStatement("INSERT INTO datastream(shotnum, fsize, ctime) VALUES (?, ?, ?)");}@Overridepublic void invoke(Message value, Context context) throws Exception {
//        preparedStatement.setLong(1, value.getId());preparedStatement.setLong(1, value.getShotnum());preparedStatement.setDouble(2, value.getDifference());preparedStatement.setTimestamp(3, new Timestamp(context.timestamp()));preparedStatement.executeUpdate();}@Overridepublic void close() throws Exception {super.close();if (preparedStatement != null) {preparedStatement.close();}if (connection != null) {connection.close();}}
}

5 测试效果

目前展示一个输出和数据库截图。代码存在一定问题,后续还需要修改。
在这里插入图片描述
在这里插入图片描述


http://chatgpt.dhexx.cn/article/mrv3ZHcv.shtml

相关文章

适应多场景的客流量统计-人流量统计算法

在商场、展厅、景区等受人流量影响较大的场所&#xff0c;流量统计算法可以快速获取流量数据和动态趋势&#xff0c;辅助评估店铺和部分活动的效果&#xff0c;帮助商业决策。另外&#xff0c;在地铁站、火车站、机场等公共场所。实时检测人数可以及时预警高密度人群&#xff0…

网站的服务器带宽计算,服务器带宽和流量计算方式

服务器带宽和流量计算方式。网站流量和带宽该怎么计算&#xff0c;给一些参照数据信息&#xff0c;由于不清楚流量和带宽是怎么计算的&#xff0c;因此不清楚用多宽的带宽更有效、更节省资产! 网站服务器上最好是安装流量统计手机软件(强烈推荐应用DUMeter)&#xff0c;假如流量…

博途1200/1500PLC累计流量计算FB(SCL算法详解+优化)

在编写这篇博客之前其实已经写过一篇SMART S7-200PLC的流量累计的应用文章,由于很多朋友咨询博途PLC下的流量累计实现算法。今天我们以博途PLC的开发环境为例详细讲解算法的实现原理和注意事项同时给出算法的优化方法。受水平和能力所限,文中难免出现错误和不足之处,诚恳的欢…

阿里云轻量应用服务器流量计算方法

阿里云轻量应用服务器套餐有峰值带宽限制每月流量的&#xff0c;还有固定带宽的&#xff0c;阿里云轻量应用服务器流量是怎么计算的&#xff1f;阿里云轻量应用服务器来说说不同套餐下轻量服务器流量计算方法&#xff1a; 轻量应用服务器带宽套餐和流量套餐 阿里云轻量应用服…

物联网GPRS模块流量计算

物联网GPRS模块流量计算 MQTT(消息队列遥测传输) 是ISO 标准下一个基于TCP/IP的消息发布/订阅传输协议。 一、TCP消耗流量计算 以太网数据包结构&#xff1a; 以太网首部 IP首部 TCP首部 APPL首部 用户数据 以太网尾部 以太网首部为14个字节 IP首部为20个字节 TCP首部…

恒容容器放气的瞬时流量的计算

有时候&#xff0c;你会遇到一个问题&#xff0c;该问题的描述如下&#xff1a; 你有一个已知体积的容器&#xff0c;设容器体积为&#xff0c;里面装有一定压力(初始压力)的气体&#xff0c;如空气或氢气等&#xff0c;设初始压力为&#xff0c;容器出口连接着一个阀门开关&am…

阿里云服务器公网带宽流量是怎么计算的?

阿里云服务器流量如何计算&#xff1f;云服务器出流量和入流量都要计算吗&#xff1f;不是&#xff0c;只计算云服务器公网出方向流量&#xff0c;阿里云服务器内网流量和公网出方向流量都是免费的&#xff0c;护云盾来详细说下阿里云服务器流量计算及流量收费说明&#xff1a;…

计算机网络-流量强度

若R链路带宽&#xff08;链路宽度&#xff09;&#xff0c;L分组长度&#xff08;一个分组的大小&#xff09;&#xff0c;a分组到达队列的平均速率&#xff08;分组数量&#xff09;&#xff0c;流量强度公式 &#xff1a;I La/R 举例理解&#xff1a; 当汽车排队从关卡上桥…

腾讯云服务器带宽计费模式按流量内网、外网出入流量计算说明

腾讯云服务器公网带宽计费模式按使用流量计费&#xff0c;云服务器对内或对外产生的流量如何计算&#xff1f;云服务器出方向&#xff08;下行流量&#xff09;和入方向&#xff08;上行流量&#xff09;怎么计算&#xff1f;腾讯云百科来详细说下腾讯云服务器按使用流量计算说…

计算视频流量

码率也可以叫比特率&#xff0c;就是一种音乐每秒播放的数据量&#xff0c;单位用bit表示&#xff0c;也就是二进制位。 bps就是比特率。b就是比特&#xff08;bit&#xff09;&#xff0c;s就是秒&#xff08;second&#xff09;&#xff0c;p就是每&#xff08;per&#xff0…

电缆载流量计算对照表

10.6/1KV聚氯乙烯绝缘电力电缆载流量 常用型号VV22、VLV22聚氯乙烯绝缘钢带铠装聚氯乙烯护套电力电缆载流量 注&#xff1a;以上电缆载流量计算条件 1. 线芯长期工作温度&#xff1a;70℃&#xff1b; 2. 环境温度&#xff1a;25℃ &#xff1b; 3. 埋地深度&#xff1a;10…

明渠如何快速估算水流量(明渠流量计算)

明渠流量估算一般采用速度面积法估算&#xff0c;如果你有流速仪可以测量渠道的流速&#xff0c;如果没有&#xff0c;可以通过漂浮物与秒表来估算&#xff0c;漂浮物容易受到风速影响&#xff0c;风大了是不行的&#xff0c;比如通过一个漂浮物&#xff0c;在时间t内漂浮的距离…

IPCam网络摄像头

文章目录 软件安装及编译环境搭建及代码获取1、于VirtualBoxVM安装Ubuntu2、Ubuntu开机设定3、MobaXterm安装及开发板连接4、套件安装以及SDK编译5、 如何获取代码6、如何更新代码 IPCam网络摄像头7、如何编译IPcam8、配置板端资源以及环境9、参数配置及运行效果9.1参数配置文件…

如何低成本化实时网络摄像头监控直播?

大众直播时代&#xff0c;处处有直播&#xff0c;直播已经在方方面面改变着人们的生活和工作。随着网络直播应用生态的越发完善&#xff0c;你会发现&#xff0c;很多传统监控升级为互联网直播的应用越来越多。那么&#xff0c;如何将常规监控摄像头转为互联网直播&#xff1f;…

【解决方案】5G时代浪潮来袭,EasyNVR助力5G厂区视频监控安防采集可视化展示

智慧工厂被认为是5G技术的重要应用场景之一&#xff0c;利用5G网络将生产设备无缝连接&#xff0c;并进一步打通设计、采购、仓储、物流等环节&#xff0c;满足工业环境下设备互联和远程交互应用需求。TSINGSEE青犀视频面向工厂智能化升级需求&#xff0c;推出5G智慧工厂方案&a…

多摄像机网络智能视频监控系统设计与实现

1、智能视频监控系统项目背景介绍 2、系统的需求分析 在多摄像机视频监控系统中,通过前端结构化信息提取设备获取单个摄像机的结构化信息以后,需要将数据传输到云端服务器当中进行统一的存储和管理,从而为再识别等服务提供可靠的数据来源,满足更加丰富的用户需求。 本章将…

基于EasyCVR视频技术的“互联网+监管”非现场监管视频监控系统设计方案

一、方案背景 1、行业痛点 1&#xff09;智能化水平弱&#xff0c;管理效率低&#xff1a;传统监管方式比较落后&#xff0c;智能化水平弱&#xff0c;监管工作完全依赖人工&#xff0c;导致人力成本过高、监管盲点多、效率低、服务质量差&#xff1b; 2&#xff09;缺乏感知…

智能视频监控平台

智能视频监控平台 智能视频监控平台是融合网络高清、智能分析、多级管控为一体的综合视频监控平台。主要用于平安城市、社区安防等领域。提供视频直播、视频存储、录像回放、视频分析等功能&#xff0c;支持本地化接入和云平台接入前端摄像头。无需安装客户端&#xff0c;支持…

几种常见的网络摄像头_DVR方案_整理

几种常见的网络摄像头_DVR方案_整理 http://blog.csdn.net/ex_net/article/details/7833334 作者&#xff1a;张建波 邮箱&#xff1a; 281451020qq.com 电话&#xff1a;13577062679 欢迎来电交流&#xff01; 一、华为海思的方案 市面上的4路8路DVR基本都是Hi3515的方案&…

网络摄像机(IPC)介绍

目录 一、什么是网络摄像机(IPC) 二、IPC功能 三、分辨率和存储 四、编码标准 五、AHD和IPC的区别 一、什么是网络摄像机(IPC) 网络摄像机是一种结合传统摄像机与网络技术所产生的新一代摄像机&#xff0c;它可以将视频影像通过网络传至地球另一端&#xff0c;且远端的浏览…