Flask框架

摘要

Flask是一个基于Python开发并且依赖jinja2模板和Werkzeug WSGI服务的一个微型框架,对于Werkzeug本质是Socket服务端,其用于接收http请求并对请求进行预处理,然后触发Flask框架,开发人员基于Flask框架提供的功能对请求进行相应的处理,并返回给用户,如果要返回给用户复杂的内容时,需要借助jinja2模板来实现对模板的处理,即:将模板和数据进行渲染,将渲染后的字符串返回给用户浏览器。

“微”(micro)并不表示你需要把整个Web应用塞进单个Python文件(虽然确实可以),也不意味着 Flask 在功能上有所欠缺。微框架中的”微”意味着Flask旨在保持核心简单而易于扩展。Flask不会替你做出太多决策——比如使用何种数据库。而那些Flask所选择的——比如使用何种模板引擎——则很容易替换。除此之外的一切都由可由你掌握。如此,Flask可以与您珠联璧合。

默认情况下,Flask不包含数据库抽象层、表单验证,或是其它任何已有多种库可以胜任的功能。然而,Flask 支持用扩展来给应用添加这些功能,如同是 Flask 本身实现的一样。众多的扩展提供了数据库集成、表单验证、上传处理、各种各样的开放认证技术等功能。Flask 也许是“微小”的,但它已准备好在需求繁杂的生产环境中投入使用。

  • 配置文件
  • 路由系统
  • 模板语言
  • 请求&响应
  • session&cookie
  • 闪现
  • 蓝图
  • 请求扩展(django中间件)
  • 中间件

常见的web框架

  • Django:有很多组件
    • ORM
    • Form
    • ModelForm
    • 缓存
    • Session
    • 中间件
    • 信号
  • Flask:短小精悍,内部没有太多组件,第三方组件丰富
  • Tornado:异步非阻塞(node.js)

Web框架本质

众所周知,对于所有的Web应用,本质上其实就是一个socket服务端,用户的浏览器其实就是一个socket客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/usr/bin/env python
#coding:utf-8
import socket
def handle_request(client):
buf = client.recv(1024)
client.send("HTTP/1.1 200 OK\r\n\r\n")
client.send("Hello, Seven")
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('localhost',8000))
sock.listen(5)
while True:
connection, address = sock.accept()
handle_request(connection)
connection.close()
if __name__ == '__main__':
main()

上述通过socket来实现了其本质,而对于真实开发中的python web程序来说,一般会分为两部分:服务器程序和应用程序。服务器程序负责对socket服务器进行封装,并在请求到来时,对请求的各种数据进行整理。应用程序则负责具体的逻辑处理。为了方便应用程序的开发,就出现了众多的Web框架,例如:Django、Flask、web.py 等。不同的框架有不同的开发方式,但是无论如何,开发出的应用程序都要和服务器程序配合,才能为用户提供服务。这样,服务器程序就需要为不同的框架提供不同的支持。这样混乱的局面无论对于服务器还是框架,都是不好的。对服务器来说,需要支持各种不同框架,对框架来说,只有支持它的服务器才能被开发出的应用使用。这时候,标准化就变得尤为重要。我们可以设立一个标准,只要服务器程序支持这个标准,框架也支持这个标准,那么他们就可以配合使用。一旦标准确定,双方各自实现。这样,服务器可以支持更多支持标准的框架,框架也可以使用更多支持标准的服务器。

WSGI(Web Server Gateway Interface)是一种规范,它定义了使用python编写的web app与web server之间接口格式,实现web app与web server间的解耦。

python标准库提供的独立WSGI服务器称为wsgiref。

1
2
3
4
5
6
7
8
9
10
11
12
from wsgiref.simple_server import make_server
def RunServer(environ, start_response):
start_response('200 OK', [('Content-Type', 'text/html')])
return [bytes('<h1>Hello, web!</h1>', encoding='utf-8'), ]
if __name__ == '__main__':
httpd = make_server('', 8000, RunServer)
print("Serving HTTP on port 8000...")
httpd.serve_forever()

werkzeug

