交通流预测爬坑记(三):使用pytorch实现LSTM预测交通流

article/2025/10/6 8:03:31

很长时间没有更新内容了,上一篇可以看做是刚接触深度学习写的,看法非常狭隘,内容非常粗糙。
在最近的学习中接触到了Pytorch,不得不承认,相对于TensorFlow来讲,灵活很多。
这次就使用pytroch来进行一下交通流预测,数据和上一篇文章数据一样。

百度网盘: https://pan.baidu.com/s/19vKN2eZZPbOg36YEWts4aQ
密码 4uh7

准备开始

目录

  • 加载数据
  • 构造数据集
    • 构造Dataset
    • 生成训练数据
  • 构造LSTM模型
    • 定义模型和损失函数
  • 模型预测
  • 模型评价
  • 完整代码

加载数据

导入数据的时候,需要把Network这一列删除
在这里插入图片描述

f = pd.read_csv('..\Desktop\AE86.csv')
# 从新设置列标
def set_columns():columns = []for i in f.loc[2]:columns.append(i.strip())return columns
f.columns = set_columns()
f.drop([0,1,2], inplace = True)
# 读取数据
data = f['Total Carriageway Flow'].astype(np.float64).values[:, np.newaxis]
data.shape # (2880, 1)

构造数据集

pytorch对于Batch输入,提供了规范的划分数据的模块, Dataset 和 DataLoader,这两个是很多刚使用pytorch的朋友头疼的地方,我简单说一下就不专门讲太多了可以去官网或者其他博客看一下

Dataset相当于装东西的大箱子,比如装苹果的箱子,一个大箱子里可以有很多小盒子,小盒子里装着苹果,不同的大箱子里小盒子装的苹果个数可以是不同的
Dataloader相当于使用什么方法在大箱子里拿盒子,比如,可以按顺序一次拿10个小盒子或者随机的拿5个小盒子

但这种使用Dataset和DataLoader创建数据集并不是唯一的方法,比如也可以使用上篇文章中使用的方法直接形成数据

当然,既然是使用pytorch那就使用它的特色吧,下面基于以上Dataset的方法,我们就可以构造自己的数据集

定义自己的Dataset比较麻烦, Dataset必须包括两部分:len 和 getitem

len 用来统计生成Dataset的长度
getitem 是可以index来获取相应的数据

同时为了方便,这Dataset中,增加了数据标准化以及反标准化的过程

构造Dataset

class LoadData(Dataset):def __init__(self, data, time_step, divide_days, train_mode):self.train_mode = train_modeself.time_step = time_stepself.train_days = divide_days[0]self.test_days = divide_days[1]self.one_day_length = int(24 * 4)# flow_norm (max_data. min_data)self.flow_norm, self.flow_data = LoadData.pre_process_data(data)# 不进行标准化
#         self.flow_data = datadef __len__(self, ):if self.train_mode == "train":return self.train_days * self.one_day_length - self.time_stepelif self.train_mode == "test":return self.test_days * self.one_day_lengthelse:raise ValueError(" train mode error")def __getitem__(self, index):if self.train_mode == "train":index = indexelif self.train_mode == "test":index += self.train_days * self.one_day_lengthelse:raise ValueError(' train mode error')data_x, data_y = LoadData.slice_data(self.flow_data, self.time_step, index,self.train_mode)data_x = LoadData.to_tensor(data_x)data_y = LoadData.to_tensor(data_y)return {"flow_x": data_x, "flow_y": data_y}# 这一步就是划分数据的重点部分@staticmethoddef slice_data(data, time_step, index, train_mode):if train_mode == "train":start_index = indexend_index = index + time_stepelif train_mode == "test":start_index = index - time_stepend_index = indexelse:raise ValueError("train mode error")data_x = data[start_index: end_index, :]data_y = data[end_index]return data_x, data_y# 数据与处理@staticmethoddef pre_process_data(data, ):# data N T Dnorm_base = LoadData.normalized_base(data)normalized_data = LoadData.normalized_data(data, norm_base[0], norm_base[1])return norm_base, normalized_data# 生成原始数据中最大值与最小值@staticmethoddef normalized_base(data):max_data = np.max(data, keepdims=True) #keepdims保持维度不变min_data = np.min(data, keepdims=True)# max_data.shape  --->(1, 1)return max_data, min_data# 对数据进行标准化@staticmethoddef normalized_data(data, max_data, min_data):data_base = max_data - min_datanormalized_data = (data - min_data) / data_basereturn normalized_data@staticmethod# 反标准化  在评价指标误差以及画图的使用使用def recoverd_data(data, max_data, min_data):data_base = max_data - min_datarecoverd_data = data * data_base - min_datareturn recoverd_data@staticmethoddef to_tensor(data):return torch.tensor(data, dtype=torch.float)

