Flask框架扩展

摘要

Flask-Session

flask中session处理机制

  • 请求进来之后先看用户有没有随机字符串
    • 有的话去获取原来的个人数据
    • 没有的话在内存中生成一个空容器(内存对象,随机字符串{放置数据的容器})
  • 视图:操作内存中的对象
  • 相应:内存对象
    • 将数据保存到数据库
    • 将随机字符串写在用户cookie中
  • obj = SecureCookieSessionInterface()
  • obj = open_session(self, request) = SecureCookieSession()
  • self.session = SecureCookieSession()
  • open_session()
    • 去用户请求的cookie中获取原来给你的随机字符串,去cookie中获取key为session的值
    • 如果没有,创建一个特殊的空字典
  • 此时Local的ctx中有了session,是一个特殊的空字典

第三方session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#pip3 install redis
#pip3 install flask-session
from flask import Flask, session, redirect
from flask.ext.session import Session

app = Flask(__name__)
app.debug = True
app.secret_key = 'asdfasdfasd'

app.config['SESSION_TYPE'] = 'redis'
from redis import Redis
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
Session(app)

@app.route('/login')
def login():
session['username'] = 'alex'
return redirect('/index')

@app.route('/index')
def index():
name = session['username']
return name

if __name__ == '__main__':
app.run()
1
2
3
4
5
6
from flask_session import RedisSessionInterface
from redis import Redis

app = Flask(__name__)
conn = Redis(host='192.168.0.94',port='6379')
app.session_interface = RedisSessionInterface(conn,key_prefix='xx')

MetaClass

用来执行当前类由谁来创建,默认是type

  • class Foo(metaclass=type) 由type创建类
  • class Foo(type) 继承type

使用

1
2
3
4
5
6
7
8
class Foo(metaclass=type):
pass

class Foo1(object):
__metaclass__ = type

class Bar(Foo):
pass

示例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class MyType(type):
def __init__(self,*args,**kwargs):
print("init")
super(MyType,self).__init__(*args,**kwargs)

def __call__(self, *args, **kwargs):
print("call")
# call本质:调用类的__new__放大,再调用__init__方法
return super(MyType,self).__call__(*args,**kwargs)

class Foo(metaclass=MyType):
pass

class Bar(Foo):
pass

obj = Bar()

示例2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class MyType(type):
def __init__(self, *args, **kwargs):
super(MyType, self).__init__(*args, **kwargs)

def __call__(cls, *args, **kwargs):
v = dir(cls)
obj = super(MyType, cls).__call__(*args, **kwargs)
return obj


class Foo(MyType('Base', (object,), {})):
user = 'xxxxxx'
age = 18


obj = Foo()

示例3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class MyType(type):
def __init__(self, *args, **kwargs):
super(MyType, self).__init__(*args, **kwargs)

def __call__(cls, *args, **kwargs):
v = dir(cls)
obj = super(MyType, cls).__call__(*args, **kwargs)
return obj


def with_metaclass(MyType,base):
return MyType('Base', (base,), {})


class Foo(with_metaclass(MyType,object)):
user = 'xxxxxx'
age = 18


obj = Foo()

实例化流程分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# 源码流程
1. 执行type的 __call__ 方法,读取字段到静态字段 cls._unbound_fields 中; meta类读取到cls._wtforms_meta中
2. 执行构造方法

a. 循环cls._unbound_fields中的字段,并执行字段的bind方法,然后将返回值添加到 self._fields[name] 中。
即:
_fields = {
name: wtforms.fields.core.StringField(),
}

PS:由于字段中的__new__方法,实例化时:name = simple.StringField(label='用户名'),创建的是UnboundField(cls, *args, **kwargs),当执行完bind之后,才变成执行 wtforms.fields.core.StringField()

b. 循环_fields,为对象设置属性
for name, field in iteritems(self._fields):
# Set all the fields to attributes so that they obscure the class
# attributes with the same names.
setattr(self, name, field)
c. 执行process,为字段设置默认值:self.process(formdata, obj, data=data, **kwargs)
优先级:obj,data,formdata;

再循环执行每个字段的process方法,为每个字段设置值:
for name, field, in iteritems(self._fields):
if obj is not None and hasattr(obj, name):
field.process(formdata, getattr(obj, name))
elif name in kwargs:
field.process(formdata, kwargs[name])
else:
field.process(formdata)