1
2
3
4
5
6
7
8
9
from werkzeug.wrappers import Request, Response
@Request.application
def hello(request):
return Response('Hello World!')
if __name__ == '__main__':
from werkzeug.serving import run_simple
run_simple('localhost', 4000, hello)

Flask

基本使用

1
2
3
4
5
6
7
8
9
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello_world():
return 'Hello World!'
if __name__ == '__main__':
app.run()

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为:
{
'DEBUG': get_debug_flag(default=False), 是否开启Debug模式
'TESTING': False, 是否开启测试模式
'PROPAGATE_EXCEPTIONS': None,
'PRESERVE_CONTEXT_ON_EXCEPTION': None,
'SECRET_KEY': None,
'PERMANENT_SESSION_LIFETIME': timedelta(days=31),
'USE_X_SENDFILE': False,
'LOGGER_NAME': None,
'LOGGER_HANDLER_POLICY': 'always',
'SERVER_NAME': None,
'APPLICATION_ROOT': None,
'SESSION_COOKIE_NAME': 'session',
'SESSION_COOKIE_DOMAIN': None,
'SESSION_COOKIE_PATH': None,
'SESSION_COOKIE_HTTPONLY': True,
'SESSION_COOKIE_SECURE': False,
'SESSION_REFRESH_EACH_REQUEST': True,
'MAX_CONTENT_LENGTH': None,
'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=12),
'TRAP_BAD_REQUEST_ERRORS': False,
'TRAP_HTTP_EXCEPTIONS': False,
'EXPLAIN_TEMPLATE_LOADING': False,
'PREFERRED_URL_SCHEME': 'http',
'JSON_AS_ASCII': True,
'JSON_SORT_KEYS': True,
'JSONIFY_PRETTYPRINT_REGULAR': True,
'JSONIFY_MIMETYPE': 'application/json',
'TEMPLATES_AUTO_RELOAD': None,
}
方式一:
app.config['DEBUG'] = True
PS: 由于Config对象本质上是字典,所以还可以使用app.config.update(...)
方式二:
app.config.from_pyfile("python文件名称")
如:
settings.py
DEBUG = True
app.config.from_pyfile("settings.py")
app.config.from_envvar("环境变量名称")
环境变量的值为python文件名称名称,内部调用from_pyfile方法
app.config.from_json("json文件名称")
JSON文件名称,必须是json格式,因为内部会执行json.loads
app.config.from_mapping({'DEBUG':True})
字典格式
app.config.from_object("python类或类的路径")
app.config.from_object('pro_flask.settings.TestingConfig')
settings.py
class Config(object):
DEBUG = False
TESTING = False
DATABASE_URI = 'sqlite://:memory:'
class ProductionConfig(Config):
DATABASE_URI = 'mysql://user@localhost/foo'
class DevelopmentConfig(Config):
DEBUG = True
class TestingConfig(Config):
TESTING = True
PS: 从sys.path中已经存在路径开始写
PS: settings.py文件默认路径要放在程序root_path目录,如果instance_relative_config为True,则就是instance_path目录

路由系统

  • @app.route(‘/user/‘)
  • @app.route(‘/post/‘)
  • @app.route(‘/post/‘)
  • @app.route(‘/post/‘)
  • @app.route(‘/login’, methods=[‘GET’, ‘POST’])
  • @app.route(‘/index/‘, methods=[‘GET’, ‘POST’],endpoint=’index’)
  • url_for反向解析url

常用路由系统有以上五种,所有的路由系统都是基于一下对应关系来处理:

1
2
3
4
5
6
7
8
9
DEFAULT_CONVERTERS = {
'default': UnicodeConverter,
'string': UnicodeConverter,
'any': AnyConverter,
'path': PathConverter,
'int': IntegerConverter,
'float': FloatConverter,
'uuid': UUIDConverter,
}

