[toc]

User Authentication

example

from werkzeug.security import generate_password_hash, check_password_hash

class User(db.Model):
# ...
password_hash = db.Column(db.String(128))

@property
def password(self):
raise AttributeError('password is not a readable attribute')

@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)

def verify_password(self, password):
return check_password_hash(self.password_hash, password)
(venv) $ flask shell
>>> u = User()
>>> u.password = 'cat'
>>> u.password
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/home/flask/flasky/app/models.py", line 24, in password
raise AttributeError('password is not a readable attribute')AttributeError: password is not a readable attribute
>>> u.password_hash
'pbkdf2:sha256:50000$moHwFH1B$ef1574909f9c549285e8547cad181c5e0213cfa44a4aba4349fa830aa1fd227f'
>>> u.verify_password('cat')
True
>>> u.verify_password('dog')
False
>>> u2 = User()
>>> u2.password = 'cat'
>>> u2.password_hash
'pbkdf2:sha256:50000$Pfz0m0KU$27be930b7f0e0119d38e8d8a62f7f5e75c0a7db61ae16709bcaa6cfd60c44b74'

tests/test_user_model.py: Password hashing tests

import unittest
from app import create_app, db
from app.models import User


class UserModelTestCase(unittest.TestCase):
def setUp(self):
self.app = create_app('testing')
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()

def tearDown(self):
db.session.remove()
db.drop_all()
self.app_context.pop()

def test_password_setter(self):
u = User(password='cat')
self.assertTrue(u.password_hash is not None)

def test_no_password_getter(self):
u = User(password='cat')
with self.assertRaises(AttributeError):
u.password

def test_password_verification(self):
u = User(password='cat')
self.assertTrue(u.verify_password('cat'))
self.assertFalse(u.verify_password('dog'))

def test_password_salts_are_random(self):
u = User(password='cat')
u2 = User(password='cat')
self.assertTrue(u.password_hash != u2.password_hash)
(venv) $ flask test
test_app_exists (test_basics.BasicsTestCase) ... ok
test_app_is_testing (test_basics.BasicsTestCase) ... ok
test_no_password_getter (test_user_model.UserModelTestCase) ... ok
test_password_salts_are_random (test_user_model.UserModelTestCase) ... ok
test_password_setter (test_user_model.UserModelTestCase) ... ok
test_password_verification (test_user_model.UserModelTestCase) ... ok

----------------------------------------------------------------------
Ran 6 tests in 0.393s

OK

app/auth/init.py: Blueprint creation 1

from flask import Blueprint

auth = Blueprint('auth', __name__)

from . import views

app/auth/views.py: Blueprint routes and view functions 2

from flask import render_template
from . import auth

@auth.route('/login')
def login():
return render_template('auth/login.html')

app/init.py: Blueprint attachment 3

def create_app(config_name):

# ...
from .auth import auth as auth_blueprint
app.register_blueprint(auth_blueprint, url_prefix='/auth')

return app

User Authentication with Flask-Login

(venv) $ pip install flask-login

app/models.py: Updates to the User model to support user logins 1

from werkzeug.security import generate_password_hash, check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from flask import current_app
from flask_login import UserMixin, AnonymousUserMixin
from . import db, login_manager


class Permission:
FOLLOW = 1
COMMENT = 2
WRITE = 4
MODERATE = 8
ADMIN = 16


class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
default = db.Column(db.Boolean, default=False, index=True)
permissions = db.Column(db.Integer)
users = db.relationship('User', backref='role', lazy='dynamic')

def __init__(self, **kwargs):
super(Role, self).__init__(**kwargs)
if self.permissions is None:
self.permissions = 0

@staticmethod
def insert_roles():
roles = {
'User': [Permission.FOLLOW, Permission.COMMENT, Permission.WRITE],
'Moderator': [Permission.FOLLOW, Permission.COMMENT,
Permission.WRITE, Permission.MODERATE],
'Administrator': [Permission.FOLLOW, Permission.COMMENT,
Permission.WRITE, Permission.MODERATE,
Permission.ADMIN],
}
default_role = 'User'
for r in roles:
role = Role.query.filter_by(name=r).first()
if role is None:
role = Role(name=r)
role.reset_permissions()
for perm in roles[r]:
role.add_permission(perm)
role.default = (role.name == default_role)
db.session.add(role)
db.session.commit()

def add_permission(self, perm):
if not self.has_permission(perm):
self.permissions += perm

def remove_permission(self, perm):
if self.has_permission(perm):
self.permissions -= perm

def reset_permissions(self):
self.permissions = 0

def has_permission(self, perm):
return self.permissions & perm == perm

def __repr__(self):
return '<Role %r>' % self.name


