[TOC]

路由

@app.route('/')def index():
return '<h1>Hello World!</h1>'


def index():
return '<h1>Hello World!</h1>'app.add_url_rule('/', 'index', index)

# 动态路由
from flask import Flask

app = Flask(__name__)


@app.route('/')
def index():
return '<h1>Hello World!</h1>'


@app.route('/user/<name>')
def user(name):
return '<h1>Hello, {}!</h1>'.format(name)

# 打印出所有的路由
(venv) $ python
>>> from hello import app
>>> app.url_mapMap([<Rule '/' (HEAD, OPTIONS, GET) -> index>,
<Rule '/static/<filename>' (HEAD, OPTIONS, GET) -> static>,
<Rule '/user/<name>' (HEAD, OPTIONS, GET) -> user>])

# 重定向
from flask import redirect
@app.route('/')
def index():
return redirect('http://www.example.com')


# 抛出错误页面
from flask import abort
@app.route('/user/<id>')
def get_user(id):
user = load_user(id)
if not user:
abort(404)
return '<h1>Hello, {}</h1>'.format(user.name)


# url_for()
url_for('.index')
"""
In its simplest usage, this function takes the view function name (or endpoint name for routes defined with app.add_url_route()) as its single argument and returns its URL. For example, in the current version of hello.py the call url_for('index') would return /, the root URL of the application. Calling url_for('index', _external=True) would instead return an absolute URL, which in this example is http://localhost:5000/.
"""

Static Files 静态文件

{% block head %}{{ super() }}
<link rel="shortcut icon" href="{{ url_for('static', filename='favicon.ico') }}" type="image/x-icon">
<link rel="icon" href="{{ url_for('static', filename='favicon.ico') }}" type="image/x-icon">
{% endblock %}

开发服务器

(venv) $ export FLASK_APP=hello.py
(venv) $ flask run
* Serving Flask app "hello"
* * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

(venv) $ set FLASK_APP=hello.py
(venv) $ flask run * Serving Flask app "hello"
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

# debug模式

(venv) $ export FLASK_APP=hello.py
(venv) $ export FLASK_DEBUG=1
(venv) $ flask run * Serving Flask app "hello"
* Forcing debug mode on
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 273-181-528

# 命令行参数

(venv) $ flask run --host 0.0.0.0
* Serving Flask app "hello"
* Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)

The Request-Response Cycle 请求 响应上下文

from flask import request
@app.route('/')
def index():
user_agent = request.headers.get('User-Agent')
return '<p>Your browser is {}</p>'.format(user_agent)
  • post
from flask import request

@app.route('/login', methods=['POST', 'GET'])
def login():
error = None
if request.method == 'POST':
if valid_login(request.form['username'],
request.form['password']):
return log_the_user_in(request.form['username'])
else:
error = 'Invalid username/password'
# the code below is executed if the request method
# was GET or the credentials were invalid
return render_template('login.html', error=error)

print request.form.get('myid') #input
print request.form.getlist('mychecks') #check组

全局变量

  • current_app
  • g
  • request
  • session
>>> from hello import app
>>> from flask import current_app
>>> current_app.name
Traceback (most recent call last):
...RuntimeError:
working outside of application context
>>> app_ctx = app.app_context()
>>> app_ctx.push()
>>> current_app.name
'hello'
>>> app_ctx.pop()

The Request Object

http://flask.pocoo.org/docs/0.12/api/#incoming-request-data

  • form dict对象
  • args dict对象
  • values dict对象
  • cookies dict对象

Request Hooks 请求钩子

  • before_first_request
  • before_request
@auth.before_app_request
def before_request():
if current_user.is_authenticated:
current_user.ping()
if not current_user.confirmed \
and request.endpoint \
and request.blueprint != 'auth' \
and request.endpoint != 'static':
return redirect(url_for('auth.unconfirmed'))
  • after_request

  • teardown_request

Responses 响应对象

http://flask.pocoo.org/docs/0.12/api/#response-objects

from flask import make_response
@app.route('/')
def index():
response = make_response('<h1>This document carries a cookie!</h1>')
response.set_cookie('answer', '42')
return response
  • headers

    A Headers object representing the response headers.

  • status

    A string with a response status.

  • status_code

    The response status as integer.

  • data

    A descriptor that calls get_data() and set_data(). This should not be used and will eventually get deprecated.

  • mimetype

    The mimetype (content type without charset etc.)

Templates

# 模板渲染
from flask import Flask, render_template
# ...
@app.route('/')
def index():
return render_template('index.html')


@app.route('/user/<name>')
def user(name):
return render_template('user.html', name=name)

jinjia2 内置过滤器

