fastapi的使用实例参看之前的 文章 https://www.madbull.site/?p=565 ,下边的示例是对 t1.py 的改造,增加登录认证、token生成、token验证、权限验证功能。结合之前的文章,替换掉t1.py文件,就可以搭建起来整个项目。
# -*- coding: utf-8 -*-
from http import HTTPStatus
from fastapi import status, Body, APIRouter, Request
from fastapi.responses import JSONResponse, Response
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from functools import wraps
from passlib.context import CryptContext
from jose import jwt
from datetime import datetime, timedelta
from typing import Any, Union
import traceback
# 创建路由
test_router = APIRouter()
# 生成 密码校验工具
p_ctx = CryptContext(schemes=["bcrypt"], deprecated=["auto"])
# jwt 计算使用的密码
JWT_KEY = "jfieOhoUHFjeifjeif232jmoi8"
# 哈希算饭
ALGORITHM = "HS256"
# 测试数据,角色对应的权限
role_dict = {"admin": ["/tt/test/t1", "/tt/test/t2"]}
"""
* 简单判断一下角色的权限
"""
def check_permission(req_path, role) :
if req_path in role_dict[role] :
return True
else :
return False
"""
* 登录检查和权限检查
* 0 登录成功,权限允许
* 1 登录认证失败
* 2 登录成功,但是权限不正确
*
"""
def jwt_auth_check(authorization, req_path ) :
# 验证 jwt token 是否正确或者超时。
try:
jwt_token = jwt.decode(authorization, JWT_KEY, ALGORITHM)
except :
# 超时和其他错误
traceback.print_exc()
return 1
# 验证是否具有访问权限
ret = check_permission(req_path, jwt_token["role"])
if ret :
return 0
else :
return 2
"""
* 登录提交的数据
"""
class Login(BaseModel):
username: str = "<not given>"
passwd: str = "<not given>"
# 获取真实密码
# 此处需要从文件、数据库或者密码存储系统中,根据username获取密码的hash,和角色
# 此处仅测试用,只给出了 "12345678" 这个密码的 hash 值。
# hash的生成方法 p_ctx.hash("12345678")
def get_pwdhash_and_role_by_username( username ) :
return """$2b$12$va7WZCbCXkie01cntgUafO.BreMuQ2uAHblgmC3sM6yI/5tPtnNqm""", "admin"
"""
* 构造 jwt 的 token
* 默认 token 2分钟超时,可以单独指定
"""
def create_jwt_token(username, role, mnts : int = 2 ) :
expire = datetime.utcnow() + timedelta(minutes=mnts)
data = {"exp": expire, "sub": username, "role": role}
jwt_token = jwt.encode(data, JWT_KEY, algorithm=ALGORITHM)
return jwt_token
"""
* 登录接口,账号密码验证成功后,生成 jwt 的 token 返回给客户端
"""
@test_router.post("/login", summary=u"第一个测试")
def login(lgn: Login= Body(...)) :
pwd_hash, role = get_pwdhash_and_role_by_username(lgn.username)
if pwd_hash is None : # 用户不存在
return shot_return(status.HTTP_400_BAD_REQUEST)
auth_rslt = p_ctx.verify(lgn.passwd, pwd_hash)
if auth_rslt is True : # 登录成功
# 创建token
jwt_token = create_jwt_token(lgn.username, role) ;
# 构造返回值
json_data = jsonable_encoder(jwt_token)
return JSONResponse(
status_code=status.HTTP_200_OK,
content = {
"code": status.HTTP_200_OK,
"message": "Success",
"data": {
"accessToken": json_data,
"token_type": "bearer"
}
}
)
else :
return shot_return(status.HTTP_400_BAD_REQUEST)
# 简短返回函数,只返回 http code 和说明
def shot_return(ret_st):
return JSONResponse(
status_code=ret_st,
content = {
"code": ret_st,
"message": HTTPStatus(ret_st).phrase
})
"""
* 构造一个装饰器,用来检查是否已经是登录状态
* 注意,此装饰器修饰的函数,最后一个参数必须是 Request 对象
"""
def login_check(f) :
@wraps(f)
def is_logined(*args, **kwargs):
request = kwargs.get("request")
# 获取 header 里的 authorization 字段
authorization=request.headers.get("authorization")
if authorization is None :
return shot_return(status.HTTP_401_UNAUTHORIZED)
else :
req_path = request.get('path')
jwt_auth_rslt = jwt_auth_check(authorization, req_path)
if jwt_auth_rslt == 0 : # 0 登录成功,权限允许
return f(*args, **kwargs)
elif jwt_auth_rslt == 1 : # 1 登录认证失败
return shot_return(status.HTTP_401_UNAUTHORIZED)
elif jwt_auth_rslt == 2 : # 2 登录成功,但是权限不正确
return shot_return(status.HTTP_405_METHOD_NOT_ALLOWED)
return is_logined
"""
* get请求处理
* 用login_check装饰器做了处理,先验证登录情况,和权限分配情况。
"""
@test_router.get("/t1", summary=u"第一个测试")
@login_check
def t1(a: str = "<not given>", b: str = "<not given>", request: Request=None ) -> Response :
pstr = "param a is ["+a+"]; param b is ["+b+"]"
return JSONResponse(
status_code=status.HTTP_200_OK,
content = {
"code": "1001",
"message": "hello , first fast api return",
"params" : pstr,
"request.headers" : dict(request.headers)
})
启动

发起测试

服务端日志

发表回复