class User(UserMixin, db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(64), unique=True, index=True)
username = db.Column(db.String(64), unique=True, index=True)
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))
password_hash = db.Column(db.String(128))
confirmed = db.Column(db.Boolean, default=False)

def __init__(self, **kwargs):
super(User, self).__init__(**kwargs)
if self.role is None:
if self.email == current_app.config['FLASKY_ADMIN']:
self.role = Role.query.filter_by(name='Administrator').first()
if self.role is None:
self.role = Role.query.filter_by(default=True).first()

@property
def password(self):
raise AttributeError('password is not a readable attribute')

@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)

def verify_password(self, password):
return check_password_hash(self.password_hash, password)

def generate_confirmation_token(self, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'confirm': self.id}).decode('utf-8')

def confirm(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token.encode('utf-8'))
except:
return False
if data.get('confirm') != self.id:
return False
self.confirmed = True
db.session.add(self)
return True

def generate_reset_token(self, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'reset': self.id}).decode('utf-8')

@staticmethod
def reset_password(token, new_password):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token.encode('utf-8'))
except:
return False
user = User.query.get(data.get('reset'))
if user is None:
return False
user.password = new_password
db.session.add(user)
return True

def generate_email_change_token(self, new_email, expiration=3600):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps(
{'change_email': self.id, 'new_email': new_email}).decode('utf-8')

def change_email(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token.encode('utf-8'))
except:
return False
if data.get('change_email') != self.id:
return False
new_email = data.get('new_email')
if new_email is None:
return False
if self.query.filter_by(email=new_email).first() is not None:
return False
self.email = new_email
db.session.add(self)
return True

def can(self, perm):
return self.role is not None and self.role.has_permission(perm)

def is_administrator(self):
return self.can(Permission.ADMIN)

def __repr__(self):
return '<User %r>' % self.username


class AnonymousUser(AnonymousUserMixin):
def can(self, permissions):
return False

def is_administrator(self):
return False

login_manager.anonymous_user = AnonymousUser


@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))

app/init.py: Flask-Login initialization 2

from flask_login import LoginManager

login_manager = LoginManager()
login_manager.login_view = 'auth.login'

def create_app(config_name):
# ...
login_manager.init_app(app)
# ...

app/models.py: User loader callback function 3

from . import login_manager

@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))

login_required登录装饰器

from flask_login import login_required

@app.route('/secret')
@login_required
def secret():
return 'Only authenticated users are allowed!'

app/auth/forms.py: Login form 4 登录表单

from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired, Length, Email, Regexp, EqualTo
from wtforms import ValidationError
from ..models import User


class LoginForm(FlaskForm):
email = StringField('Email', validators=[DataRequired(), Length(1, 64),
Email()])
password = PasswordField('Password', validators=[DataRequired()])
remember_me = BooleanField('Keep me logged in')
submit = SubmitField('Log In')
<ul class="nav navbar-nav navbar-right">
{% if current_user.is_authenticated %}
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Account <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="{{ url_for('auth.change_password') }}">Change Password</a></li>
<li><a href="{{ url_for('auth.change_email_request') }}">Change Email</a></li>
<li><a href="{{ url_for('auth.logout') }}">Log Out</a></li>
</ul>
</li>
{% else %}
<li><a href="{{ url_for('auth.login') }}">Log In</a></li>
{% endif %}
</ul>

app/auth/views.py: Sign In route 用户登录view

from flask import render_template, redirect, request, url_for, flash
from flask_login import login_user, logout_user, login_required, \
current_user
from . import auth
from .. import db
from ..models import User
from ..email import send_email
from .forms import LoginForm, RegistrationForm, ChangePasswordForm,\
PasswordResetRequestForm, PasswordResetForm, ChangeEmailForm




@auth.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user is not None and user.verify_password(form.password.data):
login_user(user, form.remember_me.data)
next = request.args.get('next')
if next is None or not next.startswith('/'):
next = url_for('main.index')
return redirect(next)
flash('Invalid username or password.')
return render_template('auth/login.html', form=form)

app/templates/auth/login.html: Render login form 登录表单 4

{% extends "base.html" %}
{% import "bootstrap/wtf.html" as wtf %}

{% block title %}Flasky - Login{% endblock %}

{% block page_content %}
<div class="page-header">
<h1>Login</h1>
</div>
<div class="col-md-4">
{{ wtf.quick_form(form) }}
<br>
<p>Forgot your password? <a href="{{ url_for('auth.password_reset_request') }}">Click here to reset it</a>.</p>
<p>New user? <a href="{{ url_for('auth.register') }}">Click here to register</a>.</p>
</div>
{% endblock %}

app/auth/views.py: Sign Out route 用户登出 1

from flask_login import logout_user, login_required