Hello,

  • safe
  • capitalize
  • lower
  • upper
  • title
  • trim
  • striptags

渲染语法

{% macro render_comment(comment) %}
<li>{{ comment }}</li>
{% endmacro %}

<ul>
{% for comment in comments %}
{{ render_comment(comment) }}
{% endfor %}
</ul>

<!-- example 2 部分渲染 -->
{% import 'macros.html' as macros %}
<ul>
{% for comment in comments %}
{{ macros.render_comment(comment) }}
{% endfor %}
</ul>




<!-- example 块语法 -->
<html>
<head>
{% block head %}
<title>{% block title %}{% endblock %} - My Application</title>
{% endblock %}
</head>
<body>
{% block body %}{% endblock %}
</body>
</html>





<!-- example 使用super()保留原来的内容 -->
{% extends "base.html" %}
{% block title %}Index{% endblock %}
{% block head %} {{ super() }} <style> </style>{% endblock %}
{% block body %}<h1>Hello, World!</h1>{% endblock %}

集成bootstrap

(venv) $ pip install flask-bootstrap

from flask_bootstrap import Bootstrap
# ...
bootstrap = Bootstrap(app)

导航菜单示例

{% extends "bootstrap/base.html" %}
{% block title %}页面标题{% endblock %}
{% block navbar %}
<div class="navbar navbar-inverse" role="navigation">
<div class="container">
<div class="navbar-header">
<button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
<span class="sr-only">Toggle navigation</span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<a class="navbar-brand" href="/">Flasky</a>
</div>
<div class="navbar-collapse collapse">
<ul class="nav navbar-nav">
<li><a href="/">Home</a></li>
</ul>
</div>
</div>
</div>
{% endblock %}

{% block content %}
<div class="container">
<div class="page-header">
<h1>Hello, {{ name }}!</h1>
</div>
</div>
{% endblock %}

错误页面

@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404

@app.errorhandler(500)
def internal_server_error(e):
return render_template('500.html'), 500

表单

(venv) $ pip install flask-wtf

from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField
from wtforms.validators import DataRequired

class NameForm(FlaskForm):
name = StringField('What is your name?', validators=[DataRequired()])
submit = SubmitField('Submit')
@app.route('/', methods=['GET', 'POST'])
def index():
name = None
form = NameForm()
if form.validate_on_submit():
name = form.name.data
form.name.data = ''
return render_template('index.html', form=form, name=name)

session

from flask import Flask, render_template, session, redirect, url_for
@app.route('/', methods=['GET', 'POST'])
def index():
form = NameForm()
if form.validate_on_submit():
session['name'] = form.name.data
return redirect(url_for('index'))
return render_template('index.html', form=form, name=session.get('name'))

database

(venv) $ pip install flask-sqlalchemy

http://flask-sqlalchemy.pocoo.org/2.3/config/

关联表

class Role(db.Model):
# ...
users = db.relationship('User', backref='role')

class User(db.Model):
# ...
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
(venv) $ flask shell
>>> from hello import db>
>> db.create_all()

多表关联


registrations = db.Table('registrations',
db.Column('student_id', db.Integer, db.ForeignKey('students.id')),
db.Column('class_id', db.Integer, db.ForeignKey('classes.id')))


class Student(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String)
classes = db.relationship('Class', secondary=registrations,
backref=db.backref('students', lazy='dynamic'),
lazy='dynamic')


class Class(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String)

操作

>>> s.classes.append(c)
>>> db.session.add(s)

>>> s.classes.all()
>>> c.students.all()

>>> s.classes.remove(c)

example

view

@app.route('/', methods=['GET', 'POST'])
def index():
form = NameForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.name.data).first()
if user is None:
user = User(username=form.name.data)
db.session.add(user)
db.session.commit()
session['known'] = False
else:
session['known'] = True
session['name'] = form.name.data
form.name.data = ''
return redirect(url_for('index'))
return render_template('index.html',
form=form, name=session.get('name'),
known=session.get('known', False))

html

{% extends "base.html" %}{% import "bootstrap/wtf.html" as wtf %}
{% block title %}Flasky{% endblock %}
{% block page_content %}
<div class="page-header">
<h1>Hello, {% if name %}{{ name }}{% else %}Stranger{% endif %}!</h1>
{% if not known %}
<p>Pleased to meet you!</p>
{% else %}
<p>Happy to see you again!</p>
{% endif %}
</div>
{{ wtf.quick_form(form) }}
{% endblock %}

## 集成到shell命令行中

@app.shell_context_processor
def make_shell_context():
return dict(db=db, User=User, Role=Role)
$ flask shell
>>> app
<Flask 'hello'>
>>> db
<SQLAlchemy engine='sqlite:////home/flask/flasky/data.sqlite'>
>>> User
<class 'hello.User'>

