时间序列相似性度量-DTW

article/2025/9/12 11:15:45

1. 背景

最近项目中遇到求解时间序列相似性问题,这里序列也可以看成向量。在传统算法中,可以用余弦相似度和pearson相关系数来描述两个序列的相似度。但是时间序列比较特殊,可能存在两个问题:

  1. 两段时间序列长度不同。如何求相似度?
  2. 一个序列是另一个序列平移之后得到的。如何求相似距离?

第一个问题,导致了根本不能用余弦相似度和pearson相关系数来求解相似。第二个问题,导致了也不能基于欧式距离这样的算法,来求解相似距离。比如下面两个长度不同的序列:

s1 = [1, 2, 0, 1, 1, 2]
s2 = [1, 0, 1]

本文记录一种算法,一方面:如果两个序列中的其中一个序列是另一个序列的平移序列,则可以比较合理的求出两个序列的相似距离。另一方面:也可以求解长度不同序列的相似距离。主要参考:dtaidistance 这个项目的源码和其中的Dynamic Time Warping (DTW)思想。

同时基于这个算法,先计算相似距离,再把相似距离通过 1 1 + d i s t \frac{1}{1+dist} 1+dist1 转化为相似度。这样就可以得到长度不同向量的相似度。

2. 核心思想

Dynamic Time Warping (DTW) 本质上和通过动态规划来计算这个序列的相似距离。其实这和求解字符串的最长公共子串、子序列这类问题本质比较类似,参考:

  • 两个字符串的最长子串
  • 两个字符串的最长子序列

