【自动化运维新手村】Flask-权限校验

2024-02-12 15:20

本文主要是介绍【自动化运维新手村】Flask-权限校验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【摘要】

上一章节,我们主要对Web应用的用户认证做了详细的讲解,包括使用Flask实现用户注册,登录,并通过Session机制实现用户保持登录。那么在了解了用户认证之后,这一章节我们就着重介绍一下权限校验的原理以及实现方式。

【为什么需要鉴权】

用户在通过认证之后,已经可以正常访问我们的后端应用,但当后端应用越来越完善,功能越来越丰富,并且牵扯的资源以及用户的范围都足够广的时候,用户的权限校验就显得尤为重要。例如:

1.是否所有用户都可以通过接口获取CMDB的数据信息;

2.是否所有的用户都可以调用接口对设备进行操作;

3.是否拥有获取CMDB信息权限的用户也拥有对设备进行操作的权限;

4.是否拥有对设备操作权限的用户,就可以对全部的设备进行操作;

诸如上述的权限问题还有很多,下面就一起来看看如何在Flask应用中进行鉴权。

【Flask实现】

对用户鉴权相信大家都可以理解,其实就是判断用户是否有权限访问某个API,在实现上也相对比较简单。

上一章节中,我们通过装饰器实现了登录认证,伪代码如下:

def permission(func):@wraps(func)def inner():if not auth():return Failreturn func()return inner@app.route("/index")
@permission
def index():return "success"

登录认证的具体实现其实就是将该装饰器加在需要认证的路由函数上,这样就可以在调用该路由函数前进行一系列的认证过程。

基于用户鉴权

如果想要在登录认证的前提下再进行用户鉴权,则只需要修改permission装饰器,使其可以对用户进行判断,然后决定是否允许该用户访问。但是我们需要先明确具体的路由函数允许哪个用户访问,并且将这个限制传入装饰器中,这时候就需要用到【自动化运维新手村】装饰器-进阶中的带参数的装饰器,伪代码如下:

def permission(permit_users):def login_acquired(func):@wraps(func)def inner():if not auth() or current_user not in permit_users:return Failreturn func()return innerreturn login_acquired@app.route("/index")
@permission(["ethan", "john", "jack"])
def index():return "success"

经过改进的装饰器加在需要鉴权的路由函数上,并且传入该路由函数允许访问的用户列表,这样在登录认证时,通过session_id获取到当前用户,判断该用户是否在允许访问的用户列表中即可。

其实不难看出,通过用户去区分权限显然是不太现实的,用户数量增多的时候,可能会让权限控制变得十分难以维护。

那么最先想到的改进方法,应该就是将不同的用户赋予不同的角色,这样在权限控制的时候,鉴权粒度就由用户变成了角色。

基于角色鉴权

关于通过角色鉴权目前业内已经有一套成熟的规范 —— RBAC(Role-Based Access Control, 基于角色的访问控制),就是用户通过角色与权限进行关联。

RBCA本质上是对用户进行分组管理,赋予角色,对权限进行合理的划分,最终实现一个用户拥有若干角色,每一个角色拥有若干权限。并且RBCA具有十分完善的权限模型设计,对于大型系统的权限管理是非常重要的,但这一章节目的就是化繁为简,学会其权限管理的基本原理和底层实现。

定义角色

这里定义角色时引入了一个新的概念,叫做枚举类型,枚举类型可以看作是一种标签或是一系列常量的集合,通常用于表示某些特定的有限集合,例如星期、月份、状态等,那么我们这里的角色显然也适合用枚举类型来定义。

from enum import Enumclass Role(Enum):ADMIN = "admin"CMDB = "cmdb"GUEST = "guest"

由于到目前为止,我们的后端应用还没有引入数据库的概念,所以权限信息可以和用户信息一起暂时保存在JSON文件中,这里用户的权限信息可以通过在用户信息中新增一个role字段来进行标识。模型如下:

