python语言-orm

摘要

本文部分内容来源于网络,个人收集整理,请勿传播

ORM英文全称object relational mapping,就是对象映射关系程序,简单来说我们类似python这种面向对象的程序来说一切皆对象,但是我们使用的数据库却都是关系型的,为了保证一致的使用习惯,通过orm将编程语言的对象模型和数据库的关系模型建立映射关系,这样我们在使用编程语言对数据库进行操作的时候可以直接使用编程语言的对象模型进行操作就可以了,而不用直接使用sql语言。

ORM的优点

  • 隐藏了数据访问细节,“封闭”的通用数据库交互,ORM的核心。他使得我们的通用数据库交互变得简单易行,并且完全不用考虑该死的SQL语句。快速开发,由此而来。
  • ORM使我们构造固化数据结构变得简单易行。

缺点

无可避免的,自动化意味着映射和关联管理,代价是牺牲性能(早期,这是所有不喜欢ORM人的共同点)。现在的各种ORM框架都在尝试使用各种方法来减轻这块(LazyLoad,Cache),效果还是很显著的。

sqlalchemy

在Python中,最有名的ORM框架是SQLAlchemy。

SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

1
2
3
#由于mysqldb依然不支持py3,所以这里我们用pymysql与sqlalchemy交互
pip install pymysql
pip3 install sqlalchemys

组成部分

  • Engine,框架的引擎
  • Connection Pooling ,数据库连接池
  • Dialect,选择连接数据库的DB API种类
    • 用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作
  • Schema/Types,架构和类型
  • SQL Exprression Language,SQL表达式语言

SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]

MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>

cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html

sqlalchemy基本使用

执行原生sql(不常用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
max_overflow=2, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)


def task1(arg):
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from t1"
)
result = cursor.fetchall()
cursor.close()
conn.close()

def task2(arg):
conn = engine.contextual_connect()
with conn:
cur = conn.execute(
"select * from t1"
)
result = cur.fetchall()
print(result)

def task3(arg):
cur = engine.execute("select * from t1")
result = cur.fetchall()
cur.close()
print(result)

for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()

创建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

Base = declarative_base()


class Users(Base):
__tablename__ = 'users'
__table_args__ = {
'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8'
}

id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
# email = Column(String(32), unique=True)
# ctime = Column(DateTime, default=datetime.datetime.now)
# extra = Column(Text, nullable=True)

__table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'),
# Index('ix_id_name', 'name', 'email'),
)


def init_db():
"""
根据类创建数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Base.metadata.create_all(engine)


def drop_db():
"""
根据类删除数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Base.metadata.drop_all(engine)


if __name__ == '__main__':
drop_db()
init_db()

多个表

指定关联列:hobby = relationship(“Hobby”, backref=’pers’,foreign_keys=”Person.hobby_id”)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship

Base = declarative_base()


# ##################### 单表示例 #########################
class Users(Base):
__tablename__ = 'users'

id = Column(Integer, primary_key=True)
name = Column(String(32), index=True)
age = Column(Integer, default=18)
email = Column(String(32), unique=True)
ctime = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)

__table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'),
# Index('ix_id_name', 'name', 'extra'),
)


class Hosts(Base):
__tablename__ = 'hosts'

id = Column(Integer, primary_key=True)
name = Column(String(32), index=True)
ctime = Column(DateTime, default=datetime.datetime.now)


# ##################### 一对多示例 #########################
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')


class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
hobby_id = Column(Integer, ForeignKey("hobby.id"))

# 与生成表结构无关,仅用于查询方便,在连表查询的时候自动关联
hobby = relationship("Hobby", backref='pers')


# ##################### 多对多示例 #########################

class Server2Group(Base):
__tablename__ = 'server2group'
id = Column(Integer, primary_key=True, autoincrement=True)
server_id = Column(Integer, ForeignKey('server.id'))
group_id = Column(Integer, ForeignKey('group.id'))


class Group(Base):
__tablename__ = 'group'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)

