大数据实战:用户流量分析系统

article/2025/9/21 3:53:44

---------------------------------------------------------------------------------------------------------------

[版权申明:本文系作者原创,转载请注明出处]

文章出处:http://blog.csdn.net/sdksdk0/article/details/51628874

作者:朱培

---------------------------------------------------------------------------------------------------------------


本文是结合hadoop中的mapreduce来对用户数据进行分析,统计用户的手机号码、上行流量、下行流量、总流量的信息,同时可以按照总流量大小对用户进行分组排序等。是一个非常简洁易用的hadoop项目,主要用户进一步加强对MapReduce的理解及实际应用。文末提供源数据采集文件和系统源码。

本案例非常适合hadoop初级人员学习以及想入门大数据、云计算、数据分析等领域的朋友进行学习。

一、待分析的数据源

以下是一个待分析的文本文件,里面有非常多的用户浏览信息,保扩用户手机号码,上网时间,机器序列号,访问的IP,访问的网站,上行流量,下行流量,总流量等信息。这里只截取一小段,具体文件在文末提供下载链接。


二、基本功能实现

想要统计出用户的上行流量、下行流量、总流量信息,我们需要建立一个bean类来对数据进行封装。于是新建应该Java工程,导包,或者直接建立一个MapReduce工程。在这里面建立一个FlowBean.java文件。

        private long upFlow;private long dFlow;private long sumFlow;
然后就是各种右键生成get,set方法,还要toString(),以及生成构造函数,(千万记得要生成一个空的构造函数,不然后面进行分析的时候会报错)。
完整代码如下:
package cn.tf.flow;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;public class FlowBean  implements WritableComparable<FlowBean>{private long upFlow;private long dFlow;private long sumFlow;public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getdFlow() {return dFlow;}public void setdFlow(long dFlow) {this.dFlow = dFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public FlowBean(long upFlow, long dFlow) {super();this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow+dFlow;}@Overridepublic void readFields(DataInput in) throws IOException {upFlow=in.readLong();dFlow=in.readLong();sumFlow=in.readLong();}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(dFlow);out.writeLong(sumFlow);}public FlowBean() {super();}@Overridepublic String toString() {return  upFlow + "\t" + dFlow + "\t" + sumFlow;}@Overridepublic int compareTo(FlowBean o) {return this.sumFlow>o.getSumFlow() ? -1:1;}}

然后就是这个统计的代码了,新建一个FlowCount.java.在这个类里面,我直接把Mapper和Reduce写在同一个类里面了,如果按规范的要求应该是要分开写的。
在mapper中,获取后面三段数据的值,所以我的这里length-2,length-3.
       public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 拿到这行的内容转成stringString line = value.toString();String[] fields = StringUtils.split(line, "\t");try {if (fields.length > 3) {// 获得手机号及上下行流量字段值String phone = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dFlow = Long.parseLong(fields[fields.length - 2]);// 输出这一行的处理结果,key为手机号,value为流量信息beancontext.write(new Text(phone), new FlowBean(upFlow, dFlow));} else {return;}} catch (Exception e) {}}}

 
在reduce中队数据进行整理,统计
public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long upSum = 0;long dSum = 0;for (FlowBean bean : values) {upSum += bean.getUpFlow();dSum += bean.getdFlow();}FlowBean resultBean = new FlowBean(upSum, dSum);context.write(key, resultBean);}}


最后在main方法中调用执行。
public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowCount.class);job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}
当然啦,还需要先在你的hdfs根目录中建立/flow/data数据,然后我那个用户的数据源上传上去。
 bin/hadoop fs -mkdir -p /flow/databin/hadoop fs -put HTTP_20130313143750.dat /flow/databin/hadoop jar  ../lx/flow.jar

把上面这个MapReduce工程打包成一个jar文件,然后用hadoop来执行这个jar文件。例如我放在~/hadoop/lx/flow.jar,然后再hadoop安装目录中执行
bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCount  /flow/data  /flow/output

最后执行结果如下:



在这整过过程中,我们是有yarnchild的进程在执行的,如下图所示:当整个过程执行完毕之后yarnchild也会自动退出。

三、按总流量从大到小排序

如果你上面这个基本操作以及完成了的话,按总流量排序就非常简单了。我们新建一个FlowCountSort.java.

全部代码如下:

