本文主要是介绍【自动化运维新手村】Flask-权限校验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
【摘要】
上一章节,我们主要对Web应用的用户认证做了详细的讲解,包括使用Flask实现用户注册,登录,并通过Session机制实现用户保持登录。那么在了解了用户认证之后,这一章节我们就着重介绍一下权限校验的原理以及实现方式。
【为什么需要鉴权】
用户在通过认证之后,已经可以正常访问我们的后端应用,但当后端应用越来越完善,功能越来越丰富,并且牵扯的资源以及用户的范围都足够广的时候,用户的权限校验就显得尤为重要。例如:
1.是否所有用户都可以通过接口获取CMDB的数据信息;
2.是否所有的用户都可以调用接口对设备进行操作;
3.是否拥有获取CMDB信息权限的用户也拥有对设备进行操作的权限;
4.是否拥有对设备操作权限的用户,就可以对全部的设备进行操作;
诸如上述的权限问题还有很多,下面就一起来看看如何在Flask应用中进行鉴权。
【Flask实现】
对用户鉴权相信大家都可以理解,其实就是判断用户是否有权限访问某个API,在实现上也相对比较简单。
上一章节中,我们通过装饰器实现了登录认证,伪代码如下:
def permission(func):@wraps(func)def inner():if not auth():return Failreturn func()return inner@app.route("/index")
@permission
def index():return "success"
登录认证的具体实现其实就是将该装饰器加在需要认证的路由函数上,这样就可以在调用该路由函数前进行一系列的认证过程。
基于用户鉴权
如果想要在登录认证的前提下再进行用户鉴权,则只需要修改permission
装饰器,使其可以对用户进行判断,然后决定是否允许该用户访问。但是我们需要先明确具体的路由函数允许哪个用户访问,并且将这个限制传入装饰器中,这时候就需要用到【自动化运维新手村】装饰器-进阶中的带参数的装饰器,伪代码如下:
def permission(permit_users):def login_acquired(func):@wraps(func)def inner():if not auth() or current_user not in permit_users:return Failreturn func()return innerreturn login_acquired@app.route("/index")
@permission(["ethan", "john", "jack"])
def index():return "success"
经过改进的装饰器加在需要鉴权的路由函数上,并且传入该路由函数允许访问的用户列表,这样在登录认证时,通过session_id
获取到当前用户,判断该用户是否在允许访问的用户列表中即可。
其实不难看出,通过用户去区分权限显然是不太现实的,用户数量增多的时候,可能会让权限控制变得十分难以维护。
那么最先想到的改进方法,应该就是将不同的用户赋予不同的角色,这样在权限控制的时候,鉴权粒度就由用户变成了角色。
基于角色鉴权
关于通过角色鉴权目前业内已经有一套成熟的规范 —— RBAC(Role-Based Access Control, 基于角色的访问控制),就是用户通过角色与权限进行关联。
RBCA本质上是对用户进行分组管理,赋予角色,对权限进行合理的划分,最终实现一个用户拥有若干角色,每一个角色拥有若干权限。并且RBCA具有十分完善的权限模型设计,对于大型系统的权限管理是非常重要的,但这一章节目的就是化繁为简,学会其权限管理的基本原理和底层实现。
定义角色
这里定义角色时引入了一个新的概念,叫做枚举类型,枚举类型可以看作是一种标签或是一系列常量的集合,通常用于表示某些特定的有限集合,例如星期、月份、状态等,那么我们这里的角色显然也适合用枚举类型来定义。
from enum import Enumclass Role(Enum):ADMIN = "admin"CMDB = "cmdb"GUEST = "guest"
由于到目前为止,我们的后端应用还没有引入数据库的概念,所以权限信息可以和用户信息一起暂时保存在JSON文件中,这里用户的权限信息可以通过在用户信息中新增一个role
字段来进行标识。模型如下:
[{"username": "","password": "","role": ""}
]
鉴权逻辑
上文中已经提到需要将装饰器修改为可传参的装饰器,修改后整体逻辑如下:
1.允许传入参数roles
,可以是多个角色或单个角色,参数类型为列表,如果不传默认为None,表示不限制角色
2.判断用户是否登录的逻辑保持不变
3.根据session_id
获取当前已登陆用户
4.判断该用户的角色是否包含在传入的参数roles
中
代码如下:
from http import HTTPStatus # 引入了http包中的状态码def permission(roles=None):def login_required(func):@wraps(func)def inner():session_id = request.headers.get("session_id", "")global SESSION_IDSif session_id not in SESSION_IDS: # 是否存在会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username not login"}if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT: # 是否会话仍有效SESSION_IDS.pop(session_id) # 如果失效则移除会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username login timeout"}SESSION_IDS[session_id]["timestamp"] = time.time() # 更新会话时间current_user = SESSION_IDS[session_id]role_values = [role.value for role in roles]if roles is not None and current_user["user_info"].get("role") not in role_values:return {"data": None, "status_code": HTTPStatus.FORBIDDEN, "message": "user has no permission"}return func()return innerreturn login_required
为用户授权
现在新增一个为用户授权的路由函数,但这个函数同样应该设置权限,只允许管理员角色调用它,所以一开始需要在用户信息中初始化一个管理员账户,如下:
[{"username": "yuefeiyu","password": "af058879880f293b3b9b4a7072e5d0bf","role": "admin"}
]
为用户授权的大致逻辑如下:
1.通过POST请求传入username
和role
表单参数
2.判断参数是否合法,role
是否属于枚举类型中已定义的角色
3.获取已注册的用户信息
4.判断被授权用户是否已注册
5.修改该用户的角色信息并保存
6.如果授权用户已登陆则修改session中该用户的角色信息
代码如下:
@app.route("/permission_manage", methods=["POST"])
@permission(roles=[Role.ADMIN])
def permission_manage():username = request.form.get("username")role = request.form.get("role")if not username or not role:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}roles = [role.value for role in Role]if role not in roles: # 判断输入的角色名称是否合法return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}if not os.path.exists(ACCOUNTS_FILE): # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)permit_user = Nonefor account in accounts: # 查找被授权用户if account.get("username", "") == username:permit_user = accountbreakif permit_user is None: # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}permit_user["role"] = roleglobal SESSION_IDSfor _, session_info in SESSION_IDS.items(): # 如果授权用户已登陆则修改session中该用户的角色信息if session_info["user_info"].get("username") == username:session_info["user_info"]["role"] = rolewith open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)return {"data": "", "status_code": HTTPStatus.OK, "message": "successfully"}
接口演示如下:
1.登录管理员用户
2.为用户jack授予guest权限
3.jack登录后携带jack的session_id访问cmdb接口
完整代码
import os
import time
import json
from hashlib import md5
from functools import wraps
from enum import Enum
from http import HTTPStatus
from flask import Flask, requestapp = Flask(__name__)ACCOUNTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "accounts.json")SESSION_IDS = {}LOGIN_TIMEOUT = 60 * 60 * 24class Role(Enum):ADMIN = "admin"CMDB = "cmdb"GUEST = "guest"def permission(roles=None):def login_required(func):@wraps(func)def inner():session_id = request.headers.get("session_id", "")global SESSION_IDSif session_id not in SESSION_IDS: # 是否存在会话信心return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username not login"}if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT: # 是否会话仍有效SESSION_IDS.pop(session_id) # 如果失效则移除会话信息return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "username login timeout"}SESSION_IDS[session_id]["timestamp"] = time.time() # 更新会话时间current_user = SESSION_IDS[session_id]role_values = [role.value for role in roles]if roles is not None and current_user["user_info"].get("role") not in role_values:return {"data": None, "status_code": HTTPStatus.FORBIDDEN, "message": "user has no permission"}return func()return innerreturn login_required@app.route("/register", methods=["POST"])
def register():"""注册用户信息"""username = request.form.get("username")password = request.form.get("password")if not username or not password: # 判断用户输入的参数return {"data": None, "status_code": HTTPStatus.BAD_REQUEST, "message": "must have username and password"}if not os.path.exists(ACCOUNTS_FILE): # 判断是否存在指定文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)for account in accounts:if account["username"] == username: # 判断是否用户已存在return {"data": None, "status_code": HTTPStatus.CONFLICT, "message": "username is already exists"}accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})with open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)return {"data": username, "status_code": HTTPStatus.OK, "message": "register username successfully"}@app.route("/login", methods=["POST"])
def login():"""用户登录"""username = request.form.get("username")password = request.form.get("password")if not username or not password:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST, "message": "invalid parameters"}if not os.path.exists(ACCOUNTS_FILE): # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)current_user = Nonefor account in accounts:if account["username"] == username:current_user = accountbreakif current_user is None: # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}if md5(password.encode()).hexdigest() != current_user["password"]: # 是否用户名密码正确return {"data": None, "status_code": HTTPStatus.UNAUTHORIZED, "message": "password is not correct"}global SESSION_IDSfor session_id, session_info in SESSION_IDS.items(): # 判断用户是否已经登陆if session_info["user_info"].get("username") == username: # 如果已经登录则更新时间戳并返回已登陆的sessionIDsession_info["timestamp"] = time.time()return {"data": {"session_id": session_id}, "status_code": HTTPStatus.OK, "message": "login successfully"}session_id = md5((password + str(time.time())).encode()).hexdigest() # 生成会话IDSESSION_IDS[session_id] = {"user_info": current_user, "timestamp": time.time()} # 记录会话信息return {"data": {"session_id": session_id}, "status_code": HTTPStatus.OK, "message": "login successfully"}@app.route("/permission_manage", methods=["POST"])
@permission(roles=[Role.ADMIN])
def permission_manage():username = request.form.get("username")role = request.form.get("role")if not username or not role:return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}roles = [role.value for role in Role]if role not in roles: # 判断输入的角色名称是否合法return {"data": None, "status_code": HTTPStatus.BAD_REQUEST}if not os.path.exists(ACCOUNTS_FILE): # 是否存在用户信息文件return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "not found accounts file"}with open("accounts.json", "r+") as f:accounts = json.load(f)permit_user = Nonefor account in accounts: # 查找被授权用户if account.get("username", "") == username:permit_user = accountbreakif permit_user is None: # 是否用户已注册return {"data": None, "status_code": HTTPStatus.NOT_FOUND, "message": "username is not exists"}permit_user["role"] = rolewith open("accounts.json", "w") as f:json.dump(accounts, f, indent=2)global SESSION_IDSfor _, session_info in SESSION_IDS.items(): # 如果授权用户已登陆则修改session中该用户的角色信息if session_info["user_info"].get("username") == username:session_info["user_info"]["role"] = rolereturn {"data": "", "status_code": HTTPStatus.OK, "message": "successfully"}@app.route("/cmdb", methods=["POST"])
@permission(roles=[Role.CMDB])
def cmdb():passreturn "success"if __name__ == "__main__":app.run(host="127.0.0.1", port=5000, debug=True)
【总结】
当用户范围足够广的时候,角色的定义就会变得复杂,可能会涉及到角色的继承,或者先将多个用户分到同一个用户组,然后再给这个用户组赋予一个角色,等等。除此之外除了对接口进行鉴权,有时候还需要对访问的资源进行鉴权,比如访问的数据,或者操作的设备等。
鉴权如果复杂可以做得很复杂,想要简单同样也可以很简单,我们这一章节其实就是简化鉴权逻辑,让大家能够了解到鉴权的底层原理和实现方式,希望大家可以仔细阅读体会。
欢迎大家添加我的个人公众号【Python玩转自动化运维】加入读者交流群,获取更多干货内容
这篇关于【自动化运维新手村】Flask-权限校验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!