Python+Pytest+Yaml+Request+Allure框架源代码之(一)common公共方法封装

本文主要是介绍Python+Pytest+Yaml+Request+Allure框架源代码之(一)common公共方法封装,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

common模块:

在这里插入图片描述

  • get_path.py:获取路径方法
# -*- coding: UTF-8 -*-
import os# 项目根目录
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))# 配置文件目录
CONFIG_DIR = os.path.join(BASE_DIR,'config')# 测试用例文件目录
TESTCASES_DIR = os.path.join(BASE_DIR,'testcases')#data文件目录
DATA_DIR = os.path.join(BASE_DIR,'data')#日志文件目录
LOGS_DIR=os.path.join(BASE_DIR,'logs')if __name__ == '__main__':print(LOGS_DIR)
  • logger_util.py:日志封装
import logging
import time
from common.get_path import *
from common.yaml_util import read_fileclass LoggerUitl:def create_log(self,logger_name='log'):# 创建一个日志对象self.logger = logging.getLogger(logger_name)# 设置全局的日志级别(DEBUG<INFO<WARNING<ERROR<CRITICAL)self.logger.setLevel(logging.DEBUG)# 防止日志重复if not self.logger.handlers:#------------文件日志--------------# 获取日志文件的名称self.file_log_path = LOGS_DIR+'/'+ read_file('/config/config.yml','log','log_name') + str(int(time.time()))+".log"# 创建文件日志的控制器self.file_handler = logging.FileHandler(self.file_log_path,encoding='utf-8')# 设置文件日志的级别file_log_level= str(read_file('/config/config.yml','log','log_level')).lower()if file_log_level == 'debug':self.file_handler.setLevel(logging.DEBUG)elif file_log_level == 'info':self.file_handler.setLevel(logging.INFO)elif file_log_level == 'waring':self.file_handler.setLevel(logging.WARNING)elif file_log_level == 'error':self.file_handler.setLevel(logging.ERROR)elif file_log_level == 'critical':self.file_handler.setLevel(logging.CRITICAL)# 设置文件日志的格式self.file_handler.setFormatter(logging.Formatter(read_file('/config/config.yml','log','log_format')))# 将控制器加入到日志对象self.logger.addHandler(self.file_handler)#------------控制台日志--------------# 创建控制台日志的控制器self.console_handler = logging.StreamHandler()# 设置控制台日志的级别console_log_level= read_file('/config/config.yml','log','log_level').lower()if console_log_level == 'debug':self.console_handler.setLevel(logging.DEBUG)elif console_log_level == 'info':self.console_handler.setLevel(logging.INFO)elif console_log_level == 'waring':self.console_handler.setLevel(logging.WARNING)elif console_log_level == 'error':self.console_handler.setLevel(logging.ERROR)elif console_log_level == 'critical':self.console_handler.setLevel(logging.CRITICAL)# 设置控制台日志的格式self.console_handler.setFormatter(logging.Formatter(read_file('/config/config.yml','log','log_format')))# 将控制器加入到日志对象self.logger.addHandler(self.console_handler)return self.logger# 函数:输出正常日志
def my_log(log_massage):LoggerUitl().create_log().info(log_massage)# 函数:输出错误日志
def error_log(log_massage):LoggerUitl().create_log().error(log_massage)raise Exception(log_massage)if __name__ == '__main__':my_log('zhangweixu')

- parameters_until.py:传参方式方法封装

import csv
import json
import traceback
import yaml
from common.get_path import *
from common.logger_util import error_log# 读取csv文件
def read_csv_file(csv_file):'''c参数说明'''csv_list = []path = BASE_DIR+"/"+csv_filewith open(path,encoding='utf-8') as f:csv_data = csv.reader(f)for row in csv_data:csv_list.append(row)return csv_list# 读取yaml文件
def read_file(yml_file):try:path = BASE_DIR+yml_filewith open(path,encoding='utf-8') as f:caseinfo = yaml.load(f,Loader=yaml.FullLoader)if len(caseinfo)>=2:return caseinfoelse:caseinfo_keys = dict(*caseinfo).keys()if 'parameters' in caseinfo_keys:new_caseinfo = analysis_parameters(*caseinfo)return new_caseinfoelse:return caseinfoexcept Exception as f:error_log("读取用例文件报错:异常信息:%s"%str(traceback.format_exc()))# 分析参数化
def analysis_parameters(caseinfo):try:caseinfo_keys = dict(caseinfo).keys()if 'parameters' in caseinfo_keys:for key, value in dict(caseinfo['parameters']).items():caseinfo_str = json.dumps(caseinfo)key_list = str(key).split('-')# 规范csv数据的写法length_flag = Truecsv_list = read_csv_file(value)one_row_data = csv_list[0]for csv_data in csv_list:if len(csv_data) != len(one_row_data):length_flag = Falsebreak# 解析new_caseinfo = []if length_flag:for x in range(1, len(csv_list)):  # x代表行temp_caseinfo = caseinfo_strfor y in range(0, len(csv_list[x])):  # y代表列if csv_list[0][y] in key_list:temp_caseinfo = temp_caseinfo.replace("$csv{" + csv_list[0][y] + "}", csv_list[x][y])new_caseinfo.append(json.loads(temp_caseinfo))return new_caseinfoelse:return caseinfoexcept Exception as f:error_log("分析parameters参数异常:异常信息:%s"%str(traceback.format_exc()))if __name__ == '__main__':print(read_file('/testcases/weixin/get_token.yml'))
  • requests_util.py:请求方式方法封装
