大数据培训 | 电商用户行为分析之订单支付实时监控

article/2025/10/23 2:15:38

在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。在接下来的内容中,我们将实现这两个需求。

模块创建和数据准备

同样地,在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为 OrderTimeoutDetect。在这个子模块中,我们同样将会用到 flink 的 CEP 库来实现事件流的模式匹配,所以需要在 pom 文件中引入 CEP 的相关依赖:

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

同样,在 src/main/目录下,将默认源文件目录 java 改名为 scala。

更多Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com) 

代码实现

在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未支付,订单就会被取消。

使用 CEP 实现

我们首先还是利用 CEP 库来实现这个功能。我们先将事件流按照订单号 orderId分流,然后定义这样的一个事件模式:在 15 分钟内,事件“create”与“pay”非严格紧邻:

val orderPayPattern = Pattern.begin[OrderEvent]("begin")

.where(_.eventType == "create")

.followedBy("follow")

.where(_.eventType == "pay")

.within(Time.seconds(5))

这样调用.select 方法时,就可以同时获取到匹配出的事件和超时未匹配的事件了。

在 src/main/scala 下继续创建 OrderTimeout.scala 文件,新建一个单例对象。定义样例类 OrderEvent,这是输入的订单事件流;另外还有 OrderResult,这是输出显示 的 订 单 状 态 结 果 。 订 单 数 据 也 本 应 该 从 UserBehavior 日 志 里 提 取 , 由 于UserBehavior.csv 中没有做相关埋点,我们从另一个文件 OrderLog.csv 中读取登录数据_大数据培训。

完整代码如下:

OrderTimeoutDetect/src/main/scala/OrderTimeout.scala

case class OrderEvent(orderId: Long, eventType: String, eventTime: Long)

case class OrderResult(orderId: Long, eventType: String)

object OrderTimeout {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val orderEventStream = env.readTextFile("YOUR_PATH\\resources\\OrderLog.csv")

.map( data => {

val dataArray = data.split(",")

OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(3).toLong)

})

.assignAscendingTimestamps(_.eventTime * 1000)

// 定义一个带匹配时间窗口的模式

val orderPayPattern = Pattern.begin[OrderEvent]("begin")

.where(_.eventType == "create")

.followedBy("follow")

.where(_.eventType == "pay")

.within(Time.minutes(15))

// 定义一个输出标签

val orderTimeoutOutput = OutputTag[OrderResult]("orderTimeout")

// 订单事件流根据 orderId 分流,然后在每一条流中匹配出定义好的模式

val patternStream = CEP.pattern(orderEventStream.keyBy("orderId"), orderPayPattern)

val completedResult = patternStream.select(orderTimeoutOutput) {

// 对于已超时的部分模式匹配的事件序列,会调用这个函数

(pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => {

val createOrder = pattern.get("begin")

OrderResult(createOrder.get.iterator.next().orderId, "timeout")

}

} {

// 检测到定义好的模式序列时,就会调用这个函数

pattern: Map[String, Iterable[OrderEvent]] => {

val payOrder = pattern.get("follow")

OrderResult(payOrder.get.iterator.next().orderId, "success")

}

}

// 拿到同一输出标签中的 timeout 匹配结果(流)

val timeoutResult = completedResult.getSideOutput(orderTimeoutOutput)

completedResult.print()

timeoutResult.print()

env.execute("Order Timeout Detect Job")

}

}

使用 Process Function 实现

我们同样可以利用 Process Function,自定义实现检测订单超时的功能。为了简化问题,我们只考虑超时报警的情形,在 pay 事件超时未发生的情况下,输出超时报警信息。

一个简单的思路是,可以在订单的 create 事件到来后注册定时器,15 分钟后触发;然后再用一个布尔类型的 Value 状态来作为标识位,表明 pay 事件是否发生过。如果 pay 事件已经发生,状态被置为 true,那么就不再需要做什么操作;而如果 pay事件一直没来,状态一直为 false,到定时器触发时,就应该输出超时报警信息_大数据视频。

具体代码实现如下:

