A股全自动化交易——从零到实盘20(完结)

article/2025/10/1 5:47:52

在这里插入图片描述

本文是“从零到实盘”系列的最后一篇文章,将介绍实现全自动实盘交易的最后一个步骤,即实现定时更新股票数据任务。

schedule模块安装

我们使用schedule来实现定时任务,首先需要安装schedule模块,在终端中输入以下命令安装:

pip install schedule

主要代码分析

我们实现最终版本的data_center.py,全部内容见文末,主要新增2个函数:

新增按天更新任务函数

def task_update_daily():

该函数包含每日更新任务。

    start_time = time.time()

首先记录任务开始运行时间,用于后续计算任务花费时间。

    stock_codes = get_stock_codes(update=True)

获取待交易股票列表,这里将update置为True,每日都通过查询BaoStock获取最新的股票代码。

    update_data_mp(stock_codes)

更新股票数据,这里会更新前复权数据。对于已在数据库中的股票,会更新获取最新交易日的数据;对于未在数据库中的股票,以及新除权除息的股票,将重新创建所有历史数据。

    update_trade(stock_codes)

根据更新后的股票数据,更新交易数据,一方面根据PTrade的交易数据进一步更新数据库的数据,一方面输出待交易数据,供PTrade实盘交易使用。

    end_time = time.time()print('程序运行时间:{}s'.format(end_time - start_time))

记录任务结束运行时间,并计算任务运行时间。

新增定时任务执行函数

def main():

该函数用于设置定时任务,每交易日17:40启动任务,进行每日数据更新计算。

    schedule.clear()

清空任务列表。

    task_time = '17:40:00'

设置任务执行时间。

    schedule.every().monday.at(task_time).do(task_update_daily)schedule.every().tuesday.at(task_time).do(task_update_daily)schedule.every().wednesday.at(task_time).do(task_update_daily)schedule.every().thursday.at(task_time).do(task_update_daily)schedule.every().friday.at(task_time).do(task_update_daily)

设置交易日定时启动任务。

    while True:schedule.run_pending()print('.', end='')time.sleep(60)

循环等待任务执行。

总结

  • 我们从零开始,完成了一套以BaoStock为数据源、以MySQL为存储、以PTrade为交易接口的运行在Windows PC上的全自动化A票交易系统。只要电脑保持开机、data_center.py保持运行、PTrade保持打开,交易所需的全部步骤会自动完成,无需人工干预。
  • 项目git地址:https://gitee.com/sl/quant_from_scratch
  • 系列文章展示了从零到实盘的过程,是笔者自接触量化以来总结出的运行比较稳定的系统,供读者实现自己的交易系统做参考。
  • 距离躺着赚钱还有很长路要走。目前策略表现不稳定,最大盈利9%,最大回撤近20%,目前还在微亏状态。虽然还在回测结果范围内(2018年至2020年3年周期回测后,年化收益为41.7%,最大回撤为25.8%),但是并不令人满意。也请读者只参考系统框架,切勿盲目实盘
  • 笔者会基于当前系统框架继续挖掘策略,欢迎感兴趣的读者一起讨论研究。
  • 非常感觉广大群友的支持与建议,使笔者更有创作和分享动力。

data_center的全部代码如下:

