python执行sql 语句
数据etl 过程中会涉及到调度,也就是每天要定时执行的任务,这些任务执行过程中其实是通过底层的脚本代码来进行数据的清洗转换等处理的。而脚本代码中肯定会涉及到调用sql 语句的情况,最近项目正好用到python 脚本调用sql 进行数据处理的需求。
直接上代码
import configparser
import pymysql# 读取配置文件
conf = configparser.RawConfigParser()
conf.read("D:\PycharmProjects\economic_relation\\venv\Include\control\conf.ini")
# 获取源数据库参数
sourceDBUrl = str(conf.get('soureDB', 'sourcedburl'))
sourceDBUser = str(conf.get('soureDB', 'sourcedbuser'))
sourceDBKey = str(conf.get('soureDB', 'sourcedbkey'))
sourceDataBse = str(conf.get('soureDB', 'sourcedatabse'))#链接源数据库
conn = pymysql.connect(host=sourceDBUrl, user=sourceDBUser, passwd=sourceDBKey, db=sourceDataBse, charset='utf8')
cur = conn.cursor(cursor=pymysql.cursors.DictCursor)#读取sql文件
# file=open("D:\PycharmProjects\economic_relation\\venv\Include\sql\\test_create",mode='r',encoding='utf-8')
sql_path="D:\PycharmProjects\economic_relation\\venv\Include\sql\\create_table"def execute_sql_file(sql_path):with open(sql_path,'r+',encoding='utf8') as f:# 将sql 用; 分割 并多行合并成一行sql_list = f.read().split(';')[:-1]sql_list = [x.replace('\n', ' ') if '\n' in x else x for x in sql_list]#执行每一条sqlfor sql_item in sql_list:print ("sql语句:",sql_item)try:effect_row = cur.execute(sql_item)conn.commit()print('effect rows is {}'.format(effect_row))except Exception as e:print("error:",e)
#执行
execute_sql_file(sql_path)
#关闭资源
cur.close()
conn.close()