生成训练数据

# 使用前25天作为训练集,后5天作为预测集
divide_days = [25, 5]
time_step = 5 #时间步长
batch_size = 48 
train_data = LoadData(data, time_step, divide_days, "train")
test_data = LoadData(data,  time_step, divide_days, "test")
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, )
test_loader = DataLoader(test_data,batch_size=batch_size, shuffle=False, )

在test数据中,不使用shuffle随机化,可以使用train_data[0]看一下,返回出来第一个时间步的数据,以及时间步+1的值,但这是已经归一化后的数据,在init函数中可以不使用pre_process_data函数,这样来检测一下数据是否正确

在这里插入图片描述
不使用归一化第一个时间步的数据
在这里插入图片描述

需要说一下,DataLoader是一个item的数据类型,需要循环取出

在这里插入图片描述

构造LSTM模型

pytorch比较灵活,可以清楚的看到模型内部的运算过程,使用pytorch构建模型,一般包括两部分 : init 和 forward

init 定义需要使用的模型结构
forward进行计算

# LSTM构建网络
class LSTM(nn.Module):def __init__(self, input_num, hid_num, layers_num, out_num, batch_first=True):super().__init__()self.l1 = nn.LSTM(input_size = input_num,hidden_size = hid_num,num_layers = layers_num,batch_first = batch_first)self.out = nn.Linear(hid_num, out_num)def forward(self, data):flow_x = data['flow_x']   # B * T * Dl_out, (h_n, c_n) = self.l1(flow_x, None)  # None表示第一次 hidden_state是0print(l_out[:, -1, :].shape)out = self.out(l_out[:, -1, :])return out

pytorch中的LSTM与TensorFlow不同的是,pytorch中的LSTM可以一次定义多个层,不需要一直叠加LSTM层,而且每次LSTM返回三个部分的值:所有层的输出(l_out)、隐藏状态(l_h)和细胞状态(c_n)。

l_out是集合了每个变量的l_h输出, 所以return的时候,可以在l_c中切片取最后一个值,当然也可以直接使用l_h

pytorch也可以使用Sequential,如果要使用Seqential就需要修改上面的Dataset,因为Dataset定义的使用返回是一个字典{“flow_x”, “flow_y”},如果使用Sequential传入模型的只能是"flow_x"。或者像tensorflow一样,不使用Dataset构建数据,直接通过一系列的方法进行划分(和上一篇文章一样)。

定义模型和损失函数

input_num = 1 # 输入的特征维度
hid_num = 50  # 隐藏层个数
layers_num = 3 # LSTM层个数
# out_num = 1
lstm = LSTM(input_num, hid_num, layers_num, out_num)
loss_func = nn.MSELoss()
optimizer = torch.optim.Adam(lstm.parameters())

模型预测

pytorch中不像tensorflow那么容易描述损失值,所以输出都需要自己来进行表示,当然也可以使用Tensorboard来查看损失值变化,但对新手而言比较麻烦

