zdppy_api如何实现带参数的中间件

2024-06-04 22:36

本文主要是介绍zdppy_api如何实现带参数的中间件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

参考代码

import timefrom api.middleware import Middleware
from api.middleware.base import BaseHTTPMiddleware
from api.exceptions import AuthException
import jwthas_logger = False
try:from log import loggerhas_logger = True
except:passdef roleapi(has_auth_func, jwtkey=None):"""判断用户角色是否有某个接口的权限:param has_auth_func 考虑到可能使用异步的MySQL库,此函数必须是一个异步函数接受Token中解析出来的role和request中的method和path作为参数,用于判断用户对该路径是否有访问权限"""return Middleware(RoleAPIAuthMiddleware, has_auth_func=has_auth_func, jwtkey=jwtkey)class RoleAPIAuthMiddleware(BaseHTTPMiddleware):def __init__(self, has_auth_func, jwtkey, app):super().__init__(app)self.has_auth_func = has_auth_funcself.jwtkey = jwtkeyasync def dispatch(self, request, call_next):# 判断是否需要跳过校验if "/login" in str(request.url) or "/register" in str(request.url):if has_logger:logger.debug("识别到正在访问不需要校验的路径", method=request.method, url=request.url)return await call_next(request)if has_logger:msg = f"开始进行基于角色和接口的高级别权限校验:method={request.method} url={request.url}"logger.debug(msg)# 获取tokentoken = request.headers.get("Authorization")if token is None:if has_logger:logger.error("权限不足:没有携带Token", token=token)raise AuthException("权限不足:没有携带Token")# 解析tokendata = Nonetry:if isinstance(self.jwtkey, str):data = jwt.parse_token(token, key=self.jwtkey)else:data = jwt.parse_token(token)except Exception as e:if has_logger:logger.error("获取用户Token失败", headers=request.headers, token=token, error=e)raise AuthException("无效的Token")# token的结果必须是字典类型if not isinstance(data, dict):if has_logger:logger.error("Token的解析结果不是字典类型", data=data)raise AuthException("无效的Token")# 必须要有过期时间expired = data.get("expired")if expired is None:if has_logger:logger.error("该Token没有设置过期时间", data=data, expired=expired)raise AuthException("无效的Token")# 校验过期时间的类型if not (isinstance(expired, int) or isinstance(expired, float)):if has_logger:logger.error("该Token的过期时间不是数字类型", expired=expired)raise AuthException("无效的Token")# 校验是否过期now = time.time()if now > expired:if has_logger:logger.warning("Token已过期", expired=expired)raise AuthException("Token已过期")# 必须要有用户名和用户IDuserid = data.get("id")username = data.get("username")userrole = data.get("role")if not (userid or username or userrole):if has_logger:logger.error("Token中应该包含用户ID,用户名和用户角色",userid=userid,username=username,userrole=userrole,token=token,data=data,)raise AuthException("无效的Token")# 获取请求方法和请求路径base_url = request.base_urlurl = request.urlmethod = request.methodpath = str(url).replace(str(base_url), "")if has_logger:logger.debug("开始查询用户是否具备接口级别的权限",method=method,path=path,userid=userid,username=username,userrole=userrole,)has_auth = Falsetry:has_auth = await self.has_auth_func(userrole, method, path)except Exception as e:if has_logger:logger.error("查询用户接口级别权限失败",method=method,path=path,userid=userid,username=username,userrole=userrole,error=e,)raise AuthException("无效的Token")if has_logger:logger.debug("成功查询用户是否具备接口级别的权限",has_auth=has_auth,method=method,path=path,userid=userid,username=username,userrole=userrole,)if not has_auth:if has_logger:logger.warning("权限不足",has_auth=has_auth,method=method,path=path,userid=userid,username=username,userrole=userrole,)raise AuthException("权限不足")# 权限充足,发送请求,获取响应response = await call_next(request)return response

调用栈分析

使用中间件