# -*- coding: UTF-8 -*-
import json
import re
import traceback
import jsonpath
import requests
# from common.parameters_until import read_file
from common.logger_util import my_log, error_log
from common.yaml_util import *
from debugtalk import DebugTalkclass Requestutil:session = requests.session()def __init__(self):self.base_url =""self.last_headers={}# 规范功能测试YAML测试用例文件的写法def analysis_yaml(self,caseinfo):try:# 1.必须有的四个一级关键字:name,base_url,requests,validatecaseinfo_keys = dict(caseinfo).keys()if 'name' in caseinfo_keys and 'base_url' in caseinfo and 'request' in caseinfo and 'validate' in caseinfo:# 2.request关键字必须包含两个二级关键字:method,urlrequest_keys = dict(caseinfo['request']).keys()if 'method' in request_keys and 'url' in request_keys:# 参数(params,data,json),请求头,文件上传这些都不能约束.name = caseinfo['name']self.base_url = caseinfo['base_url']method = caseinfo['request']['method']del caseinfo['request']['method']url = caseinfo['request']['url']del caseinfo['request']['url']headers = Noneif jsonpath.jsonpath(caseinfo,'$..headers'):headers = caseinfo['request']['headers']del caseinfo['request']['headers']files = Noneif jsonpath.jsonpath(caseinfo, '$..files'):files = caseinfo['request']['files']for key,value in dict(files).items():files[key] = open(value,'rb')del caseinfo['request']['files']# 把method,url,headers,files这四个数据从caseinfo['request']去掉之后再把剩下的传给kwargsres = self.send_request(name=name,method=method,url=url,headers=headers,files=files,**caseinfo['request'])return_text = res.textstatus_code = res.status_codemy_log("响应文本信息:%s"%return_text)my_log("响应json信息:%s"%res.json())# 提取接口关联的变量,既要支持正则表达式,又要支持json提取if 'extract' in caseinfo_keys:for key,value in dict(caseinfo['extract']).items():# 正则表达式提取if '(.*?)' in value or '(.+?)' in value:ze_value = re.search(value,return_text)if ze_value:extract_data = {key:ze_value.group(1)}write_file('/config/extract.yml',extract_data)print(extract_data)else:   # json提取return_json = res.json()  # 前提是要返回json格式extract_data = {key:return_json[value]}write_file('/config/extract.yml', extract_data)print(extract_data)# 断言的封装yq_result = caseinfo['validate']sj_result = res.json()self.validate_result(yq_result, sj_result, status_code)else:error_log('request关键字必须包含两个二级关键字:method,url')else:error_log('必须有的四个一级关键字:name,base_url,request,validate')except Exception as f:error_log("分析YAML文件异常:异常信息:%s" % str(traceback.format_exc()))#  统一替换方法,data可以是url(string),也可以是参数(字典,字典中包含有列表),也可以是请求头(字典).def replace_value(self,data):# 字典类型转换成字符串if data and isinstance(data,dict):  # 如果data不为空并且数据类型为字典str_data = json.dumps(data)else:str_data = data# 替换值for i in range(1, str_data.count('{{') + 1):if "{{" in str_data and "}}" in str_data:start_index = str_data.index("{{")end_index = str_data.index("}}",start_index)old_value = str_data[start_index:end_index + 2]new_value = read_file("/config/extract.yml", old_value[2:-2])str_data = str_data.replace(old_value, new_value)# 还原数据类型if data and isinstance(data,dict):  # 如果data不为空并且数据类型为字典data = json.loads(str_data)else:data = str_datareturn data#  统一替换方法,data可以是url(string),也可以是参数(字典,字典中包含有列表),也可以是请求头(字典).def replace_load(self, data):# 字典类型转换成字符串if data and isinstance(data, dict): # 如果data不为空并且数据类型为字典str_data = json.dumps(data)else:str_data = data# 替换值for i in range(1, str_data.count('${') + 1):if "${" in str_data and "}" in str_data:start_index = str_data.index("${")end_index = str_data.index("}", start_index)old_value = str_data[start_index:end_index + 1]function_name = old_value[2:old_value.index('(')]args_value = old_value[old_value.index('(')+1:old_value.index(')')]# 反射(通过一个函数的字符串直接去调用这个方法)new_value = getattr(DebugTalk(),function_name)(*args_value.split(','))str_data = str_data.replace(old_value, str(new_value))# 还原数据类型if data and isinstance(data, dict):   # 如果data不为空并且数据类型为字典data = json.loads(str_data)else:data = str_datareturn data# 统一发送请求def send_request(self,name,method,url,headers=None,files=None,**kwargs):try:# 处理methodself.last_method = str(method).lower()# 处理基础路径self.url=self.replace_load(self.base_url) + self.replace_value(url)# 处理请求头if headers and isinstance(headers,dict):self.last_headers=self.replace_value(headers)# 最核心的地方:请求数据如何去替换:可能是params,data,jsonfor key,value in kwargs.items():if key in ['params','data','json']:# 替换{{}}格式value = self.replace_value(value)# 替换${}格式value = self.replace_load(value)kwargs[key] = value# 收集日志my_log('-----------------接口请求开始-----------------')my_log("接口名称:%s"%name)my_log("请求方式:%s"%self.last_method)my_log("请求路径:%s"%self.url)my_log("请求头:%s"%self.last_headers)if 'params' in kwargs.keys():my_log("请求参数:%s"%kwargs['params'])elif 'data' in kwargs.keys():my_log("请求参数:%s"%kwargs['data'])elif 'json' in kwargs.keys():my_log("请求参数:%s"%kwargs['json'])my_log("文件上传:%s"%files)# 发送请求res = Requestutil.session.request(method=self.last_method,url=self.url,headers=self.last_headers,**kwargs)# print(res.request.headers)# print(res.text)# print(res.json())return resexcept Exception as f:error_log("发送请求异常:异常信息:%s"%str(traceback.format_exc()))# 断言封装def validate_result(self,yq_result,sj_result,status_code):try:''':param yq_result:预期结果:param sj_result:实际结果:param status_code:实际状态码:return:'''# 收集日志my_log("预期结果:%s"%yq_result)my_log("实际结果:%s"%sj_result)#判断是否断言成功,0成功,1失败flag = 0# 解析ƒif yq_result and isinstance(yq_result,list):for yq in yq_result:for key,value in dict(yq).items():# 判断断言方式if key=='equals':for assert_key,assert_value in dict(value).items():if assert_key=='status_code':if status_code!=assert_value:flag=flag+1error_log("断言失败:"+assert_key+"不等于"+str(assert_value)+"")else:key_list = jsonpath.jsonpath(sj_result,'$..%s'%assert_key)if key_list:if assert_value not in key_list:flag = flag + 1error_log("断言失败:"+assert_key+"不等于"+str(assert_value)+"")else:flag = flag + 1error_log("断言失败:返回结果中不存在"+assert_key+"")elif key=='contains':if value not in json.dumps(sj_result):flag = flag + 1error_log("断言失败:返回结果中不包含字符串"+value+"")else:error_log('框架不支持此断言方式')assert flag==0my_log('接口请求成功')my_log('-----------------接口请求结束-----------------\n')except Exception as f:my_log('接口请求失败')my_log('-----------------接口请求结束-----------------\n')error_log("断言异常:异常信息:%s" % str(traceback.format_exc()))if __name__ == '__main__':# url = "/cgi-bin/tags/update?access_token={{access_token}}&a=c{{csrf_token}}"# for i in range(1,url.count("{{")+1):#     if "{{" in url and "}}" in url:#         start_index = url.index("{{")#         end_index = url.index("}}")#         old_value = url[start_index:end_index+2]#         new_value = read_file('/config/extract.yml',old_value[2:-2])#         url = url.replace(old_value,new_value)##         print(old_value,new_value)#         print(url)# dict_data = {'name': '获取access_token统一鉴权码', 'base_url': 'https://api.weixin.qq.com', 'request': {'method': 'GET', 'url': '/cgi-bin/token', 'params': {'grant_type': 'client_credential', 'appid': 'wx9b755d429f6fb216', 'secret': 'b963db0b97c8487b0cb920a240bd78e3'}}, 'validate': [{'eq': ['status_code', 200]}]}# # print(dict_data.pop('name'))# del dict_data['name']# print(dict_data)json_data = {"tag": {"id": 100, "name": "CesareCheung${get_random_number(100000,999999)}" }}result = Requestutil('base', 'base_weixin_url').replace_load(json_data)print(result)
  • yaml_util.py:文件读取写入方法
