【自动化运维新手村】Flask-权限校验

2024-02-12 15:20

本文主要是介绍【自动化运维新手村】Flask-权限校验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【摘要】

上一章节,我们主要对Web应用的用户认证做了详细的讲解,包括使用Flask实现用户注册,登录,并通过Session机制实现用户保持登录。那么在了解了用户认证之后,这一章节我们就着重介绍一下权限校验的原理以及实现方式。

【为什么需要鉴权】

用户在通过认证之后,已经可以正常访问我们的后端应用,但当后端应用越来越完善,功能越来越丰富,并且牵扯的资源以及用户的范围都足够广的时候,用户的权限校验就显得尤为重要。例如:

1.是否所有用户都可以通过接口获取CMDB的数据信息;

2.是否所有的用户都可以调用接口对设备进行操作;

3.是否拥有获取CMDB信息权限的用户也拥有对设备进行操作的权限;

4.是否拥有对设备操作权限的用户,就可以对全部的设备进行操作;

诸如上述的权限问题还有很多,下面就一起来看看如何在Flask应用中进行鉴权。

【Flask实现】

对用户鉴权相信大家都可以理解,其实就是判断用户是否有权限访问某个API,在实现上也相对比较简单。

上一章节中,我们通过装饰器实现了登录认证,伪代码如下:

def permission(func):@wraps(func)def inner():if not auth():return Failreturn func()return inner@app.route("/index")
@permission
def index():return "success"

登录认证的具体实现其实就是将该装饰器加在需要认证的路由函数上,这样就可以在调用该路由函数前进行一系列的认证过程。

基于用户鉴权

如果想要在登录认证的前提下再进行用户鉴权,则只需要修改permission装饰器,使其可以对用户进行判断,然后决定是否允许该用户访问。但是我们需要先明确具体的路由函数允许哪个用户访问,并且将这个限制传入装饰器中,这时候就需要用到【自动化运维新手村】装饰器-进阶中的带参数的装饰器,伪代码如下:

def permission(permit_users):def login_acquired(func):@wraps(func)def inner():if not auth() or current_user not in permit_users:return Failreturn func()return innerreturn login_acquired@app.route("/index")
@permission(["ethan", "john", "jack"])
def index():return "success"

经过改进的装饰器加在需要鉴权的路由函数上,并且传入该路由函数允许访问的用户列表,这样在登录认证时,通过session_id获取到当前用户,判断该用户是否在允许访问的用户列表中即可。

其实不难看出,通过用户去区分权限显然是不太现实的,用户数量增多的时候,可能会让权限控制变得十分难以维护。

那么最先想到的改进方法,应该就是将不同的用户赋予不同的角色,这样在权限控制的时候,鉴权粒度就由用户变成了角色。

基于角色鉴权

关于通过角色鉴权目前业内已经有一套成熟的规范 —— RBAC(Role-Based Access Control, 基于角色的访问控制),就是用户通过角色与权限进行关联。

RBCA本质上是对用户进行分组管理,赋予角色,对权限进行合理的划分,最终实现一个用户拥有若干角色,每一个角色拥有若干权限。并且RBCA具有十分完善的权限模型设计,对于大型系统的权限管理是非常重要的,但这一章节目的就是化繁为简,学会其权限管理的基本原理和底层实现。

定义角色

这里定义角色时引入了一个新的概念,叫做枚举类型,枚举类型可以看作是一种标签或是一系列常量的集合,通常用于表示某些特定的有限集合,例如星期、月份、状态等,那么我们这里的角色显然也适合用枚举类型来定义。

from enum import Enumclass Role(Enum):ADMIN = "admin"CMDB = "cmdb"GUEST = "guest"

由于到目前为止,我们的后端应用还没有引入数据库的概念,所以权限信息可以和用户信息一起暂时保存在JSON文件中,这里用户的权限信息可以通过在用户信息中新增一个role字段来进行标识。模型如下:

[{"username": "","password": "","role": ""}
]
鉴权逻辑

上文中已经提到需要将装饰器修改为可传参的装饰器,修改后整体逻辑如下:

1.允许传入参数roles,可以是多个角色或单个角色,参数类型为列表,如果不传默认为None,表示不限制角色

2.判断用户是否登录的逻辑保持不变

3.根据session_id获取当前已登陆用户

4.判断该用户的角色是否包含在传入的参数roles

代码如下:

from http import HTTPStatus # 引入了http包中的状态码def permission(roles=None):def login_required(func):@wraps(func)def inner():session_id = request.headers.get("session_id", "")global SESSION_IDSif session_id not in SESSION_IDS:  # 是否存在会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username not login"}if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT:  # 是否会话仍有效SESSION_IDS.pop(session_id)  # 如果失效则移除会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username login timeout"}SESSION_IDS[session_id]["timestamp"] = time.time()  # 更新会话时间current_user = SESSION_IDS[session_id]role_values = [role.value for role in roles]if roles is not None and current_user["user_info"].get("role") not in role_values:return {"data": None, "status_code": HTTPStatus.FORBIDDEN, "message": "user has no permission"}return func()return innerreturn login_required
为用户授权

