别院牧志知识库 别院牧志知识库
首页
  • 基础

    • 全栈之路
    • 😎Awesome资源
  • 进阶

    • Python 工匠系列
    • 高阶知识点
  • 指南教程

    • Socket 编程
    • 异步编程
    • PEP 系列
  • 面试

    • Python 面试题
    • 2022 面试记录
    • 2021 面试记录
    • 2020 面试记录
    • 2019 面试记录
    • 数据库索引原理
  • 基金

    • 基金知识
    • 基金经理
  • 细读经典

    • 德隆-三个知道
    • 孔曼子-摊大饼理论
    • 配置者说-躺赢之路
    • 资水-建立自己的投资体系
    • 反脆弱
  • Git 参考手册
  • 提问的智慧
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
首页
  • 基础

    • 全栈之路
    • 😎Awesome资源
  • 进阶

    • Python 工匠系列
    • 高阶知识点
  • 指南教程

    • Socket 编程
    • 异步编程
    • PEP 系列
  • 面试

    • Python 面试题
    • 2022 面试记录
    • 2021 面试记录
    • 2020 面试记录
    • 2019 面试记录
    • 数据库索引原理
  • 基金

    • 基金知识
    • 基金经理
  • 细读经典

    • 德隆-三个知道
    • 孔曼子-摊大饼理论
    • 配置者说-躺赢之路
    • 资水-建立自己的投资体系
    • 反脆弱
  • Git 参考手册
  • 提问的智慧
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 工作
  • 规范

  • Linux

  • 数据库

    • MySQL

      • 推荐几本学习 MySQL 的好书
      • MySQL 索引原理
      • MySQL 事务
      • 聚集索引与非聚集索引的总结
      • Linux 安装 MySQL(包含源码安装和 yum 安装)
      • 为什么数据库不应该使用外键
      • MySQL 为什么要使用 B+树索引
      • MySQL 锁机制
      • 常用 SQL 整理
      • MySQL 分表分库知识整理
      • MySQL 日志:undo log、redo log、binlog 有什么用?
      • MySQL 知识总结
      • MySQL 的 MVCC 机制
    • redis

    • 数据库操作记录
    • 数据库设计
    • SQLAlchemy 2.0 教程
      • 为什么要有 Core
      • 创建链接
      • Core 层 -- 直接使用 SQL
        • CRUD
        • 事务与 commit
      • ORM
        • 声明式 API
        • 外键
        • CRUD
        • 加载外键关联模型
        • 外键的写入
        • JSON 字段的特殊处理
        • 批量插入
      • 从 1.X API 迁移到 2.0 API
      • import
        • 反射——从数据库创建模型
      • 在 FastAPI 中使用
      • 多进程环境下的使用
        • 仅在设定值时增加 where 条件
      • 参考
  • Git

  • 👨‍💻Web

  • 英语

  • Docker

  • 编辑器

  • 网络

  • 前端

  • 存储

  • 备忘录

  • 如何开始你的单元测试
  • 以程序员的视角看中国——西安篇
  • 💻工作
  • 数据库
yifei
2022-12-23
目录

SQLAlchemy 2.0 教程

SQLAlchemy 是 Python 生态系统中最流行的 ORM。SQLAlchemy 设计非常优雅,分为了两部分——底层的 Core 和上层的传统 ORM。在 Python 乃至其他语言的大多数 ORM 中,都没有实现很好的分层设计, 比如 django 的 ORM,数据库链接和 ORM 本身完全混在一起。

sqla_arch

SQLAlchemy 当前版本是 1.4. 有两套 API,1.3 及之前的老 API 和 1.4 及之后的 2.0 兼容 API。本文中只介绍 2.0 API。

# 为什么要有 Core