# -*- coding: UTF-8 -*-
import os
import yaml
from common.get_path import *# 读取yml文件
def read_file(yml_file,one_node=None,two_node=None):path = BASE_DIR+yml_filewith open(path,encoding='utf-8') as f:value = yaml.load(f,Loader=yaml.FullLoader)if one_node and two_node:return value[one_node][two_node]elif one_node:return value[one_node]else:return value# 写入yml文件
def write_file(yml_file,data):path = BASE_DIR+yml_filewith open(path,encoding='utf-8',mode='a') as f:yaml.dump(data, stream=f,allow_unicode=True)# 清空yml文件
def clean_file(yml_file):path = BASE_DIR+yml_filewith open(path,encoding='utf-8',mode='w') as f:f.truncate()if __name__ == '__main__':# print(read_file('/config.yml',"base",'base_info_url'))print(read_file('/config/config.yml','log','log_name'))

这篇关于Python+Pytest+Yaml+Request+Allure框架源代码之(一)common公共方法封装的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1078357

相关文章

Linux换行符的使用方法详解

《Linux换行符的使用方法详解》本文介绍了Linux中常用的换行符LF及其在文件中的表示,展示了如何使用sed命令替换换行符,并列举了与换行符处理相关的Linux命令,通过代码讲解的非常详细,需要的... 目录简介检测文件中的换行符使用 cat -A 查看换行符使用 od -c 检查字符换行符格式转换将