@auth.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user is not None and user.verify_password(form.password.data):
login_user(user, form.remember_me.data)
next = request.args.get('next')
if next is None or not next.startswith('/'):
next = url_for('main.index')
return redirect(next)
flash('Invalid username or password.')
return render_template('auth/login.html', form=form)

app/templates/index.html: Greet the logged-in user 2

(venv) $ flask shell
>>> u = User(email='john@example.com', username='john', password='cat')
>>> db.session.add(u)
>>> db.session.commit()

app/auth/forms.py: User registration form 新用户注册

from flask import render_template, redirect, request, url_for, flash
from flask_login import login_user, logout_user, login_required, \
current_user
from . import auth
from .. import db
from ..models import User
from ..email import send_email
from .forms import LoginForm, RegistrationForm, ChangePasswordForm,\
PasswordResetRequestForm, PasswordResetForm, ChangeEmailForm

app/auth/forms.py: User registration form 1

from flask import render_template, redirect, request, url_for, flash
from flask_login import login_user, logout_user, login_required, \
current_user
from . import auth
from .. import db
from ..models import User
from ..email import send_email
from .forms import LoginForm, RegistrationForm, ChangePasswordForm,\
PasswordResetRequestForm, PasswordResetForm, ChangeEmailForm



class RegistrationForm(FlaskForm):
email = StringField('Email', validators=[DataRequired(), Length(1, 64),
Email()])
username = StringField('Username', validators=[
DataRequired(), Length(1, 64),
Regexp('^[A-Za-z][A-Za-z0-9_.]*$', 0,
'Usernames must have only letters, numbers, dots or '
'underscores')])
password = PasswordField('Password', validators=[
DataRequired(), EqualTo('password2', message='Passwords must match.')])
password2 = PasswordField('Confirm password', validators=[DataRequired()])
submit = SubmitField('Register')

def validate_email(self, field):
if User.query.filter_by(email=field.data).first():
raise ValidationError('Email already registered.')

def validate_username(self, field):
if User.query.filter_by(username=field.data).first():
raise ValidationError('Username already in use.')

app/auth/views.py: User registration route 注册新用户