import os
import baostock as bs
import datetime
import sys
import numpy as np
import pandas as pd
import multiprocessing
import time
import sqlalchemy
import matplotlib.pyplot as plt
from pandas.plotting import table
import math
import schedule# 可用日线数量约束
g_available_days_limit = 60# 历史数据开始时间
g_start_date = '1990-12-19'# BaoStock日线数据字段
g_baostock_data_fields = 'date,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,peTTM,pbMRQ, psTTM,pcfNcfTTM,isST'# 定时上传到ptrade的文件路径
g_ptrade_upload_path = 'D:/quant_from_scratch/quant_from_scratch/a-share/data/file_update/trade_data.csv'# ptrade输出交易数据文件目录
g_ptrade_export_dir = 'D:/quant_from_scratch/quant_from_scratch/a-share/data/file_export/'# 止盈比例
g_take_profit_percent = 0.0618# 止损比例
g_stop_loss_percent = 0.1618def bs_func_decorator(func):"""BaoStock方法装饰器调用BaoStock方法,如果失败,尝试重新连接BaoStock避免重复建立BaoStock连接:param args::param func: BaoStock方法:return: 装饰器"""# 装饰def wrap_func(*args, **kwargs):# 最多尝试次数retry_n = 5# 循环尝试查询复权数据,最多尝试retry_n次for i in range(retry_n):# 调用BaoStock方法rs = func(*args, **kwargs)# 如果查询成功,则返回查询结果if rs.error_code == '0':return rs.get_data()# 未查询成功,则尝试重新连接BaoStockelse:print('重新连接BaoStock...')bs.login()# 无法连接,则程序退出print('无法连接BaoStock,程序退出...')exit(0)return wrap_func@bs_func_decorator
def wrap_query_all_stock(date):"""装饰股票代码查询函数:param date: 需要查询的交易日期:return: 包含股票代码的DataFrame"""return bs.query_all_stock(date)@bs_func_decorator
def wrap_query_history_k_data_plus(code, data_fields=g_baostock_data_fields,start_date=g_start_date, end_date=datetime.date.today().strftime('%Y-%m-%d'),frequency='d', adjustflag='2'):"""装饰获取历史股票K线数据函数:param code: 股票代码:param data_fields: 指标数据:param start_date: 开始日期:param end_date: 结束日期:param frequency: 数据类型,默认为d,日k线;d=日k线、w=周、m=月、5=5分钟、15=15分钟、30=30分钟、60=60分钟k线数据,不区分大小写;指数没有分钟线数据;周线每周最后一个交易日才可以获取,月线每月最后一个交易日才可以获取。:param adjustflag: 复权类型,默认前复权:2;1:后复权;3:不复权:return: 包含K线数据的DataFrame"""return bs.query_history_k_data_plus(code, data_fields, start_date, end_date, frequency, adjustflag)@bs_func_decorator
def wrap_query_dividend_data(code, year, yearType='report'):"""装饰除权除息信息函数:param code: 股票代码:param year: 查询年份:param yearType: 年份类别,默认为"report":预案公告年份,可选项"operate":除权除息年份。此参数不可为空:return: 包含除权除息信息的DataFrame"""return bs.query_dividend_data(code, year, yearType=yearType)def create_mysql_engine():"""创建数据库引擎对象:return: 新创建的数据库引擎对象"""# 引擎参数信息host = 'localhost'user = 'root'passwd = '111111'port = '3306'db = 'db_quant'# 创建数据库引擎对象mysql_engine = sqlalchemy.create_engine('mysql+pymysql://{0}:{1}@{2}:{3}'.format(user, passwd, host, port),poolclass=sqlalchemy.pool.NullPool)# 如果不存在数据库db_quant则创建mysql_engine.execute("CREATE DATABASE IF NOT EXISTS {0} ".format(db))# 创建连接数据库db_quant的引擎对象db_engine = sqlalchemy.create_engine('mysql+pymysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user, passwd, host, port, db),pool_size=get_process_num() * 2, max_overflow=get_process_num() * 2, pool_timeout=50)# 返回引擎对象return db_enginedef get_stock_codes(date=None, update=False):"""获取指定日期前(含当日)最近交易日的A股代码列表若参数update为False,表示从数据库中读取股票列表若数据库中不存在股票列表的表,或者update为True,则下载指定日期date前(含date)最近交易日的交易股票列表:param date: 日期,默认为None:param update: 是否更新股票列表,默认为False:return: A股代码的列表"""print('正在获取股票代码...')# 创建数据库引擎对象engine = create_mysql_engine()# 数据库中股票代码的表名table_name = 'stock_codes'# 数据库中不存在股票代码表,或者需要更新股票代码表if table_name not in sqlalchemy.inspect(engine).get_table_names() or update:# 获取最新交易日trading_date = get_trading_date(date)# 从BaoStock查询股票数据stock_df = wrap_query_all_stock(trading_date)# 如果无数据,则程序退出if 0 == len(stock_df):engine.dispose()print('数据异常,程序退出...')exit(0)# 筛选股票数据,上证和深证股票代码在sh.600000与sz.39900之间stock_df = stock_df[(stock_df['code'] >= 'sh.600000') & (stock_df['code'] < 'sz.399000')]# 将股票代码写入数据库stock_df.to_sql(name=table_name, con=engine, if_exists='replace', index=False, index_label=False)# 关闭数据库连接engine.dispose()# 返回股票列表return stock_df['code'].tolist()# 从数据库中读取股票代码列表else:# 待执行的sql语句sql_cmd = 'SELECT {} FROM {}'.format('code', table_name)# 读取sqlstock_codes = pd.read_sql(sql=sql_cmd, con=engine)['code'].tolist()# 关闭数据库连接engine.dispose()# 返回股票列表return stock_codesdef update_data(code, latest_trading_date, db_tables, query_days=60, adjustflag='2', recreate=False):"""更新日线数据,计算相关因子:param code: 待更新数据的股票代码:param latest_trading_date: 最新交易日期:param db_tables: 数据库中已存在股票的表名列表:param query_days: 在数据库中查询历史日线数据的天数,用于计算扩展因子,需要根据扩展因子设置,这里要计算60日均线,所以最小设置为60:param adjustflag: 复权选项 1:后复权  2:前复权  3:不复权  默认为前复权:param recreate: 是否重新创建数据:return: 包含所有待处理股票的最新日线数据及扩展因子的Series"""# 创建数据库引擎对象engine = create_mysql_engine()# 创建空Series存储最新一日的数据latest_series = pd.Series(dtype='float64')print('正在更新{}...'.format(code))# 股票数据在数据库中的表名table_name = '{}_{}'.format(code[3:], code[:2])# 数据库写入模式,默认为添加if_exists = 'append'# 数据id,默认为-1last_id = -1# 日线开始时间,默认为1990-12-19start_date = g_start_date# 用于读取数据库数据的DataFramedb_df = pd.DataFrame()# 重新创建或者数据库中无该股票的数据表,需要重新创建数据if recreate or (table_name not in db_tables):if_exists = 'replace'else:# 获取按时间排序的最后query_days行数据sql_cmd = 'SELECT * FROM {} ORDER BY date DESC LIMIT {};'.format(table_name, query_days)db_df = pd.read_sql(sql=sql_cmd, con=engine)# 数据按id(date)升序排序db_df = db_df.sort_values(by='id', ascending=True)# 获取数据库中最新数据日期及idlast_id = db_df['id'].iloc[-1]start_date = db_df['date'].iloc[-1]# 如果数据库中已包含最新一日数据if start_date >= latest_trading_date:# 将最新一日数据,添加code字段,并返回latest_series = db_df.iloc[-1].copy()latest_series['code'] = code[3:]print('{}当前已为最新数据'.format(code))# 关闭数据库连接engine.dispose()return latest_series# 计算待更新数据的开始日期start_date = (datetime.datetime.strptime(start_date, '%Y-%m-%d') + datetime.timedelta(days=1)).strftime('%Y-%m-%d')# 新除权除息,需重新创建数据if query_dividend(code, start_date):if_exists = 'replace'start_date = g_start_date# 下载日线数据out_df = wrap_query_history_k_data_plus(code, start_date=start_date, end_date=latest_trading_date, adjustflag=adjustflag)# 剔除停盘数据if out_df.shape[0]:out_df = out_df[(out_df['volume'] != '0') & (out_df['volume'] != '')]# 如果数据为空,则不更新if not out_df.shape[0]:# 关闭数据库连接engine.dispose()return latest_series# 将数值数据转为float型,便于后续处理convert_list = ['open', 'high', 'low', 'close', 'preclose', 'volume', 'amount', 'turn', 'pctChg']out_df[convert_list] = out_df[convert_list].astype(float)# 新添加行数new_rows = out_df.shape[0]# 如果是更新数据,将数据库中读取的数据拼接到新下载日线数据上if db_df.shape[0]:out_df = db_df[list(out_df)].append(out_df)# 如果数据少于query_days,则不更新if out_df.shape[0] < query_days:# 关闭数据库连接engine.dispose()return latest_series# 重置索引out_df.reset_index(drop=True, inplace=True)# 计算扩展因子out_df = extend_factor(out_df)# 取最后new_rows行out_df = out_df.iloc[-new_rows:]# 判读是否有字段缺失if np.any(out_df.isnull()):print('{}有缺失字段!!!'.format(code))# 更新idid_s = pd.Series(np.arange(last_id + 1, last_id + 1 + new_rows), index=out_df.index)# 如果是更新数据,则直接赋值id列if db_df.shape[0]:out_df['id'] = id_s# 如果是新建数据,则将id插入到第一列else:out_df.insert(0, 'id', id_s)# 将更新数据写入数据库out_df.to_sql(name=table_name, con=engine, if_exists=if_exists, index=False)# 关闭数据库连接engine.dispose()# 将更新的最后一行添加code字段,append到latest_df中latest_series = out_df.iloc[-1].copy()latest_series['code'] = code[3:]# 返回包含最新一日股票日线数据的Seriesreturn latest_seriesdef update_data_mp(stock_codes):"""使用多进程更新日线数据,计算扩展因子将最新一日各个股票数据存储在数据库表latest中,便于后续筛选候选股票使用:param stock_codes: 待更新数据的股票代码:return: 包含所有待处理股票的最新日线数据的DataFrame"""# 创建数据库引擎实例engine = create_mysql_engine()# 获取数据库内所有表的表名db_tables = sqlalchemy.inspect(engine).get_table_names()# 获取最新交易日trading_date = get_trading_date()# 多进程更新数据,获取最新日线数据的DataFramewith multiprocessing.Pool(processes=get_process_num()) as pool:rs = pool.starmap(update_data, [(code, trading_date, db_tables) for code in stock_codes])# 删除空值rs = [s for s in rs if 0 != len(s)]# 将各进程返回值组装为pd.DataFramelatest_df = pd.DataFrame(rs)# 将所有股票最新日线数据写入数据库表latestif latest_df.shape[0]:latest_df.to_sql(name='latest', con=create_mysql_engine(), if_exists='replace', index=False)# 关闭数据库连接engine.dispose()print('完成数据更新')return latest_dfdef get_trading_date(date=None):"""取得指定日期前市场最近交易日,这里的交易日特指有日线数据的交易日:param date: 日期字符串,默认为空,返回历史离当日最近的有日线的交易日(当日为交易日,若17:30前调用,也无日线数据):return: 最近交易日日期字符串,格式'%Y-%m-%d'"""# date为空时,将trade_date设置为当日if date is None:trading_date = datetime.date.today().strftime('%Y-%m-%d')# 否则将trade_date设置为指定dateelse:trading_date = date# 查询股票数据,query_all_stock与K线在17:30同时更新,向trading_date前寻找有K线数据的日期while 0 == wrap_query_all_stock(trading_date).shape[0]:# trading_date向历史移动1天trading_date = (datetime.datetime.strptime(trading_date, '%Y-%m-%d') - datetime.timedelta(days=1)).strftime('%Y-%m-%d')# 返回最近交易日日期return trading_datedef query_dividend(code, date):"""查询是否在指定日期后除权除息:param code: 股票代码:param date: 指定日期:return: 在指定日期后除权除息返回True,否则返回False"""# 查询除权除息数据rs_df = wrap_query_dividend_data(code, date[:4])# 在指定日期后除权除息返回True,否则返回Falseif rs_df.shape[0] and rs_df['dividOperateDate'].iloc[-1] >= date:return Trueelse:return Falsedef extend_factor(df):"""计算扩展因子:param df: 待计算扩展因子的DataFrame:return: 包含扩展因子的DataFrame"""# 使用pipe依次计算涨停、双神及是否为候选股票df = df.pipe(zt).pipe(ss, delta_days=30).pipe(candidate)return dfdef zt(df):"""计算涨停因子若涨停,则因子为True,否则为False以当日收盘价较前一日收盘价上涨9.8%及以上作为涨停判断标准:param df: 待计算扩展因子的DataFrame:return: 包含扩展因子的DataFrame"""df['zt'] = np.where((df['close'].values >= 1.098 * df['preclose'].values), True, False)return dfdef shift_i(df, factor_list, i, fill_value=0, suffix='a'):"""计算移动因子,用于获取前i日或者后i日的因子:param df: 待计算扩展因子的DataFrame:param factor_list: 待移动的因子列表:param i: 移动的步数:param fill_value: 用于填充NA的值,默认为0:param suffix: 值为a(ago)时表示移动获得历史数据,用于计算指标;值为l(later)时表示获得未来数据,用于计算收益:return: 包含扩展因子的DataFrame"""# 选取需要shift的列构成新的DataFrame,进行shift操作shift_df = df[factor_list].shift(i, fill_value=fill_value)# 对新的DataFrame列进行重命名shift_df.rename(columns={x: '{}_{}{}'.format(x, i, suffix) for x in factor_list}, inplace=True)# 将重命名后的DataFrame合并到原始DataFrame中df = pd.concat([df, shift_df], axis=1)return dfdef shift_till_n(df, factor_list, n, fill_value=0, suffix='a'):"""计算范围移动因子用于获取前/后n日内的相关因子,内部调用了shift_i:param df: 待计算扩展因子的DataFrame:param factor_list: 待移动的因子列表:param n: 移动的步数范围:param fill_value: 用于填充NA的值,默认为0:param suffix: 值为a(ago)时表示移动获得历史数据,用于计算指标;值为l(later)时表示获得未来数据,用于计算收益:return: 包含扩展因子的DataFrame"""for i in range(n):df = shift_i(df, factor_list, i + 1, fill_value, suffix)return dfdef ss(df, delta_days=30):"""计算双神因子,即间隔的两个涨停若当日形成双神,则因子为True,否则为False:param df: 待计算扩展因子的DataFrame:param delta_days: 两根涨停间隔的时间不能超过该值,否则不判定为双神,默认值为30:return: 包含扩展因子的DataFrame"""# 移动涨停因子,求取近delta_days天内的涨停情况,保存在一个临时DataFrame中temp_df = shift_till_n(df, ['zt'], delta_days, fill_value=False)# 生成列表,用于后续检索第2天前至第delta_days天前是否有涨停出现col_list = ['zt_{}a'.format(x) for x in range(2, delta_days + 1)]# 计算双神,需同时满足3个条件:# 1、第2天前至第delta_days天前,至少有1个涨停# 2、1天前不是涨停(否则就是连续涨停,不是间隔的涨停)# 3、当天是涨停df['ss'] = temp_df[col_list].any(axis=1) & ~temp_df['zt_1a'] & temp_df['zt']return dfdef mas(df, ma_list, factor='close'):"""计算多条均线因子,内部调用ma计算单条均线:param df: 待计算扩展因子的DataFrame:param ma_list: 待计算均线的周期列表:param factor: 待计算均线的因子,默认为收盘价:return: 包含扩展因子的DataFrame"""for i in ma_list:df = ma(df, i, factor)return dfdef ma(df, n=5, factor='close'):"""计算均线因子:param df: 待计算扩展因子的DataFrame:param n: 待计算均线的周期,默认计算5日均线:param factor: 待计算均线的因子,默认为收盘价:return: 包含扩展因子的DataFrame"""# 均线名称,例如,收盘价的5日均线名称为ma_5,成交量的5日均线名称为volume_ma_5name = '{}ma_{}'.format('' if 'close' == factor else factor + '_', n)# 取待计算均线的因子列s = pd.Series(df[factor], name=name, index=df.index)# 利用rolling和mean计算均线数据s = s.rolling(center=False, window=n).mean()# 将均线数据添加到原始的DataFrame中df = df.join(s)# 均线数值保留两位小数df[name] = df[name].apply(lambda x: round(x + 0.001, 2))return dfdef cross_mas(df, ma_list):"""计算穿均线因子若当日最低价不高于均线价格且当日收盘价不低于均线价格则当日穿均线因子值为True,否则为False:param df: 待计算扩展因子的DataFrame:param ma_list: 均线的周期列表:return: 包含扩展因子的DataFrame"""for i in ma_list:df['cross_{}'.format(i)] = (df['low'] <= df['ma_{}'.format(i)]) & (df['ma_{}'.format(i)] <= df['close'])return dfdef candidate(df):"""计算是否为候选若当日日线同时穿过5、10、20、30日均线且30日均线在60日均线上方且当日形成双神则当日作为候选,该因子值为True,否则为False:param df: 待计算扩展因子的DataFrame:return: 包含扩展因子的DataFrame"""# 均线周期列表ma_list = [5, 10, 20, 30, 60]# 计算均线的因子,保存到临时的DataFrame中temp_df = mas(df, ma_list)# 计算穿多线的因子,保存到临时的DataFrame中temp_df = cross_mas(temp_df, ma_list)# 穿多线因子的列名列表column_list = ['cross_{}'.format(x) for x in ma_list[:-1]]# 计算是否为候选df['candidate'] = temp_df[column_list].all(axis=1) & (temp_df['ma_30'] >= temp_df['ma_60']) & df['ss']return dfdef get_process_num():"""获取建议进程数目对于I/O密集型任务,建议进程数目为CPU核数/(1-a),a去0.8~0.9:return: 进程数目"""return min(60, int(os.cpu_count() / (1 - 0.9)))def profit_loss_statistic_mp(stock_codes, hold_days=10):"""多进程盈亏分布统计,计算当日candidate为True,持仓hold_days天的收益分布输出数据分布表格及图片文件:param stock_codes: 待分析的股票代码:param hold_days: 持仓天数:return: None"""# 多进程计算获得候选股票数据with multiprocessing.Pool(processes=get_process_num()) as pool:rs = pool.starmap(profit_loss_statistic, [(code, hold_days) for code in stock_codes])# 拼接候选DataFramecandidate_df = pd.concat(rs, axis=0)# 将收益分布数据保存到excel文件candidate_df[['date', 'code', 'max_profit', 'max_loss']].to_excel('profit_loss_{}.xlsx'.format(hold_days), index=False, encoding='utf-8')# 重置索引,方便后面绘图candidate_df.reset_index(inplace=True)# 设置绘图格式并绘图fig, ax = plt.subplots(1, 1)table(ax, np.round(candidate_df[['max_profit', 'max_loss']].describe(), 4), loc='upper right',colWidths=[0.2, 0.2, 0.2])candidate_df[['max_profit', 'max_loss']].plot(ax=ax, legend=None)# 保存图表fig.savefig('profit_loss_{}.png'.format(hold_days))# 显示图表plt.show()def profit_loss_statistic(code, hold_days=10):"""盈亏分布统计,计算当日candidate为True,持仓hold_days天的收益分布输出数据分布表格及图片文件:param code: 待分析的股票代码:param hold_days: 持仓天数:return: 筛选出符合买入条件的股票DataFrame"""# 创建数据库引擎对象engine = create_mysql_engine()# 创建空的DataFramecandidate_df = pd.DataFrame()# 计算收益分布循环print('正在处理{}...'.format(code))# 股票数据在数据库中的表名table_name = '{}_{}'.format(code[3:], code[:2])# 如果数据库中没有该股票数据则跳过if table_name not in sqlalchemy.inspect(engine).get_table_names():# 关闭数据库连接engine.dispose()return candidate_df# 从数据库读取特定字段数据cols = 'date, open, high, low, close, candidate'sql_cmd = 'SELECT {} FROM {} ORDER BY date DESC'.format(cols, table_name)df = pd.read_sql(sql=sql_cmd, con=engine)print(df.dtypes)# 关闭数据库连接engine.dispose()# 移动第2日开盘价、最低价,以及hold_days的最高价、最低价数据df = shift_i(df, ['open'], 1, suffix='l')df = shift_till_n(df, ['high', 'low'], hold_days, suffix='l')# 丢弃最近hold_days日的数据df = df.iloc[hold_days: df.shape[0] - g_available_days_limit, :]# 选取出现买点的股票df = df[(df['candidate'] > 0) & (df['low_1l'] <= df['close'])]# 将数据添加到候选池中if df.shape[0]:df['code'] = codecandidate_df = candidate_df.append(df)if candidate_df.shape[0]:# 计算最大盈利# 买入当天无法卖出,因此计算最大收益时,从第2日开始cols = ['high_{}l'.format(x) for x in range(2, hold_days + 1)]candidate_df['max_high'] = candidate_df[cols].max(axis=1)candidate_df['max_profit'] = candidate_df['max_high'] / candidate_df[['open_1l', 'close']].min(axis=1) - 1# 计算最大亏损cols = ['low_{}l'.format(x) for x in range(2, hold_days + 1)]candidate_df['min_low'] = candidate_df[cols].min(axis=1)candidate_df['max_loss'] = candidate_df['min_low'] / candidate_df[['open_1l', 'close']].min(axis=1) - 1return candidate_dfdef update_latest_table(code):"""更新数据库中最新日线数据表从数据库中读取股票的最新一天数据并返回:param code: 待更新数据的股票代码:return: 包含所有待处理股票的最新日线数据的DataFrame"""# 创建数据库引擎对象engine = create_mysql_engine()# 创建空的DataFramelatest_df = pd.DataFrame()print('正在更新{}...'.format(code))# 股票数据在数据库中的表名table_name = '{}_{}'.format(code[3:], code[:2])# 判断是否存在该表,不存在则跳过if table_name not in sqlalchemy.inspect(engine).get_table_names():# 关闭数据库连接engine.dispose()return latest_df# 从数据库中读取股票的最新一天数据sql_cmd = 'SELECT * FROM {} ORDER BY date DESC LIMIT 1;'.format(table_name)df = pd.read_sql(sql=sql_cmd, con=engine)# 关闭数据库连接engine.dispose()# 有缺失字段就不参与候选if np.any(df.isnull()):print('{}有缺失字段!!!'.format(code))return latest_df# 添加code字段,并append到latest_df中df['code'] = code[3:]latest_df = latest_df.append(df)return latest_dfdef update_latest_table_mp(stock_codes):"""多进程更新数据库中最新日线数据表从各个进程收集每只股票的最新一天数据并返回:param stock_codes: 待更新数据的股票代码:return: 包含所有待处理股票的最新日线数据的DataFrame"""# 多进程更新最新日线数据表,获取该表数据的DataFramewith multiprocessing.Pool(processes=get_process_num()) as pool:rs = pool.map(update_latest_table, stock_codes)# 拼接最新日线数据latest_df = pd.concat(rs, axis=0)# 将所有股票最新日线数据写入数据库表latestif latest_df.shape[0]:latest_df.to_sql(name='latest', con=create_mysql_engine(), if_exists='replace', index=False)return latest_dfdef query_latest_table(stock_codes, by_latest_table=True):"""查询最新日线数据表:param stock_codes: 待获取最新日线数据的股票代码:param by_latest_table: 是否通过表latest查询,为True(默认)时,直接从表latest中查询,否则从各个股票数据表中分别读取:return: 包含所有待处理股票的最新日线数据的DataFrame"""# 创建数据库引擎对象engine = create_mysql_engine()# 如果设置直接从表latest查询,且表latest存在,则直接从数据库读取数据返回if by_latest_table and 'latest' in sqlalchemy.inspect(engine).get_table_names():sql_cmd = 'SELECT * FROM latest;'latest_df = pd.read_sql(sql=sql_cmd, con=engine)# 否则从各股票数据表中分别读取最新一日数据再返回(过程中会更新表latest)else:latest_df = update_latest_table_mp(stock_codes)# 关闭数据库连接engine.dispose()return latest_dfdef write_easymoney_candidates(df):"""输出候选股票txt,供东方财富读取:param df: 包含候选股票数据的DataFrame:return: None"""# 候选输出目录out_dir = '../data/candidates/'# 如果目录不存在,则创建if not os.path.exists(out_dir):os.makedirs(out_dir)out_file = open('{}{}.txt'.format(out_dir, datetime.datetime.today().date()), mode='w')for stock in df['code'].tolist():print(stock, file=out_file)out_file.close()def update_trade(stock_codes):"""更新交易数据,与ptrade进行数据交互:param stock_codes: 更新股票范围:return: None"""# 查询最新日线数据df = query_latest_table(stock_codes)# 筛选出候选股票if df.shape[0]:df = df[df['candidate'] > 0]if df.shape[0]:df = df.sort_values(by='turn')print(df['code'].tolist())else:print('当日无候选股票')# 输出候选股票txt,供东方财富读取write_easymoney_candidates(df)# 更新ptrade交易数据update_ptrade(df)def update_ptrade(df):"""更新ptrade交易数据:param df: 包含当日候选股票日线数据的DataFrame:return:"""# 将候选更新到数据库中update_ptrade_candidate(df)# 从ptrade获取成交信息,在数据库中进行更新update_ptrade_deal()# 盘后更新数据库中持仓股票的持有天数字段update_ptrade_hold_days()# 盘后更新数据库中股票是否继续日后交易update_ptrade_to_trade()# 输出ptrade交易所需的文件output_ptrade_file()def output_ptrade_file():"""输出ptrade交易所需的文件!!!向ptrade传递数据的唯一方式!!!ptrade提供定时上传文件功能,将交易所需的数据写入指定文件,供ptrade读取使用:return: None"""# 创建数据库引擎对象engine = create_mysql_engine()# 获取数据库内所有表的表名db_tables = sqlalchemy.inspect(engine).get_table_names()# 判断是否存在表ptrade,不存在则无需更新table_name = 'ptrade'if table_name not in db_tables:return# 读取数据库中待交易表数据sql_cmd = 'SELECT * FROM {};'.format(table_name)db_data = pd.read_sql(sql=sql_cmd, con=create_mysql_engine())# 关闭数据库连接engine.dispose()# 如果不存在目录则创建upload_dir = g_ptrade_upload_path[:g_ptrade_upload_path.rfind('/')]if not os.path.exists(upload_dir):os.makedirs(upload_dir)# 将待交易数据写到指定文件,供ptrade读取db_data.to_csv(g_ptrade_upload_path, index=False, encoding='utf-8')def update_ptrade_deal():"""从ptrade获取成交信息,在数据库中进行更新!!!从ptrade获取交易数据的方式!!!ptrade会每个一段时间(该时间可设置)将交易、持仓等数据写入指定文件可以通过读取这些文件来获取去实盘交易和仓位等数据:return: None"""# 创建数据库引擎对象engine = create_mysql_engine()# 获取数据库内所有表的表名db_tables = sqlalchemy.inspect(engine).get_table_names()# 判断是否存在表ptrade,不存在则无需更新table_name = 'ptrade'if table_name not in db_tables:return# 读取待交易数据数据sql_cmd = 'SELECT * FROM {};'.format(table_name)db_data = pd.read_sql(sql=sql_cmd, con=engine)# 目录是否存在,不存在则创建if not os.path.exists(g_ptrade_export_dir):os.makedirs(g_ptrade_export_dir)# 读取ptrade输出的交易文件trading_date = get_trading_date()deal_file = '{}Deal_{}.csv'.format(g_ptrade_export_dir, trading_date.replace('-', ''))deal_df = pd.DataFrame()# 判断交易文件是否存在if os.path.exists(deal_file):deal_df = pd.read_csv(deal_file, encoding='gbk', usecols=['证券代码', '买卖方向', '成交数量', '成交价格', '成交金额'],converters={'证券代码': str})else:print('无交易文件!')# 没有成交,则无需更新if not deal_df.shape[0]:return# 合并交易数据,按证券代码和买卖方向对数据分组,求取总成交金额、总成交数量和均价group = deal_df.groupby(['证券代码', '买卖方向'])deal_df['总金额'] = group['成交金额'].transform('sum')deal_df['总数量'] = group['成交数量'].transform('sum')deal_df['均价'] = deal_df['总金额'] / deal_df['总数量']deal_df = deal_df.drop_duplicates(['证券代码', '买卖方向'])# 合并后交易数据按行循环for row in deal_df.itertuples():# 处理卖出成交数据,填写数据库中该股票的卖出日期、卖出价格、总卖出金额字段,并设置股票为不再交易if '卖出' == getattr(row, '买卖方向'):idcs = db_data[db_data['code'] == getattr(row, '证券代码')].indexif len(idcs) < 1:continueidx = idcs[0]db_data.loc[idx, 'date_sell'] = trading_datedb_data.loc[idx, 'price_sell'] = getattr(row, '均价')db_data.loc[idx, 'amount_sell'] = getattr(row, '总金额')db_data.loc[idx, 'to_trade'] = 0# 处理买入成交数据,填写数据库中该股票的买入日期、买入价格、总买入数量、总买入金额、是否到达买点字段if '买入' == getattr(row, '买卖方向'):idcs = db_data[db_data['code'] == getattr(row, '证券代码')].indexif len(idcs) < 1:continueidx = idcs[0]db_data.loc[idx, 'date_buy'] = trading_datedb_data.loc[idx, 'price_buy'] = getattr(row, '均价')db_data.loc[idx, 'trade_volume'] = getattr(row, '总数量')db_data.loc[idx, 'amount_buy'] = getattr(row, '总金额')db_data.loc[idx, 'buy_available'] = 1# 将更新后的待交易数据写回数据库db_data.to_sql(name=table_name, con=engine, if_exists='replace', index=False)# 关闭数据库连接engine.dispose()print('完成ptrade订单信息更新!')def get_table_name(code):"""根据纯数字股票代码,获取对应在数据库中的表名:param code: 纯数字股票代码:return: 代码对应的数据库中的表名"""if '6' == code[:1]:market = 'sh'else:market = 'sz'return '{}_{}'.format(code, market)def update_ptrade_hold_days():"""盘后更新数据库中持仓股票的持有天数字段:return: None"""# 创建数据库引擎对象engine = create_mysql_engine()# 获取数据库内所有表的表名db_tables = sqlalchemy.inspect(engine).get_table_names()# 判断是否存在表ptrade,不存在则无需更新table_name = 'ptrade'if table_name not in db_tables:return# 读取待交易数据数据sql_cmd = 'SELECT * FROM {};'.format(table_name)db_data = pd.read_sql(sql=sql_cmd, con=engine)# 待交易数据循环for row in db_data.itertuples():# 只更新待交易且已买入的股票if getattr(row, 'to_trade') and getattr(row, 'date_buy') is not None:# 查询数据库个股的日线数据,计算hold_dayssql_cmd = 'SELECT * FROM {} ORDER BY date DESC LIMIT {};'.format(get_table_name(getattr(row, 'code')), 10)read_df = pd.read_sql(sql=sql_cmd, con=engine)if read_df.shape[0] < 1:continue# 选出日期大于买入日期的行,行数+1即为持股的天数after_buy_df = read_df[read_df['date'] > getattr(row, 'date_buy')]db_data.loc[getattr(row, 'Index'), 'hold_days'] = after_buy_df.shape[0] + 1# 将更新后的待交易数据写回数据库db_data.to_sql(name='ptrade', con=engine, if_exists='replace', index=False)# 关闭数据库连接engine.dispose()print('完成持有天数更新!')def update_ptrade_to_trade():"""盘后更新数据库中股票是否继续日后交易:return: None"""# 创建数据库引擎对象engine = create_mysql_engine()# 获取数据库内所有表的表名db_tables = sqlalchemy.inspect(engine).get_table_names()# 判断是否存在表ptrade,不存在则无需更新table_name = 'ptrade'if table_name not in db_tables:return# 读取待交易数据数据sql_cmd = 'SELECT * FROM {};'.format(table_name)db_data = pd.read_sql(sql=sql_cmd, con=engine)# 读取最新一日所有股票的日线数据sql_cmd = 'SELECT * FROM latest;'latest_df = pd.read_sql(sql=sql_cmd, con=engine)# 是否达到买点的字段名称buy_available = 'buy_available'# 获取最新交易日期trading_date = get_trading_date()# 待交易数据循环for row in db_data.itertuples():# 当日更新选出的候选股票,不更新是否交易字段if getattr(row, 'date_candidate') >= trading_date:continue# 已达到买点的股票,不更新是否交易字段if getattr(row, buy_available) > 0:continue# 在最新日线数据表中,查询对应股票的位置idcs = latest_df[latest_df['code'] == getattr(row, 'code')].indexif len(idcs) < 1:continue# 当日最低价小于等于买点价格,表示股票当日达到了买点if latest_df.loc[idcs[0], 'low'] <= getattr(row, 'buy_point'):# 标记股票已达到买点db_data.loc[getattr(row, 'Index'), buy_available] = 1# 如果该股票尚未买入,则不再进行交易,标记是否交易字段为0if getattr(row, 'date_buy') is None:db_data.loc[getattr(row, 'Index'), 'to_trade'] = 0# 选出待交易数据及不再交易数据trade_data = db_data[db_data['to_trade'] == 1]history_data = db_data[db_data['to_trade'] == 0]# 更新待交易数据,并将不再交易的数据添加到历史数据表中trade_data.to_sql(name='ptrade', con=engine, if_exists='replace', index=False)history_data.to_sql(name='ptrade_history', con=engine, if_exists='append', index=False)# 关闭数据库连接engine.dispose()print('完成可交易股票更新!')def update_ptrade_candidate(df):"""根据当日日线数据,选出候选交易股票,待交易数据写入数据库:param df: 包含当日候选股票日线数据的DataFrame:return: None"""# df为空,则直接退出if not df.shape[0]:return# 删除科创板数据,无科创板权限out_df = df['68' != df['code'].str[:2]]# out_df为空,则直接退出if not out_df.shape[0]:return# ptrade表名table_name = 'ptrade'# 创建数据库引擎对象engine = create_mysql_engine()# 日期列名date_col = 'date_candidate'# 获取按时间降序的第1行数据if table_name in sqlalchemy.inspect(engine).get_table_names():sql_cmd = 'SELECT {0} FROM {1} ORDER BY {0} DESC LIMIT 1;'.format(date_col, table_name)db_df = pd.read_sql(sql=sql_cmd, con=engine)if db_df.shape[0]:# 获取数据库中的最新候选日期db_date = db_df[date_col].iloc[0]# 如果数据库中候选日期已为最新,则无需更新,避免重复添加if db_date >= out_df['date'].iloc[0]:return# 设置候选数据各字段的值out_df = out_df[['code', 'date', 'close']]buy_point = 'buy_point'out_df = out_df.rename(columns={'date': 'date_candidate', 'close': buy_point})out_df['price_take_profit'] = out_df[buy_point].apply(lambda x: math.floor(x * (1 + g_take_profit_percent) * 100) / 100)out_df['price_stop_loss'] = out_df[buy_point].apply(lambda x: math.ceil(x * (1 - g_stop_loss_percent) * 100) / 100)out_df['date_buy'] = Noneout_df['hold_days'] = 0out_df['date_sell'] = Noneout_df['price_buy'] = 0.0out_df['price_sell'] = 0.0out_df['trade_volume'] = 0out_df['amount_buy'] = 0.0out_df['amount_sell'] = 0.0out_df['buy_available'] = 0out_df['to_trade'] = 1# 将新选出的候选股票数据append到数据库if out_df.shape[0]:out_df.to_sql(name=table_name, con=engine, if_exists='append', index=False)# 关闭数据库连接engine.dispose()def task_update_daily():"""每日更新任务,可设置为17:40启动:return: None"""# 程序开始运行时间start_time = time.time()# 获取待交易股票列表stock_codes = get_stock_codes(update=True)# 更新股票数据update_data_mp(stock_codes)# 更新交易数据update_trade(stock_codes)# 程序结束运行时间end_time = time.time()print('程序运行时间:{}s'.format(end_time - start_time))def main():"""每日更新数据:return: None"""# 清空任务列表schedule.clear()# 任务执行时间task_time = '17:40:00'# 交易日定时启动任务schedule.every().monday.at(task_time).do(task_update_daily)schedule.every().tuesday.at(task_time).do(task_update_daily)schedule.every().wednesday.at(task_time).do(task_update_daily)schedule.every().thursday.at(task_time).do(task_update_daily)schedule.every().friday.at(task_time).do(task_update_daily)# 循环等待任务执行while True:schedule.run_pending()print('.', end='')time.sleep(60)if __name__ == '__main__':main()# 如果不按定时任务启动,可以直接运行下面的函数# task_update_daily()