app1 = api.Api(routes=[api.resp.get("/", index),api.resp.get("/1", index),api.resp.post("/login", login),],middleware=[# 默认是:zhangdapeng zhangdapng520# 可以传入账号和密码进行覆盖apimidauth.roleapi(has_auth_func)]
)

apimidauth.roleapi(has_auth_func) 实例化了一个中间件对象。

has_auth_func 是一个函数

这个函数,会在中间件里面,被调用到,完整代码如下。

async def has_auth_func(role, method, path):"""校验角色对method和path是否有访问权限"""print(role, method, path)if not str(path).startswith("/"):path = f"/{path}"# GET:/1auth = str(method).upper() + ":" + path# 判断是否有权限role_auth_dict = auth_dict.get(role)if not isinstance(role_auth_dict, dict):return Falseif not role_auth_dict.get(auth):return Falsereturn True

roleapi 方法

完整代码如下:

def roleapi(has_auth_func, jwtkey=None):"""判断用户角色是否有某个接口的权限:param has_auth_func 考虑到可能使用异步的MySQL库,此函数必须是一个异步函数接受Token中解析出来的role和request中的method和path作为参数,用于判断用户对该路径是否有访问权限"""return Middleware(RoleAPIAuthMiddleware, has_auth_func=has_auth_func, jwtkey=jwtkey)

这个其实就是一个普通的函数。
不过比较特殊的地方在于,这个函数的返回值是一个Middleware类的实例。这个类接收如下参数:

  • RoleAPIAuthMiddleware:自定义的中间件
  • has_auth_func:实际上是自定义中间件需要的一个参数
  • jwtkey:实际上也是自定义中间件需要的一个参数

注意:has_auth_func 和 jwtkey 这两个参数,不是 Middleware 类必须的,而是我们自定义的 RoleAPIAuthMiddleware 需要的。

RoleAPIAuthMiddleware 的代码分析

完整代码:

class RoleAPIAuthMiddleware(BaseHTTPMiddleware):def __init__(self, has_auth_func, jwtkey, app):super().__init__(app)self.has_auth_func = has_auth_funcself.jwtkey = jwtkeyasync def dispatch(self, request, call_next):# 判断是否需要跳过校验if "/login" in str(request.url) or "/register" in str(request.url):if has_logger:logger.debug("识别到正在访问不需要校验的路径", method=request.method, url=request.url)return await call_next(request)if has_logger:msg = f"开始进行基于角色和接口的高级别权限校验:method={request.method} url={request.url}"logger.debug(msg)# 获取tokentoken = request.headers.get("Authorization")if token is None:if has_logger:logger.error("权限不足:没有携带Token", token=token)raise AuthException("权限不足:没有携带Token")# 解析tokendata = Nonetry:if isinstance(self.jwtkey, str):data = jwt.parse_token(token, key=self.jwtkey)else:data = jwt.parse_token(token)except Exception as e:if has_logger:logger.error("获取用户Token失败", headers=request.headers, token=token, error=e)raise AuthException("无效的Token")# token的结果必须是字典类型if not isinstance(data, dict):if has_logger:logger.error("Token的解析结果不是字典类型", data=data)raise AuthException("无效的Token")# 必须要有过期时间expired = data.get("expired")if expired is None:if has_logger:logger.error("该Token没有设置过期时间", data=data, expired=expired)raise AuthException("无效的Token")# 校验过期时间的类型if not (isinstance(expired, int) or isinstance(expired, float)):if has_logger:logger.error("该Token的过期时间不是数字类型", expired=expired)raise AuthException("无效的Token")# 校验是否过期now = time.time()if now > expired:if has_logger:logger.warning("Token已过期", expired=expired)raise AuthException("Token已过期")# 必须要有用户名和用户IDuserid = data.get("id")username = data.get("username")userrole = data.get("role")if not (userid or username or userrole):if has_logger:logger.error("Token中应该包含用户ID,用户名和用户角色",userid=userid,username=username,userrole=userrole,token=token,data=data,)raise AuthException("无效的Token")# 获取请求方法和请求路径base_url = request.base_urlurl = request.urlmethod = request.methodpath = str(url).replace(str(base_url), "")if has_logger:logger.debug("开始查询用户是否具备接口级别的权限",method=method,path=path,userid=userid,username=username,userrole=userrole,)has_auth = Falsetry:has_auth = await self.has_auth_func(userrole, method, path)except Exception as e:if has_logger:logger.error("查询用户接口级别权限失败",method=method,path=path,userid=userid,username=username,userrole=userrole,error=e,)raise AuthException("无效的Token")if has_logger:logger.debug("成功查询用户是否具备接口级别的权限",has_auth=has_auth,method=method,path=path,userid=userid,username=username,userrole=userrole,)if not has_auth:if has_logger:logger.warning("权限不足",has_auth=has_auth,method=method,path=path,userid=userid,username=username,userrole=userrole,)raise AuthException("权限不足")# 权限充足,发送请求,获取响应response = await call_next(request)return response

基本结构分析

class RoleAPIAuthMiddleware(BaseHTTPMiddleware):def __init__(self, has_auth_func, jwtkey, app):super().__init__(app)self.has_auth_func = has_auth_funcself.jwtkey = jwtkey

自定义的中间件类,需要继承:BaseHTTPMiddleware,这个类来自于 from api.middleware.base import BaseHTTPMiddleware 。

初始化方法:

def __init__(self, has_auth_func, jwtkey, app):super().__init__(app)self.has_auth_func = has_auth_funcself.jwtkey = jwtkey

在这个初始化方法中,我们定义了此中间件需要的参数。

我们在这里定义的是类的参数,但是实际上最后传递参数的方式是:

return Middleware(RoleAPIAuthMiddleware, has_auth_func=has_auth_func, jwtkey=jwtkey)

这里的 app 是没有传参的。

核心是 dispatch 方法

async def dispatch(self, request, call_next):# 判断是否需要跳过校验if "/login" in str(request.url) or "/register" in str(request.url):if has_logger:logger.debug("识别到正在访问不需要校验的路径", method=request.method, url=request.url)return await call_next(request)if has_logger:msg = f"开始进行基于角色和接口的高级别权限校验:method={request.method} url={request.url}"logger.debug(msg)# 获取tokentoken = request.headers.get("Authorization")if token is None:if has_logger:logger.error("权限不足:没有携带Token", token=token)raise AuthException("权限不足:没有携带Token")# 解析tokendata = Nonetry:if isinstance(self.jwtkey, str):data = jwt.parse_token(token, key=self.jwtkey)else:data = jwt.parse_token(token)except Exception as e:if has_logger:logger.error("获取用户Token失败", headers=request.headers, token=token, error=e)raise AuthException("无效的Token")# token的结果必须是字典类型if not isinstance(data, dict):if has_logger:logger.error("Token的解析结果不是字典类型", data=data)raise AuthException("无效的Token")# 必须要有过期时间expired = data.get("expired")if expired is None:if has_logger:logger.error("该Token没有设置过期时间", data=data, expired=expired)raise AuthException("无效的Token")# 校验过期时间的类型if not (isinstance(expired, int) or isinstance(expired, float)):if has_logger:logger.error("该Token的过期时间不是数字类型", expired=expired)raise AuthException("无效的Token")# 校验是否过期now = time.time()if now > expired:if has_logger:logger.warning("Token已过期", expired=expired)raise AuthException("Token已过期")# 必须要有用户名和用户IDuserid = data.get("id")username = data.get("username")userrole = data.get("role")if not (userid or username or userrole):if has_logger:logger.error("Token中应该包含用户ID,用户名和用户角色",userid=userid,username=username,userrole=userrole,token=token,data=data,)raise AuthException("无效的Token")# 获取请求方法和请求路径base_url = request.base_urlurl = request.urlmethod = request.methodpath = str(url).replace(str(base_url), "")if has_logger:logger.debug("开始查询用户是否具备接口级别的权限",method=method,path=path,userid=userid,username=username,userrole=userrole,)has_auth = Falsetry:has_auth = await self.has_auth_func(userrole, method, path)except Exception as e:if has_logger:logger.error("查询用户接口级别权限失败",method=method,path=path,userid=userid,username=username,userrole=userrole,error=e,)raise AuthException("无效的Token")if has_logger:logger.debug("成功查询用户是否具备接口级别的权限",has_auth=has_auth,method=method,path=path,userid=userid,username=username,userrole=userrole,)if not has_auth:if has_logger:logger.warning("权限不足",has_auth=has_auth,method=method,path=path,userid=userid,username=username,userrole=userrole,)raise AuthException("权限不足")# 权限充足,发送请求,获取响应response = await call_next(request)return response

中间件核心方法分析

参数是什么

async def dispatch(self, request, call_next):

首先,这个方法是一个异步方法。
第一个参数是请求对象,存储了客户端的所有请求信息。
第二个参数是调用下一个中间件的对象,如果成功了,则调用此方法得到一个response对象,返回response对象即可。

如果成功了返回什么?

如果成功了,则调用此 call_next 方法得到一个response对象,返回response对象即可。

response = await call_next(request)
return response

如果失败了,该返回什么?

示例代码如下:

if has_logger:logger.error("Token的解析结果不是字典类型", data=data)
raise AuthException("无效的Token")

首先,我们是记录错误日志。
然后,抛出一个异常。
因为 zdppy_api 已经内部封装了全局异常处理,所以这个异常,最终会被全局异常错误处理器自动捕获,并返回给客户端一个比较通用且友好的错误信息。

支持哪些异常类

首先是 AuthException,这个来源于:

from api.exceptions import AuthException

通过查看源码,我们可以知道, zdppy_api 框架,目前内置了如下异常处理器:

default_exception_handlers = {404: not_found,500: server_error,HTTPException: handle_http_exception,AuthException: handle_auth_exception,Exception: handle_exception,
}

最终总结

如果,我们要实现 db 请求上下文中间件:

  • 请求开始时,自动建立连接
  • 请求结束是,自动断开连接

那么,我们的实现思路如下:

  • 1、实现一个 OrmRequestMiddleware 中间件类。这个类继承 api.middleware.base.BaseHTTPMiddleware,接收一个 db 作为参数。
  • 2、实现一个 apimidorm.request(db) 方法,这个方法的返回值是 Middleware(OrmRequestMiddleware, db=db)
  • 3、在 OrmRequestMiddleware 自定义中间件类中,重写 async def dispatch(self, request, call_next) 方法
  • 4、方法体中实现具体的逻辑。请求开始之前,调用 db.connect(),调用 response = call_next(),之后调用 db.colse(),最后返回 response。

以上是一个具体的实现思路,仅供参考。

这篇关于zdppy_api如何实现带参数的中间件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1031317

相关文章

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

Andrej Karpathy最新采访:认知核心模型10亿参数就够了,AI会打破教育不公的僵局

夕小瑶科技说 原创  作者 | 海野 AI圈子的红人,AI大神Andrej Karpathy,曾是OpenAI联合创始人之一,特斯拉AI总监。上一次的动态是官宣创办一家名为 Eureka Labs 的人工智能+教育公司 ,宣布将长期致力于AI原生教育。 近日,Andrej Karpathy接受了No Priors(投资博客)的采访,与硅谷知名投资人 Sara Guo 和 Elad G

C++11第三弹:lambda表达式 | 新的类功能 | 模板的可变参数

🌈个人主页: 南桥几晴秋 🌈C++专栏: 南桥谈C++ 🌈C语言专栏: C语言学习系列 🌈Linux学习专栏: 南桥谈Linux 🌈数据结构学习专栏: 数据结构杂谈 🌈数据库学习专栏: 南桥谈MySQL 🌈Qt学习专栏: 南桥谈Qt 🌈菜鸡代码练习: 练习随想记录 🌈git学习: 南桥谈Git 🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈�

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

如何在页面调用utility bar并传递参数至lwc组件

1.在app的utility item中添加lwc组件: 2.调用utility bar api的方式有两种: 方法一,通过lwc调用: import {LightningElement,api ,wire } from 'lwc';import { publish, MessageContext } from 'lightning/messageService';import Ca

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

4B参数秒杀GPT-3.5:MiniCPM 3.0惊艳登场!

​ 面壁智能 在 AI 的世界里,总有那么几个时刻让人惊叹不已。面壁智能推出的 MiniCPM 3.0,这个仅有4B参数的"小钢炮",正在以惊人的实力挑战着 GPT-3.5 这个曾经的AI巨人。 MiniCPM 3.0 MiniCPM 3.0 MiniCPM 3.0 目前的主要功能有: 长上下文功能:原生支持 32k 上下文长度,性能完美。我们引入了