【自动化运维新手村】Flask-权限校验

2024-02-12 15:20

本文主要是介绍【自动化运维新手村】Flask-权限校验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【摘要】

上一章节,我们主要对Web应用的用户认证做了详细的讲解,包括使用Flask实现用户注册,登录,并通过Session机制实现用户保持登录。那么在了解了用户认证之后,这一章节我们就着重介绍一下权限校验的原理以及实现方式。

【为什么需要鉴权】

用户在通过认证之后,已经可以正常访问我们的后端应用,但当后端应用越来越完善,功能越来越丰富,并且牵扯的资源以及用户的范围都足够广的时候,用户的权限校验就显得尤为重要。例如:

1.是否所有用户都可以通过接口获取CMDB的数据信息;

2.是否所有的用户都可以调用接口对设备进行操作;

3.是否拥有获取CMDB信息权限的用户也拥有对设备进行操作的权限;

4.是否拥有对设备操作权限的用户,就可以对全部的设备进行操作;

诸如上述的权限问题还有很多,下面就一起来看看如何在Flask应用中进行鉴权。

【Flask实现】

对用户鉴权相信大家都可以理解,其实就是判断用户是否有权限访问某个API,在实现上也相对比较简单。

上一章节中,我们通过装饰器实现了登录认证,伪代码如下:

def permission(func):@wraps(func)def inner():if not auth():return Failreturn func()return inner@app.route("/index")
@permission
def index():return "success"

登录认证的具体实现其实就是将该装饰器加在需要认证的路由函数上,这样就可以在调用该路由函数前进行一系列的认证过程。

基于用户鉴权

如果想要在登录认证的前提下再进行用户鉴权,则只需要修改permission装饰器,使其可以对用户进行判断,然后决定是否允许该用户访问。但是我们需要先明确具体的路由函数允许哪个用户访问,并且将这个限制传入装饰器中,这时候就需要用到【自动化运维新手村】装饰器-进阶中的带参数的装饰器,伪代码如下:

def permission(permit_users):def login_acquired(func):@wraps(func)def inner():if not auth() or current_user not in permit_users:return Failreturn func()return innerreturn login_acquired@app.route("/index")
@permission(["ethan", "john", "jack"])
def index():return "success"

经过改进的装饰器加在需要鉴权的路由函数上,并且传入该路由函数允许访问的用户列表,这样在登录认证时,通过session_id获取到当前用户,判断该用户是否在允许访问的用户列表中即可。

其实不难看出,通过用户去区分权限显然是不太现实的,用户数量增多的时候,可能会让权限控制变得十分难以维护。

那么最先想到的改进方法,应该就是将不同的用户赋予不同的角色,这样在权限控制的时候,鉴权粒度就由用户变成了角色。

基于角色鉴权

关于通过角色鉴权目前业内已经有一套成熟的规范 —— RBAC(Role-Based Access Control, 基于角色的访问控制),就是用户通过角色与权限进行关联。

RBCA本质上是对用户进行分组管理,赋予角色,对权限进行合理的划分,最终实现一个用户拥有若干角色,每一个角色拥有若干权限。并且RBCA具有十分完善的权限模型设计,对于大型系统的权限管理是非常重要的,但这一章节目的就是化繁为简,学会其权限管理的基本原理和底层实现。

定义角色

这里定义角色时引入了一个新的概念,叫做枚举类型,枚举类型可以看作是一种标签或是一系列常量的集合,通常用于表示某些特定的有限集合,例如星期、月份、状态等,那么我们这里的角色显然也适合用枚举类型来定义。

from enum import Enumclass Role(Enum):ADMIN = "admin"CMDB = "cmdb"GUEST = "guest"

由于到目前为止,我们的后端应用还没有引入数据库的概念,所以权限信息可以和用户信息一起暂时保存在JSON文件中,这里用户的权限信息可以通过在用户信息中新增一个role字段来进行标识。模型如下:

[{"username": "","password": "","role": ""}
]
鉴权逻辑

上文中已经提到需要将装饰器修改为可传参的装饰器,修改后整体逻辑如下:

1.允许传入参数roles,可以是多个角色或单个角色,参数类型为列表,如果不传默认为None,表示不限制角色

2.判断用户是否登录的逻辑保持不变

3.根据session_id获取当前已登陆用户

4.判断该用户的角色是否包含在传入的参数roles

代码如下:

from http import HTTPStatus # 引入了http包中的状态码def permission(roles=None):def login_required(func):@wraps(func)def inner():session_id = request.headers.get("session_id", "")global SESSION_IDSif session_id not in SESSION_IDS:  # 是否存在会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username not login"}if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT:  # 是否会话仍有效SESSION_IDS.pop(session_id)  # 如果失效则移除会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username login timeout"}SESSION_IDS[session_id]["timestamp"] = time.time()  # 更新会话时间current_user = SESSION_IDS[session_id]role_values = [role.value for role in roles]if roles is not None and current_user["user_info"].get("role") not in role_values:return {"data": None, "status_code": HTTPStatus.FORBIDDEN, "message": "user has no permission"}return func()return innerreturn login_required
为用户授权