执行每个字段的process方法,为字段的data和字段的raw_data赋值
def process(self, formdata, data=unset_value):
self.process_errors = []
if data is unset_value:
try:
data = self.default()
except TypeError:
data = self.default

self.object_data = data

try:
self.process_data(data)
except ValueError as e:
self.process_errors.append(e.args[0])

if formdata:
try:
if self.name in formdata:
self.raw_data = formdata.getlist(self.name)
else:
self.raw_data = []
self.process_formdata(self.raw_data)
except ValueError as e:
self.process_errors.append(e.args[0])

try:
for filter in self.filters:
self.data = filter(self.data)
except ValueError as e:
self.process_errors.append(e.args[0])

d. 页面上执行print(form.name) 时,打印标签

因为执行了:
字段的 __str__ 方法
字符的 __call__ 方法
self.meta.render_field(self, kwargs)
def render_field(self, field, render_kw):
other_kw = getattr(field, 'render_kw', None)
if other_kw is not None:
render_kw = dict(other_kw, **render_kw)
return field.widget(field, **render_kw)
执行字段的插件对象的 __call__ 方法,返回标签字符串

验证流程分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
a. 执行form的validate方法,获取钩子方法
def validate(self):
extra = {}
for name in self._fields:
inline = getattr(self.__class__, 'validate_%s' % name, None)
if inline is not None:
extra[name] = [inline]

return super(Form, self).validate(extra)
b. 循环每一个字段,执行字段的 validate 方法进行校验(参数传递了钩子函数)
def validate(self, extra_validators=None):
self._errors = None
success = True
for name, field in iteritems(self._fields):
if extra_validators is not None and name in extra_validators:
extra = extra_validators[name]
else:
extra = tuple()
if not field.validate(self, extra):
success = False
return success
c. 每个字段进行验证时候
字段的pre_validate 【预留的扩展】
字段的_run_validation_chain,对正则和字段的钩子函数进行校验
字段的post_validate【预留的扩展】

wtforms

WTForms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证。

1
pip3 install wtforms

用户登录注册示例

用户登录

当用户登录时候,需要对用户提交的用户名和密码进行多种格式校验。如:

  • 用户不能为空;用户长度必须大于6;
  • 密码不能为空;密码长度必须大于12;密码必须包含 字母、数字、特殊字符等(自定义正则);

app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')
app.debug = True


class LoginForm(Form):
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired(message='用户名不能为空.'),
validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'}

)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.'),
validators.Length(min=8, message='用户名长度必须大于%(min)d'),
validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')

],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)



@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
form = LoginForm()
return render_template('login.html', form=form)
else:
form = LoginForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('login.html', form=form)

if __name__ == '__main__':
app.run()

login.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post">
<!--<input type="text" name="name">-->
<p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>

<!--<input type="password" name="pwd">-->
<p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
<input type="submit" value="提交">
</form>
</body>
</html>

用户注册

注册页面需要让用户输入:用户名、密码、密码重复、性别、爱好等。

app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')
app.debug = True



class RegisterForm(Form):
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired()
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'},
default='alex'
)

pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)

pwd_confirm = simple.PasswordField(
label='重复密码',
validators=[
validators.DataRequired(message='重复密码不能为空.'),
validators.EqualTo('pwd', message="两次密码输入不一致")
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)

email = html5.EmailField(
label='邮箱',
validators=[
validators.DataRequired(message='邮箱不能为空.'),
validators.Email(message='邮箱格式错误')
],
widget=widgets.TextInput(input_type='email'),
render_kw={'class': 'form-control'}
)

gender = core.RadioField(
label='性别',
choices=(
(1, '男'),
(2, '女'),
),
coerce=int
)
city = core.SelectField(
label='城市',
choices=(
('bj', '北京'),
('sh', '上海'),
)
)

hobby = core.SelectMultipleField(
label='爱好',
choices=(
(1, '篮球'),
(2, '足球'),
),
coerce=int
)

favor = core.SelectMultipleField(
label='喜好',
choices=(
(1, '篮球'),
(2, '足球'),
),
widget=widgets.ListWidget(prefix_label=False),
option_widget=widgets.CheckboxInput(),
coerce=int,
default=[1, 2]
)

def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))

