fastapi增加jwt验证功能

fastapi增加jwt验证功能

fastapi的使用实例参看之前的 文章 https://www.madbull.site/?p=565 ,下边的示例是对 t1.py 的改造,增加登录认证、token生成、token验证、权限验证功能。结合之前的文章,替换掉t1.py文件,就可以搭建起来整个项目。

# -*- coding: utf-8 -*-

from http import HTTPStatus
from fastapi import status, Body, APIRouter, Request
from fastapi.responses import JSONResponse, Response
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from functools import wraps
from passlib.context import CryptContext

from jose import jwt
from datetime import datetime, timedelta
from typing import Any, Union
import traceback


# 创建路由
test_router = APIRouter()

# 生成 密码校验工具
p_ctx = CryptContext(schemes=["bcrypt"], deprecated=["auto"])

# jwt 计算使用的密码
JWT_KEY = "jfieOhoUHFjeifjeif232jmoi8"
# 哈希算饭
ALGORITHM = "HS256"

# 测试数据,角色对应的权限
role_dict = {"admin": ["/tt/test/t1", "/tt/test/t2"]}
"""
 * 简单判断一下角色的权限
"""
def check_permission(req_path, role) :
    if req_path in role_dict[role] :
        return True
    else :
        return False

"""
 * 登录检查和权限检查
 * 0 登录成功,权限允许
 * 1 登录认证失败
 * 2 登录成功,但是权限不正确
 *
"""
def jwt_auth_check(authorization, req_path ) :
    # 验证 jwt token 是否正确或者超时。
    try:
        jwt_token = jwt.decode(authorization, JWT_KEY, ALGORITHM)
    except :
        # 超时和其他错误
        traceback.print_exc()
        return 1
    # 验证是否具有访问权限
    ret = check_permission(req_path, jwt_token["role"])
    if ret :
        return 0
    else :
        return 2


"""
 * 登录提交的数据
"""
class Login(BaseModel):
    username: str = "<not given>"
    passwd: str = "<not given>"

# 获取真实密码
# 此处需要从文件、数据库或者密码存储系统中,根据username获取密码的hash,和角色
# 此处仅测试用,只给出了 "12345678" 这个密码的 hash 值。
# hash的生成方法 p_ctx.hash("12345678")
def get_pwdhash_and_role_by_username( username ) :
    return """$2b$12$va7WZCbCXkie01cntgUafO.BreMuQ2uAHblgmC3sM6yI/5tPtnNqm""", "admin"

"""
 * 构造 jwt 的 token
 * 默认 token 2分钟超时,可以单独指定
"""
def create_jwt_token(username, role, mnts : int = 2 ) :
    expire = datetime.utcnow() + timedelta(minutes=mnts) 
    data = {"exp": expire, "sub": username, "role": role}
    jwt_token = jwt.encode(data, JWT_KEY, algorithm=ALGORITHM)
    return jwt_token


"""
 * 登录接口,账号密码验证成功后,生成 jwt 的 token 返回给客户端
"""
@test_router.post("/login", summary=u"第一个测试")
def login(lgn: Login= Body(...)) :
    pwd_hash, role = get_pwdhash_and_role_by_username(lgn.username)
    if pwd_hash is None :    # 用户不存在
        return shot_return(status.HTTP_400_BAD_REQUEST)
    auth_rslt = p_ctx.verify(lgn.passwd, pwd_hash)
    if auth_rslt is True :          # 登录成功
        # 创建token
        jwt_token = create_jwt_token(lgn.username, role) ;
        # 构造返回值
        json_data = jsonable_encoder(jwt_token)
        return JSONResponse(
            status_code=status.HTTP_200_OK,
            content = {
                "code": status.HTTP_200_OK,
                "message": "Success",
                "data": {
                    "accessToken": json_data,
                    "token_type": "bearer"
                }
            }
        )
    else :
        return shot_return(status.HTTP_400_BAD_REQUEST)


# 简短返回函数,只返回 http code 和说明
def shot_return(ret_st):
    return JSONResponse(
        status_code=ret_st,
        content = {
            "code": ret_st, 
            "message": HTTPStatus(ret_st).phrase
        })


"""
 * 构造一个装饰器,用来检查是否已经是登录状态
 * 注意,此装饰器修饰的函数,最后一个参数必须是 Request 对象
"""
def login_check(f) :
    @wraps(f)
    def is_logined(*args, **kwargs):
        request = kwargs.get("request")
        # 获取 header 里的 authorization 字段
        authorization=request.headers.get("authorization")
        if authorization is None :
            return shot_return(status.HTTP_401_UNAUTHORIZED)
        else :
            req_path = request.get('path')
            jwt_auth_rslt = jwt_auth_check(authorization, req_path) 
            if jwt_auth_rslt == 0 :                 # 0 登录成功,权限允许
                return f(*args, **kwargs)
            elif jwt_auth_rslt == 1 :               # 1 登录认证失败
                return shot_return(status.HTTP_401_UNAUTHORIZED)
            elif jwt_auth_rslt == 2 :               # 2 登录成功,但是权限不正确
                return shot_return(status.HTTP_405_METHOD_NOT_ALLOWED) 
                
    return is_logined


"""
 * get请求处理
 * 用login_check装饰器做了处理,先验证登录情况,和权限分配情况。
"""
@test_router.get("/t1", summary=u"第一个测试")
@login_check
def t1(a: str = "<not given>", b: str = "<not given>", request: Request=None ) -> Response :
    pstr = "param a is ["+a+"]; param b is ["+b+"]"
    return JSONResponse(
        status_code=status.HTTP_200_OK,
        content = {
            "code": "1001",
            "message": "hello , first fast api return",
            "params" : pstr,
            "request.headers" : dict(request.headers)
        })

启动

发起测试

服务端日志

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注