博客内容只用于交流学习,不构成投资建议,盈亏自负!
个人博客:https://coderx.com.cn/(优先更新)
项目最新代码:https://gitee.com/sl/quant_from_scratch
欢迎大家转发、留言。已建微信群用于学习交流,群1已满,群2已创建,感兴趣的读者请扫码加微信!


http://chatgpt.dhexx.cn/article/BYTStZXA.shtml

相关文章

自动交易股票接口开发的关键是什么?

小编认为自动交易股票接口软件的开发的关键是要知道券商的股票交易接口&#xff0c;但是出于安全的考虑&#xff0c;券商对外是不公开股票交易接口。但是我们自己是无法开发股票自动交易软件的开发。不过&#xff0c;现在很多券商都提供了证券独立委托系统。如果我们利用券商都…

股票实盘交易接口怎样实现自动交易过程?

目前随着国内二级股市已处于相对较高的水平&#xff0c;但是对于高净值的用户来说&#xff0c;在量化投资市场上使用股票实盘交易接口具有低波动性和自动交易的定量对冲投资模型体系&#xff0c;包括了一些高性价比、大型基金和长期投资的资产品种&#xff0c;所以大家在选择股…

股票量化自动交易软件下单原则条件

股票量化自动交易软件下单原则条件是一系列的买卖方式&#xff0c;将常见的技术指标写入销售模式&#xff0c;为用户提供自动化的交易服务。如果技术指标已经研究&#xff0c;这些指标已经成为你交易中的一个或全部决策因素&#xff0c;但由于各种主观和客观因素&#xff0c;你…