OrderTimeoutDetect/src/main/scala/OrderTimeoutWithoutCep.scala

object OrderTimeoutWithoutCep {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

val orderEventStream = env.readTextFile("YOUR_PATH\\resources\\OrderLog.csv")

.map( data => {

val dataArray = data.split(",")

OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(3).toLong)

})

更多Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com) 

.assignAscendingTimestamps(_.eventTime * 1000)

.keyBy(_.orderId)

// 自定义一个 process function,进行 order 的超时检测,输出超时报警信息

val timeoutWarningStream = orderEventStream

.process(new OrderTimeoutAlert)

timeoutWarningStream.print()

env.execute()

}

class OrderTimeoutAlert extends KeyedProcessFunction[Long, OrderEvent, OrderResult]

{

lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new

ValueStateDescriptor[Boolean]("ispayed-state", classOf[Boolean]))

override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long,

OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {

val isPayed = isPayedState.value()

if (value.eventType == "create" && !isPayed) {

ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L + 15 * 60 *

1000L)

} else if (value.eventType == "pay") {

isPayedState.update(true)

}

}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent,

OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {

val isPayed = isPayedState.value()

if (!isPayed) {

out.collect(OrderResult(ctx.getCurrentKey, "order timeout"))

}

isPayedState.clear()

}

}

}

来自两条流的订单交易匹配

对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做 合 并 处 理 。 这 里 我 们 利 用 connect 将 两 条 流 进 行 连 接 , 然 后 用 自 定 义 的CoProcessFunction 进行处理。

具体代码如下:

TxMatchDetect/src/main/scala/TxMatch

case class OrderEvent( orderId: Long, eventType: String, txId: String, eventTime: Long )

case class ReceiptEvent( txId: String, payChannel: String, eventTime: Long )

object TxMatch {

val unmatchedPays = new OutputTag[OrderEvent]("unmatchedPays")

val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatchedReceipts")

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val orderEventStream = env.readTextFile("YOUR_PATH\\resources\\OrderLog.csv")

.map( data => {

val dataArray = data.split(",")

OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2),

dataArray(3).toLong)

})

.filter(_.txId != "")

.assignAscendingTimestamps(_.eventTime * 1000L)

.keyBy(_.txId)

val receiptEventStream = env.readTextFile("YOUR_PATH\\resources\\ReceiptLog.csv")

.map( data => {

val dataArray = data.split(",")

ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)

})

.assignAscendingTimestamps(_.eventTime * 1000L)

.keyBy(_.txId)

val processedStream = orderEventStream

.connect(receiptEventStream)

.process(new TxMatchDetection)

processedStream.getSideOutput(unmatchedPays).print("unmatched pays")

processedStream.getSideOutput(unmatchedReceipts).print("unmatched receipts")

processedStream.print("processed")

env.execute()

}

class TxMatchDetection extends CoProcessFunction[OrderEvent, ReceiptEvent,

(OrderEvent, ReceiptEvent)]{

lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new

ValueStateDescriptor[OrderEvent]("pay-state",classOf[OrderEvent]) )

lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new

ValueStateDescriptor[ReceiptEvent]("receipt-state", classOf[ReceiptEvent]) )

override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent,

ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent,

ReceiptEvent)]): Unit = {

val receipt = receiptState.value()

if( receipt != null ){

receiptState.clear()

out.collect((pay, receipt))

} else{

payState.update(pay)

ctx.timerService().registerEventTimeTimer(pay.eventTime * 1000L)

}

}

override def processElement2(receipt: ReceiptEvent, ctx:

CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out:

Collector[(OrderEvent, ReceiptEvent)]): Unit = {

val payment = payState.value()

if( payment != null ){

payState.clear()

out.collect((payment, receipt))

} else{

receiptState.update(receipt)

ctx.timerService().registerEventTimeTimer(receipt.eventTime * 1000L)

}

}

更多Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com) 

override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent,

ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent,