现在新增一个为用户授权的路由函数,但这个函数同样应该设置权限,只允许管理员角色调用它,所以一开始需要在用户信息中初始化一个管理员账户,如下:

[{"username": "yuefeiyu","password": "af058879880f293b3b9b4a7072e5d0bf","role": "admin"}
]

为用户授权的大致逻辑如下:

1.通过POST请求传入usernamerole表单参数

2.判断参数是否合法,role是否属于枚举类型中已定义的角色

3.获取已注册的用户信息

4.判断被授权用户是否已注册

5.修改该用户的角色信息并保存

6.如果授权用户已登陆则修改session中该用户的角色信息

代码如下:

@app.route("/permission_manage", methods=["POST"])
@permission(roles=[Role.ADMIN])
def permission_manage():username = request.form.get("username")role = request.form.get("role")if not username or not role:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}roles = [role.value for role in Role]if role not in roles:  # 判断输入的角色名称是否合法return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)permit_user = Nonefor account in accounts: # 查找被授权用户if account.get("username", "") == username:permit_user = accountbreakif permit_user is None:  # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}permit_user["role"] = roleglobal SESSION_IDSfor _, session_info in SESSION_IDS.items(): # 如果授权用户已登陆则修改session中该用户的角色信息if session_info["user_info"].get("username") == username:session_info["user_info"]["role"] = rolewith open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)return {"data": "", "status_code": HTTPStatus.OK, "message": "successfully"}
接口演示如下:

1.登录管理员用户

在这里插入图片描述

2.为用户jack授予guest权限

在这里插入图片描述

3.jack登录后携带jack的session_id访问cmdb接口

在这里插入图片描述

完整代码
import os
import time
import json
from hashlib import md5
from functools import wraps
from enum import Enum
from http import HTTPStatus
from flask import Flask, requestapp = Flask(__name__)ACCOUNTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "accounts.json")SESSION_IDS = {}LOGIN_TIMEOUT = 60 * 60 * 24class Role(Enum):ADMIN = "admin"CMDB = "cmdb"GUEST = "guest"def permission(roles=None):def login_required(func):@wraps(func)def inner():session_id = request.headers.get("session_id", "")global SESSION_IDSif session_id not in SESSION_IDS:  # 是否存在会话信心return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username not login"}if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT:  # 是否会话仍有效SESSION_IDS.pop(session_id)  # 如果失效则移除会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username login timeout"}SESSION_IDS[session_id]["timestamp"] = time.time()  # 更新会话时间current_user = SESSION_IDS[session_id]role_values = [role.value for role in roles]if roles is not None and current_user["user_info"].get("role") not in role_values:return {"data": None, "status_code": HTTPStatus.FORBIDDEN, "message": "user has no permission"}return func()return innerreturn login_required@app.route("/register", methods=["POST"])
def register():"""注册用户信息"""username = request.form.get("username")password = request.form.get("password")if not username or not password:  # 判断用户输入的参数return {"data": None, "status_code": HTTPStatus.BAD_REQUEST, "message": "must have username and password"}if not os.path.exists(ACCOUNTS_FILE):  # 判断是否存在指定文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)for account in accounts:if account["username"] == username:  # 判断是否用户已存在return {"data": None, "status_code": HTTPStatus.CONFLICT, "message": "username is already exists"}accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})with open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)return {"data": username, "status_code": HTTPStatus.OK, "message": "register username successfully"}@app.route("/login", methods=["POST"])
def login():"""用户登录"""username = request.form.get("username")password = request.form.get("password")if not username or not password:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST, "message": "invalid parameters"}if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)current_user = Nonefor account in accounts:if account["username"] == username:current_user = accountbreakif current_user is None: # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}if md5(password.encode()).hexdigest() != current_user["password"]:  # 是否用户名密码正确return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "password is not correct"}global SESSION_IDSfor session_id, session_info in SESSION_IDS.items():  # 判断用户是否已经登陆if session_info["user_info"].get("username") == username: # 如果已经登录则更新时间戳并返回已登陆的sessionIDsession_info["timestamp"] = time.time()return {"data": {"session_id": session_id}, "status_code": HTTPStatus.OK, "message": "login successfully"}session_id = md5((password + str(time.time())).encode()).hexdigest()  # 生成会话IDSESSION_IDS[session_id] = {"user_info": current_user, "timestamp": time.time()}  # 记录会话信息return {"data": {"session_id": session_id}, "status_code": HTTPStatus.OK, "message": "login successfully"}@app.route("/permission_manage", methods=["POST"])
@permission(roles=[Role.ADMIN])
def permission_manage():username = request.form.get("username")role = request.form.get("role")if not username or not role:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}roles = [role.value for role in Role]if role not in roles:  # 判断输入的角色名称是否合法return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)permit_user = Nonefor account in accounts: # 查找被授权用户if account.get("username", "") == username:permit_user = accountbreakif permit_user is None:  # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}permit_user["role"] = rolewith open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)global SESSION_IDSfor _, session_info in SESSION_IDS.items(): # 如果授权用户已登陆则修改session中该用户的角色信息if session_info["user_info"].get("username") == username:session_info["user_info"]["role"] = rolereturn {"data": "", "status_code": HTTPStatus.OK, "message": "successfully"}@app.route("/cmdb", methods=["POST"])
@permission(roles=[Role.CMDB])
def cmdb():passreturn "success"if __name__ == "__main__":app.run(host="127.0.0.1", port=5000, debug=True)