通达信自动交易软件

1、要善用spy 2、不同的控件主要靠GetDlgCtrlID去区分 3、要获得另一个进程的焦点窗口(GetFocus)需要调用AttachThreadInput 4、尽量少用keybd_event模拟键盘输入&#xff0c;主要是该函数不能保证按键消息一定能被特定进程接收到。取而代之的是SendMessage(hwnd, WM_IME_CH…

动手搭建深度强化学习的自动股票量化交易系统

基于深度强化学习的股票量化交易 ⭐ ⭐ ⭐ 欢迎点个小小的Star支持&#xff01;⭐ ⭐ ⭐ 开源不易&#xff0c;希望大家多多支持~ 更多实践案例(AI识虫&#xff0c;基于PaddleX实现森林火灾监测&#xff0c;眼疾识别&#xff0c;智能相册分类等)、深度学习资料&#xff0c;请…

postman 数据流请求

备注&#xff1a; Postman version &#xff1a; Version 9.21.3 Windows 版本 1.修改headers 添加或者修改 Content-Type 为application/octet-stream2.Body 部分 选择raw 格式数据 直接复制需要的流数据内容&#xff0c;可以任意3.最后执行请求

数据流图基础

一、结构化分析方法 结构化分析是指20世纪70年代末&#xff0c;由Demarco等人提出的&#xff0c;简称SA方法&#xff0c;是面向数据流进行需求分析的方法&#xff0c;旨在减少分析活动中的错误&#xff0c;建立满足用户需求的系统逻辑模型。 结构化分析的要点是&#xff1a;根…