迁移文件

(venv) $ pip install flask-migrate
from flask_migrate import Migrate

# ...

migrate = Migrate(app, db)
(venv) $ flask db init  
Creating directory /home/flask/flasky/migrations...done
Creating directory /home/flask/flasky/migrations/versions...done

迁移文件

# 加上注释便于回滚
(venv) $ flask db migrate -m "initial migration"
INFO [alembic.migration] Context impl SQLiteImpl.
Generating /home/flask/flasky/migrations/versions/1bc 594146bb5_initial_migration.py...done
(venv) $ flask db upgrade
INFO [alembic.migration] Context impl SQLiteImpl.
INFO [alembic.migration] Will assume non-transactional DDL.
INFO [alembic.migration] Running upgrade None -> 1bc594146bb5, initial migration

邮件

(venv) $ pip install flask-mail

import os
# ...
app.config['MAIL_SERVER'] = 'smtp.googlemail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD')
from flask_mail import Mail
mail = Mail(app)
(venv) $ export MAIL_USERNAME=<Gmail username>
(venv) $ export MAIL_PASSWORD=<Gmail password>

# windows

(venv) $ set MAIL_USERNAME=<Gmail username>
(venv) $ set MAIL_PASSWORD=<Gmail password>

shell 命令行中发送邮件

(venv) $ flask shell
>>> from flask_mail import Message
>>> from hello import mail
>>> msg = Message('test email', sender='you@example.com',
... recipients=['you@example.com'])
>>> msg.body = 'This is the plain text body'
>>> msg.html = 'This is the <b>HTML</b> body'
>>> with app.app_context():
... mail.send(msg)
...

Integrating Emails with the Application 集成到应用中

from flask_mail import Message
app.config['FLASKY_MAIL_SUBJECT_PREFIX'] = '[Flasky]'
app.config['FLASKY_MAIL_SENDER'] = 'Flasky Admin <flasky@example.com>'

def send_email(to, subject, template, **kwargs):
msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + subject,
sender=app.config['FLASKY_MAIL_SENDER'], recipients=[to])
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs) mail.send(msg)

example


# ...
app.config['FLASKY_ADMIN'] = os.environ.get('FLASKY_ADMIN')
# ...
@app.route('/', methods=['GET', 'POST'])
def index():
form = NameForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.name.data).first()
if user is None:
user = User(username=form.name.data)
db.session.add(user)
session['known'] = False
if app.config['FLASKY_ADMIN']:
send_email(app.config['FLASKY_ADMIN'], 'New User',
'mail/new_user', user=user)
else:
session['known'] = True
session['name'] = form.name.data
form.name.data = ''
return redirect(url_for('index'))
return render_template('index.html',
form=form, name=session.get('name'),
known=session.get('known', False))
(venv) $ export FLASKY_ADMIN=<your-email-address>

(venv) $ set FLASKY_ADMIN=<your-email-address>

Sending Asynchronous Email

from threading import Thread


def send_async_email(app, msg):
with app.app_context():
mail.send(msg)


def send_email(to, subject, template, **kwargs):
msg = Message(
app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + subject,
sender=app.config['FLASKY_MAIL_SENDER'],
recipients=[to]
)
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
thr = Thread(target=send_async_email, args=[app, msg])
thr.start()
return thr

大型项目文件结构

flasky
├── LICENSE
├── README.md
├── __pycache__
│   └── hello.cpython-35.pyc
├── app
│   ├── __init__.py
│   ├── email.py
│   ├── main
│   │   ├── __init__.py
│   │   ├── errors.py
│   │   ├── forms.py
│   │   └── views.py
│   ├── models.py
│   ├── static
│   │   └── favicon.ico
│   └── templates
│   ├── 404.html
│   ├── 500.html
│   ├── base.html
│   ├── index.html
│   └── mail
│   ├── new_user.html
│   └── new_user.txt
├── config.py
├── flasky.py
├── migrations
│   ├── README
│   ├── alembic.ini
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│   └── 38c4e85512a9_initial_migration.py
├── requirements.txt
└── tests
├── __init__.py
└── test_basics.py

9 directories, 27 files

config.py

import os
basedir = os.path.abspath(os.path.dirname(__file__))


class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'hard to guess string'
MAIL_SERVER = os.environ.get('MAIL_SERVER', 'smtp.googlemail.com')
MAIL_PORT = int(os.environ.get('MAIL_PORT', '587'))
MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS', 'true').lower() in \
['true', 'on', '1']
MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
FLASKY_MAIL_SUBJECT_PREFIX = '[Flasky]'
FLASKY_MAIL_SENDER = 'Flasky Admin <flasky@example.com>'
FLASKY_ADMIN = os.environ.get('FLASKY_ADMIN')
SQLALCHEMY_TRACK_MODIFICATIONS = False