lstm.train()
epoch_loss_change = []
for epoch in range(30):epoch_loss = 0.0start_time = time.time()for data_ in train_loader:lstm.zero_grad()predict = lstm(data_)loss = loss_func(predict, data_['flow_y'])epoch_loss += loss.item()loss.backward()optimizer.step()epoch_loss_change.append(1000 * epoch_loss / len(train_data))end_time = time.time()print("Epoch: {:04d}, Loss: {:02.4f}, Time: {:02.2f} mins".format(epoch, 1000 * epoch_loss / len(train_data),(end_time-start_time)/60))
plt.plot(epoch_loss_change)

模型评价

lstm.eval()
with torch.no_grad(): # 关闭梯度total_loss = 0.0pre_flow = np.zeros([batch_size, 1]) # [B, D],T=1 # 目标数据的维度,用0填充real_flow = np.zeros_like(pre_flow)for data_ in test_loader:pre_value = lstm(data_)loss = loss_func(pre_value, data_['flow_y'])total_loss += loss.item()# 反归一化pre_value = LoadData.recoverd_data(pre_value.detach().numpy(),test_data.flow_norm[0].squeeze(1), # max_datatest_data.flow_norm[1].squeeze(1), # min_data)target_value = LoadData.recoverd_data(data_['flow_y'].detach().numpy(),test_data.flow_norm[0].squeeze(1), test_data.flow_norm[1].squeeze(1),)pre_flow = np.concatenate([pre_flow, pre_value])real_flow = np.concatenate([real_flow, target_value])pre_flow  = pre_flow[batch_size: ]real_flow  = real_flow[batch_size: ]
#     # 计算误差
mse = mean_squared_error(pre_flow, real_flow)
rmse = math.sqrt(mean_squared_error(pre_flow, real_flow))
mae = mean_absolute_error(pre_flow, real_flow)
print('均方误差---', mse)
print('均方根误差---', rmse)
print('平均绝对误差--', mae)# 画出预测结果图
font_set = FontProperties(fname=r"C:\Windows\Fonts\simsun.ttc", size=15)    # 中文字体使用宋体,15号
plt.figure(figsize=(15,10))
plt.plot(real_flow, label='Real_Flow', color='r', )
plt.plot(pre_flow, label='Pre_Flow')
plt.xlabel('测试序列', fontproperties=font_set)
plt.ylabel('交通流量/辆', fontproperties=font_set)
plt.legend() 
# 预测储存图片
# plt.savefig('...\Desktop\123.jpg')

完整代码