SpringBoot实现数据库读写分离的3种方法小结

《SpringBoot实现数据库读写分离的3种方法小结》为了提高系统的读写性能和可用性,读写分离是一种经典的数据库架构模式,在SpringBoot应用中,有多种方式可以实现数据库读写分离,本文将介绍三... 目录一、数据库读写分离概述二、方案一:基于AbstractRoutingDataSource实现动态

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

Python Websockets库的使用指南

《PythonWebsockets库的使用指南》pythonwebsockets库是一个用于创建WebSocket服务器和客户端的Python库,它提供了一种简单的方式来实现实时通信,支持异步和同步... 目录一、WebSocket 简介二、python 的 websockets 库安装三、完整代码示例1.

揭秘Python Socket网络编程的7种硬核用法

《揭秘PythonSocket网络编程的7种硬核用法》Socket不仅能做聊天室,还能干一大堆硬核操作,这篇文章就带大家看看Python网络编程的7种超实用玩法,感兴趣的小伙伴可以跟随小编一起... 目录1.端口扫描器:探测开放端口2.简易 HTTP 服务器:10 秒搭个网页3.局域网游戏:多人联机对战4.

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

Java中的String.valueOf()和toString()方法区别小结

《Java中的String.valueOf()和toString()方法区别小结》字符串操作是开发者日常编程任务中不可或缺的一部分,转换为字符串是一种常见需求,其中最常见的就是String.value... 目录String.valueOf()方法方法定义方法实现使用示例使用场景toString()方法方法

Java中List的contains()方法的使用小结

《Java中List的contains()方法的使用小结》List的contains()方法用于检查列表中是否包含指定的元素,借助equals()方法进行判断,下面就来介绍Java中List的c... 目录详细展开1. 方法签名2. 工作原理3. 使用示例4. 注意事项总结结论:List 的 contain

Python使用自带的base64库进行base64编码和解码

《Python使用自带的base64库进行base64编码和解码》在Python中,处理数据的编码和解码是数据传输和存储中非常普遍的需求,其中,Base64是一种常用的编码方案,本文我将详细介绍如何使... 目录引言使用python的base64库进行编码和解码编码函数解码函数Base64编码的应用场景注意

Python基于wxPython和FFmpeg开发一个视频标签工具

《Python基于wxPython和FFmpeg开发一个视频标签工具》在当今数字媒体时代,视频内容的管理和标记变得越来越重要,无论是研究人员需要对实验视频进行时间点标记,还是个人用户希望对家庭视频进行... 目录引言1. 应用概述2. 技术栈分析2.1 核心库和模块2.2 wxpython作为GUI选择的优