def validate_pwd_confirm(self, field):
"""
自定义pwd_confirm字段规则,例:与pwd字段是否一致
:param field:
:return:
"""
# 最开始初始化时,self.data中已经有所有的值

if field.data != self.data['pwd']:
# raise validators.ValidationError("密码不一致") # 继续后续验证
raise validators.StopValidation("密码不一致") # 不再继续后续验证


@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
form = RegisterForm(data={'gender': 1})
return render_template('register.html', form=form)
else:
form = RegisterForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('register.html', form=form)



if __name__ == '__main__':
app.run()

register.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>用户注册</h1>
<form method="post" novalidate style="padding:0 50px">
{% for item in form %}
<p>{{item.label}}: {{item}} {{item.errors[0] }}</p>
{% endfor %}
<input type="submit" value="提交">
</form>
</body>
</html>

示例

meta

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, render_template, request, redirect, session
from wtforms import Form
from wtforms.csrf.core import CSRF
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
from hashlib import md5

app = Flask(__name__, template_folder='templates')
app.debug = True


class MyCSRF(CSRF):
"""
Generate a CSRF token based on the user's IP. I am probably not very
secure, so don't use me.
"""

def setup_form(self, form):
self.csrf_context = form.meta.csrf_context()
self.csrf_secret = form.meta.csrf_secret
return super(MyCSRF, self).setup_form(form)

def generate_csrf_token(self, csrf_token):
gid = self.csrf_secret + self.csrf_context
token = md5(gid.encode('utf-8')).hexdigest()
return token

def validate_csrf_token(self, form, field):
print(field.data, field.current_token)
if field.data != field.current_token:
raise ValueError('Invalid CSRF')


class TestForm(Form):
name = html5.EmailField(label='用户名')
pwd = simple.StringField(label='密码')

class Meta:
# -- CSRF
# 是否自动生成CSRF标签
csrf = True
# 生成CSRF标签name
csrf_field_name = 'csrf_token'

# 自动生成标签的值,加密用的csrf_secret
csrf_secret = 'xxxxxx'
# 自动生成标签的值,加密用的csrf_context
csrf_context = lambda x: request.url
# 生成和比较csrf标签
csrf_class = MyCSRF

# -- i18n
# 是否支持本地化
# locales = False
locales = ('zh', 'en')
# 是否对本地化进行缓存
cache_translations = True
# 保存本地化缓存信息的字段
translations_cache = {}


@app.route('/index/', methods=['GET', 'POST'])
def index():
if request.method == 'GET':
form = TestForm()
else:
form = TestForm(formdata=request.form)
if form.validate():
print(form)
return render_template('index.html', form=form)


if __name__ == '__main__':
app.run()

源码流程

  • 解释:metaclass
  • 实例 form = LoginForm()
  • 验证 form.validate()
1
2
3
4
5
6
7
8
9
10
11
class BaseForm:
pass

class NewBase(BaseForm,metaclass=FormMeta):
pass

class Form(NewBase):
pass

class LoginForm(Form):
pass

Flask-SQLAchemy

  • Flask和SQLAchemy的管理者
  • 文件和目录的管理
  • db = SQLAlchemy()
    • 包含配置
    • 包含ORM基类
    • 包含create_all
    • engine
    • 创建连接

代码待续

Flask-Script

用于实现django的python manage.py runserver

1
pip install flask-script

使用

1
2
3
4
5
6
7
8
9
10
11
12
from flask import Flask,signals,session
from flask_script import Manager

app = Flask(__name__)
manager = Manager(app)

@app.route('/')
def index():
return 'index'

if __name__ == '__main__':
manager.run()

运行

1
python manager.py runserver

自定义命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from flask import Flask,signals,session
from flask_script import Manager

app = Flask(__name__)
manager = Manager(app)

@manager.command
def custom(arg):
print(arg)

@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name,url):
print(name,url)

@app.route('/')
def index():
return 'index'

if __name__ == '__main__':
manager.run()

Flask-Migrate

用于数据库操作

1
2
3
4
5
6
7
8
from flask import Flask
from flask_script import Manager
from flask_migrate import Migrate,MigrateCommand

app = Flask(__name__)
manager = Manager(app)
migrate = Migrate(app, db)
manager.add_command('db',MigrateCommand)

使用

1
2
3
python manager.py db init
python manager.py db migrate
python manager.py db upgrade