文章目录
- 前言
- 一、peewee的安装和入门
- 1.字段类型表&Meta类型表&类型属性表
- 字段类型表
- Meta类型表
- 类型属性表
- 2.设计表结构
- 防止连接丢失
- 二、表的设计&操作
- 0.继承
- 1.添加
- 生成表
- 原生SQL
- 添加
- 2.查询
- 3.更新
- 4.删除
- 5.约束
- 6.复合主键
- 三、详细操作
- 1.like&排序&dict
- 2.去重&统计
- 四、避免n+1查询
前言
很多人对这个orm框架有很大的误区 以为会一个orm框架就不用去深入sql语句了 这会导致sql不会写 查询的时候实现困难 适应别的语言的orm框架还得再学一遍。
所以说sql一定一定要学好,这样查询实现的原理就会顺通,学习其他语言的orm框架就会快很多
ORM(Object Relational Mapping,对象关系映射),在Python下⾯有很多这样的类库,如
SQLObject、Storm、peewee和SQLAlchemy。
介绍⼀下Peewee的基本使⽤,因为它⾮常的轻量级,最主要是和Django的ORM 操作很像,如果你学过Django那么很容易上⼿。
一、peewee的安装和入门
peewee官方文档:http://docs.peewee-orm.com/en/latest/peewee/models.html
安装
pip install peewee
pip install pymysql
1.字段类型表&Meta类型表&类型属性表
字段类型表
可以看到对应关系 比如IntegerField 在mysql中是integer
Meta类型表
类型属性表
2.设计表结构
import datetimefrom peewee import *
import logging#查看peewee的日志信息
logger = logging.getLogger("peewee")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())db = MySQLDatabase('peewee', host='127.0.0.1', user='root', passwd='******')class User(Model):#如果没有设置主键,那么自动生成一个id的主键username = CharField(primary_key=True, max_length=20)age = CharField(default=18, max_length=20, verbose_name="年龄")#migrations,class Meta: #大写database = dbclass Tweet(Model):#backref反向查询 外键设置了一对多的关系 #比如一个用户可以有多个Tweet 我拿到用户怎么查所有的Tweet 这时候backref就起作用了#我直接User.tweets就查到了user = ForeignKeyField(User, backref='tweets')message = TextField()#当前时间 注意不要用now()不要实现它created_date = DateTimeField(default=datetime.datetime.now)is_published = BooleanField(default=True)class Meta:#指明databasedatabase = db
防止连接丢失
如果是在项目中,可能会因为一个语句报错而丢失连接 而导致项目崩溃
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase#实现这个类可以避免崩溃
class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):passdb = ReconnectMySQLDatabase("mxshop_goods_srv", host="localhost", port=3306, user="root", password="123456")
二、表的设计&操作
0.继承
在设计表的时候可以设计一个BaseModel来继承它
class BaseModel(Model):add_time = DateTimeField(default=datetime.datetime.now, verbose_name="添加时间")class Meta:database = db # 这里是数据库链接,为了方便建立多个表,可以把这个部分提炼出来形成一个新的类class Person(BaseModel):first = CharField()
1.添加
生成表
if __name__ == "__main__":#1. 生成表结构db.connect()#这个方法只能建表不能改表结构db.create_tables([User, Tweet])
原生SQL
query = User.raw('SELECT * FROM new_user WHERE username = %s', "bobby5")query = User.select().where(SQL('username = "%s"' % "bobby5"))for q in query:print(q.username, q.age)
添加
if __name__ == "__main__":# 添加huey = User.create(username="huey")charlie = User(username="charlie") #update set age=18 where username="charlie"rows = charlie.save() #1. save方法既可以完成新建,也可以完成更新的操作(你的对象中主键值是否有设置,你是一个更新的操作)if rows == 0:print("未更新数据")Person.insert({'first': 'li3','last': 'bobby3'}).execute()
2.查询
#3. 查询#1. get方法 - 1. 返回来的是直接的user对象 2. 这个方法如果查询不到会抛出异常try:# charlie = User.get(User.username=="charie")charlie = User.get_by_id("charie")print(charlie.username)#这个操作发起的sql请求是什么except User.DoesNotExist as e:print("查询不到")#2. 查询所有users = User.select() #1. 没有看到sql查询语句,用于组装sql 2. 对象是ModelSelect 我们对ModelSelect进行for循环和切片的时候才会发起请求# print(users.sql())print(type(users))# user = users[0] users[1:2]print(type(user))usernames = ["charlie", "huey", "mickey"]users = User.select().where(User.username.in_(usernames))for user in users:print(user.username)for user in User.select():print(user.username)
3.更新
charlie = User(username="charlie") #update set xx=xx where username="charlie"print(charlie.save())# 使用update更新print(User.update(age=20).where(User.username=="charlie").execute())
4.删除
user = User.get(User.username=="huey")user.delete_instance()#是不是也不会执行# query = User.delete().where(User.username=="charlie").execute()# print(query)#有少数的方法会直接执行sql语句 get get_by_id
5.约束
class Person(BaseModel):first = CharField()last = CharField()class Meta:primary_key = CompositeKey('first', 'last')class Pet(BaseModel):owner_first = CharField()owner_last = CharField()pet_name = CharField()class Meta:constraints = [SQL('FOREIGN KEY(owner_first, owner_last) REFERENCES person(first, last)')]
6.复合主键
class Blog(BaseModel):passclass Tag(BaseModel):passclass BlogToTag(BaseModel):"""A simple "through" table for many-to-many relationship."""blog = ForeignKeyField(Blog)tag = ForeignKeyField(Tag)class Meta:primary_key = CompositeKey('blog', 'tag')
三、详细操作
1.like&排序&dict
#likequery = Person.select().where(Person.first.startswith('bo'))#排序users = User.select().order_by(-User.age) #兼容了djangofor user in users:print(user.username, user.age)person = Person.select().order_by(Person.id.desc())for row in person:print(row)# 以字典形式输出query = Person.select().dicts()for row in query:print(type(row))print(row)
2.去重&统计
#distinct去重, count方法统计数量query = User.select(User.username).distinct().count()print(query)for q in query:print(q.username)print(query)# select user.name from user where age=(select max(age) from user)"""何时使用one():如果您有一个应该返回 1 个结果的查询,否则会引发异常——即使它返回 0 个结果。换句话说,它不允许 empty results.何时使用 scalar():如果您有一个返回 1 个结果或没有结果的查询。否则抛出异常。换句话说,它确实允许 empty results."""max_age = User.select(fn.MAX(User.age)).scalar()users = User.select().where(User.age==max_age)for user in users:print(user.username)sub_query = User.select(fn.MAX(User.age))query = User.select(User.username).where(User.age==sub_query)for q in query:print(q.username)query = User.raw('SELECT * FROM new_user WHERE username = %s', "bobby5")query = User.select().where(SQL('username = "%s"' % "bobby5"))for q in query:print(q.username, q.age)
四、避免n+1查询
# 使用表连接 能减少网络传输# query = Tweet.select(Tweet, User.username).join(User).where(User.username=="mickey")# for q in query:# print(q.user.username, q.content)# query = Tweet.select(Tweet, User.username).join(User, on=(Tweet.user==User.id)).where(User.username=="mickey")# for q in query:# print(q.user.username, q.content)#反向# user = User.get(User.username=="mickey")# tweets = Tweet.select().where(Tweet.user==user)# for tweet in tweets:# print(user.username, tweet.content)#反向2# tweets = User.get(User.username == "mickey").tweets# # tweets = Tweet.select().where(Tweet.user == user)# for tweet in tweets:# print(tweet.content)#想得到5个数据, 会发起5次请求 + 1 -> n+1查询# for tweet in Tweet.select():# print(tweet.content, tweet.user.username)#什么时候sql会发起请求,以及会发起多少次请求, 开发人员来说 很重要 很多sql高手不会喜欢用ormquery = Tweet.select(Tweet, User.username).join(User).where(User.username == "mickey")for q in query:print(q.user.username, q.content)