import pandas as pd
import numpy as np
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as Data
from torchsummary import summary
import math
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import time
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error
import math
from matplotlib.font_manager import FontProperties  # 画图时可以使用中文# 加载数据
f = pd.read_csv('..\Desktop\AE86.csv')
# 从新设置列标
def set_columns():columns = []for i in f.loc[2]:columns.append(i.strip())return columns
f.columns = set_columns()
f.drop([0,1,2], inplace = True)
# 读取数据
data = f['Total Carriageway Flow'].astype(np.float64).values[:, np.newaxis]class LoadData(Dataset):def __init__(self, data, time_step, divide_days, train_mode):self.train_mode = train_modeself.time_step = time_stepself.train_days = divide_days[0]self.test_days = divide_days[1]self.one_day_length = int(24 * 4)# flow_norm (max_data. min_data)self.flow_norm, self.flow_data = LoadData.pre_process_data(data)# 不进行标准化# self.flow_data = datadef __len__(self, ):if self.train_mode == "train":return self.train_days * self.one_day_length - self.time_stepelif self.train_mode == "test":return self.test_days * self.one_day_lengthelse:raise ValueError(" train mode error")def __getitem__(self, index):if self.train_mode == "train":index = indexelif self.train_mode == "test":index += self.train_days * self.one_day_lengthelse:raise ValueError(' train mode error')data_x, data_y = LoadData.slice_data(self.flow_data, self.time_step, index,self.train_mode)data_x = LoadData.to_tensor(data_x)data_y = LoadData.to_tensor(data_y)return {"flow_x": data_x, "flow_y": data_y}# 这一步就是划分数据@staticmethoddef slice_data(data, time_step, index, train_mode):if train_mode == "train":start_index = indexend_index = index + time_stepelif train_mode == "test":start_index = index - time_stepend_index = indexelse:raise ValueError("train mode error")data_x = data[start_index: end_index, :]data_y = data[end_index]return data_x, data_y# 数据与处理@staticmethoddef pre_process_data(data, ):norm_base = LoadData.normalized_base(data)normalized_data = LoadData.normalized_data(data, norm_base[0], norm_base[1])return norm_base, normalized_data# 生成原始数据中最大值与最小值@staticmethoddef normalized_base(data):max_data = np.max(data, keepdims=True) #keepdims保持维度不变min_data = np.min(data, keepdims=True)# max_data.shape  --->(1, 1)return max_data, min_data# 对数据进行标准化@staticmethoddef normalized_data(data, max_data, min_data):data_base = max_data - min_datanormalized_data = (data - min_data) / data_basereturn normalized_data@staticmethod# 反标准化  在评价指标误差以及画图的使用使用def recoverd_data(data, max_data, min_data):data_base = max_data - min_datarecoverd_data = data * data_base - min_datareturn recoverd_data@staticmethoddef to_tensor(data):return torch.tensor(data, dtype=torch.float)# 划分数据
divide_days = [25, 5]
time_step = 5
batch_size = 48
train_data = LoadData(data, time_step, divide_days, "train")
test_data = LoadData(data,  time_step, divide_days, "test")
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, )
test_loader = DataLoader(test_data,batch_size=batch_size, shuffle=False, )# LSTM构建网络
class LSTM(nn.Module):def __init__(self, input_num, hid_num, layers_num, out_num, batch_first=True):super().__init__()self.l1 = nn.LSTM(input_size = input_num,hidden_size = hid_num,num_layers = layers_num,batch_first = batch_first)self.out = nn.Linear(hid_num, out_num)def forward(self, data):flow_x = data['flow_x']   # B * T * Dl_out, (h_n, c_n) = self.l1(flow_x, None)  # None表示第一次 hidden_state是0
#         print(l_out[:, -1, :].shape)out = self.out(l_out[:, -1, :])return out# 定义模型参数
input_num = 1 # 输入的特征维度
hid_num = 50  # 隐藏层个数
layers_num = 3 # LSTM层个数
out_num = 1
lstm = LSTM(input_num, hid_num, layers_num, out_num)
loss_func = nn.MSELoss()
optimizer = torch.optim.Adam(lstm.parameters())# 训练模型
lstm.train()
epoch_loss_change = []
for epoch in range(30):epoch_loss = 0.0start_time = time.time()for data_ in train_loader:lstm.zero_grad()predict = lstm(data_)loss = loss_func(predict, data_['flow_y'])epoch_loss += loss.item()loss.backward()optimizer.step()epoch_loss_change.append(1000 * epoch_loss / len(train_data))end_time = time.time()print("Epoch: {:04d}, Loss: {:02.4f}, Time: {:02.2f} mins".format(epoch, 1000 * epoch_loss / len(train_data),(end_time-start_time)/60))
plt.plot(epoch_loss_change)#评价模型
lstm.eval()
with torch.no_grad(): # 关闭梯度total_loss = 0.0pre_flow = np.zeros([batch_size, 1]) # [B, D],T=1 # 目标数据的维度,用0填充real_flow = np.zeros_like(pre_flow)for data_ in test_loader:pre_value = lstm(data_)loss = loss_func(pre_value, data_['flow_y'])total_loss += loss.item()# 反归一化pre_value = LoadData.recoverd_data(pre_value.detach().numpy(),test_data.flow_norm[0].squeeze(1), # max_datatest_data.flow_norm[1].squeeze(1), # min_data)target_value = LoadData.recoverd_data(data_['flow_y'].detach().numpy(),test_data.flow_norm[0].squeeze(1), test_data.flow_norm[1].squeeze(1),)pre_flow = np.concatenate([pre_flow, pre_value])real_flow = np.concatenate([real_flow, target_value])pre_flow  = pre_flow[batch_size: ]real_flow  = real_flow[batch_size: ]
#     # 计算误差
mse = mean_squared_error(pre_flow, real_flow)
rmse = math.sqrt(mean_squared_error(pre_flow, real_flow))
mae = mean_absolute_error(pre_flow, real_flow)
print('均方误差---', mse)
print('均方根误差---', rmse)
print('平均绝对误差--', mae)# 画出预测结果图
font_set = FontProperties(fname=r"C:\Windows\Fonts\simsun.ttc", size=15)    # 中文字体使用宋体,15号
plt.figure(figsize=(15,10))
plt.plot(real_flow, label='Real_Flow', color='r', )
plt.plot(pre_flow, label='Pre_Flow')
plt.xlabel('测试序列', fontproperties=font_set)
plt.ylabel('交通流量/辆', fontproperties=font_set)
plt.legend() 
# 预测储存图片
# plt.savefig('...\Desktop\123.jpg')``