Core 层主要实现了客户端连接池。我们知道,作为现代 Web 应用的核心,关系型数据库的并发连接能力往往并不是很强,一般不要搞好多短连接过去,大多数情况下需要一个连接池。连接池大体分为 两种,一种在服务端,也就是一个专门的连接池中间件,给短连接每次分配一个长链接复用。另一种在客户端,一般作为第三方库引入代码中。SQLAlchemy 的连接池显然是客户端的。在这个连接池中, SQLAlchemy 维护了一定数量的长连接,当调用 connect 时,实际是从池子中取出了一个链接,调用 close 时,实际是放回到了池子中一个链接。

# 创建链接

SQLAlchemy 中使用 create_engine 来创建连接(池)。create_engine 的参数是数据库的 URL。

from sqlalchemy import create_engine

engine = create_engine(
    "mysql://user:password@localhost:3306/dbname",
    echo=True,  # echo 设为 True 会打印出实际执行的 sql,调试的时候更方便
    future=True,  # 使用 SQLAlchemy 2.0 API,向后兼容
    pool_size=5, # 连接池的大小默认为 5 个,设置为 0 时表示连接无限制
    pool_recycle=3600, # 设置时间以限制数据库自动断开
)

# 创建一个 SQLite 的内存数据库,必须加上 check_same_thread=False,否则无法在多线程中使用
engine = create_engine("sqlite:///:memory:", echo=True, future=True,
    connect_args={"check_same_thread": False})

# pip install mysqlclient
engine = create_engine('mysql+mysqldb://user:password@localhost/foo?charset=utf8mb4')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# Core 层 -- 直接使用 SQL

# CRUD

from sqlachemy import text

with engine.connect() as conn:
    result = conn.execute(text("select * from users"))
    print(result.all())

# result 可以遍历,每一个行结果是一个 Row 对象
for row in result:
    # row 对象三种访问方式都支持
    print(row.x, row.y)
    print(row[0], row[1])
    print(row["x"], row["y"])

# 传递参数,使用 `:var` 传递
result = conn.execute(
    text("SELECT x, y FROM some_table WHERE y > :y"),
    {"y": 2}
)
# 也可以预先编译好参数
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)