这里基于动态规划构建序列a和序列b的距离矩阵dp[i][j],其中dp[i][j]表示序列a[0:i]和b[0:j]之间的相似距离的平方。并且有:
d p [ i ] [ j ] = { ( a [ 0 ] − b [ 0 ] ) 2 i = 0, j = 0 ( a [ 0 ] − b [ j ] ) 2 + d p [ 0 ] [ j − 1 ] i = 0 ( a [ i ] − b [ 0 ] ) 2 + d p [ i − 1 ] [ 0 ] j = 0 ( a [ i ] − b [ j ] ) 2 + m i n ( d p [ i − 1 ] [ j ] , d p [ j − 1 ] [ i ] , d p [ i − 1 ] [ j − 1 ] ) i , j > 0 dp[i][j] = \begin{cases} (a[0] - b[0])^2 \quad\quad &\text{i = 0, j = 0} \\ (a[0] - b[j])^2 + dp[0][j-1] \quad\quad &\text{i = 0} \\ (a[i] - b[0])^2 + dp[i-1][0] \quad\quad &\text{j = 0} \\ (a[i] - b[j])^2 + min(dp[i-1][j],dp[j-1][i],dp[i-1][j-1]) \quad\quad &i,j > 0 \end{cases} dp[i][j]=(a[0]b[0])2(a[0]b[j])2+dp[0][j1](a[i]b[0])2+dp[i1][0](a[i]b[j])2+min(dp[i1][j],dp[j1][i],dp[i1][j1])i = 0, j = 0i = 0j = 0i,j>0

所以dp[len(a)-1][len(b)-1] 就是相似距离的平方,最后开方就是两个时间序列的相似距离,这也就是DTW算法核心实现。实际上,上文的那个github核心代码其实也就是这么写的,只不过在此基础上加了其他参数、画图、层次聚类的代码实现而已。

3. 实现

在实际使用中解读了dtaidistance这个项目的源码,其中除去所有参数后,DTW最简单的实现如下:

import numpy as npfloat_formatter = lambda x: "%.2f" % x
np.set_printoptions(formatter={'float_kind': float_formatter})def TimeSeriesSimilarity(s1, s2):l1 = len(s1)l2 = len(s2)paths = np.full((l1 + 1, l2 + 1), np.inf)  # 全部赋予无穷大paths[0, 0] = 0for i in range(l1):for j in range(l2):d = s1[i] - s2[j]cost = d ** 2paths[i + 1, j + 1] = cost + min(paths[i, j + 1], paths[i + 1, j], paths[i, j])paths = np.sqrt(paths)s = paths[l1, l2]return s, paths.Tif __name__ == '__main__':s1 = [1, 2, 0, 1, 1, 2]s2 = [1, 0, 1]distance, paths = TimeSeriesSimilarity(s1, s2)print(paths)print("=" * 40)print(distance)

输出

[[0.00 inf inf inf inf inf inf]
[inf 0.00 1.00 1.41 1.41 1.41 1.73]
[inf 1.00 2.00 1.00 1.41 1.73 2.45]
[inf 1.00 1.41 1.41 1.00 1.00 1.41]]
========================================
1.4142135623730951

这里的矩阵
[ 0.00 1.00 1.41 1.41 1.41 1.73 1.00 2.00 1.00 1.41 1.73 2.45 1.00 1.41 1.41 1.00 1.00 1.41 ] \begin{bmatrix} 0.00 & 1.00 & 1.41 & 1.41 & 1.41 & 1.73 \\ 1.00 & 2.00 & 1.00 & 1.41 & 1.73 & 2.45 \\ 1.00 & 1.41 & 1.41 & 1.00 & 1.00 & 1.41 \end{bmatrix} 0.001.001.001.002.001.411.411.001.411.411.411.001.411.731.001.732.451.41

就是dp[i][j]。这里代码用另一种写法实现了求解dp[i][j]。并且矩阵右下角元素1.41,就是我们求解的相似距离。

4. 改进

其实在实际使用中,我们发现该算法对周期序列的距离计算不是很好。尤其两个序列是相同周期,但是是平移后的序列。如下:

s1 = np.array([1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1])
s2 = np.array([0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2])
s3 = np.array([0.8, 1.5, 0, 1.2, 0, 0, 0.6, 1, 1.2, 0, 0, 1, 0.2, 2.4, 0.5, 0.4])

用图表展示:

很明显从图中可以看到 s 1 s_1 s1 s 2 s_2 s2是相同的时间序列,但是 s 1 s_1 s1 s 2 s_2 s2平移后得到的, s 3 s_3 s3是我随意构造的一个序列。但是,现在用上面的算法求解,得:
d i s t ( s 1 , s 2 ) = 2.0 d i s t ( s 1 , s 3 ) = 1.794435844492636 \begin{aligned} dist(s_1,s_2) &= 2.0 \\ dist(s_1,s_3) &= 1.794435844492636 \end{aligned} dist(s1,s2)dist(s1,s3)=2.0=1.794435844492636

我们发现 s 1 s_1 s1 s 3 s_3 s3反而比较像,这是我们不能接受的。因此,这里需要对算法有个改进。以下有两个改进策略。

4.1 改进策略1

目的是想求得一个惩罚系数 α \alpha α,这个 α \alpha α和原算法的distance相乘,得到更新后的distance。首先基于原算法包dtaidistance,可以求出dp[i][j]从左上角,到右下角的最优路径。

其实这样的最优路径,可以用图示表示,比如 s 1 s_1 s1 s 2 s_2 s2的dp[i][j]的最优路径:

再比如 s 1 s_1 s1 s 3 s_3 s3的dp[i][j]的最优路径:

其实可以很明显看到, s 1 s_1 s1 s 2 s_2 s2的最优路径拐点比较少,对角直线很长。而 s 1 s_1 s1 s 3 s_3 s3的拐点比较多,对角直线很短。因此我基于这个考虑进行优化,公式如下:
α = 1 − ∑ i = 1 n c o m L e n i 2 s e q L e n 2 \alpha = 1- \sqrt{\sum_{i=1}^n \frac{comLen_i^2}{seqLen^2}} α=1i=1nseqLen2comLeni2

其中seqLen 是这个图中最优路径节点个数, c o m L e n i comLen_i comLeni表示每段对角直线的长度。求和后开发表示一个长度系数,这个长度系数越大,表示对角直线越长。最后1减去这个长度系数得到,我们要的衰减系数 α \alpha α。以下是代码实现:

说明:这里best_path()是直接摘自dtaidistance,TimeSeriesSimilarity()方法是修改自这个项目。

import numpy as np
import mathdef get_common_seq(best_path, threshold=1):com_ls = []pre = best_path[0]length = 1for i, element in enumerate(best_path):if i == 0:continuecur = best_path[i]if cur[0] == pre[0] + 1 and cur[1] == pre[1] + 1:length = length + 1else:com_ls.append(length)length = 1pre = curcom_ls.append(length)return list(filter(lambda num: True if threshold < num else False, com_ls))def calculate_attenuate_weight(seqLen, com_ls):weight = 0for comlen in com_ls:weight = weight + (comlen * comlen) / (seqLen * seqLen)return 1 - math.sqrt(weight)def best_path(paths):"""Compute the optimal path from the nxm warping paths matrix."""i, j = int(paths.shape[0] - 1), int(paths.shape[1] - 1)p = []if paths[i, j] != -1:p.append((i - 1, j - 1))while i > 0 and j > 0:c = np.argmin([paths[i - 1, j - 1], paths[i - 1, j], paths[i, j - 1]])if c == 0:i, j = i - 1, j - 1elif c == 1:i = i - 1elif c == 2:j = j - 1if paths[i, j] != -1:p.append((i - 1, j - 1))p.pop()p.reverse()return pdef TimeSeriesSimilarity(s1, s2):l1 = len(s1)l2 = len(s2)paths = np.full((l1 + 1, l2 + 1), np.inf)  # 全部赋予无穷大paths[0, 0] = 0for i in range(l1):for j in range(l2):d = s1[i] - s2[j]cost = d ** 2paths[i + 1, j + 1] = cost + min(paths[i, j + 1], paths[i + 1, j], paths[i, j])paths = np.sqrt(paths)s = paths[l1, l2]return s, paths.Tif __name__ == '__main__':# 测试数据s1 = np.array([1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1])s2 = np.array([0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2])s3 = np.array([0.8, 1.5, 0, 1.2, 0, 0, 0.6, 1, 1.2, 0, 0, 1, 0.2, 2.4, 0.5, 0.4])# 原始算法distance12, paths12 = TimeSeriesSimilarity(s1, s2)distance13, paths13 = TimeSeriesSimilarity(s1, s3)print("更新前s1和s2距离:" + str(distance12))print("更新前s1和s3距离:" + str(distance13))best_path12 = best_path(paths12)best_path13 = best_path(paths13)# 衰减系数com_ls1 = get_common_seq(best_path12)com_ls2 = get_common_seq(best_path13)# print(len(best_path12), com_ls1)# print(len(best_path13), com_ls2)weight12 = calculate_attenuate_weight(len(best_path12), com_ls1)weight13 = calculate_attenuate_weight(len(best_path13), com_ls2)# 更新距离print("更新后s1和s2距离:" + str(distance12 * weight12))print("更新后s1和s3距离:" + str(distance13 * weight13))

输出:

更新前s1和s2距离:2.0
更新前s1和s3距离:1.794435844492636
更新后s1和s2距离:0.6256314581274465
更新后s1和s3距离:0.897217922246318

结论:
用新的算法更新后,我们会发现s1和s2距离比s1和s3的距离更加接近了,这就是我们要的结果。

4.2 改进策略2

也想求得一个惩罚系数 α \alpha α。方案如下:

  1. 先求解两个序列seq1和seq2的最长公共子串,长度记为a。
  2. 因为seq1和seq2是数值序列,在求最长公共子串时,设置了一个最大标准差的偏移容忍。也就是说,两个数值在这个标准差内,认为也是公共子串中的一部分。
  3. 衰减系数: α = 1 − a × a l e n ( s e q 1 ) × l e n ( s e q 2 ) \alpha = 1 - \frac{a \times a}{len(seq1) \times len(seq2)} α=1len(seq1)×len(seq2)a×a

也就是说,两个数值序列的最长公共子串越长,则衰减系数越小。这里把:s2 = np.array([0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2])改成s2 = np.array([0, 1, 1, 2, 0, 1, 1.7, 2, 0, 1, 1, 2, 0, 1, 1, 2]),来验证最大标准差的偏移容忍。

实际代码实现和效果如下:

import numpy as npfloat_formatter = lambda x: "%.2f" % x
np.set_printoptions(formatter={'float_kind': float_formatter})def TimeSeriesSimilarityImprove(s1, s2):# 取较大的标准差sdt = np.std(s1, ddof=1) if np.std(s1, ddof=1) > np.std(s2, ddof=1) else np.std(s2, ddof=1)# print("两个序列最大标准差:" + str(sdt))l1 = len(s1)l2 = len(s2)paths = np.full((l1 + 1, l2 + 1), np.inf)  # 全部赋予无穷大sub_matrix = np.full((l1, l2), 0)  # 全部赋予0max_sub_len = 0paths[0, 0] = 0for i in range(l1):for j in range(l2):d = s1[i] - s2[j]cost = d ** 2paths[i + 1, j + 1] = cost + min(paths[i, j + 1], paths[i + 1, j], paths[i, j])if np.abs(s1[i] - s2[j]) < sdt:if i == 0 or j == 0:sub_matrix[i][j] = 1else:sub_matrix[i][j] = sub_matrix[i - 1][j - 1] + 1max_sub_len = sub_matrix[i][j] if sub_matrix[i][j] > max_sub_len else max_sub_lenpaths = np.sqrt(paths)s = paths[l1, l2]return s, paths.T, [max_sub_len]def calculate_attenuate_weight(seqLen1, seqLen2, com_ls):weight = 0for comlen in com_ls:weight = weight + comlen / seqLen1 * comlen / seqLen2return 1 - weightif __name__ == '__main__':# 测试数据s1 = np.array([1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1])s2 = np.array([0, 1, 1, 2, 0, 1, 1.7, 2, 0, 1, 1, 2, 0, 1, 1, 2])s3 = np.array([0.8, 1.5, 0, 1.2, 0, 0, 0.6, 1, 1.2, 0, 0, 1, 0.2, 2.4, 0.5, 0.4])# 原始算法distance12, paths12, max_sub12 = TimeSeriesSimilarityImprove(s1, s2)distance13, paths13, max_sub13 = TimeSeriesSimilarityImprove(s1, s3)print("更新前s1和s2距离:" + str(distance12))print("更新前s1和s3距离:" + str(distance13))# 衰减系数weight12 = calculate_attenuate_weight(len(s1), len(s2), max_sub12)weight13 = calculate_attenuate_weight(len(s1), len(s3), max_sub13)# 更新距离print("更新后s1和s2距离:" + str(distance12 * weight12))print("更新后s1和s3距离:" + str(distance13 * weight13))

输出:

更新前s1和s2距离:2.0223748416156684
更新前s1和s3距离:1.794435844492636
更新后s1和s2距离:0.47399410350367227
更新后s1和s3距离:1.6822836042118463

结论:
用新的算法更新后,我们会发现s1和s2距离比s1和s3的距离更加接近了,这就是我们要的结果。

5. 参考

  1. https://github.com/wannesm/dtaidistance
  2. 两个字符串的最长子串
  3. 两个字符串的最长子序列

http://chatgpt.dhexx.cn/article/H0VTARhu.shtml

相关文章

算法笔记-DTW动态时间规整

算法笔记-DTW动态时间规整 简介简单的例子定义讨论 约束条件步模式标准化点与点的距离函数 具体应用场景 分类点到点匹配 算法笔记-DTW动态时间规整 动态时间规整/规划(Dynamic Time Warping, DTW)是一个比较老的算法&#xff0c;大概在1970年左右被提出来&#xff0c;最早用…

DTW学习笔记1.0

目录 DTW算法的目的&#xff1a; DTW算法实现&#xff1a; 以下图两段序列为例&#xff1a; DTW代码实现&#xff1a; Dynamic programming function 示例&#xff1a; 单变量示例&#xff1a; 比较序列相似性示例&#xff1a; 缺点&#xff1a; DTW算法的目的&#xf…

动态时间规整—DTW算法

简述 Dynamic Time Warping&#xff08;DTW&#xff09;诞生有一定的历史了&#xff08;日本学者Itakura提出&#xff09;&#xff0c;它出现的目的也比较单纯&#xff0c;是一种衡量两个长度不同的时间序列的相似度的方法。应用也比较广&#xff0c;主要是在模板匹配中&#…

语音信号处理之(一)动态时间规整(DTW)

语音信号处理之&#xff08;一&#xff09;动态时间规整&#xff08;DTW&#xff09; zouxy09qq.com http://blog.csdn.net/zouxy09 这学期有《语音信号处理》这门课&#xff0c;快考试了&#xff0c;所以也要了解了解相关的知识点。呵呵&#xff0c;平时没怎么听课&#xff…

【机器学习】【DTW】

转自&#xff1a;https://blog.csdn.net/zouxy09/article/details/9140207 一、概述 在大部分的学科中&#xff0c;时间序列是数据的一种常见表示形式。对于时间序列处理来说&#xff0c;一个普遍的任务就是比较两个序列的相似性。 在时间序列中&#xff0c;我们通常需要比较…

动态时间规整算法: 从DTW到FastDTW

目录 动态时间规整算法: 从DTW到FastDTW总结&#xff1a;简介[^1]DTW[^1]FastDTW&#xff1a;使用多级粗化的方法[^1]结果 动态时间规整算法: 从DTW到FastDTW 总结&#xff1a; FastDTW作者对DTW的改进点很巧妙&#xff01;先通过举例说明在一些情况下目前现有的方法对DTW改进…

机器学习算法(二十三):DTW(Dynamic Time Warping,动态时间调整)

目录 1 DTW&#xff08;动态时间调整&#xff09; 2 算法的实现 3 例子 4 python实现 ​​​​​​​5 DTW的加速算法FastDTW 5.1 标准DTW算法 5.2 DTW常用加速手段 5.3 FastDTW​​​​​​​ 1 DTW&#xff08;动态时间调整&#xff09; 动态时间调整算法是大多用于检…

初识DTW(动态时间规整)算法及Python实现例

目录 1. 概要 2. 时序列相似度度量 3. DTW基本算法 4. Python实现 5. Next Action 1. 概要 DTW&#xff08; Dynamic Time Warping&#xff0c;动态时间规整&#xff09;是基于动态规划&#xff08;Dynamic Programming&#xff09;策略对两个时序列通过非线性地进行时域对…

DTW基本原理

设时间归正函数为&#xff1a;&#xff0c;式中&#xff0c;N为路径长度&#xff0c;c(n)(i(n),j(n))表示第n个匹配点对是由参考模板的第i个特征矢量与待测模板的第j个特征矢量构成的匹配点对。两者之间的距离称为局部匹配距离。DTW算法就是通过局部优化的方法实现加权距离总和…

DTW算法——Matlab实现

概述 DTW &#xff08;Dynamic time warping&#xff09;算法是可以度量两个独立时间序列的相似度的一种方法。曾被广泛应用在单词音频的匹配上。该方法主要用来解决在两段序列的时长不同的情况下&#xff0c;进行相似度的判断。 上图中&#xff0c;左侧时长相等&#xff0c;…

DTW简介

dtw算法主要针对序列匹配提出的&#xff0c;尤其是当序列出现一定的飘移&#xff0c;欧氏距离度量就会失效。dtw常用在语音匹配当中&#xff0c;在图像处理里面也有一定的应用。 现在有两个序列X,Y. X[2,3,4,7,9,2,1,2,1],Y[1,1,1,1,2,3,3,4,7,8,9,1,1,1,1] 绘制在坐标轴上如…

DTW算法

dtw算法主要针对序列匹配提出的&#xff0c;尤其是当序列出现一定的飘移&#xff0c;欧氏距离度量就会失效。dtw常用在语音匹配当中&#xff0c;在图像处理里面也有一定的应用。 现在有两个序列X,Y. X[2,3,4,7,9,2,1,2,1],Y[1,1,1,1,2,3,3,4,7,8,9,1,1,1,1] 绘制在坐标轴上如…

HMM学习笔记_1(从一个实例中学习DTW算法)

DTW为(Dynamic Time Warping,动态时间归准)的简称。应用很广&#xff0c;主要是在模板匹配中&#xff0c;比如说用在孤立词语音识别&#xff0c;计算机视觉中的行为识别&#xff0c;信息检索等中。可能大家学过这些类似的课程都看到过这个算法&#xff0c;公式也有几个&#xf…

DTW(Dynamic Time Warping)动态时间规整——简单易懂

DTW可以用来干什么呢&#xff1f; DWT可以计算两个时间序列的相似度&#xff0c;尤其适用于不同长度、不同节奏的时间序列&#xff08;比如不同的人读同一个词的音频序列&#xff09;。距离越近&#xff0c;相似度越高。 DTW在语音中的运用&#xff1a; 在实际应用中&#xff…

DTW(动态时间归整)算法的前世今生

今天和大家分享一下我刚刚学习到的DTW算法。 主要从以下几个方面进行介绍&#xff1a; 1. DTW算法的提出和应用场景。 2. DTW算法的基本原理和计算过程。 3. DTW算法的具体代码实现。 一、DTW算法的提出和应用场景 Dynamic Time Warping&#xff08;简称&#xff1a;DTW&…

时间序列匹配之dtw的python实现(二)

简介 在上一篇文章里我们介绍了dtw库的使用&#xff0c;但其限制太多&#xff0c;不够灵活&#xff0c;且作图不够方便&#xff0c;因此我们来介绍一个更加复杂的库----dtw-python。它是R语言中dtw实现的python版本&#xff0c;基本的API是对应的&#xff0c;它的优势在于能够…

DTW算法详解

DTW算法详解 1.DTW 1.1 时序相似度 在时间序列数据中&#xff0c;一个常见的任务是比较两个序列的相似度&#xff0c;作为分类或聚类任务的基础。那么&#xff0c;时间序列的相似度应该如何计算呢&#xff1f; “ 经典的时间序列相似性度量方法总体被分为两 类: 锁步度量(lo…

动态时间规整算法(DTW)原理及代码实现

Dynamic Time Warping&#xff08;DTW&#xff09;动态时间规整算法 Dynamic Time Warping&#xff08;DTW&#xff09;是一种衡量两个时间序列之间的相似度的方法&#xff0c;主要应用在语音识别领域来识别两段语音是否表示同一个单词。 1. DTW方法原理 在时间序列中&#…

LOIC网站压力测试工具

官网下载&#xff1a;https://sourceforge.net/projects/loic/ 百度云&#xff1a;https://pan.baidu.com/s/1VVUjLqtq1mMAD-TJIAnhnQ 1.软件解压后运行&#xff0c;界面如图 2.然后可以在url处输入想要测试的网站网址&#xff0c;也可以输入ip地址&#xff0c;输入完之后要…

十大抢手的网站压力测试工具

两天&#xff0c;jnj在本站发布了《如何在低速率网络中测试 Web 应用》&#xff0c;那是测试网络不好的情况。而下面是十个免费的可以用来进行Web的负载/压力测试的工具&#xff0c;这样&#xff0c;你就可以知道你的服务器以及你的WEB应用能够顶得住多少的并发量&#xff0c;以…