[{"username": "","password": "","role": ""}
]
鉴权逻辑

上文中已经提到需要将装饰器修改为可传参的装饰器,修改后整体逻辑如下:

1.允许传入参数roles,可以是多个角色或单个角色,参数类型为列表,如果不传默认为None,表示不限制角色

2.判断用户是否登录的逻辑保持不变

3.根据session_id获取当前已登陆用户

4.判断该用户的角色是否包含在传入的参数roles

代码如下:

from http import HTTPStatus # 引入了http包中的状态码def permission(roles=None):def login_required(func):@wraps(func)def inner():session_id = request.headers.get("session_id", "")global SESSION_IDSif session_id not in SESSION_IDS:  # 是否存在会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username not login"}if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT:  # 是否会话仍有效SESSION_IDS.pop(session_id)  # 如果失效则移除会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username login timeout"}SESSION_IDS[session_id]["timestamp"] = time.time()  # 更新会话时间current_user = SESSION_IDS[session_id]role_values = [role.value for role in roles]if roles is not None and current_user["user_info"].get("role") not in role_values:return {"data": None, "status_code": HTTPStatus.FORBIDDEN, "message": "user has no permission"}return func()return innerreturn login_required
为用户授权

现在新增一个为用户授权的路由函数,但这个函数同样应该设置权限,只允许管理员角色调用它,所以一开始需要在用户信息中初始化一个管理员账户,如下:

[{"username": "yuefeiyu","password": "af058879880f293b3b9b4a7072e5d0bf","role": "admin"}
]

为用户授权的大致逻辑如下:

1.通过POST请求传入usernamerole表单参数

2.判断参数是否合法,role是否属于枚举类型中已定义的角色

3.获取已注册的用户信息

4.判断被授权用户是否已注册

5.修改该用户的角色信息并保存

6.如果授权用户已登陆则修改session中该用户的角色信息

代码如下:

@app.route("/permission_manage", methods=["POST"])
@permission(roles=[Role.ADMIN])
def permission_manage():username = request.form.get("username")role = request.form.get("role")if not username or not role:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}roles = [role.value for role in Role]if role not in roles:  # 判断输入的角色名称是否合法return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)permit_user = Nonefor account in accounts: # 查找被授权用户if account.get("username", "") == username:permit_user = accountbreakif permit_user is None:  # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}permit_user["role"] = roleglobal SESSION_IDSfor _, session_info in SESSION_IDS.items(): # 如果授权用户已登陆则修改session中该用户的角色信息if session_info["user_info"].get("username") == username:session_info["user_info"]["role"] = rolewith open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)return {"data": "", "status_code": HTTPStatus.OK, "message": "successfully"}
接口演示如下:

1.登录管理员用户

在这里插入图片描述

2.为用户jack授予guest权限

在这里插入图片描述

3.jack登录后携带jack的session_id访问cmdb接口

在这里插入图片描述

