JWT在Flask中的应用

目录
  1. 1. 安装
  2. 2. 用例
  3. 3. 项目封装
  4. 4. 登录方案

python中实现jwt的有两个常用的库。

  • itsdangerous

    • JSONWebSignatureSerializer

    • TimedJSONWebSignatureSerializer (可设置有效期)

  • pyjwt

    pyjwt.readthedocs.io/en/latest/

    下面介绍pyjwt的使用。

安装

1
$ pip install pyjwt

用例

1
2
3
4
5
6
7
8
>>> import jwt

>>> encoded_jwt = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256')
>>> encoded_jwt
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg'

>>> jwt.decode(encoded_jwt, 'secret', algorithms=['HS256'])
{'some': 'payload'}

项目封装

由于项目中多个业务场景都需要用到jwt,下面先封装到项目的utils包中,下面介绍在Flask项目中的一种形式,先封装jwt的生成utils/jwt_util.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import jwt
from flask import current_app

def generate_jwt(payload, expiry, secret=None):
"""

:param payload: dict 载荷
:param expiry: datetime 有效期
:param secret: 密钥
:return: 生成jwt
"""
_payload = {'exp': expiry}
_payload.update(payload)

if not secret:
secret = current_app.config['JWT_SECRET'] # 需要在配置文件配置JWT_SECRET

token = jwt.encode(_payload, secret, algorithm='HS256')
return token.decode()

下面是封装jwt的校验。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def verify_jwt(token, secret=None):
"""
校验jwt
:param token: jwt
:param secret: 密钥
:return: dict: payload
"""
if not secret:
secret = current_app.config['JWT_SECRET']

try:
payload = jwt.decode(token, secret, algorithm=['HS256'])
except jwt.PyJWTError:
payload = None

return payload

其中algorithm='HS256'表示使用sha256计算签名的时候,使用一个secret秘钥字符串参与运算,使用sha256计算验签的时候,使用相同的secret 秘钥字符串参与运算。

登录方案

为了考虑安全性,用户的初始token 的有效期不宜过长,可以初始设置一个短期的有效期,然后在过期的时候,在用户无感知情况下,让客户端重新发一个请求去获取一个refresh_token,这个能够很好的防止抓包,提升了安全性。

下面是一种使用场景。短期token和长期token可根据需求设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from datetime import datetime, timedelta
from flask_restful import Resource
from utils.jwt_util import generate_jwt
from flask import current_app, g

class AuthorizationResource(Resource):
"""
登录认证
"""
def _generate_tokens(self, user_id, with_refresh_token=True):
"""
生成token 和refresh_token
:param user_id: 用户id
:return: token, refresh_token
"""
# 颁发JWT
now = datetime.utcnow()
expiry = now + timedelta(hours=current_app.config['JWT_EXPIRY_HOURS'])# 短期token
token = generate_jwt({'user_id': user_id, 'refresh': False}, expiry)
refresh_token = None
if with_refresh_token:
refresh_expiry = now + timedelta(days=current_app.config['JWT_REFRESH_DAYS']) # 长期token
refresh_token = generate_jwt({'user_id': user_id, 'refresh': True}, refresh_expiry)
return token, refresh_token

def post(self):
"""
用户登录创建token
"""
# ...根据业务不同,此处省略参数校验和数据库保存业务场景

token, refresh_token = self._generate_tokens(user.id)

return {'token': token, 'refresh_token': refresh_token}, 201

为了使用jwt对用户进行授权验证,可以在请求钩子before_request中验证用户身份。下面封装一个中间件utils/middlewares.py,然后在创建应用中调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from flask import request, g
from .jwt_util import verify_jwt

def jwt_authentication():
"""
根据jwt验证用户身份
"""
g.user_id = None
g.is_refresh_token = False
authorization = request.headers.get('Authorization')
if authorization and authorization.startswith('Bearer '): # 让前端请求头携带Authorization,值以'Bearer '开头
token = authorization.strip()[7:]
payload = verify_jwt(token)
if payload:
g.user_id = payload.get('user_id')
g.is_refresh_token = payload.get('refresh')

项目中添加请求钩子,保证每次请求之前都能够访问。

1
2
3
# 添加请求钩子
from common.utils.middlewares import jwt_authentication
app.before_request(jwt_authentication)

下面提供更新token的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class AuthorizationResource(Resource):
"""
认证
"""
......

# 补充put方式 更新token接口
def put(self):
"""
刷新token
"""
user_id = g.user_id
if user_id and g.is_refresh_token:
token, refresh_token = self._generate_tokens(user_id, with_refresh_token=False)
return {'token': token}, 201
else:
return {'message': 'Wrong refresh token.'}, 403

有些业务场景必须要求用户进行登录才能访问,下面封装/utils/decorators.py.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def login_required(func):
"""
用户必须登录装饰器
使用方法:放在method_decorators中
"""
@wraps(func)
def wrapper(*args, **kwargs):
if not g.user_id:
return {'message': 'User must be authorized.'}, 401
elif g.is_refresh_token:
return {'message': 'Do not use refresh token.'}, 403
else:
return func(*args, **kwargs)

return wrapper