【自动化运维新手村】Flask-权限校验

2024-02-12 15:20

本文主要是介绍【自动化运维新手村】Flask-权限校验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【摘要】

上一章节,我们主要对Web应用的用户认证做了详细的讲解,包括使用Flask实现用户注册,登录,并通过Session机制实现用户保持登录。那么在了解了用户认证之后,这一章节我们就着重介绍一下权限校验的原理以及实现方式。

【为什么需要鉴权】

用户在通过认证之后,已经可以正常访问我们的后端应用,但当后端应用越来越完善,功能越来越丰富,并且牵扯的资源以及用户的范围都足够广的时候,用户的权限校验就显得尤为重要。例如:

1.是否所有用户都可以通过接口获取CMDB的数据信息;

2.是否所有的用户都可以调用接口对设备进行操作;

3.是否拥有获取CMDB信息权限的用户也拥有对设备进行操作的权限;

4.是否拥有对设备操作权限的用户,就可以对全部的设备进行操作;

诸如上述的权限问题还有很多,下面就一起来看看如何在Flask应用中进行鉴权。

【Flask实现】

对用户鉴权相信大家都可以理解,其实就是判断用户是否有权限访问某个API,在实现上也相对比较简单。

上一章节中,我们通过装饰器实现了登录认证,伪代码如下:

def permission(func):@wraps(func)def inner():if not auth():return Failreturn func()return inner@app.route("/index")
@permission
def index():return "success"

登录认证的具体实现其实就是将该装饰器加在需要认证的路由函数上,这样就可以在调用该路由函数前进行一系列的认证过程。

基于用户鉴权

如果想要在登录认证的前提下再进行用户鉴权,则只需要修改permission装饰器,使其可以对用户进行判断,然后决定是否允许该用户访问。但是我们需要先明确具体的路由函数允许哪个用户访问,并且将这个限制传入装饰器中,这时候就需要用到【自动化运维新手村】装饰器-进阶中的带参数的装饰器,伪代码如下:

def permission(permit_users):def login_acquired(func):@wraps(func)def inner():if not auth() or current_user not in permit_users:return Failreturn func()return innerreturn login_acquired@app.route("/index")
@permission(["ethan", "john", "jack"])
def index():return "success"

经过改进的装饰器加在需要鉴权的路由函数上,并且传入该路由函数允许访问的用户列表,这样在登录认证时,通过session_id获取到当前用户,判断该用户是否在允许访问的用户列表中即可。

其实不难看出,通过用户去区分权限显然是不太现实的,用户数量增多的时候,可能会让权限控制变得十分难以维护。

那么最先想到的改进方法,应该就是将不同的用户赋予不同的角色,这样在权限控制的时候,鉴权粒度就由用户变成了角色。

基于角色鉴权

关于通过角色鉴权目前业内已经有一套成熟的规范 —— RBAC(Role-Based Access Control, 基于角色的访问控制),就是用户通过角色与权限进行关联。

RBCA本质上是对用户进行分组管理,赋予角色,对权限进行合理的划分,最终实现一个用户拥有若干角色,每一个角色拥有若干权限。并且RBCA具有十分完善的权限模型设计,对于大型系统的权限管理是非常重要的,但这一章节目的就是化繁为简,学会其权限管理的基本原理和底层实现。

定义角色

这里定义角色时引入了一个新的概念,叫做枚举类型,枚举类型可以看作是一种标签或是一系列常量的集合,通常用于表示某些特定的有限集合,例如星期、月份、状态等,那么我们这里的角色显然也适合用枚举类型来定义。

from enum import Enumclass Role(Enum):ADMIN = "admin"CMDB = "cmdb"GUEST = "guest"

由于到目前为止,我们的后端应用还没有引入数据库的概念,所以权限信息可以和用户信息一起暂时保存在JSON文件中,这里用户的权限信息可以通过在用户信息中新增一个role字段来进行标识。模型如下:

[{"username": "","password": "","role": ""}
]
鉴权逻辑

上文中已经提到需要将装饰器修改为可传参的装饰器,修改后整体逻辑如下:

1.允许传入参数roles,可以是多个角色或单个角色,参数类型为列表,如果不传默认为None,表示不限制角色

2.判断用户是否登录的逻辑保持不变

3.根据session_id获取当前已登陆用户

4.判断该用户的角色是否包含在传入的参数roles

代码如下:

from http import HTTPStatus # 引入了http包中的状态码def permission(roles=None):def login_required(func):@wraps(func)def inner():session_id = request.headers.get("session_id", "")global SESSION_IDSif session_id not in SESSION_IDS:  # 是否存在会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username not login"}if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT:  # 是否会话仍有效SESSION_IDS.pop(session_id)  # 如果失效则移除会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username login timeout"}SESSION_IDS[session_id]["timestamp"] = time.time()  # 更新会话时间current_user = SESSION_IDS[session_id]role_values = [role.value for role in roles]if roles is not None and current_user["user_info"].get("role") not in role_values:return {"data": None, "status_code": HTTPStatus.FORBIDDEN, "message": "user has no permission"}return func()return innerreturn login_required
为用户授权