现在新增一个为用户授权的路由函数,但这个函数同样应该设置权限,只允许管理员角色调用它,所以一开始需要在用户信息中初始化一个管理员账户,如下:

[{"username": "yuefeiyu","password": "af058879880f293b3b9b4a7072e5d0bf","role": "admin"}
]

为用户授权的大致逻辑如下:

1.通过POST请求传入usernamerole表单参数

2.判断参数是否合法,role是否属于枚举类型中已定义的角色

3.获取已注册的用户信息

4.判断被授权用户是否已注册

5.修改该用户的角色信息并保存

6.如果授权用户已登陆则修改session中该用户的角色信息

代码如下:

@app.route("/permission_manage", methods=["POST"])
@permission(roles=[Role.ADMIN])
def permission_manage():username = request.form.get("username")role = request.form.get("role")if not username or not role:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}roles = [role.value for role in Role]if role not in roles:  # 判断输入的角色名称是否合法return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)permit_user = Nonefor account in accounts: # 查找被授权用户if account.get("username", "") == username:permit_user = accountbreakif permit_user is None:  # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}permit_user["role"] = roleglobal SESSION_IDSfor _, session_info in SESSION_IDS.items(): # 如果授权用户已登陆则修改session中该用户的角色信息if session_info["user_info"].get("username") == username:session_info["user_info"]["role"] = rolewith open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)return {"data": "", "status_code": HTTPStatus.OK, "message": "successfully"}
接口演示如下:

1.登录管理员用户

在这里插入图片描述

2.为用户jack授予guest权限

在这里插入图片描述

3.jack登录后携带jack的session_id访问cmdb接口

在这里插入图片描述

完整代码
import os
import time
import json
from hashlib import md5
from functools import wraps
from enum import Enum
from http import HTTPStatus
from flask import Flask, requestapp = Flask(__name__)ACCOUNTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "accounts.json")SESSION_IDS = {}LOGIN_TIMEOUT = 60 * 60 * 24class Role(Enum):ADMIN = "admin"CMDB = "cmdb"GUEST = "guest"def permission(roles=None):def login_required(func):@wraps(func)def inner():session_id = request.headers.get("session_id", "")global SESSION_IDSif session_id not in SESSION_IDS:  # 是否存在会话信心return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username not login"}if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT:  # 是否会话仍有效SESSION_IDS.pop(session_id)  # 如果失效则移除会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username login timeout"}SESSION_IDS[session_id]["timestamp"] = time.time()  # 更新会话时间current_user = SESSION_IDS[session_id]role_values = [role.value for role in roles]if roles is not None and current_user["user_info"].get("role") not in role_values:return {"data": None, "status_code": HTTPStatus.FORBIDDEN, "message": "user has no permission"}return func()return innerreturn login_required@app.route("/register", methods=["POST"])
def register():"""注册用户信息"""username = request.form.get("username")password = request.form.get("password")if not username or not password:  # 判断用户输入的参数return {"data": None, "status_code": HTTPStatus.BAD_REQUEST, "message": "must have username and password"}if not os.path.exists(ACCOUNTS_FILE):  # 判断是否存在指定文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)for account in accounts:if account["username"] == username:  # 判断是否用户已存在return {"data": None, "status_code": HTTPStatus.CONFLICT, "message": "username is already exists"}accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})with open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)return {"data": username, "status_code": HTTPStatus.OK, "message": "register username successfully"}@app.route("/login", methods=["POST"])
def login():"""用户登录"""username = request.form.get("username")password = request.form.get("password")if not username or not password:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST, "message": "invalid parameters"}if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)current_user = Nonefor account in accounts:if account["username"] == username:current_user = accountbreakif current_user is None: # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}if md5(password.encode()).hexdigest() != current_user["password"]:  # 是否用户名密码正确return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "password is not correct"}global SESSION_IDSfor session_id, session_info in SESSION_IDS.items():  # 判断用户是否已经登陆if session_info["user_info"].get("username") == username: # 如果已经登录则更新时间戳并返回已登陆的sessionIDsession_info["timestamp"] = time.time()return {"data": {"session_id": session_id}, "status_code": HTTPStatus.OK, "message": "login successfully"}session_id = md5((password + str(time.time())).encode()).hexdigest()  # 生成会话IDSESSION_IDS[session_id] = {"user_info": current_user, "timestamp": time.time()}  # 记录会话信息return {"data": {"session_id": session_id}, "status_code": HTTPStatus.OK, "message": "login successfully"}@app.route("/permission_manage", methods=["POST"])
@permission(roles=[Role.ADMIN])
def permission_manage():username = request.form.get("username")role = request.form.get("role")if not username or not role:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}roles = [role.value for role in Role]if role not in roles:  # 判断输入的角色名称是否合法return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)permit_user = Nonefor account in accounts: # 查找被授权用户if account.get("username", "") == username:permit_user = accountbreakif permit_user is None:  # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}permit_user["role"] = rolewith open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)global SESSION_IDSfor _, session_info in SESSION_IDS.items(): # 如果授权用户已登陆则修改session中该用户的角色信息if session_info["user_info"].get("username") == username:session_info["user_info"]["role"] = rolereturn {"data": "", "status_code": HTTPStatus.OK, "message": "successfully"}@app.route("/cmdb", methods=["POST"])
@permission(roles=[Role.CMDB])
def cmdb():passreturn "success"if __name__ == "__main__":app.run(host="127.0.0.1", port=5000, debug=True)