ReceiptEvent)]): Unit = {

if ( payState.value() != null ){

ctx.output(unmatchedPays, payState.value())

}

if ( receiptState.value() != null ){

ctx.output(unmatchedReceipts, receiptState.value())

}

payState.clear()

receiptState.clear()

}

}

}


http://chatgpt.dhexx.cn/article/dijMtW2B.shtml

相关文章

订单数据分析

订单背景 订单&#xff1a;对订单的预测不仅为了企业更好的制定物料采购计划、控制库存、提升生产效率、控制生产进度&#xff0c;还为了帮助企业更好的把控市场潜在需求&#xff0c;分析目前经营状态和未来发展趋势。 宽厚板材市场价格&#xff08;只能查询到近三个月的&…

关于订单功能的处理和分析

这两天看了一下RABC的权限管理处理&#xff0c;梳理了一下订单功能的表创建&#xff0c;界面&#xff0c;功能分析。 目录 RABC RBAC0模型 那么对于RABC模型我们怎么创建数据库表&#xff1f; 订单模块的梳理 RABC RABC说的是在用户和权限之间多一个角色&#xff0c;用户与…

订单数据分析-实战

1. 京东订单数据准备 1.1 京东订单数据介绍 2020年5月25日10%抽样数据大家电-家用电器-冰箱70K 1.2 数据清洗 缺失值处理 用户城市和省份信息有部分缺失&#xff0c;部分订单的订单中支付时间为空值数据逻辑错误格式内容一致性 import pandas as pd import numpy as np im…

话单数据完整流程

原始数据__解析_____>>>解析后的数据___入库____>>>汇总的数据 1.原始数据 上游中兴的原始数据&#xff0c;在远程桌面Winscp软件中查看。丢失了下游也没法补充采集。得等上游补充采集后下游才能解析。当原始数据存在&#xff0c;而话单数据显示红点&…

话单分析账单分析行踪分析三合一数据分析

这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…

Office Tool Plus(安装visio)

说明&#xff1a;需要提前卸载原先的Office&#xff08;Word、PPT、Excel等&#xff09; 一、Office Tool Plus官网 https://otp.landian.vip/zh-cn/download.html 二、下载Office Tool Plus 百度网盘链接&#xff08;Office Tool Plus安装包&#xff09; 链接&#xff1a;…

FFmpeg音频解码-音频可视化

最近在做一个音频可视化的业务&#xff0c;网上有Java层的实现方法&#xff0c;但是业务需要用C实现&#xff0c;从原理出发其实很简单&#xff0c;先对音频进行解码&#xff0c;再计算分贝。这比把大象放进冰箱还简单。本文从音频可视化的业务为依托&#xff0c;以FFmpeg为基础…

基于FFmpeg的视频播放器之七:音频解码

一.流程 音频解码的流程和视频解码几乎一样,最大的区别是解码后需要进行重采样。因为解码出的AVSampleFormat格式是AV_SAMPLE_FMT_FLTP(float, planar),该格式无法直接使用SDL进行播放,需要转换成SDL支持的AV_SAMPLE_FMT_S16(signed 16 bits)格式。关于重采样,详见下篇…

2020手机音频解码芯片_2020杰理音频芯片全解析,14款音频产品代表作拆解汇总...

珠海市杰理科技股份有限公司,成立于2010年。杰理科技主要从事射频智能终端、多媒体智能终端等系统级芯片(SoC)的研究、开发和销售。 杰理科技的芯片产品主要应用于AI智能音箱、蓝牙音箱、蓝牙耳机、智能语音玩具等物联网智能终端产品,下游应用产品市场十分广泛和巨大。 杰理科…

音频编解码原理

实例说明 音频编解码常用的实现方案有三 种。 第一种就是采用专用的音频芯片对 语音信号进行采集和处理&#xff0c;音频编解码算法集成在硬件内部&#xff0c;如 MP3 编解码芯片、语音合成 分析芯片等。使用这种方案的优点就是处理速度块&#xff0c;设计周期短&#xff1b;缺…

基于STM32音频解码MP3——vs1053