【总结】

当用户范围足够广的时候,角色的定义就会变得复杂,可能会涉及到角色的继承,或者先将多个用户分到同一个用户组,然后再给这个用户组赋予一个角色,等等。除此之外除了对接口进行鉴权,有时候还需要对访问的资源进行鉴权,比如访问的数据,或者操作的设备等。

鉴权如果复杂可以做得很复杂,想要简单同样也可以很简单,我们这一章节其实就是简化鉴权逻辑,让大家能够了解到鉴权的底层原理和实现方式,希望大家可以仔细阅读体会。


欢迎大家添加我的个人公众号【Python玩转自动化运维】加入读者交流群,获取更多干货内容

这篇关于【自动化运维新手村】Flask-权限校验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/702877

相关文章

使用Python实现Word文档的自动化对比方案

《使用Python实现Word文档的自动化对比方案》我们经常需要比较两个Word文档的版本差异,无论是合同修订、论文修改还是代码文档更新,人工比对不仅效率低下,还容易遗漏关键改动,下面通过一个实际案例... 目录引言一、使用python-docx库解析文档结构二、使用difflib进行差异比对三、高级对比方

Python自动化处理PDF文档的操作完整指南

《Python自动化处理PDF文档的操作完整指南》在办公自动化中,PDF文档处理是一项常见需求,本文将介绍如何使用Python实现PDF文档的自动化处理,感兴趣的小伙伴可以跟随小编一起学习一下... 目录使用pymupdf读写PDF文件基本概念安装pymupdf提取文本内容提取图像添加水印使用pdfplum

基于Python实现自动化邮件发送系统的完整指南

《基于Python实现自动化邮件发送系统的完整指南》在现代软件开发和自动化流程中,邮件通知是一个常见且实用的功能,无论是用于发送报告、告警信息还是用户提醒,通过Python实现自动化的邮件发送功能都能... 目录一、前言:二、项目概述三、配置文件 `.env` 解析四、代码结构解析1. 导入模块2. 加载环

Python实战之SEO优化自动化工具开发指南

《Python实战之SEO优化自动化工具开发指南》在数字化营销时代,搜索引擎优化(SEO)已成为网站获取流量的重要手段,本文将带您使用Python开发一套完整的SEO自动化工具,需要的可以了解下... 目录前言项目概述技术栈选择核心模块实现1. 关键词研究模块2. 网站技术seo检测模块3. 内容优化分析模

Python Flask实现定时任务的不同方法详解

《PythonFlask实现定时任务的不同方法详解》在Flask中实现定时任务,最常用的方法是使用APScheduler库,本文将提供一个完整的解决方案,有需要的小伙伴可以跟随小编一起学习一下... 目录完js整实现方案代码解释1. 依赖安装2. 核心组件3. 任务类型4. 任务管理5. 持久化存储生产环境

Python使用python-pptx自动化操作和生成PPT

《Python使用python-pptx自动化操作和生成PPT》这篇文章主要为大家详细介绍了如何使用python-pptx库实现PPT自动化,并提供实用的代码示例和应用场景,感兴趣的小伙伴可以跟随小编... 目录使用python-pptx操作PPT文档安装python-pptx基础概念创建新的PPT文档查看

基于Python编写自动化邮件发送程序(进阶版)

《基于Python编写自动化邮件发送程序(进阶版)》在数字化时代,自动化邮件发送功能已成为企业和个人提升工作效率的重要工具,本文将使用Python编写一个简单的自动化邮件发送程序,希望对大家有所帮助... 目录理解SMTP协议基础配置开发环境构建邮件发送函数核心逻辑实现完整发送流程添加附件支持功能实现htm

Python用Flask封装API及调用详解

《Python用Flask封装API及调用详解》本文介绍Flask的优势(轻量、灵活、易扩展),对比GET/POST表单/JSON请求方式,涵盖错误处理、开发建议及生产环境部署注意事项... 目录一、Flask的优势一、基础设置二、GET请求方式服务端代码客户端调用三、POST表单方式服务端代码客户端调用四

Springboot项目登录校验功能实现

《Springboot项目登录校验功能实现》本文介绍了Web登录校验的重要性,对比了Cookie、Session和JWT三种会话技术,分析其优缺点,并讲解了过滤器与拦截器的统一拦截方案,推荐使用JWT... 目录引言一、登录校验的基本概念二、HTTP协议的无状态性三、会话跟android踪技术1. Cook

Linux权限管理与ACL访问控制详解

《Linux权限管理与ACL访问控制详解》Linux权限管理涵盖基本rwx权限(通过chmod设置)、特殊权限(SUID/SGID/StickyBit)及ACL精细授权,由umask决定默认权限,需合... 目录一、基本权限概述1. 基本权限与数字对应关系二、权限管理命令(chmod)1. 字符模式语法2.