最近在学习python,发现这个语言挺有意思,写起来比较轻松。后面打算用它代替matlab来做数据分析。写了一个comtrade格式录波文件的读取类。目前仅支持读取comtrade的binary格式文件。仅读取模拟量,不解析开关量。上comtrade类代码。
#!/usr/bin/env python 3.8.0
# -*- coding: utf-8 -*-
#
''' ########################################################################'''
import matplotlib.pyplot as plt
import re
import struct''' ########################################################################## ## COMTRADE文件读取 ## ########################################################################''''''------------------------- 全局变量 -------------------------'''
ERROR = -1
OK = 0class Comtrade():"""comtrade文件类"""'''------------------------- 初始化变量 -------------------------'''def __init__(self, filename=''):#--文件名self.file_name = filenameself.cfg_file_name = filename+'.cfg' self.dat_file_name = filename+'.dat' print("## The cometrade file:"+self.file_name)# 列表,存储cfg文件每一行内容self.cfglines = [] # 模拟量通道数self.analog_chanel_num = 0# 开关量word数self.digit_chanel_num = 0# DAT数据格式 BYNARY 或 ASCIIself.file_format = ''# 采样点数self.wave_points = 0# 字典list# cfg模拟量通道信息 字典# An,ch-id,ph,ccbm,uu,a,b,skew,min,max,primary,secodary,ps<CR/LF>self.ana_infos = [] # 每一帧的数据长度 单位BYTE# 4, 4, 2, 2, 2, 2, 2, 2# n,timestamp,A1,A2,……Ak,D1,D2,……Dm<CR/LF>self.dataframe_lenth = 0# 二进制数据帧self.packed_datas = []# 大小端self.endian_mode = 'little-endian'# 二进制数据解析格式self.pack_format = '''''------------------------- 读取CFG文件 -------------------------'''def cfg_read(self):#--读取CFG文件内容,按行存储至列表try:with open(self.cfg_file_name) as cfg_file_obj:lines = cfg_file_obj.readlines() for line in lines:line = line.rstrip()line = line.split(',')self.cfglines.append(line) #print(self.cfglines[-1]) #--CFG文件不存在,打印提示信息,return except FileNotFoundError: msg = "## ERROR: The file " + self.cfg_file_name + " doesn't exist." print(msg)return ERROR#--模拟量通道数nums = re.findall("\d+", self.cfglines[1][1])self.analog_chanel_num = int(nums[0]) #--开关量word数nums = re.findall("\d+", self.cfglines[1][2])self.digit_chanel_num = int((int(nums[0])+15)/16) #--检查文件是否BINARY格式,no则returnself.file_format = self.cfglines[-2][0]if self.file_format != 'BINARY':msg = "## ERROR: The file format is "+self.file_format+\", the tool doesn't support." print(msg) return ERROR #--采样点数self.wave_points = int(self.cfglines[-5][-1])#--提取模拟量通道的相关信息i = 0while i < self.analog_chanel_num:self.ana_infos.append({})self.ana_infos[i]['An'] = int(self.cfglines[i+2][0])self.ana_infos[i]['ch-id'] = self.cfglines[i+2][1]self.ana_infos[i]['ph'] = self.cfglines[i+2][2]self.ana_infos[i]['ccbm'] = self.cfglines[i+2][3]self.ana_infos[i]['uu'] = self.cfglines[i+2][4]self.ana_infos[i]['a'] = float(self.cfglines[i+2][5])self.ana_infos[i]['b'] = float(self.cfglines[i+2][6])self.ana_infos[i]['skew'] = float(self.cfglines[i+2][7])self.ana_infos[i]['min'] = int(self.cfglines[i+2][8])self.ana_infos[i]['max'] = int(self.cfglines[i+2][9])self.ana_infos[i]['primary'] = float(self.cfglines[i+2][10])self.ana_infos[i]['secondary'] = float(self.cfglines[i+2][11])self.ana_infos[i]['ps'] = self.cfglines[i+2][12]self.ana_infos[i]['ana_data'] = []self.ana_infos[i]['pri_data'] = []i += 1#print(self.ana_infos)return OK'''------------------------- 读取DAT文件 -------------------------'''def dat_read(self):#--读取DAT文件# 4, 4, 2, 2, 2, 2, 2, 2# n,timestamp,A1,A2,……Ak,D1,D2,……Dm<CR/LF>self.dataframe_lenth = 4+4+2*self.analog_chanel_num+2*self.digit_chanel_numtry:with open(self.dat_file_name,'rb') as dat_file_obj: #按二进制格式读取i = 0while i < self.wave_points:recv_buf = dat_file_obj.read(self.dataframe_lenth)if len(recv_buf) != self.dataframe_lenth:msg = "## ERROR: The dat file isn't match the wave points."print(msg)return ERRORelse:self.packed_datas.append(recv_buf)i +=1except FileNotFoundError:msg = "## ERROR: The file " + self.dat_file_name + " does not exist."print(msg)return ERROR#--大小端if self.endian_mode == 'little-endian':self.pack_format += '<'else:self.pack_format += '>'#--序号n+时标timestampself.pack_format += 'I'self.pack_format += 'I'#-----模拟量Analogi = 0while i < self.analog_chanel_num:self.pack_format += 'h'i +=1#-----开关量Digiti = 0while i < self.digit_chanel_num:self.pack_format += 'H'i +=1#-----按格式读取数据i = 0while i < self.wave_points:n_stuct = struct.Struct(self.pack_format)n_unpacked_data = n_stuct.unpack(self.packed_datas[i])j=0while j < self.analog_chanel_num:self.ana_infos[j]['ana_data'].append(n_unpacked_data[2+j])j +=1i +=1#print(self.ana_infos[0])#-----转换到一次值 i = 0while i < self.wave_points:j=0while j < self.analog_chanel_num:self.ana_infos[j]['pri_data'].append( \(self.ana_infos[j]['ana_data'][i]+self.ana_infos[j]['b'])*self.ana_infos[j]['a'] )j +=1i +=1return OK''' --------------------------- file end --------------------------- '''
main函数测试。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
''' ########################################################################'''
from comtrade import Comtrade
import matplotlib.pyplot as plt
import re
import struct''' ########################################################################## ## main ## ########################################################################''''''------------------------- 全局变量 -------------------------'''
ERROR = -1
OK = 0'''------------------------- main -------------------------'''def main ():""" Function doc """cfile = Comtrade("twave")cfg_read_ok = cfile.cfg_read()dat_read_ok = cfile.dat_read()if cfg_read_ok==0 and dat_read_ok==0:plt.plot(cfile.ana_infos[1]['pri_data'][1000:], linewidth=1, )plt.show()return OK'''------------------------- run -------------------------'''
main()
'''------------------------- end -------------------------'''
运行结果。