http://chatgpt.dhexx.cn/article/JY1wRl4e.shtml

相关文章

Python交通流仿真【含源码】

虽然交通并不总是畅通无阻,但汽车无缝穿越交叉路口,在交通信号灯处转弯和停车看起来相当壮观。这种沉思让我思考交通流对人类文明的重要性。 在此之后,内心的书呆子特质让我忍不住思考一种模拟交通流的方法。我在一个涉及交通流量的本科项目…

基于深度学习的短时交通流预测与优化

TOC 第二章 数据预处理与短时交通流量特性分析 2.1 数据来源 数据记录了明尼苏达州双子城19条高速环城公路一整年的交通流量,交通流量数据采样间隔为30秒(采用2018年6月1日至8月31日期间,采集间隔为5分钟,选取公路上的5个车辆检测站点的交…

基于Spatial-Temporal Transformer的城市交通流预测

文章信息 本周阅读的论文是题目为《Spatial-Temporal Transformer Networks for Traffic Flow Forecasting》的一篇2021年发布在arXiv网站上的使用时空Transformer网络(STTNs)预测交通流的文章。 摘要 交通预测已成为智能交通系统的核心组成部分。然而&a…

基于推特数据挖掘交通事件的城市交通流深度学习预测模型

文章信息 本周阅读的论文是题目为《A deep-learning model for urban traffic flow prediction with traffic events mined from twitter》的一篇2021年发表在《World Wide Web》涉及交通事故下的城市交通客流预测的文章。 摘要 短期交通预测是现代城市交通管理和控制系统的关键…

交通流优化:一种强化学习方法

1. 文章信息 《Traffic flow optimization: A reinforcement learning approach》是2016年发表在Engineering Applications of Artificial Intelligence的一篇文章。 2. 摘要 交通拥堵会导致诸如延误、燃油消耗增加和额外污染等重要问题。本文提出了一种新的基于强化学习的交通…

SUMO交通流仿真实战

理解、预测并最终减少城市路网中的交通拥堵是一个复杂的问题。即使了解最简单的单车道情况下出现的交通拥堵, 也是具有挑战性的。SUMO是一个开源平台,可模拟复杂环境中的交通流。在这个教程里,我们将学习如何从零创建复杂的交通流模拟&#x…

python交通流预测算法_一种高速公路交通流预测方法与流程

本发明涉及智能交通领域,更具体地,涉及一种高速公路交通流预测方法。 背景技术: 随着社会经济的不断增长,国内汽车的拥有量越来越多,高速公路车流量急剧上升,从而导致高速公路上车辆拥堵愈发严重。现有方法采用径向基函数神经网络训练网络参数的算法,在粗略搜索过程中容…

交通流特征工程小技巧与思考

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、交通流是什么?二、特征工程是什么?三、处理数据时的一些小技巧四、一些常用的机器学习python库总结 前言 小编最近参与了一些工程方…

初等模型---交通流和道路通行能力

交通流的基本参数及其特性 为明确和简单起见,这里的交通流均指由标准长度的小型汽车在单方向的。 道路上行驶而形成的车流,没有外界因素如岔路、信号灯等的影响。 借用物理学的概念,将交通流近似看作一辆辆汽车组成的连续的流体,可以 用流量、…

交通流理论 第一章 绪论

第一章 绪论 1.1 交通流理论研究的内容和意义 交通流理论是运用物理学和数学的定律描述交通特性的交通工程学基础理论之一;道路设施可以分为两类:连续流和间断流设施。连续流设施为机动车流提供了相对连续的运行环境,几乎没有强制性阻断干扰…

数学小游戏:原创字谜几则

昨天晚上躺在床上发呆,想了几则数学字谜。下面每个式子都对应一个英文单词,例如的意思就是tank。 你能猜出多少个来呢? 1. 2. 3. 4. 5. 6. 7. 8. 9. 10.

纵横字谜的答案

1.问题描述 输入一个r行c列(1<r,c<10)的网格&#xff0c;黑格用”*”表示&#xff0c;每个白格都填有一个字母&#xff0c;如果一个白格的左边相邻的位置或者边上相邻的位置没有白格&#xff08;可能是黑格&#xff0c;也可能除了网格边界&#xff09;&#xff0c; 则称…

猜字谜 C++

解析&#xff1a; 1.由于五位数*一位数等于六位数 而且万位等于第二位数各位所以A>3 2.D为1-9 3.整式变形为 DDDDDD/AABCAB 我们需判断一个每位数都一样的六位数除以一个3-9中的某个数A 结果需满足 万位等于十位等于A 千位等于个位 且没有余数 answer: #include<iostr…

猜字小游戏

文章目录 猜字小游戏猜字游戏升级版 猜字小游戏 编写程序 运行程序 猜字游戏升级版 编写程序 运行程序

纵横字谜的答案 (UVa232)

纵横字谜的答案 Time Limit:3000MS Memory Limit:0KB 64bit IO Format:%lld & %llu Submit Status Description Crossword Answers A crossword puzzle consists of a rectangular grid of black and white squares and two lists of definitions (or descriptio…

Scratch 教程《元宵猜灯谜》

程序演示 元宵节到了&#xff0c;用Scratch给小朋友做一个猜灯谜游戏&#xff01; 课程引入 青玉案元夕 【宋】辛弃疾 东风夜放花千树&#xff0c;更吹落、星如雨。宝马雕车香满路。凤箫声动&#xff0c;玉壶光转&#xff0c;一夜鱼龙舞。 蛾儿雪柳黄金缕&#xff0c;笑语盈盈暗…

Python 猜字谜游戏

import random WORDS ("python","import","hello","difficult","easy") print("欢迎来到猜单词游戏&#xff0c;请将乱序后的单词组成正确的单词") iscontinue "y" while iscontinue"y" or…

c语言猜字谜(详解)(后附完整源码)

c语言猜字谜 一.游戏前置二.游戏实现1.让电脑生成随机数2.让玩家重复输入3.输赢判断 一.游戏前置 向其他游戏一样&#xff0c;在游戏开始前&#xff0c;我们需要一个菜单让玩家进行选择 所以我们需要以下功能 1.一个能让玩家进行选择的函数&#xff08;switch&#xff09; 2.玩…

猜字谜小游戏

猜字谜小游戏 思路 : 先写一个菜单函数,打印一个菜单,获取用户输入 从菜单函数中调用Game函数 写Game函数里面的内容 从主函数中调用菜单函数 #include<stdio.h> #include<stdlib.h>//里面包含rand函数,system函数 #include<time.h> void Game(void);//声明…