package cn.tf.flow;import java.io.IOException;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCountSort {public static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line=value.toString();String[] fields=StringUtils.split(line,"\t");String phone=fields[0];long upSum=Long.parseLong(fields[1]);long dSum=Long.parseLong(fields[2]);FlowBean sumBean=new FlowBean(upSum,dSum);context.write(sumBean, new Text(phone));}	
}public static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{//进来的“一组”数据就是一个手机的流量bean和手机号@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(values.iterator().next(), key);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowCountSort.class);job.setMapperClass(FlowCountSortMapper.class);job.setReducerClass(FlowCountSortReducer.class);job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}}

这个主要就是使用了FlowBean.java中的代码来实现的,主要是继承了WritableComparable<FlowBean>接口来实现,然后重写了compareTo()方法。

@Overridepublic int compareTo(FlowBean o) {return this.sumFlow>o.getSumFlow() ? -1:1;}
按照同样的方法对这个文件打成jar包,然后使用hadoop的相关语句进行执行就可以了。

bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCountSort  /flow/output  /flow/sortoutput
结果图:




四、按用户号码区域进行分类

流量汇总之后的结果需要按照省份输出到不同的结果文件中,需要解决两个问题:

 1、如何让mr的最终结果产生多个文件: 原理:MR中的结果文件数量由reduce
  task的数量绝对,是一一对应的 做法:在代码中指定reduce task的数量
 
 
  2、如何让手机号进入正确的文件 原理:让不同手机号数据发给正确的reduce task,就进入了正确的结果文件
  要自定义MR中的分区partition的机制(默认的机制是按照kv中k的hashcode%reducetask数)
  做法:自定义一个类来干预MR的分区策略——Partitioner的自定义实现类

主要代码与前面的排序是非常类似的,只要在main方法中添加如下两行代码就可以了。

          //指定自定义的partitionerjob.setPartitionerClass(ProvincePartioner.class);job.setNumReduceTasks(5);

这里我们需要新建一个ProvincePartioner.java来处理号码分类的逻辑。

public class ProvincePartioner extends Partitioner<Text, FlowBean>{private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();static {provinceMap.put("135", 0);provinceMap.put("136", 1);provinceMap.put("137", 2);provinceMap.put("138", 3);		}@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {String prefix = key.toString().substring(0, 3);Integer partNum = provinceMap.get(prefix);if(partNum == null) partNum=4;return partNum;}}

执行方法和前面也是一样的。从执行的流程中我们可以看到这里启动了5个reduce task,因为我这里数据量比较小,所以只启动了一个map task。



到这里,整个用户流量分析系统就全部结束了。关于大数据的更多内容,欢迎关注。点击左上角头像下方“点击关注".感谢您的支持!



数据源下载地址:http://download.csdn.net/detail/sdksdk0/9545935

源码项目地址:https://github.com/sdksdk0/HDFS_MapReduce





http://chatgpt.dhexx.cn/article/vE6ZNQQk.shtml

相关文章

大数据离线---网站日志流量分析系统(1)---简介及框架

本次介绍网站日志流量分析系统&#xff0c;首先是简介和架构。后面会对架构中需要的每个模块的进行逐个介绍。本篇主要分为两个部分 网站日志流量分析系统简介整体技术流程和架构 1. 网站日志流量分析系统简介 1.1点击流数据模型 点击流的概念 点击流&#xff08; Click St…

大数据流量分析系统

大数据流量分析系统 前几天去了移动营业厅办了个卡&#xff0c;就不说某动的内幕了&#xff0c;说说上行流量和下行流量统计手机号码的总流量。这次我们利用大数据来分析&#xff0c;大数据也有三大框架&#xff0c;Hadoop是离线计算框架&#xff0c;Storm是实时计算框架&…

使用流量分析系统进行资产梳理

很多网络管理人员都接触资产梳理&#xff0c;也有很多软件和系统附带资产梳理工具&#xff0c;但是根据明辰智航统计&#xff0c;很多企事业单位资产梳理工作是以前进行的&#xff0c;没有及时更新。网络管理人员甚至不了解企业网络中有多少个MAC地址、IP地址、端口、协议、应用…

网络异常流量分析系统设计

为什么要监测分析异常流量&#xff1f; 网络管理人员都希望在网络性能突然下降的时候找到“真凶”所在&#xff0c;并迅速解决问题。利用网络异常流量监测的方法&#xff0c;可以非常直观地看到网络流量是否有突然增长或者突然下降的现象&#xff0c;并进一步分析是那些用户产…

国内首个开源网络流量可视化分析平台 -- 流影

流影:基于流量的网络行为高级分析平台 流影是一款基于全流量的高级网络行为分析平台,该系统是由深海鱼(北京)科技有限公司流影项目组研发设计,首发开源是1.0版本。 项目简介 深海鱼(北京)科技有限公司专注于为客户提供优质的数据分析相关服务,近年来立足于客户的数字安…

大数据综合项目--网站流量日志数据分析系统(详细步骤和代码)

文章目录 前言&#xff1a;基本概述Sqoop概述什么是Sqoop Flume概述什么是Flume为什么需要flume HIve概述什么是Hive 系统背景&#xff1a;模块开发数据采集使用Flume搭建日志采集系统 数据预处理实现数据预处理 数据仓库开发数据导出日志分析系统报表展示 前言&#xff1a; 提…

实战演习(四)——网络流量系统分析简介

笔者是一个痴迷于挖掘数据中的价值的学习人&#xff0c;希望在平日的工作学习中&#xff0c;挖掘数据的价值&#xff0c;找寻数据的秘密&#xff0c;笔者认为&#xff0c;数据的价值不仅仅只体现在企业中&#xff0c;个人也可以体会到数据的魅力&#xff0c;用技术力量探索行为…

网络流量在线分析系统的设计与实现

编译环境&#xff1a;visual studio2019&#xff0c;安装并配置winpcap和pthreads库函数 1 配置环境 1.1 安装vscode 参考微信公众号 软件安装管家。 1.2 安装MinGW-w64 下载地址&#xff1a;添加链接描述 安装参考博客&#xff1a;MinGW-w64安装配置全过程 第一步&#xf…

时间范围查询

开发工具与关键技术&#xff1a; VS,条件查询 作者&#xff1a;卢惠圳 撰写时间&#xff1a;2019年8月16日 在页面的表格数据中会有要根据时间来作为查询条件的数据&#xff0c;一般是根据时间范围来进行查询所以这里也是时间范围的查询&#xff0c;先将表格和进行时间范围查询…

oracle语句查询时间范围

oracle语句查询时间范围 参考网址 &#xff1a; https://blog.csdn.net/weixin_36436373/article/details/116541220?ops_request_misc%257B%2522request%255Fid%2522%253A%2522167930069316800188562500%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%25…

VUE时间范围选择的组件

最近手写了一个框选时间的组件,可以按半个小时为单位选择时间 代码部分 <template><div><div class"byted-schedule"><table :key"itemKey" class"byted-schedule-calendar-table"><thead><tr><th rows…

Mysql获取指定时间范围数据

MySQL获取某个时间范围内的数据 TO_DAYS(date)函数。 to_days()&#xff1a;返回从0000年至当前日期的总天数。 目录 1、今天(TO_DAYS()) 2、今天昨天(TO_DAYS()) 3.近7天(DATE_SUB()) 5.本月(DATE_FORMAT()) 6.上一月(PERIOD_DIFF()) 7.本季度 8.上季度 9.本年 ​1…

Mybatis-Plus时间范围查询

方式一 通过apply方法&#xff0c;来实现时间范围查询&#xff0c;该方法可用于数据库函数&#xff0c;动态入参的params对应前面applySql内部的{index}部分&#xff0c;这样是不会有sql注入风险的&#xff0c;反之会有! apply(String applySql, Object... params) apply(boo…

elementui DateTimePicker组件 限制时间范围(包含时分秒)

1、基础范围控制&#xff08;只控制日期&#xff0c;不含时分秒&#xff09; <template><el-date-pickertype"datetime"v-model"startDate"value-format"yyyy-MM-dd HH:mm:ss"placeholder"选择开始时间"size"mini"…

MongoDB查询某个时间范围

MongoDB 时间范围查询目前有两种方式&#xff1a; DateISODate MongoDB条件对应关系 (>) 大于 - $gt(<) 小于 - $lt(>) 大于等于 - $gte(< ) 小于等于 - $lte Date方式 例如查询时间段为2023.01.03<日期<2023.01.05可翻译为&#xff1a; "日期字段名&…

移动端时间范围选择

div模块 <div className"choseDate"><div className"range"><input classNamestartTime onClick{()>{this.setState({startVisible:true})}} placeholder"请选择开始日期" value{this.state.startTime} /><DatePickert…

sql查询时间范围数据

mapper.xml >查询一定时间范围 条件:年月日 时分秒 <if test"creatAt ! null and creatAt ! "><![CDATA[and DATE_FORMAT(ride_order.created_at, %Y-%m-%d %h-%m-%s)> DATE_FORMAT(#{creatAt}, %Y-%m-%d %h-%m-%s) ]]></if><if test&qu…

JAVA判断当前时间在时间范围内

我们在日常开发的时候肯定有围绕时间选择的一些功能, 今天给大家分享一个java如何判断当前时间是否在所选择时间范围内的一个小demo public static void main(String[] args) throws ParseException {SimpleDateFormat ft new SimpleDateFormat ("yyyy-MM-dd hh:mm:ss&q…

JavaScript 时间范围

当前时间往前的时间范围&#xff08;六个月之前&#xff09; 效果图 js文件代码片 /*查询日期区间&#xff08;当前时间往前&#xff09; Add By Vivian 2020/12/04 */ //rangeVal:两个日期的间隔符 num&#xff1a;隔多少 timeType&#xff1a;相隔时间类型 function funGet…

JavaScript如何判定一个给定的时间区间在哪些时间段范围内?

作者 | 十方 来源 | https://segmentfault.com/a/1190000041958661 有这样的一个场景&#xff1a;给定一个时间区间&#xff0c;需要判定这个时间区间在哪些时间段范围内. 比如时间段范围如下&#xff1a; [["00:00","01:00"],["01:00","02…