@staticmethod
def init_app(app):
pass


class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = os.environ.get('DEV_DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data-dev.sqlite')


class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = os.environ.get('TEST_DATABASE_URL') or \
'sqlite://'


class ProductionConfig(Config):
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'data.sqlite')


config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,

'default': DevelopmentConfig
}

app/main/init.py: Blueprint creation 蓝图创建

from flask import Blueprint

main = Blueprint('main', __name__)

from . import views, errors

app/init.py

from flask import Flask
from flask_bootstrap import Bootstrap
from flask_mail import Mail
from flask_moment import Moment
from flask_sqlalchemy import SQLAlchemy
from config import config

bootstrap = Bootstrap()
mail = Mail()
moment = Moment()
db = SQLAlchemy()


def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)

bootstrap.init_app(app)
mail.init_app(app)
moment.init_app(app)
db.init_app(app)

from .main import main as main_blueprint # 注册蓝图
app.register_blueprint(main_blueprint) # 注册蓝图

return app

app/main/errors.py: Blueprint with error handlers

from flask import render_template
from . import main


@main.app_errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404


@main.app_errorhandler(500)
def internal_server_error(e):
return render_template('500.html'), 500

app/main/views.py: Blueprint with application routes

from flask import render_template, session, redirect, url_for, current_app
from .. import db
from ..models import User
from ..email import send_email
from . import main
from .forms import NameForm


@main.route('/', methods=['GET', 'POST'])
def index():
form = NameForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.name.data).first()
if user is None:
user = User(username=form.name.data)
db.session.add(user)
session['known'] = False
if current_app.config['FLASKY_ADMIN']:
send_email(current_app.config['FLASKY_ADMIN'], 'New User',
'mail/new_user', user=user)
else:
session['known'] = True
session['name'] = form.name.data
return redirect(url_for('.index'))
return render_template('index.html',
form=form, name=session.get('name'),
known=session.get('known', False))

Application Script

flasky.py: Main script

import os
from flask_migrate import Migrate
from app import create_app, db
from app.models import User, Role

app = create_app(os.getenv('FLASK_CONFIG') or 'default')
migrate = Migrate(app, db)


@app.shell_context_processor
def make_shell_context():
return dict(db=db, User=User, Role=Role)


@app.cli.command()
def test():
"""Run the unit tests."""
import unittest
tests = unittest.TestLoader().discover('tests')
unittest.TextTestRunner(verbosity=2).run(tests)

Requirements File

(venv) $ pip freeze >requirements.txt 

(venv) $ pip install -r requirements.txt # 安装

Unit Tests

tests/test_basics.py: Unit tests

import unittest
from flask import current_app
from app import create_app, db


class BasicsTestCase(unittest.TestCase):
def setUp(self):
self.app = create_app('testing')
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()

def tearDown(self):
db.session.remove()
db.drop_all()
self.app_context.pop()

def test_app_exists(self):
self.assertFalse(current_app is None)

def test_app_is_testing(self):
self.assertTrue(current_app.config['TESTING'])

flasky.py: Unit test launcher command

@app.cli.command()
def test():
"""Run the unit tests."""
import unittest
tests = unittest.TestLoader().discover('tests')
unittest.TextTestRunner(verbosity=2).run(tests)

运行单元测试

(venv) $ flask test
test_app_exists (test_basics.BasicsTestCase) ... ok
test_app_is_testing (test_basics.BasicsTestCase) ... ok
.----------------------------------------------------------------------
Ran 2 tests in 0.001s

OK

修改模型

(venv) $  flask db upgrade
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> 38c4e85512a9, initial migration

Running the Application

(venv) $ export FLASK_APP=flasky.py
(venv) $ export FLASK_DEBUG=1
(venv) $ flask run

utils

获取当前时间

Message Flashing 消息闪现

from flask import Flask, render_template, session, redirect, url_for, flash
@app.route('/', methods=['GET', 'POST'])
def index():
form = NameForm()
if form.validate_on_submit():
old_name = session.get('name')
if old_name is not None and old_name != form.name.data:
flash('Looks like you have changed your name!')
session['name'] = form.name.data
return redirect(url_for('index'))
return render_template('index.html', form = form, name = session.get('name'))
{% block content %}
<div class="container">
{% for message in get_flashed_messages() %}
<div class="alert alert-warning">
<button type="button" class="close" data-dismiss="alert">&times;</button>
{{ message }}
</div>
{% endfor %}
{% block page_content %}{% endblock %}
</div>
{% endblock %}