# 与生成表结构无关,仅用于查询方便
servers = relationship('Server', secondary='server2group', backref='groups')


class Server(Base):
__tablename__ = 'server'

id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)


def init_db():
"""
根据类创建数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Base.metadata.create_all(engine)


def drop_db():
"""
根据类删除数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Base.metadata.drop_all(engine)


if __name__ == '__main__':
drop_db()
init_db()

创建多个表并包含Fk、M2M关系

另一种创建方式(不常用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from sqlalchemy import Table, MetaData, Column, Integer, String, ForeignKey
from sqlalchemy.orm import mapper

metadata = MetaData()

user = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('fullname', String(50)),
Column('password', String(12))
)

class User(object):
def __init__(self, name, fullname, password):
self.name = name
self.fullname = fullname
self.password = password

事实上,我们用第一种方式创建的表就是基于第2种方式的再封装。

操作数据库表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)

#创建与数据库的会话session class ,注意,这里返回给session的是个class,不是实例
Session = sessionmaker(bind=engine)

# 每次执行数据库操作时,都需要创建一个session
session = Session()

# ############# 执行ORM操作 #############
obj1 = Users(name="alex1")
#此时还没创建对象呢
print(obj1.name,obj1.id)
#把要创建的数据对象添加到这个session里, 一会统一创建
session.add(obj1)

# 提交事务,创建数据
session.commit()
# 关闭session
session.close()

基于scoped_session实现线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

"""
# 线程安全,基于本地线程实现每个线程用同一个session
# 特殊的:scoped_session中有原来方法的Session中的以下方法:

public_methods = (
'__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
'close', 'commit', 'connection', 'delete', 'execute', 'expire',
'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
'bulk_update_mappings',
'merge', 'query', 'refresh', 'rollback',
'scalar'
)
"""
session = scoped_session(Session)

# ############# 执行ORM操作 #############
obj1 = Users(name="alex1")
session.add(obj1)

# 提交事务
session.commit()
# 关闭session
session.close()

多线程执行示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from db import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)


def task(arg):
session = Session()

obj1 = Users(name="alex1")
session.add(obj1)

session.commit()

for i in range(10):
t = threading.Thread(target=task, args=(i,))
t.start()

基于relationship操作ForeignKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
Hobby(caption='乒乓球'),
Hobby(caption='羽毛球'),
Person(name='张三', hobby_id=3),
Person(name='李四', hobby_id=4),
])

person = Person(name='张九', hobby=Hobby(caption='姑娘'))
session.add(person)

# 连表添加数据
hb = Hobby(caption='人妖')
hb.pers = [Person(name='文飞'), Person(name='博雅')]
session.add(hb)

session.commit()
"""

# 第一种方式连表
p = session.query(Person.name, Hobby.caption).join(Hobby,isouter=True).all()

# 与生成表结构无关,仅用于查询方便,在连表查询的时候自动关联
hobby = relationship("Hobby", backref='pers')

# 使用relationship正向查询
"""
v = session.query(Person).first()
print(v.name)
print(v.hobby.caption)
"""

# 使用relationship反向查询
"""
v = session.query(Hobby).first()
print(v.caption)
print(v.pers)
"""

session.close()

基于relationship操作m2m

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
Server(hostname='c1.com'),
Server(hostname='c2.com'),
Group(name='A组'),
Group(name='B组'),
])
session.commit()

s2g = Server2Group(server_id=1, group_id=1)
session.add(s2g)
session.commit()

# 与生成表结构无关,仅用于查询方便
# servers = relationship('Server', secondary='server2group', backref='groups')

# 会在两个表和关联表都添加数据
gp = Group(name='C组')
gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]
session.add(gp)
session.commit()

# 会在两个表和关联表都添加数据
ser = Server(hostname='c6.com')
ser.groups = [Group(name='F组'),Group(name='G组')]
session.add(ser)
session.commit()
"""