【软件工程】数据流图 ( 数据流图简介 | 数据流图概念 | 数据流 | 加工 | 数据存储 | 外部实体 | 数据流图分层 | 顶层数据流图 | 中层数据流图 | 底层数据流图 )

文章目录 一、数据流图 ( DFD ) 简介二、数据流图 ( DFD ) 概念符号1、数据流2、加工 ( 核心 )3、数据存储4、外部实体 三、数据流图 ( DFD ) 分层1、分层说明2、顶层数据流图3、中层数据流图4、底层数据流图 一、数据流图 ( DFD ) 简介 数据流图 ( Data Flow Diagram ) : 在 …

数据流图(DFD)的概念

数据流图&#xff08;DFD&#xff09;是描述数据流程的图形工具&#xff0c;数据流图从数据传递和加工的角度&#xff0c;以图形的方式刻画数据流从输入到输出的移动变换过程。数据流图是系统逻辑模型的图形表示&#xff0c;从数据的传递与加工角度&#xff0c;来刻画数据流从输…

软考下午——数据流图

基本图形符号 设计原则 我们重点研究数据流图的三大设计原则。这三大设计原则是解题的法宝。 &#xff08;1&#xff09;父图与子图的平衡原则 子图的输入输出数据流同父图对应加工的输入输出数据流必须一致&#xff0c;此即父图与子图的平衡。 &#xff08;图1&#xff0c;不符…