【总结】

当用户范围足够广的时候,角色的定义就会变得复杂,可能会涉及到角色的继承,或者先将多个用户分到同一个用户组,然后再给这个用户组赋予一个角色,等等。除此之外除了对接口进行鉴权,有时候还需要对访问的资源进行鉴权,比如访问的数据,或者操作的设备等。

鉴权如果复杂可以做得很复杂,想要简单同样也可以很简单,我们这一章节其实就是简化鉴权逻辑,让大家能够了解到鉴权的底层原理和实现方式,希望大家可以仔细阅读体会。


欢迎大家添加我的个人公众号【Python玩转自动化运维】加入读者交流群,获取更多干货内容

这篇关于【自动化运维新手村】Flask-权限校验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/702877

相关文章

Python+FFmpeg实现视频自动化处理的完整指南

《Python+FFmpeg实现视频自动化处理的完整指南》本文总结了一套在Python中使用subprocess.run调用FFmpeg进行视频自动化处理的解决方案,涵盖了跨平台硬件加速、中间素材处理... 目录一、 跨平台硬件加速:统一接口设计1. 核心映射逻辑2. python 实现代码二、 中间素材处

python中的flask_sqlalchemy的使用及示例详解

《python中的flask_sqlalchemy的使用及示例详解》文章主要介绍了在使用SQLAlchemy创建模型实例时,通过元类动态创建实例的方式,并说明了如何在实例化时执行__init__方法,... 目录@orm.reconstructorSQLAlchemy的回滚关联其他模型数据库基本操作将数据添

Java使用Spire.Doc for Java实现Word自动化插入图片

《Java使用Spire.DocforJava实现Word自动化插入图片》在日常工作中,Word文档是不可或缺的工具,而图片作为信息传达的重要载体,其在文档中的插入与布局显得尤为关键,下面我们就来... 目录1. Spire.Doc for Java库介绍与安装2. 使用特定的环绕方式插入图片3. 在指定位

kingbase修改权限实现方式

《kingbase修改权限实现方式》该文章详细介绍了如何在数据库中创建用户并赋予其相应的权限,包括创建用户、回收默认权限、创建数据库、赋权数据库权限、创建只读用户以及回收权限等步骤... 目录前言使用步骤总结前言创建用户后对数据库对象的读写权限进行修改使用步骤1、创建用户create user cs

C#借助Spire.XLS for .NET实现Excel工作表自动化样式设置

《C#借助Spire.XLSfor.NET实现Excel工作表自动化样式设置》作为C#开发者,我们经常需要处理Excel文件,本文将深入探讨如何利用C#代码,借助强大的Spire.XLSfor.N... 目录为什么需要自动化工作表样式使用 Spire.XLS for .NET 实现工作表整体样式设置样式配置

Springboot中JWT登录校验及其拦截器实现方法

《Springboot中JWT登录校验及其拦截器实现方法》:本文主要介绍Springboot中JWT登录校验及其拦截器实现方法的相关资料,包括引入Maven坐标、获取Token、JWT拦截器的实现... 目录前言一、JWT是什么?二、实现步骤1.引入Maven坐标2.获取Token3.JWT拦截器的实现4.

C#自动化生成PowerPoint(PPT)演示文稿

《C#自动化生成PowerPoint(PPT)演示文稿》在当今快节奏的商业环境中,演示文稿是信息传递和沟通的关键工具,下面我们就深入探讨如何利用C#和Spire.Presentationfor.NET... 目录环境准备与Spire.Presentation安装核心操作:添加与编辑幻灯片元素添加幻灯片文本操

Python实现Word文档自动化的操作大全(批量生成、模板填充与内容修改)

《Python实现Word文档自动化的操作大全(批量生成、模板填充与内容修改)》在职场中,Word文档是公认的好伙伴,但你有没有被它折磨过?批量生成合同、制作报告以及发放证书/通知等等,这些重复、低效... 目录重复性文档制作,手动填充模板,效率低下还易错1.python-docx入门:Word文档的“瑞士

5 种使用Python自动化处理PDF的实用方法介绍

《5种使用Python自动化处理PDF的实用方法介绍》自动化处理PDF文件已成为减少重复工作、提升工作效率的重要手段,本文将介绍五种实用方法,从内置工具到专业库,帮助你在Python中实现PDF任务... 目录使用内置库(os、subprocess)调用外部工具使用 PyPDF2 进行基本 PDF 操作使用

C#自动化实现检测并删除PDF文件中的空白页面

《C#自动化实现检测并删除PDF文件中的空白页面》PDF文档在日常工作和生活中扮演着重要的角色,本文将深入探讨如何使用C#编程语言,结合强大的PDF处理库,自动化地检测并删除PDF文件中的空白页面,感... 目录理解PDF空白页的定义与挑战引入Spire.PDF for .NET库核心实现:检测并删除空白页