完整代码
import os
import time
import json
from hashlib import md5
from functools import wraps
from enum import Enum
from http import HTTPStatus
from flask import Flask, requestapp = Flask(__name__)ACCOUNTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "accounts.json")SESSION_IDS = {}LOGIN_TIMEOUT = 60 * 60 * 24class Role(Enum):ADMIN = "admin"CMDB = "cmdb"GUEST = "guest"def permission(roles=None):def login_required(func):@wraps(func)def inner():session_id = request.headers.get("session_id", "")global SESSION_IDSif session_id not in SESSION_IDS:  # 是否存在会话信心return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username not login"}if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT:  # 是否会话仍有效SESSION_IDS.pop(session_id)  # 如果失效则移除会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username login timeout"}SESSION_IDS[session_id]["timestamp"] = time.time()  # 更新会话时间current_user = SESSION_IDS[session_id]role_values = [role.value for role in roles]if roles is not None and current_user["user_info"].get("role") not in role_values:return {"data": None, "status_code": HTTPStatus.FORBIDDEN, "message": "user has no permission"}return func()return innerreturn login_required@app.route("/register", methods=["POST"])
def register():"""注册用户信息"""username = request.form.get("username")password = request.form.get("password")if not username or not password:  # 判断用户输入的参数return {"data": None, "status_code": HTTPStatus.BAD_REQUEST, "message": "must have username and password"}if not os.path.exists(ACCOUNTS_FILE):  # 判断是否存在指定文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)for account in accounts:if account["username"] == username:  # 判断是否用户已存在return {"data": None, "status_code": HTTPStatus.CONFLICT, "message": "username is already exists"}accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})with open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)return {"data": username, "status_code": HTTPStatus.OK, "message": "register username successfully"}@app.route("/login", methods=["POST"])
def login():"""用户登录"""username = request.form.get("username")password = request.form.get("password")if not username or not password:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST, "message": "invalid parameters"}if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)current_user = Nonefor account in accounts:if account["username"] == username:current_user = accountbreakif current_user is None: # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}if md5(password.encode()).hexdigest() != current_user["password"]:  # 是否用户名密码正确return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "password is not correct"}global SESSION_IDSfor session_id, session_info in SESSION_IDS.items():  # 判断用户是否已经登陆if session_info["user_info"].get("username") == username: # 如果已经登录则更新时间戳并返回已登陆的sessionIDsession_info["timestamp"] = time.time()return {"data": {"session_id": session_id}, "status_code": HTTPStatus.OK, "message": "login successfully"}session_id = md5((password + str(time.time())).encode()).hexdigest()  # 生成会话IDSESSION_IDS[session_id] = {"user_info": current_user, "timestamp": time.time()}  # 记录会话信息return {"data": {"session_id": session_id}, "status_code": HTTPStatus.OK, "message": "login successfully"}@app.route("/permission_manage", methods=["POST"])
@permission(roles=[Role.ADMIN])
def permission_manage():username = request.form.get("username")role = request.form.get("role")if not username or not role:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}roles = [role.value for role in Role]if role not in roles:  # 判断输入的角色名称是否合法return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)permit_user = Nonefor account in accounts: # 查找被授权用户if account.get("username", "") == username:permit_user = accountbreakif permit_user is None:  # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}permit_user["role"] = rolewith open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)global SESSION_IDSfor _, session_info in SESSION_IDS.items(): # 如果授权用户已登陆则修改session中该用户的角色信息if session_info["user_info"].get("username") == username:session_info["user_info"]["role"] = rolereturn {"data": "", "status_code": HTTPStatus.OK, "message": "successfully"}@app.route("/cmdb", methods=["POST"])
@permission(roles=[Role.CMDB])
def cmdb():passreturn "success"if __name__ == "__main__":app.run(host="127.0.0.1", port=5000, debug=True)

【总结】

当用户范围足够广的时候,角色的定义就会变得复杂,可能会涉及到角色的继承,或者先将多个用户分到同一个用户组,然后再给这个用户组赋予一个角色,等等。除此之外除了对接口进行鉴权,有时候还需要对访问的资源进行鉴权,比如访问的数据,或者操作的设备等。

鉴权如果复杂可以做得很复杂,想要简单同样也可以很简单,我们这一章节其实就是简化鉴权逻辑,让大家能够了解到鉴权的底层原理和实现方式,希望大家可以仔细阅读体会。


欢迎大家添加我的个人公众号【Python玩转自动化运维】加入读者交流群,获取更多干货内容

这篇关于【自动化运维新手村】Flask-权限校验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/702877

相关文章

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

【Linux 从基础到进阶】Ansible自动化运维工具使用