注册路由原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def auth(func):
def inner(*args, **kwargs):
print('before')
result = func(*args, **kwargs)
print('after')
return result
return inner
@app.route('/index.html',methods=['GET','POST'],endpoint='index')
@auth
def index():
return 'Index'
def index():
return "Index"
self.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"])
or
app.add_url_rule(rule='/index.html', endpoint="index", view_func=index, methods=["GET","POST"])
app.view_functions['index'] = index
def auth(func):
def inner(*args, **kwargs):
print('before')
result = func(*args, **kwargs)
print('after')
return result
return inner
class IndexView(views.View):
methods = ['GET']
decorators = [auth, ]
def dispatch_request(self):
print('Index')
return 'Index!'
app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint
class IndexView(views.MethodView):
methods = ['GET']
decorators = [auth, ]
def get(self):
return 'Index.GET'
def post(self):
return 'Index.POST'
app.add_url_rule('/index', view_func=IndexView.as_view(name='index')) # name=endpoint

@app.route和app.add_url_rule参数:

  • rule, URL规则
  • view_func, 视图函数名称
  • defaults=None, 默认值,当URL中无参数,函数需要参数时,使用defaults={‘k’:’v’}为函数提供参数
  • endpoint=None, 名称,用于反向生成URL,即: url_for(‘名称’)
  • methods=None, 允许的请求方式,如:[“GET”,”POST”]
  • strict_slashes=None, 对URL最后的 / 符号是否严格要求,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@app.route('/index',strict_slashes=False),
访问 http://www.xx.com/index/ 或 http://www.xx.com/index均可
@app.route('/index',strict_slashes=True)
仅访问 http://www.xx.com/index
```
* redirect_to=None, 重定向到指定地址
``` python
@app.route('/index/<int:nid>', redirect_to='/home/<nid>')
def func(adapter, nid):
return "/home/888"
@app.route('/index/<int:nid>', redirect_to=func)
  • subdomain=None, 子域名访问
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from flask import Flask, views, url_for
app = Flask(import_name=__name__)
app.config['SERVER_NAME'] = 'wupeiqi.com:5000'
@app.route("/", subdomain="admin")
def static_index():
"""Flask supports static subdomains
This is available at static.your-domain.tld"""
return "static.your-domain.tld"
@app.route("/dynamic", subdomain="<username>")
def username_index(username):
"""Dynamic subdomains are also supported
Try going to user1.your-domain.tld/dynamic"""
return username + ".your-domain.tld"
if __name__ == '__main__':
app.run()

自定制正则路由匹配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from flask import Flask, views, url_for
from werkzeug.routing import BaseConverter
app = Flask(import_name=__name__)
class RegexConverter(BaseConverter):
"""
自定义URL匹配正则表达式
"""
def __init__(self, map, regex):
super(RegexConverter, self).__init__(map)
self.regex = regex
def to_python(self, value):
"""
路由匹配时,匹配成功后传递给视图函数中参数的值
:param value:
:return:
"""
return int(value)
def to_url(self, value):
"""
使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
:param value:
:return:
"""
val = super(RegexConverter, self).to_url(value)
return val
# 添加到flask中
app.url_map.converters['regex'] = RegexConverter
@app.route('/index/<regex("\d+"):nid>')
def index(nid):
print(url_for('index', nid='888'))
return 'Index'
if __name__ == '__main__':
app.run()

模板

模板的使用

Flask使用的是Jinja2模板,所以其语法和Django无差别

自定义模板方法

Flask中自定义模板方法的方式和Bottle相似,创建一个函数并通过参数的形式传入render_template,如

html

1
2
3
4
5
6
7
8
9
10
11
12
<!DOCTYPE html>
<html>
<head lang="en">
<meta charset="UTF-8">
<title></title>
</head>
<body>
<h1>自定义函数</h1>
{{ww()|safe}}
</body>
</html>

run.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask,render_template
app = Flask(__name__)
def wupeiqi():
return '<h1>Wupeiqi</h1>'
@app.route('/login', methods=['GET', 'POST'])
def login():
return render_template('login.html', ww=wupeiqi)
app.run()

其他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
{% macro input(name, type='text', value='') %}
<input type="{{ type }}" name="{{ name }}" value="{{ value }}">
{% endmacro %}
{{ input('n1') }}
{% include 'tp.html' %}
<h1>asdf{{ v.k1}}</h1>
</body>
</html>