现在新增一个为用户授权的路由函数,但这个函数同样应该设置权限,只允许管理员角色调用它,所以一开始需要在用户信息中初始化一个管理员账户,如下:

[{"username": "yuefeiyu","password": "af058879880f293b3b9b4a7072e5d0bf","role": "admin"}
]

为用户授权的大致逻辑如下:

1.通过POST请求传入usernamerole表单参数

2.判断参数是否合法,role是否属于枚举类型中已定义的角色

3.获取已注册的用户信息

4.判断被授权用户是否已注册

5.修改该用户的角色信息并保存

6.如果授权用户已登陆则修改session中该用户的角色信息

代码如下:

@app.route("/permission_manage", methods=["POST"])
@permission(roles=[Role.ADMIN])
def permission_manage():username = request.form.get("username")role = request.form.get("role")if not username or not role:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}roles = [role.value for role in Role]if role not in roles:  # 判断输入的角色名称是否合法return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)permit_user = Nonefor account in accounts: # 查找被授权用户if account.get("username", "") == username:permit_user = accountbreakif permit_user is None:  # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}permit_user["role"] = roleglobal SESSION_IDSfor _, session_info in SESSION_IDS.items(): # 如果授权用户已登陆则修改session中该用户的角色信息if session_info["user_info"].get("username") == username:session_info["user_info"]["role"] = rolewith open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)return {"data": "", "status_code": HTTPStatus.OK, "message": "successfully"}
接口演示如下:

1.登录管理员用户

在这里插入图片描述

2.为用户jack授予guest权限

在这里插入图片描述

3.jack登录后携带jack的session_id访问cmdb接口

在这里插入图片描述

完整代码
import os
import time
import json
from hashlib import md5
from functools import wraps
from enum import Enum
from http import HTTPStatus
from flask import Flask, requestapp = Flask(__name__)ACCOUNTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "accounts.json")SESSION_IDS = {}LOGIN_TIMEOUT = 60 * 60 * 24class Role(Enum):ADMIN = "admin"CMDB = "cmdb"GUEST = "guest"def permission(roles=None):def login_required(func):@wraps(func)def inner():session_id = request.headers.get("session_id", "")global SESSION_IDSif session_id not in SESSION_IDS:  # 是否存在会话信心return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username not login"}if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT:  # 是否会话仍有效SESSION_IDS.pop(session_id)  # 如果失效则移除会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username login timeout"}SESSION_IDS[session_id]["timestamp"] = time.time()  # 更新会话时间current_user = SESSION_IDS[session_id]role_values = [role.value for role in roles]if roles is not None and current_user["user_info"].get("role") not in role_values:return {"data": None, "status_code": HTTPStatus.FORBIDDEN, "message": "user has no permission"}return func()return innerreturn login_required@app.route("/register", methods=["POST"])
def register():"""注册用户信息"""username = request.form.get("username")password = request.form.get("password")if not username or not password:  # 判断用户输入的参数return {"data": None, "status_code": HTTPStatus.BAD_REQUEST, "message": "must have username and password"}if not os.path.exists(ACCOUNTS_FILE):  # 判断是否存在指定文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)for account in accounts:if account["username"] == username:  # 判断是否用户已存在return {"data": None, "status_code": HTTPStatus.CONFLICT, "message": "username is already exists"}accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})with open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)return {"data": username, "status_code": HTTPStatus.OK, "message": "register username successfully"}@app.route("/login", methods=["POST"])
def login():"""用户登录"""username = request.form.get("username")password = request.form.get("password")if not username or not password:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST, "message": "invalid parameters"}if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)current_user = Nonefor account in accounts:if account["username"] == username:current_user = accountbreakif current_user is None: # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}if md5(password.encode()).hexdigest() != current_user["password"]:  # 是否用户名密码正确return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "password is not correct"}global SESSION_IDSfor session_id, session_info in SESSION_IDS.items():  # 判断用户是否已经登陆if session_info["user_info"].get("username") == username: # 如果已经登录则更新时间戳并返回已登陆的sessionIDsession_info["timestamp"] = time.time()return {"data": {"session_id": session_id}, "status_code": HTTPStatus.OK, "message": "login successfully"}session_id = md5((password + str(time.time())).encode()).hexdigest()  # 生成会话IDSESSION_IDS[session_id] = {"user_info": current_user, "timestamp": time.time()}  # 记录会话信息return {"data": {"session_id": session_id}, "status_code": HTTPStatus.OK, "message": "login successfully"}@app.route("/permission_manage", methods=["POST"])
@permission(roles=[Role.ADMIN])
def permission_manage():username = request.form.get("username")role = request.form.get("role")if not username or not role:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}roles = [role.value for role in Role]if role not in roles:  # 判断输入的角色名称是否合法return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)permit_user = Nonefor account in accounts: # 查找被授权用户if account.get("username", "") == username:permit_user = accountbreakif permit_user is None:  # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}permit_user["role"] = rolewith open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)global SESSION_IDSfor _, session_info in SESSION_IDS.items(): # 如果授权用户已登陆则修改session中该用户的角色信息if session_info["user_info"].get("username") == username:session_info["user_info"]["role"] = rolereturn {"data": "", "status_code": HTTPStatus.OK, "message": "successfully"}@app.route("/cmdb", methods=["POST"])
@permission(roles=[Role.CMDB])
def cmdb():passreturn "success"if __name__ == "__main__":app.run(host="127.0.0.1", port=5000, debug=True)