Ansible自动化运维工具使用 Ansible 是一款开源的自动化运维工具,采用无代理架构(agentless),基于 SSH 连接进行管理,具有简单易用、灵活强大、可扩展性高等特点。它广泛用于服务器管理、应用部署、配置管理等任务。本文将介绍 Ansible 的安装、基本使用方法及一些实际运维场景中的应用,旨在帮助运维人员快速上手并熟练运用 Ansible。 1. Ansible的核心概念

如何使用Ansible实现CI/CD流水线的自动化

如何使用Ansible实现CI/CD流水线的自动化 持续集成(CI)和持续交付(CD)是现代软件开发过程中的核心实践,它们帮助团队更快地交付高质量的软件。Ansible,作为一个强大的自动化工具,可以在CI/CD流水线中发挥关键作用。本文将详细介绍如何使用Ansible实现CI/CD流水线的自动化,包括设计流水线的结构、配置管理、自动化测试、部署、以及集成Ansible与CI/CD工具(如Jen

校验码:奇偶校验,CRC循环冗余校验,海明校验码

文章目录 奇偶校验码CRC循环冗余校验码海明校验码 奇偶校验码 码距:任何一种编码都由许多码字构成,任意两个码字之间最少变化的二进制位数就称为数据检验码的码距。 奇偶校验码的编码方法是:由若干位有效信息(如一个字节),再加上一个二进制位(校验位)组成校验码。 奇校验:整个校验码中1的个数为奇数 偶校验:整个校验码中1的个数为偶数 奇偶校验,可检测1位(奇数位)的错误,不可纠错。

Golang进程权限调度包runtime

关于 runtime 包几个方法: Gosched:让当前线程让出 cpu 以让其它线程运行,它不会挂起当前线程,因此当前线程未来会继续执行GOMAXPROCS:设置最大的可同时使用的 CPU 核数Goexit:退出当前 goroutine(但是defer语句会照常执行)NumGoroutine:返回正在执行和排队的任务总数GOOS:目标操作系统NumCPU:返回当前系统的 CPU 核数量 p

BIRT 报表的自动化测试

来源:http://www.ibm.com/developerworks/cn/opensource/os-cn-ecl-birttest/如何为 BIRT 报表编写自动化测试用例 BIRT 是一项很受欢迎的报表制作工具,但目前对其的测试还是以人工测试为主。本文介绍了如何对 BIRT 报表进行自动化测试,以及在实际项目中的一些测试实践,从而提高了测试的效率和准确性 -------

android java.io.IOException: open failed: ENOENT (No such file or directory)-api23+权限受权

问题描述 在安卓上,清单明明已经受权了读写文件权限,但偏偏就是创建不了目录和文件 调用mkdirs()总是返回false. <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/><uses-permission android:name="android.permission.READ_E

文章解读与仿真程序复现思路——电力自动化设备EI\CSCD\北大核心《考虑燃料电池和电解槽虚拟惯量支撑的电力系统优化调度方法》

本专栏栏目提供文章与程序复现思路,具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源程序擅长文章解读,论文与完整源程序,等方面的知识,电网论文源程序关注python

Android6.0以上权限申请

说明: 部分1:出自:http://jijiaxin89.com/2015/08/30/Android-s-Runtime-Permission/ android M 的名字官方刚发布不久,最终正式版即将来临! android在不断发展,最近的更新 M 非常不同,一些主要的变化例如运行时权限将有颠覆性影响。惊讶的是android社区鲜有谈论这事儿,尽管这事很重要或许在不远的将来会引

自动化表格处理的革命:智能文档系统技术解析

在当今数据驱动的商业环境中,表格数据的自动化处理成为了企业提高效率、降低成本的关键。企业智能文档系统在智能表格识别方面展现出卓越的性能,通过精准识别和处理各种通用表格,显著提升了企业文档管理的智能化水平。本文将深入探讨该系统在表格识别方面的关键技术和应用优势,以及如何通过行业定制化服务满足不同行业的需求。 1. 通用表格识别 智能文档系统通过先进的OCR技术和表格结构识别算法,能够精准