# 使用relationship正向查询
"""
v = session.query(Group).first()
print(v.name)
print(v.servers)
"""

# 使用relationship反向查询
"""
v = session.query(Server).first()
print(v.hostname)
print(v.groups)
"""
session.close()

其他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text, func
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()

# 关联子查询
subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
result = session.query(Group.name, subqry)
"""
SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid
FROM server
WHERE server.id = `group`.id) AS anon_1
FROM `group`
"""

# 原生SQL
"""
# 查询
cursor = session.execute('select * from users')
result = cursor.fetchall()

# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)
"""

session.close()

常用操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text

from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()
session.commit()
session.close()

增加数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 添加一条数据
obj1 = Users(name="wupeiqi")
session.add(obj1)

# 批量增加,数据多的时候效率低
session.add_all([
Users(name="wupeiqi"),
Users(name="alex"),
Hosts(name="c1.com"),
])

# 这种方式增加大量数据更快
session.execute(
User.__table__.insert(),
[{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)]
)
session.commit()
session.close()

删除数据

1
2
3
session.query(Users).filter(Users.id > 2).delete()
session.commit()
session.close()

修改

1
2
3
4
5
6
7
8
9
10
11
12
# update修改
session.query(Users).filter(Users.id > 0).update({"name" : "099"})
# 字符串加synchronize_session=False
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
# 数字要加synchronize_session="evaluate"
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
# 对象修改
my_user = session.query(User).filter_by(name="alex").first()
my_user.name = "xxxx"
# 一定要提交才能生效
session.commit()
session.close()

查询

1
2
my_user = Session.query(User).filter_by(name="alex").first()
print(my_user)

此时你看到的输出是这样的应该

1
2
3
4
5
6
<__main__.User object at 0x105b4ba90>

print(my_user.id,my_user.name,my_user.password)

输出
1 alex alex3714

不过刚才上面的显示的内存对象对址你是没办法分清返回的是什么数据的,除非打印具体字段看一下,如果想让它变的可读,只需在定义表的类下面加上这样的代码

1
2
3
def __repr__(self):
return "<User(name='%s', password='%s')>" % (
self.name, self.password)

查询操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 获取所有数据
r1 = session.query(Users).all()
# 查询某个字段
r2 = session.query(Users.name.label('xx'), Users.age).all()
# 条件过滤
r3 = session.query(Users).filter(Users.name == "alex").all()
r4 = session.query(Users).filter_by(name='alex').all()
r5 = session.query(Users).filter_by(name='alex').first()
r8 = Session.query(User).filter(User.id>0).filter(User.id<7).all()
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
# 统计分组
Session.query(User).filter(User.name.like("Ra%")).count()
# 分组
from sqlalchemy import func
print(Session.query(func.count(User.name),User.name).group_by(User.name).all() )