@auth.route('/register', methods=['GET', 'POST'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
user = User(email=form.email.data,
username=form.username.data,
password=form.password.data)
db.session.add(user)
db.session.commit()
token = user.generate_confirmation_token()
send_email(user.email, 'Confirm Your Account',
'auth/email/confirm', user=user, token=token)
flash('A confirmation email has been sent to you by email.')
return redirect(url_for('auth.login'))
return render_template('auth/register.html', form=form)

用户权限

app/models.py: Role permissions 1

class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
default = db.Column(db.Boolean, default=False, index=True)
permissions = db.Column(db.Integer)
users = db.relationship('User', backref='role', lazy='dynamic')

def __init__(self, **kwargs):
super(Role, self).__init__(**kwargs)
if self.permissions is None:
self.permissions = 0

@staticmethod
def insert_roles():
roles = {
'User': [Permission.FOLLOW, Permission.COMMENT, Permission.WRITE],
'Moderator': [Permission.FOLLOW, Permission.COMMENT,
Permission.WRITE, Permission.MODERATE],
'Administrator': [Permission.FOLLOW, Permission.COMMENT,
Permission.WRITE, Permission.MODERATE,
Permission.ADMIN],
}
default_role = 'User'
for r in roles:
role = Role.query.filter_by(name=r).first()
if role is None:
role = Role(name=r)
role.reset_permissions()
for perm in roles[r]:
role.add_permission(perm)
role.default = (role.name == default_role)
db.session.add(role)
db.session.commit()

def add_permission(self, perm):
if not self.has_permission(perm):
self.permissions += perm

def remove_permission(self, perm):
if self.has_permission(perm):
self.permissions -= perm

def reset_permissions(self):
self.permissions = 0

def has_permission(self, perm):
return self.permissions & perm == perm

def __repr__(self):
return '<Role %r>' % self.name

app/models.py: Permission constants 2


class Permission:
FOLLOW = 1
COMMENT = 2
WRITE = 4
MODERATE = 8
ADMIN = 16

app/models.py: Permission management in the Role model 3

class Role(db.Model):
# ...
def add_permission(self, perm):
if not self.has_permission(perm):
self.permissions += perm
def remove_permission(self, perm):
if self.has_permission(perm):
self.permissions -= perm
def reset_permissions(self):
self.permissions = 0

def has_permission(self, perm):
return self.permissions & perm == perm
(venv) $ flask shell
>>> r = Role(name='User')
>>> r.add_permission(Permission.FOLLOW)
>>> r.add_permission(Permission.WRITE)
>>> r.has_permission(Permission.FOLLOW)
True
>>> r.has_permission(Permission.ADMIN)
False
>>> r.reset_permissions()
>>> r.has_permission(Permission.FOLLOW)
False

app/models.py: Create roles in the database 3


class Role(db.Model):

# ....

@staticmethod
def insert_roles():
roles = {
'User': [Permission.FOLLOW, Permission.COMMENT, Permission.WRITE],
'Moderator': [Permission.FOLLOW, Permission.COMMENT,
Permission.WRITE, Permission.MODERATE],
'Administrator': [Permission.FOLLOW, Permission.COMMENT,
Permission.WRITE, Permission.MODERATE,
Permission.ADMIN],
}
default_role = 'User'
for r in roles:
role = Role.query.filter_by(name=r).first()
if role is None:
role = Role(name=r)
role.reset_permissions()
for perm in roles[r]:
role.add_permission(perm)
role.default = (role.name == default_role)
db.session.add(role)
db.session.commit()

app/models.py: Define a default role for users 权限分配

class User(UserMixin, db.Model):

def __init__(self, **kwargs):
super(User, self).__init__(**kwargs)
if self.role is None:
if self.email == current_app.config['FLASKY_ADMIN']:
self.role = Role.query.filter_by(name='Administrator').first()
if self.role is None:
self.role = Role.query.filter_by(default=True).first()

app/models.py: Evaluate whether a user has a given permission 权限检查

class User(UserMixin, db.Model):
# ...

def can(self, perm):
return self.role is not None and self.role.has_permission(perm)

def is_administrator(self):
return self.can(Permission.ADMIN)

def __repr__(self):
return '<User %r>' % self.username


class AnonymousUser(AnonymousUserMixin):
def can(self, permissions):
return False

def is_administrator(self):
return False


login_manager.anonymous_user = AnonymousUser

权限检查装饰器

from functools import wraps
from flask import abort
from flask_login import current_user
from .models import Permission


def permission_required(permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.can(permission):
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator


def admin_required(f):
return permission_required(Permission.ADMIN)(f)

使用权限装饰器

from .decorators import admin_required, permission_required

@main.route('/admin')
@login_required
@admin_required
def for_admins_only():
return "For administrators!"

@main.route('/admin')
@login_required
@permission_required(Permission.MODERATE)
def for_moderators_only():
return "For comment moderators!"

将权限检查添加到 全局上下文中

# app/main/__init__.py: Adding the Permission class to the template context
@main.app_context_processor
def inject_permissions():
return dict(Permission=Permission)

测试

tests/test_user_model.py: Unit tests for roles and permissions

import unittest
import time
from app import create_app, db
from app.models import User, AnonymousUser, Role, Permission


class UserModelTestCase(unittest.TestCase):
def setUp(self):
self.app = create_app('testing')
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
Role.insert_roles()


def test_user_role(self):
u = User(email='john@example.com', password='cat')
self.assertTrue(u.can(Permission.FOLLOW))
self.assertTrue(u.can(Permission.COMMENT))
self.assertTrue(u.can(Permission.WRITE))
self.assertFalse(u.can(Permission.MODERATE))
self.assertFalse(u.can(Permission.ADMIN))

def test_moderator_role(self):
r = Role.query.filter_by(name='Moderator').first()
u = User(email='john@example.com', password='cat', role=r)
self.assertTrue(u.can(Permission.FOLLOW))
self.assertTrue(u.can(Permission.COMMENT))
self.assertTrue(u.can(Permission.WRITE))
self.assertTrue(u.can(Permission.MODERATE))
self.assertFalse(u.can(Permission.ADMIN))

def test_administrator_role(self):
r = Role.query.filter_by(name='Administrator').first()
u = User(email='john@example.com', password='cat', role=r)
self.assertTrue(u.can(Permission.FOLLOW))
self.assertTrue(u.can(Permission.COMMENT))
self.assertTrue(u.can(Permission.WRITE))
self.assertTrue(u.can(Permission.MODERATE))
self.assertTrue(u.can(Permission.ADMIN))

def test_anonymous_user(self):
u = AnonymousUser()
self.assertFalse(u.can(Permission.FOLLOW))
self.assertFalse(u.can(Permission.COMMENT))
self.assertFalse(u.can(Permission.WRITE))
self.assertFalse(u.can(Permission.MODERATE))
self.assertFalse(u.can(Permission.ADMIN))
(venv) $ flask shell
>>> Role.insert_roles()
>>> Role.query.all()
[<Role 'Administrator'>, <Role 'User'>, <Role 'Moderator'>]


(venv) $ flask shell
>>> admin_role = Role.query.filter_by(name='Administrator').first()
>>> default_role = Role.query.filter_by(default=True).first()
>>> for u in User.query.all():
... if u.role is None:
... if u.email == app.config['FLASKY_ADMIN']:
... u.role = admin_role
... else:.
.. u.role = default_role
...
>>> db.session.commit()