【总结】

当用户范围足够广的时候,角色的定义就会变得复杂,可能会涉及到角色的继承,或者先将多个用户分到同一个用户组,然后再给这个用户组赋予一个角色,等等。除此之外除了对接口进行鉴权,有时候还需要对访问的资源进行鉴权,比如访问的数据,或者操作的设备等。

鉴权如果复杂可以做得很复杂,想要简单同样也可以很简单,我们这一章节其实就是简化鉴权逻辑,让大家能够了解到鉴权的底层原理和实现方式,希望大家可以仔细阅读体会。


欢迎大家添加我的个人公众号【Python玩转自动化运维】加入读者交流群,获取更多干货内容

这篇关于【自动化运维新手村】Flask-权限校验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/702877

相关文章

MySQL分表自动化创建的实现方案

《MySQL分表自动化创建的实现方案》在数据库应用场景中,随着数据量的不断增长,单表存储数据可能会面临性能瓶颈,例如查询、插入、更新等操作的效率会逐渐降低,分表是一种有效的优化策略,它将数据分散存储在... 目录一、项目目的二、实现过程(一)mysql 事件调度器结合存储过程方式1. 开启事件调度器2. 创

Java访问修饰符public、private、protected及默认访问权限详解

《Java访问修饰符public、private、protected及默认访问权限详解》:本文主要介绍Java访问修饰符public、private、protected及默认访问权限的相关资料,每... 目录前言1. public 访问修饰符特点:示例:适用场景:2. private 访问修饰符特点:示例:

Spring常见错误之Web嵌套对象校验失效解决办法

《Spring常见错误之Web嵌套对象校验失效解决办法》:本文主要介绍Spring常见错误之Web嵌套对象校验失效解决的相关资料,通过在Phone对象上添加@Valid注解,问题得以解决,需要的朋... 目录问题复现案例解析问题修正总结  问题复现当开发一个学籍管理系统时,我们会提供了一个 API 接口去

Python Invoke自动化任务库的使用

《PythonInvoke自动化任务库的使用》Invoke是一个强大的Python库,用于编写自动化脚本,本文就来介绍一下PythonInvoke自动化任务库的使用,具有一定的参考价值,感兴趣的可以... 目录什么是 Invoke?如何安装 Invoke?Invoke 基础1. 运行测试2. 构建文档3.

Windows自动化Python pyautogui RPA操作实现

《Windows自动化PythonpyautoguiRPA操作实现》本文详细介绍了使用Python的pyautogui库进行Windows自动化操作的实现方法,文中通过示例代码介绍的非常详细,对大... 目录依赖包睡眠:鼠标事件:杀死进程:获取所有窗口的名称:显示窗口:根据图片找元素:输入文字:打开应用:依

Linux中chmod权限设置方式

《Linux中chmod权限设置方式》本文介绍了Linux系统中文件和目录权限的设置方法,包括chmod、chown和chgrp命令的使用,以及权限模式和符号模式的详细说明,通过这些命令,用户可以灵活... 目录设置基本权限命令:chmod1、权限介绍2、chmod命令常见用法和示例3、文件权限详解4、ch

Jenkins中自动化部署Spring Boot项目的全过程

《Jenkins中自动化部署SpringBoot项目的全过程》:本文主要介绍如何使用Jenkins从Git仓库拉取SpringBoot项目并进行自动化部署,通过配置Jenkins任务,实现项目的... 目录准备工作启动 Jenkins配置 Jenkins创建及配置任务源码管理构建触发器构建构建后操作构建任务

Mybatis拦截器如何实现数据权限过滤

《Mybatis拦截器如何实现数据权限过滤》本文介绍了MyBatis拦截器的使用,通过实现Interceptor接口对SQL进行处理,实现数据权限过滤功能,通过在本地线程变量中存储数据权限相关信息,并... 目录背景基础知识MyBATis 拦截器介绍代码实战总结背景现在的项目负责人去年年底离职,导致前期规

spring 参数校验Validation示例详解

《spring参数校验Validation示例详解》Spring提供了Validation工具类来实现对客户端传来的请求参数的有效校验,本文给大家介绍spring参数校验Validation示例详... 目录前言一、Validation常见的校验注解二、Validation的简单应用三、分组校验四、自定义校

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去