基于正点原子教程 VS1053简介&#xff1a; 1.该模块采用VS1053B 作为主芯片 2.支持&#xff1a;MP3/WMA/OGG/WAV/FLAC/MIDI/AAC 等音频格式的解码 3.支持&#xff1a;OGG/WAV 音频格式的录音&#xff0c;支持高低音调节以及 EarSpeaker 空间效果设置 模块如图所示正点原子 AL…

ijkplayer音频解码播放架构分析

ijkplayer是一款跨平台播放器&#xff0c;支持Android与iOS播放&#xff0c;音频解码默认使用FFmpeg的avcodec软解。Android端播放音频可以用OpenSL ES和AudioTrack&#xff0c;而iOS端播放音频默认使用AudioQueue。 一、iOS音频解码播放 采用pipeline形式创建音频播放组件&a…

HIFI音频解码芯片ES9023

现在的HiFi播放器、解码耳放设备越来越多&#xff0c;推陈出新的速度也越来越快。各家厂商也都对产品进行了卖点细分&#xff0c;把新款旗舰级解码芯片拎出来宣传。美国ESS公司推出的ES9038Pro芯片大家都早已耳熟能详。 美国ESS系列芯片拥有行业高标准的信噪比 DNR&#xff08;…

DP7398:国产兼容替代CS4398立体声24位/192kHz音频解码芯片

目录 DP7398简介结构框图芯片特性 应用领域 DP7398简介 DP7398是立体声 24 位/192kHz 数模转换芯片。该D/A系统包括数字去加重、半分贝步长音量控制、ATAPI 通道混频、可选择的快速和慢速数字插补滤波器和过采样多位增量 Sigma-Delta 调制器&#xff1b;该调制器采用失调整形技…

ijkplayer 音频解码线程

在ijkplayer 读线程中提到&#xff0c;函数stream_component_open()中的decoder_start()会创建音频解码线程&#xff0c;来看解码线程audio_thread()的主要代码 static int audio_thread(void *arg) {...do {...if ((got_frame decoder_decode_frame(ffp, &is->auddec…

ES9023音频解码芯片原理及应用简介

一般来说&#xff0c;音频解码器分为两类&#xff0c;一类是用于Hi&#xff0d;Fi听音的纯音频解码器&#xff0c;即指把CD机等数字音源器材一分为二后&#xff0c;去掉转盘&#xff08;驱动光碟旋转读盘&#xff09;的部分。 纯音频解码器的主要作用是把读取的数字音频信息转…

iOS的音频解码详解

在iOS平台上,所有的音频框架底层都是基于AudioUnit实现的,较高层次的音频框架包括: Media Player、 AV Foundation、OpenAL和Audio Toolbox,这些框架都封装了AudioUnit,然后提供了更高层次的API(功能更少,职责更单一的接口)。 当开发者在开发音视频相关产品的时候,如果…

FFmpeg音频解码流程详解及简单demo参考

本文主要讲解FFmpeg的音频解码具体流程&#xff0c;API使用。最后再以一个非常简单的demo演示将一个mp3格式的音频文件解码为原始数据pcm文件。 本文主要基于FFmpeg音频解码新接口。 一、FFmpeg音频解码API调用流程图 API接口简单大体讲解如下&#xff1a; av_register…

ffmpeg 音频解码一

1. ffmpeg 视频解码一 2. ffmpeg 视频解码二 3. ffmpeg 音频解码一 4. ffmpeg 音频解码二 5. ffmpeg 音视频解码 6. ffmpeg 视频编码一 7. ffmpeg 视频编码一&#xff08;精简版&#xff09; 8. ffmpeg 视频编码二&#xff08;基于 libswscale 转换视频&#xff09; 9. ffmpeg …

MP3音频解码详细过程(二)

最近做了无人机的音频解码&#xff0c;二周内从无任何音频基础到输出PCM数据码流到无人机上可以实现播报功能&#xff0c;其中遇到了许多莫名的坑&#xff0c;谨以此篇记录心得。也算交个作业。 架构设计思路&#xff1a;由usart 实时传输mp3音频码流&#xff0c;STM32F446 将…