其他常用操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# 条件
ret = session.query(User).filter(User.name == 'ed')
ret = session.query(User).filter(User.name != 'ed')
ret = session.query(User).filter_by(name='alex').all()
ret = session.query(User).filter(User.id > 1, Users.name == 'eric').all()
ret = session.query(User).filter(User.id.between(1, 3), Users.name == 'eric').all()
# None 判断
ret = session.query(User).filter(User.name == None)
ret = session.query(User).filter(User.name.is_(None))
ret = session.query(User).filter(User.name != None)
ret = session.query(User).filter(User.name.isnot(None))
# in && not in
ret = session.query(User).filter(User.id.in_([1,3,4])).all()
ret = session.query(User).filter(~User.id.in_([1,3,4])).all()
ret = session.query(User).filter(User.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
ret = session.query(User).filter(User.name.in_(session.query(User.name).filter(User.name.like('%ed%'))))

# and && or
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(User.name == 'ed', User.fullname == 'Ed Jones')
ret = session.query(Users).filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
ret = session.query(Users).filter(User.name.match('wendy'))
ret = session.query(Users).filter(
or_(
Users.id < 2,
and_(Users.name == 'eric', Users.id > 3),
Users.extra != ""
)).all()


# 通配符
ret = session.query(Users).filter(Users.name.like('%e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 限制
ret = session.query(Users)[1:2]

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分组
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 连表 需要有主外键
ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
# inner jon
ret = session.query(Person).join(Favor).all()
# left join
ret = session.query(Person).join(Favor, isouter=True).all()
ret = session.query(Person).join(Favor, Person.id == Favor.pid, isouter=True).all()
ret = session.query(Person).join(Favor, and_(Users.id > 3, Users.name == 'eric'), isouter=True).all()

# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

回滚

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
my_user = Session.query(User).filter_by(id=1).first()
my_user.name = "Jack"


fake_user = User(name='Rain', password='12345')
Session.add(fake_user)

print(Session.query(User).filter(User.name.in_(['Jack','rain'])).all() ) #这时看session里有你刚添加和修改的数据

Session.rollback() #此时你rollback一下

print(Session.query(User).filter(User.name.in_(['Jack','rain'])).all() ) #再查就发现刚才添加的数据没有了。

# Session
# Session.commit()

事务操作

1
2
3
4
5
6
7
8
9
# 事务操作
with engine.begin() as conn:
conn.execute("insert into table (x, y, z) values (1, 2, 3)")
conn.execute("my_special_procedure(5)")

conn = engine.connect()
# 事务操作
with conn.begin():
conn.execute("some statement", {'x':5, 'y':10})

外键关联

我们创建一个addresses表,跟user表关联

1
2
3
4
5
6
7
8
9
10
11
12
13
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship

class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String(32), nullable=False)
user_id = Column(Integer, ForeignKey('user.id'))

user = relationship("User", backref="addresses") #这个nb,允许你在user表里通过backref字段反向查出所有它在addresses表里的关联项

def __repr__(self):
return "<Address(email_address='%s')>" % self.email_address

表创建好后,我们可以这样反查试试

1
2
3
4
5
6
7
obj = Session.query(User).first()
for i in obj.addresses: #通过user对象反查关联的addresses记录
print(i)

addr_obj = Session.query(Address).first()
print(addr_obj.user.name) #在addr_obj里直接查关联的user表
创建关联对象
1
2
3
4
5
6
7
obj = Session.query(User).filter(User.name=='rain').all()[0]
print(obj.addresses)

obj.addresses = [Address(email_address="r1@126.com"), #添加关联对象
Address(email_address="r2@126.com")]

Session.commit()

多外键关联

下表中,Customer表有2个字段都关联了Address表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship

Base = declarative_base()

class Customer(Base):
__tablename__ = 'customer'
id = Column(Integer, primary_key=True)
name = Column(String)

billing_address_id = Column(Integer, ForeignKey("address.id"))
shipping_address_id = Column(Integer, ForeignKey("address.id"))

billing_address = relationship("Address")
shipping_address = relationship("Address")

class Address(Base):
__tablename__ = 'address'
id = Column(Integer, primary_key=True)
street = Column(String)
city = Column(String)
state = Column(String)

创建表结构是没有问题的,但你Address表中插入数据时会报下面的错

1
2
3
4
5
6
sqlalchemy.exc.AmbiguousForeignKeysError: Could not determine join
condition between parent/child tables on relationship
Customer.billing_address - there are multiple foreign key
paths linking the tables. Specify the 'foreign_keys' argument,
providing a list of those columns which should be
counted as containing a foreign key reference to the parent table.

解决办法如下

1
2
3
4
5
6
7
8
9
10
class Customer(Base):
__tablename__ = 'customer'
id = Column(Integer, primary_key=True)
name = Column(String)

billing_address_id = Column(Integer, ForeignKey("address.id"))
shipping_address_id = Column(Integer, ForeignKey("address.id"))

billing_address = relationship("Address", foreign_keys=[billing_address_id])
shipping_address = relationship("Address", foreign_keys=[shipping_address_id])

这样sqlachemy就能分清哪个外键是对应哪个字段了

多对多关系

现在来设计一个能描述“图书”与“作者”的关系的表结构,需求是

  • 一本书可以有好几个作者一起出版
  • 一个作者可以写好几本书
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#一本书可以有多个作者,一个作者又可以出版多本书


from sqlalchemy import Table, Column, Integer,String,DATE, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker


Base = declarative_base()

book_m2m_author = Table('book_m2m_author', Base.metadata,
Column('book_id',Integer,ForeignKey('books.id')),
Column('author_id',Integer,ForeignKey('authors.id')),
)

class Book(Base):
__tablename__ = 'books'
id = Column(Integer,primary_key=True)
name = Column(String(64))
pub_date = Column(DATE)
authors = relationship('Author',secondary=book_m2m_author,backref='books')

def __repr__(self):
return self.name

class Author(Base):
__tablename__ = 'authors'
id = Column(Integer, primary_key=True)
name = Column(String(32))

def __repr__(self):
return self.name

orm 多对多

接下来创建几本书和作者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Session_class = sessionmaker(bind=engine) #创建与数据库的会话session class ,注意,这里返回给session的是个class,不是实例
s = Session_class() #生成session实例

b1 = Book(name="跟Alex学Python")
b2 = Book(name="跟Alex学把妹")
b3 = Book(name="跟Alex学装逼")
b4 = Book(name="跟Alex学开车")

a1 = Author(name="Alex")
a2 = Author(name="Jack")
a3 = Author(name="Rain")

b1.authors = [a1,a2]
b2.authors = [a1,a2,a3]

s.add_all([b1,b2,b3,b4,a1,a2,a3])

s.commit()

此时,手动连上mysql,分别查看这3张表,你会发现,book_m2m_author中自动创建了多条纪录用来连接book和author表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
mysql> select * from books;
+----+------------------+----------+
| id | name | pub_date |
+----+------------------+----------+
| 1 | 跟Alex学Python | NULL |
| 2 | 跟Alex学把妹 | NULL |
| 3 | 跟Alex学装逼 | NULL |
| 4 | 跟Alex学开车 | NULL |
+----+------------------+----------+
4 rows in set (0.00 sec)

mysql> select * from authors;
+----+------+
| id | name |
+----+------+
| 10 | Alex |
| 11 | Jack |
| 12 | Rain |
+----+------+
3 rows in set (0.00 sec)

mysql> select * from book_m2m_author;
+---------+-----------+
| book_id | author_id |
+---------+-----------+
| 2 | 10 |
| 2 | 11 |
| 2 | 12 |
| 1 | 10 |
| 1 | 11 |
+---------+-----------+
5 rows in set (0.00 sec)

此时,我们去用orm查一下数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
print('--------通过书表查关联的作者---------')

book_obj = s.query(Book).filter_by(name="跟Alex学Python").first()
print(book_obj.name, book_obj.authors)

print('--------通过作者表查关联的书---------')
author_obj =s.query(Author).filter_by(name="Alex").first()
print(author_obj.name , author_obj.books)
s.commit()
输出如下

--------通过书表查关联的作者---------
跟Alex学Python [Alex, Jack]
--------通过作者表查关联的书---------
Alex [跟Alex学把妹, 跟Alex学Python]

多对多删除

删除数据时不用管boo_m2m_authors , sqlalchemy会自动帮你把对应的数据删除

通过书删除作者

1
2
3
4
5
6
author_obj =s.query(Author).filter_by(name="Jack").first()

book_obj = s.query(Book).filter_by(name="跟Alex学把妹").first()

book_obj.authors.remove(author_obj) #从一本书里删除一个作者
s.commit()

直接删除作者 

删除作者时,会把这个作者跟所有书的关联关系数据也自动删除

1
2
3
4
author_obj =s.query(Author).filter_by(name="Alex").first()
# print(author_obj.name , author_obj.books)
s.delete(author_obj)
s.commit()