软件工程数据流图

数据流图 数据流图是一种图形化技术&#xff0c;它描绘信息流和数据从输入移动到输出的过程中所经受的变换。在数据流图中没有任何具体的物理部件&#xff0c;它只是描绘数据在软件中流动和被处理的逻辑过程&#xff0c;是系统逻辑功能的图形表示。设计数据流图时只需考虑系统…

Vue单向数据流

Vue 的特性单向数据流&#xff0c;指数据一般从父组件传到子组件,子组件没有权利直接修改。 父组件传来的数据,即子组件从props中直接获取的数据&#xff0c;只能请求父组件修改数据再传给子组件&#xff0c;父组件属性值的更新会下行流动到子组件中。 1&#xff09;如果传很…

数据流分析

基本原理 数据流分析是一种用来获取相关数据沿着程序执行路径流动的信息分析技术。分析对象是程序执行路径上的数据流动或可能的取值 优点&#xff1a;具有更强的分析能力&#xff0c;适合需要考虑控制流信息且变量属性之操作十分简单的静态分析问题 缺点&#xff1a;分析效率…

数据流图DFD

数据流图和数据字典是结构化分析方法中常用的两种工具。本文中基础资料收集于网络&#xff0c;顶层数据流图部分加入里自己的理解。 数据流图 数据流图&#xff0c;简称DFD&#xff0c;是SA方法中用于表示系统逻辑模型的一种工具&#xff0c;它以图形的方式描绘数据在系统中流…