注意:Markup等价django的mark_safe

请求和响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from flask import Flask
from flask import request
from flask import render_template
from flask import redirect
from flask import make_response
app = Flask(__name__)
@app.route('/login.html', methods=['GET', "POST"])
def login():
# 请求相关信息
# request.method
# request.args
# request.form
# request.values
# request.cookies
# request.headers
# request.path
# request.full_path
# request.script_root
# request.url
# request.base_url
# request.url_root
# request.host_url
# request.host
# request.files
# obj = request.files['the_file_name']
# obj.save('/var/www/uploads/' + secure_filename(f.filename))
# 响应相关信息
# return "字符串"
# return render_template('html模板路径',**{})
# return redirect('/index.html')
# response = make_response(render_template('index.html'))
# response是flask.wrappers.Response类型
# response.delete_cookie('key')
# response.set_cookie('key', 'value')
# response.headers['X-Something'] = 'A value'
# return response
return "内容"
if __name__ == '__main__':
app.run()

Session

除请求对象之外,还有一个session对象.它允许你在不同请求间存储特定用户的信息,它是在Cookies的基础上实现的,并且对Cookies进行密钥签名要使用会话,你需要设置一个密钥.

  • 设置 session[‘username’] = ‘xxx’
  • 删除 session.pop(‘username’, None)

  • 设置cookie时,如何设定关闭浏览器则cookie失效

    • response.set_cookie(‘k’,’v’,exipre=None)

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from flask import Flask, session, redirect, url_for, escape, request
app = Flask(__name__)
@app.route('/')
def index():
if 'username' in session:
return 'Logged in as %s' % escape(session['username'])
return 'You are not logged in'
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
session['username'] = request.form['username']
return redirect(url_for('index'))
return '''
<form action="" method="post">
<p><input type=text name=username>
<p><input type=submit value=Login>
</form>
'''
@app.route('/logout')
def logout():
# remove the username from the session if it's there
session.pop('username', None)
return redirect(url_for('index'))
# set the secret key. keep this really secret:
app.secret_key = 'dasji9dasd9qwdq8dh9qwdu'

自定义Session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
pip3 install Flask-Session
run.py
from flask import Flask
from flask import session
from pro_flask.utils.session import MySessionInterface
app = Flask(__name__)
app.secret_key = 'MIJ1d2i9f1fndunNIJN'
app.session_interface = MySessionInterface()
@app.route('/login.html', methods=['GET', "POST"])
def login():
print(session)
session['user1'] = 'alex'
session['user2'] = 'alex'
del session['user2']
return "内容"
if __name__ == '__main__':
app.run()
session.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import uuid
import json
from flask.sessions import SessionInterface
from flask.sessions import SessionMixin
from itsdangerous import Signer, BadSignature, want_bytes
class MySession(dict, SessionMixin):
def __init__(self, initial=None, sid=None):
self.sid = sid
self.initial = initial
super(MySession, self).__init__(initial or ())
def __setitem__(self, key, value):
super(MySession, self).__setitem__(key, value)
def __getitem__(self, item):
return super(MySession, self).__getitem__(item)
def __delitem__(self, key):
super(MySession, self).__delitem__(key)
class MySessionInterface(SessionInterface):
session_class = MySession
container = {}
def __init__(self):
import redis
self.redis = redis.Redis()
def _generate_sid(self):
return str(uuid.uuid4())
def _get_signer(self, app):
if not app.secret_key:
return None
return Signer(app.secret_key, salt='flask-session',
key_derivation='hmac')
def open_session(self, app, request):
"""
程序刚启动时执行,需要返回一个session对象
"""
sid = request.cookies.get(app.session_cookie_name)
if not sid:
sid = self._generate_sid()
return self.session_class(sid=sid)
signer = self._get_signer(app)
try:
sid_as_bytes = signer.unsign(sid)
sid = sid_as_bytes.decode()
except BadSignature:
sid = self._generate_sid()
return self.session_class(sid=sid)
# session保存在redis中
# val = self.redis.get(sid)
# session保存在内存中
val = self.container.get(sid)
if val is not None:
try:
data = json.loads(val)
return self.session_class(data, sid=sid)
except:
return self.session_class(sid=sid)
return self.session_class(sid=sid)
def save_session(self, app, session, response):
"""
程序结束前执行,可以保存session中所有的值
如:
保存到resit
写入到用户cookie
"""
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
expires = self.get_expiration_time(app, session)
val = json.dumps(dict(session))
# session保存在redis中
# self.redis.setex(name=session.sid, value=val, time=app.permanent_session_lifetime)
# session保存在内存中
self.container.setdefault(session.sid, val)
session_id = self._get_signer(app).sign(want_bytes(session.sid))
response.set_cookie(app.session_cookie_name, session_id,
expires=expires, httponly=httponly,
domain=domain, path=path, secure=secure)