# 插入时,可以直接插入多条
conn.execute(
    text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
    [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 事务与 commit

SQLAlchemy 提供两种提交的方式,一种是手工 commit,一种是半自动 commit。官方文档建议使用 engine.begin()。还有一种完全自动的,每一行提交一次的 autocommit 方式,不建议使用。

# "commit as you go"  需要手动 commit
with engine.connect() as conn:
    conn.execute(text("CREATE TABLE some_table (x int, y int)"))
    conn.execute(
        text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
        [{"x": 1, "y": 1}, {"x": 2, "y": 4}]
    )
    conn.commit()  # 注意这里的 commit

# "begin once"  半自动 commit
with engine.begin() as conn:
    conn.execute(
        text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
        [{"x": 6, "y": 8}, {"x": 9, "y": 10}]
    )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# ORM

Session 不是线程安全的。但是一般情况下,web 框架应该在每个请求开始时获取一个 session, 所以也不是问题。

from sqlalchemy.orm import Session

with Session(engine) as session:
    session.add(foo)
    session.commit()

# 还可以使用 sessionmaker 来创建一个工厂函数,这样就不用每次都输入参数了

from sqlachemy.orm import sessionmaker
new_session = sessionmaker(engine)

with new_session() as session:
    ...
1
2
3
4
5
6
7
8
9
10
11
12
13

# 声明式 API

  • 使用 __tablename__ 指定数据库表名
  • 使用 Column 声明每个字段
  • 使用 Integer/String... 指定字段类型
  • 使用 index 参数指定索引
  • 使用 unique 参数指定唯一索引
  • 使用 __table_args__ 指定其他属性,比如联合索引
from datetime import datetime
from sqlalchemy import Integer, Column, String, func, UniqueConstraint
from sqlalchemy.orm import declarative_base, relationship

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    __table_args__ = (UniqueConstraint("name", "time_created"),)
    id = Column(Integer, primary_key=True)
    name = Column(String(30), index=True)
    fullname = Column(String, unique=True)
    # 对于特别大的字段,还可以使用 deferred,这样默认不加载这个字段
    description = deferred(Column(Text))
    # 默认值,注意传递的是函数,不是现在的时间
    time_created = Column(DateTime(Timezone=True), default=datetime.now)
    # 或者使用服务器默认值,但是必须在表创建的时候就设置好,会成为表的 schema 的一部分
    time_created = Column(DateTime(timezone=True), server_default=func.now())
    time_updated = Column(DateTime(timezone=True), onupdate=func.now())

class Address(Base):
    __tablename__ = "address"
    id = Column(Integer, primary_key=True)
    email_address = Column(String, nullable=False)

# 调用 create_all 创建所有模型

Base.metadata.create_all(engine)

# 如果只需要创建一个模型

User.__table__.create(engine)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 外键

使用 relationship 来制定模型之间的关联关系。

一对多关系的双向映射

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import declarative_base, relationship, Session

Base = declarative_base()

class Group(Base):
    __tablename__ = 'groups'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    # 对应的多个 users,这里使用模型名作为参数
    members = relationship('User')

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    # group_id 是数据库中真实存在的外键名,第二个字段 ForeignKey 用来指定对应的 ID
    group_id = Column(Integer, ForeignKey('groups.id'))
    # 模型中对应的 group 字段,需要声明和对应模型中的哪个字段是重合的
    group = relationship('Group', overlaps="members")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

多对多映射,需要使用一个关联表。

# 关联表
class UserPermissions(Base):
    __tablename__ = 'user_permissions'
    id = Column(Integer, primary_key=True)
    # 同样用 foreign key 指定外键
    user_id = Column(Integer, ForeignKey('users.id'))
    permission_id = Column(String, ForeignKey('permissions.id'))

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    # 使用 secondary 指定关联表,同样用 overlaps 指定对应模型中的字段
    permissions = relationship('Permission', secondary="user_permissions", overlaps="users")

class Permission(Base):
    __tablename__ = 'permissions'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    # 同上
    users = relationship('User', secondary="user_permissions", overlaps="permissions")


user1 = User(name='user1', group_id=1)
user2 = User(name='user2')
group1 = Group(name='group1')
group2 = Group(name='group2', members=[user2])
permission1 = Permission(name="open_file")
permission2 = Permission(name="save_file")
user1.permissions.append(permission1)

db.add_all([user1, user2, group1, group2, permission1, permission2])

db.commit()

print(user1.permissions[0].id)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

其他的教程中大多使用 backref 来生成对应模型的属性,我并不喜欢这样,更倾向于在对应的模型中 都显式声明可以访问的属性。

# CRUD

和 1.x API 不同,2.0 API 中不再使用 query,而是使用 select 来查询数据。网上的诸多教程还在 讲解 query API,最好不要参考这些教程了。

from sqlalchemy import select

# where 的参数是 `==` 构成的表达式,好处是写代码的时候,拼写错误就会被检查到
stmt = select(User).where(User.name == "john").order_by(User.id)
# filter_by 使用 **kwargs 作为参数
stmt = select(User).filter_by(name="some_user")
# order_by 还可以使用 User.id.desc() 表示逆序排列

result = session.execute(stmt)

# 一般情况下,当选取整个对象的时候,都要用 scalars 方法,否则返回的是一个包含一个对象的 tuple
for user in result.scalars():
    print(user.name)

# 查询模型单个属性时,不需要使用 scalars
result = session.execute(select(User.name))
for row in result:
    print(row.name)

# 按照 id 查询还有一个快捷方式:
user = session.get(User, pk=1)

# 更新数据需要使用 update 语句
from sqlalchemy import update
# synchronize_session 有三种选项: false, "fetch", "evaluate",默认是 evaluate
# false 表示完全不更新 Python 中的对象
# fetch 表示重新从数据库中加载一份对象
# evaluate 表示在更新数据库的同时,也尽量在 Python 中的对象上使用同样的操作
stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch")
session.execute(stmt)

# 或者直接对属性赋值
user.name = "John"
session.commit()

# 这里有一个可能引入 race condition(竞态条件)的地方
# 错误!如果两个进程同时更新这个值,可能导致只更新了一个值。
# 两者都赋值为自身认为的正确值 2,实际正确值为 1 + 1 + 1 = 3
# 对应 SQL:Update users set visit_count = 2 where user.id = 1
user.visit_count += 1
# 正确做法:注意大写的 U,也就是使用了模型的属性,生成的 SQL 是在 SQL 服务端 +1
# 对应 SQL: Update users set visit_count = visit_count + 1 where user.id = 1
user.visit_count = User.visit_count + 1

# 添加对象直接使用 session.add 方法
session.add(user)
# 或者 add_all
session.add_all([user1, user2, group1])

# 如果要获取插入后的 ID,当然也可以 commit 之后再读
session.flush()   # flush 并不是 commit,并没有提交事务,应该是可重复读,和数据库的隔离级别有关。
print(user.id)

# 删除使用 session.delete
session.delete(user)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

# 加载外键关联模型

如果我们在读取一个 N 个记录的列表之后,再去数据库中一一读取每个项目的具体值,就会产生 N+1 个 查询。这就是数据库中最常犯的错误:N+1 问题。

默认情况下,查询中不会加载外键关联的模型,可以使用 selectinload 选项来加载外键,从而避免 N+1 问题。select(Model).options(selectinload(Model.field))

- session.execute(select(User)).scalars().all()  # 没有加载 parent 外键
+ session.execute(select(User).options(selectinload(User.groups))).scalars().all()
1
2

Selectinload 的原理在于使用了 select in 子查询,这也是名字的又来。除了 selectinload 外, 还可以使用传统的 joinedload,它的原理就是最普通的 join table

# 使用 joinedload 加载外键,注意需要使用 unique 方法,这是 2.0 中规定的。
session.execute(select(User).options(joinedload(User.groups))).unique().scalars().all()
1
2

在 2.0 中,更推荐使用 selectinload 而不是 joinedload,一般情况下,selectinload 都要好, 而且不用使用 unique.

# 外键的写入

SQLAlchemy 中,直接像处理数组一样处理外键就好了,这点非常方便。

user.permissions.append(open_permission)  # 添加
user.permissions.remove(save_permission)  # 删除
# 清空所有外键
user.permissions.clear()
user.permissions = []
1
2
3
4
5

# JSON 字段的特殊处理

大多数的数据库现在都支持 JSON 字段了,在 SQLAlchemy 中我们也可以直接从字段读取 json 对象 或者写入 json 对象。但是,千万不要直接对这个 json 对象做 update 并期望写回数据库中,这是 不可靠的。一定要复制后读写,然后再赋值回去。

    article = session.get(Article, 1)
    tags = copy.copy(article.tags)
    tags.append("iOS")
    article.tags = tags
    session.commit()
1
2
3
4
5

# 批量插入

当需要插入大量数据的时候,如果依然采用逐个插入的方法,那么就会在和数据库的交互上浪费很多 时间,效率很低。MySQL 等大多数数据库都提供了 insert ... values (...), (...) ... 这种 批量插入的 API,在 SQLAlchemy 中也可以很好地利用这一点。

# 使用 session.bulk_save_objects(...) 直接插入多个对象

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

# 使用 bulk_insert_mappings 可以省去创建对象的开销,直接插入字典
users = [
    {"name": "u1"},
    {"name": "u2"},
    {"name": "u3"},
]
s.bulk_insert_mappings(User, users)
s.commit()

# 使用 bulk_update_mappings 可以批量更新对象,字典中的 id 会被用作 where 条件,
# 其他字段全部用于更新
session.bulk_update_mappings(User, users)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 从 1.X API 迁移到 2.0 API

- session.query(User).get(42)
+ session.get(User, 42)

- session.query(User).all()
+ session.execute(select(User)).scalars().all()

- session.query(User).filter_by(name="some_user").one()
+ session.execute(select(User).filter_by(name="some_user")).scalar_one()

- session.query(User).from_statememt(text("select * from users")).a..()
+ session.execute(select(User).from_statement(text("selct * from users"))).scalars().all()

- session.query(User).filter(User.name == "foo").update({"fullname": "FooBar"}, synchronize_session="evaluate")
+ session.execute(update(User).where(User.name == "foo").values(fullname="FooBar").execute_options(synchronize_session="evaluate"))
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# import

SQLAlchemy 的导出类型主要分布在以下三个部分:

# SQL 相关的直接从根目录导入,值得注意的是 Column 和 Integer 也在这里
from sqlalchemy import text, insert, select, create_engine, Column, Integer, String

# ORM 相关的从 orm 子包中导入
from sqlalchemy.orm import declarative_base, Session, sessionmaker

# 异常从 exc 中导入
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
1
2
3
4
5
6
7
8

# 反射——从数据库创建模型

TODO

# 在 FastAPI 中使用

# 多进程环境下的使用

由于 Python 中 GIL 的原因,要利用多核处理器需要使用多进程。而多进程中,资源并不能共享, 对应到 SQLAlchemy,也就是连接池不能共享。

我们需要手工解决这个问题。

一般情况下,最好不要尝试在多个进程中共享同一个 Session。最好在每个进程初始化时创建 Session。

# 仅在设定值时增加 where 条件

在 URL 中,经常需要根据用户指定了哪些选项来返回对应的结果。

query = select(User)
if username is not None:
    query = query.where(User.username == username)
if password is not None:
    query = query.where(User.password == password)
1
2
3
4
5

# 参考

  1. 原文 (opens new window)
  2. https://docs.sqlalchemy.org/en/14/tutorial/orm_related_objects.html (opens new window)
  3. https://stackoverflow.com/questions/25668092/flask-sqlalchemy-many-to-many-insert-data (opens new window)
  4. https://stackoverflow.com/questions/9667138/how-to-update-sqlalchemy-row-entry (opens new window)
  5. https://docs.sqlalchemy.org/en/14/changelog/migration_20.html (opens new window)
  6. https://stackoverflow.com/questions/13370317/sqlalchemy-default-datetime (opens new window)
  7. https://amercader.net/blog/beware-of-json-fields-in-sqlalchemy/ (opens new window)
  8. https://stackoverflow.com/questions/26948397/how-to-delete-records-from-many-to-many-secondary-table-in-sqlalchemy (opens new window)
  9. Count by many to many foreign key (opens new window)
  10. https://stackoverflow.com/questions/19175311/how-to-create-only-one-table-with-sqlalchemy/19175907 (opens new window)
  11. https://stackoverflow.com/questions/63220132/sqlalchemy-insert-to-mysql-db-unicodeencodeerror-for-cyrlic-data (opens new window)
  12. https://stackoverflow.com/questions/41279157/connection-problems-with-sqlalchemy-and-multiple-processes (opens new window)
  13. https://docs.sqlalchemy.org/en/14/core/pooling.html (opens new window)
  14. https://stackoverflow.com/questions/10059345/sqlalchemy-unique-across-multiple-columns (opens new window)
  15. https://stackoverflow.com/questions/22876946/how-to-order-by-count-of-many-to-many-relationship-in-sqlalchemy (opens new window)
编辑 (opens new window)
#SQLAlchemy
上次更新: 2024-07-15, 08:03:22
数据库设计
Git 知识整理

← 数据库设计 Git 知识整理→

最近更新
01
提升沟通亲和力的实用策略
03-26
02
工作
07-15
03
如何选房子
06-25
更多文章>
Theme by Vdoing | Copyright © 2019-2025 IMOYAO | 别院牧志
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式