软考--数据流图(DFD)

数据流图的基本元素及其作用 数据流图通过外部代理(实体)描述系统与外界之间的数据交互关系&#xff0c;内部的活动通过处理(加工)表示&#xff0c;用数据流描述系统中不同活动之间的数据传输内容和方向&#xff0c;需要持久化存储的数据用数据存储表示&#xff0c;一般用文件…

数据流

数据流 引子 编译器后端会对前端生成的中间代码做很多优化&#xff0c;也就是在保证程序语义不变的前提下&#xff0c;提高程序执行的效率或减少代码size等优化目目标。优化需要依靠代码分析给出的"指导信息"来相应地改进代码&#xff0c;而代码分析中最重要的就是数…

数据流分析简介

文章目录 0. 前言1. 数据流分析简介1.1 数据流分析基本概念1.2 数据流分析结构简述 2. 数据流分析应用2.1 定义可达性分析(Reaching Definitions Analysis)2.1.1 定义可达描述2.1.2 定义可达算法2.1.3 定义可达算法示例 2.2 活变量分析(Live Variables Analysis)2.2.1 活变量描…

Swift抖动动画

一、直接实现某个视图的持续抖动、只需要给视图的layer添加动画就行。 /// 直接实现/// - Parameters:/// - repeatCount: 重复次数/// - duration: 持续时间/// - values: //抖动幅度数组&#xff1a;不需要太大&#xff1a;从-15度 到 15度、再回到原位置、为一个抖动…

os “抖动”与工作集

由于请求分页式虚拟存储器系统的性能优越&#xff0c;在正常运行情况下&#xff0c;它能有效地减少内存碎片&#xff0c;提高处理机的利用率和吞吐量&#xff0c;故是目前最常用的一种系统。但如果在系统中运行的进程太多&#xff0c;进程在运行中会频繁地发生缺页情况&#xf…

ADC 采样数据抖动

MSP430或STM32&#xff0c;在使用内部ADC出现的采样数据异常抖动问题 采样设计&#xff1a; 用于检测供电线路电流及电压。 产品运行在两种模式下&#xff0c;1、低功耗静态模式&#xff08;仓储态&#xff09;&#xff0c;2、全功能全速运行模式&#xff08;工作态&#xff09…