第三方session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#pip3 install redis
#pip3 install flask-session
from flask import Flask, session, redirect
from flask.ext.session import Session
app = Flask(__name__)
app.debug = True
app.secret_key = 'asdfasdfasd'
app.config['SESSION_TYPE'] = 'redis'
from redis import Redis
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
Session(app)
@app.route('/login')
def login():
session['username'] = 'alex'
return redirect('/index')
@app.route('/index')
def index():
name = session['username']
return name
if __name__ == '__main__':
app.run()

蓝图

蓝图用于为应用提供目录划分:

  • 构造程序目录
    • 可以自己造,来回导入
    • 蓝图
      • 批量url前缀
      • 自定模板路径、静态文件路径
      • 请求扩展
        • 针对app,所有蓝图生效
        • 针对某个蓝图做:用户验证
  • 小型应用程序:示例
  • 大型应用程序:示例
  • 蓝图对象名称和函数名称不要重复

其他:

  • 蓝图URL前缀:xxx = Blueprint(‘account’, name,url_prefix=’/xxx’)
  • 蓝图模板目录优先路径:xxx = Blueprint(‘account’, name,template_folder=’tpl’)
  • 蓝图子域名:xxx = Blueprint(‘account’, name,subdomain=’admin’)
    • 前提需要给配置SERVER_NAME: app.config[‘SERVER_NAME’] = ‘wupeiqi.com:5000’
    • 访问时:admin.wupeiqi.com:5000/login.html

message

message是一个基于Session实现的用于保存数据的集合,其特点是:使用一次就删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from flask import Flask, flash, redirect, render_template, request, get_flashed_messages
app = Flask(__name__)
app.secret_key = 'some_secret'
@app.route('/')
def index1():
messages = get_flashed_messages()
print(messages)
return "Index1"
@app.route('/set')
def index2():
v = request.args.get('p')
flash(v)
return 'ok'
if __name__ == "__main__":
app.run()

应用:对临时数据的操作,比如错误信息

1
2
3
from flask import flash
flash("msgxxxx",category="m1")
data = get_flashed_messages(category_filter=['m1'])

中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from flask import Flask, flash, redirect, render_template, request
app = Flask(__name__)
app.secret_key = 'some_secret'
@app.route('/')
def index1():
return render_template('index.html')
@app.route('/set')
def index2():
v = request.args.get('p')
flash(v)
return 'ok'
class MiddleWare:
def __init__(self,wsgi_app):
self.wsgi_app = wsgi_app
def __call__(self, *args, **kwargs):
print('before')
return self.wsgi_app(*args, **kwargs)
print('after')
if __name__ == "__main__":
app.wsgi_app = MiddleWare(app.wsgi_app)
app.run(port=9999)

请求扩展

  • @app.after_request 类似django的process_request 常用
    • 可以用来写登录验证
    • 用request.path设置白名单
  • @app.after_request 类似django的process_response 常用
    • after_request请求拦截后after_request还会执行
  • @app.errorhandler(code) 定制错误信息
  • @app.template_global() @app.template_filter() 定制模板中用的方法
  • @app.before_first_request 第一次请求执行的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, Request, render_template
