python中实现jwt
的有两个常用的库。
安装
用例
1 2 3 4 5 6 7 8
| >>> import jwt
>>> encoded_jwt = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256') >>> encoded_jwt 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg'
>>> jwt.decode(encoded_jwt, 'secret', algorithms=['HS256']) {'some': 'payload'}
|
项目封装
由于项目中多个业务场景都需要用到jwt
,下面先封装到项目的utils
包中,下面介绍在Flask项目中的一种形式,先封装jwt
的生成utils/jwt_util.py
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import jwt from flask import current_app
def generate_jwt(payload, expiry, secret=None): """
:param payload: dict 载荷 :param expiry: datetime 有效期 :param secret: 密钥 :return: 生成jwt """ _payload = {'exp': expiry} _payload.update(payload)
if not secret: secret = current_app.config['JWT_SECRET']
token = jwt.encode(_payload, secret, algorithm='HS256') return token.decode()
|
下面是封装jwt
的校验。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| def verify_jwt(token, secret=None): """ 校验jwt :param token: jwt :param secret: 密钥 :return: dict: payload """ if not secret: secret = current_app.config['JWT_SECRET']
try: payload = jwt.decode(token, secret, algorithm=['HS256']) except jwt.PyJWTError: payload = None
return payload
|
其中algorithm='HS256'
表示使用sha256计算签名的时候,使用一个secret秘钥字符串参与运算,使用sha256计算验签的时候,使用相同的secret 秘钥字符串参与运算。
登录方案
为了考虑安全性,用户的初始token 的有效期不宜过长,可以初始设置一个短期的有效期,然后在过期的时候,在用户无感知情况下,让客户端重新发一个请求去获取一个refresh_token,这个能够很好的防止抓包,提升了安全性。
下面是一种使用场景。短期token和长期token可根据需求设置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| from datetime import datetime, timedelta from flask_restful import Resource from utils.jwt_util import generate_jwt from flask import current_app, g
class AuthorizationResource(Resource): """ 登录认证 """ def _generate_tokens(self, user_id, with_refresh_token=True): """ 生成token 和refresh_token :param user_id: 用户id :return: token, refresh_token """ now = datetime.utcnow() expiry = now + timedelta(hours=current_app.config['JWT_EXPIRY_HOURS']) token = generate_jwt({'user_id': user_id, 'refresh': False}, expiry) refresh_token = None if with_refresh_token: refresh_expiry = now + timedelta(days=current_app.config['JWT_REFRESH_DAYS']) refresh_token = generate_jwt({'user_id': user_id, 'refresh': True}, refresh_expiry) return token, refresh_token
def post(self): """ 用户登录创建token """
token, refresh_token = self._generate_tokens(user.id)
return {'token': token, 'refresh_token': refresh_token}, 201
|
为了使用jwt
对用户进行授权验证,可以在请求钩子before_request中验证用户身份。下面封装一个中间件utils/middlewares.py
,然后在创建应用中调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| from flask import request, g from .jwt_util import verify_jwt
def jwt_authentication(): """ 根据jwt验证用户身份 """ g.user_id = None g.is_refresh_token = False authorization = request.headers.get('Authorization') if authorization and authorization.startswith('Bearer '): token = authorization.strip()[7:] payload = verify_jwt(token) if payload: g.user_id = payload.get('user_id') g.is_refresh_token = payload.get('refresh')
|
项目中添加请求钩子,保证每次请求之前都能够访问。
1 2 3
| from common.utils.middlewares import jwt_authentication app.before_request(jwt_authentication)
|
下面提供更新token的接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class AuthorizationResource(Resource): """ 认证 """ ......
def put(self): """ 刷新token """ user_id = g.user_id if user_id and g.is_refresh_token: token, refresh_token = self._generate_tokens(user_id, with_refresh_token=False) return {'token': token}, 201 else: return {'message': 'Wrong refresh token.'}, 403
|
有些业务场景必须要求用户进行登录才能访问,下面封装/utils/decorators.py
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| def login_required(func): """ 用户必须登录装饰器 使用方法:放在method_decorators中 """ @wraps(func) def wrapper(*args, **kwargs): if not g.user_id: return {'message': 'User must be authorized.'}, 401 elif g.is_refresh_token: return {'message': 'Do not use refresh token.'}, 403 else: return func(*args, **kwargs)
return wrapper
|