app = Flask(__name__, template_folder='templates')
app.debug = True
@app.before_first_request
def before_first_request1():
print('before_first_request1')
@app.before_first_request
def before_first_request2():
print('before_first_request2')
@app.before_request
def before_request1(*args,**kwargs):
Request.nnn = 123
print('before_request1')
@app.before_request
def before_request2(*args,**kwargs):
print('before_request2')
@app.after_request
def after_request1(response):
print('before_request1', response)
return response
@app.after_request
def after_request2(response):
print('before_request2', response)
return response
@app.errorhandler(404)
def page_not_found(error):
return 'This page does not exist', 404
@app.template_global()
def sb(a1, a2):
return a1 + a2
# {{sb(1,2)}}
@app.template_filter()
def db(a1, a2, a3):
return a1 + a2 + a3
# {{ 1|db(2,3)}}
@app.route('/')
def hello_world():
return render_template('hello.html')
if __name__ == '__main__':
app.run()

上下文管理

  • ThreadLocal
  • 源码(request)
    • 单进程单线程,基于全局变量
    • 单进程多线程,threading.local对象
    • 单进程单线程(多个协程),threading.local对象做不到
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import threading
local_values = threading.local()
def func(num):
local_values.name = num
import time
time.sleep(1)
print(local_values.name,threading.current_thread().name)
for i in range(20):
th = threading.Thread(target=func, args=(i,),name='线程%s' %i)
th.start()
##########
import threading
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident
class Local(object):
def __init__(self):
self.storage = {}
self.get_ident = get_ident
def set(self, k, v):
ident = self.get_ident()
origin = self.storage.get(ident)
if not origin:
origin = {k:v}
else:
origin[k] = v
self.storage[ident] = origin
def get(self, k):
ident = get_ident()
origin = self.storage.get(ident)
if not origin:
return None
return origin.get(k, None)
local_values = Local()
def func(num):
local_values.set('name', num)
import time
time.sleep(1)
print(local_values.get('name'),threading.current_thread().name)
for i in range(20):
th = threading.Thread(target=func, args=(i,),name='线程%s' %i)
th.start()

others

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import threading
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident
class Local(object):
def __init__(self):
object.__setattr__(self, '__storage__',{})
object.__setattr__(self, '__ident_func__',get_ident)
def __setattr__(self, key, value):
ident = self.__ident_func__()
storage = self.__storage__
try:
storage[ident][key] = value
except KeyError:
storage[ident] = {key:value}
def __getattr__(self, item):
try:
return self.__storage__[self.__indet_func__()][item]
except KeyError:
raise AttributeError(item)
def __delattr__(self, item):
try:
del self.__storage__[self.__ident_func__()][item]
except KeyError:
raise AttributeError(item)
local_values = Local()
def func(num):
local_values.name = num
import time
time.sleep(1)
print(local_values.name,threading.current_thread().name)
for i in range(20):
th = threading.Thread(target=func, args=(i,),name='线程%s' %i)
th.start()

上下文

  • threading.Local对每个线程保存自己的值,而flask定义了一个Local对象支持协程、线程将数据存储到单独的空间
  • 首先将请求相关的数据environ封装到了RequestContext对象(Request、Session)
  • ctx再通过LocalStack对象将RequestContext对象放到Local中(每个线程、协程独立空间存储)
  • 视图函数就可以调用request、session
    • 调用这些数据(request、session)的时候会执行Localproxy中对应的方法
    • localproxy又调用了一个函数(_lookup_req_object)
    • 再通过LocalStack去Local里面把RequestContext拿出来(通过top拿列表钟最后一个)
    • 然后再去RequestContext中获取request或者session
  • 最后通过调用LocalStack的pop方法将数据在Local中移除

请求上下文

  • 执行app.call方法
  • 执行app.wsgi_app方法
  • ctx = self.request_context(environ)
    • 将请求相关的environ封装到了RequestContext对象中
    • 再将对象封装到Local中(每个线程、协程独立空间存储)
    • ctx.app 当前app名称
    • ctx.request Request对象(封装请求相关的数据)
    • ctx.session 空
  • ctx.push()
    • 将ctx通过LocalStack添加到Local中
    • _request_ctx_stack.local = {‘唯一id’:{‘stack’:[ctx,]}
    • ctx.session 有值了
  • 执行视图函数(如打印request)
    • request是LocalProxy对象
    • 执行LocalProxy相应的对象(str
      • LocalProxy中的partial(_lookup_req_object,’request’)自动传递request参数
      • 目标是去local中获取ctx,然后在ctx里面获取request
    • str(LocalProxy._get_current_object)
    • 调用偏函数
    • ctx.request
  • 移除请求数据

应用上下文

  • from flask import request,session,g,current_app
  • ctx.push中
    • app_ctx 创建了一个AppContext(self)对象
    • app_ctx.app = 当前app对象
    • app_ctx.g = 对象(用于保存一个周期中要存储的值)
      • 每个请求周期都会创建一个用于在请求周期中传递值的一个容器
    • app_ctx.push到一个LocalStack()中
    • _app_ctx_stack.local = {‘唯一id’:{‘stack’:[app_ctx,]}
  • 多线程是如何体现的
  • flask的local保存数据时,使用列表创建出来的栈,为什么用栈
    • 如果是web运行环境,栈中永远只保存一条数据
    • 写脚本获取app信息的时候可能存在app上下文嵌套关系
  • web访问多app应用时,上下文管理是如何实现的
    • 使用离线脚本执行flask的时候就可能出现多app堆栈(单元测试)

多app应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from werkzeug.wsgi import DispatcherMiddleware
from werkzeug.serving import run_simple
from flask import Flask
app1 = Flask('app01')
app2 = Flask('app02')
@app1.route('/index1')
def index1():
return 'index1'
@app2.route('/index2')
def index2():
return 'index2'
app = DispatcherMiddleware(app1,{
'/sec' : app2,
})
if __name__ == '__main__':
run_simple('localhost', 5000, app,)

离线脚本

1
2
3
4
5
6
7
8
9
10
11
from flask import Flask,current_app,_app_ctx_stack
app = Flask('app01')
app2 = Flask('app02')
with app.app_context():
print(current_app.name)
print(_app_ctx_stack._local.__storage__)
with app2.app_context():
print(current_app.name)
print(_app_ctx_stack._local.__storage__)

信号

  • before_first_request
  • 触发request_started信号
  • before_request
  • 如果有模板渲染
    • 触发before_render_template信号
    • render
    • 触发template_rendered信号
  • after_request
  • session.save_session
  • 触发request_finished信号
  • 如果出现异常
    • 触发got_request_exception信号
  • 触发request_tearing_down信号

Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为。

1
pip3 install blinker

内置信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
request_started = _signals.signal('request-started')
# 请求到来前执行
request_finished = _signals.signal('request-finished')
# 请求结束后执行
before_render_template = _signals.signal('before-render-template')
# 模板渲染前执行
template_rendered = _signals.signal('template-rendered')
# 模板渲染后执行
got_request_exception = _signals.signal('got-request-exception')
# 请求执行出现异常时执行
request_tearing_down = _signals.signal('request-tearing-down')
# 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')
# 请求上下文执行完毕后自动执行(无论成功与否)
appcontext_pushed = _signals.signal('appcontext-pushed')
# 请求上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped')
# 请求上下文pop时执行
message_flashed = _signals.signal('message-flashed')
# 调用flash在其中添加数据时,自动触发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from flask import Flask,signals
app = Flask(__name__)
def func(*args, **kwargs):
print('触发信号',args,kwargs)
signals.request_started.connect(func)
@app.route('/')
def index():
return 'index'
if __name__ == '__main__':
app.run()

自定义信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from flask import Flask, current_app, flash, render_template
from flask.signals import _signals
app = Flask(import_name=__name__)
# 自定义信号
xxxxx = _signals.signal('xxxxx')
def func(sender, *args, **kwargs):
print(sender)
# 自定义信号中注册函数
xxxxx.connect(func)
@app.route("/x")
def index():
# 触发信号
xxxxx.send('123123', k1='v1')
return 'Index'
if __name__ == '__main__':
app.run()