本文主要是介绍AirTest源码分析之运行器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
源码位置:airtest/cli/runner.py
使用:根据airtest文档说明,可以通过命令行来启动air脚本,需要传入一些参数如设备号,脚本名等,这样就可以不用通过AirTest IDE来运行了,可以集成,所以我们也可以写个脚本来控制air脚本的运行。
文档链接:https://airtest.readthedocs.io/en/latest/README_MORE.html#running-air-from-cli
这就是说你用airtest run 命令,可以指定运行某air脚本,指定设备,指定log输出地址
翻一下源码,找到runner.py,可以看得出这是一个unittest的子类AirtestCase,入口是run_script接口
1、程序的入口——run_script,传入参数parsed_args,进来以后传给全局变量args,给airtestcase里调用。
后面的三行代码,用过unittest的朋友应该都很熟悉了
def run_script(parsed_args, testcase_cls=AirtestCase):global args # make it global deliberately to be used in AirtestCase & test scriptsargs = parsed_argssuite = unittest.TestSuite()#创建一个测试套件suite.addTest(testcase_cls())#添加一条AirtestCase类型的case,因为接口入参默认testcase_cls=AirtestCaseresult = unittest.TextTestRunner(verbosity=0).run(suite)#运行它if not result.wasSuccessful():sys.exit(-1)#退出
2、AirtestCase类
这里定义好了setUpClass、setUp、runTest、tearDown、tearDownClass
分别做了什么呢,一个个看一下:
@classmethod
def setUpClass(cls):cls.args = args #runScrip传进来的参数setup_by_args(args) #设置参数,设备、log路径、脚本路径# setup script exec scopecls.scope = copy(globals())cls.scope["exec_script"] = cls.exec_other_script
def setUp(self):if self.args.log and self.args.recording: #如果参数配置了log路径且recording为Turefor dev in G.DEVICE_LIST:try:dev.start_recording() #开始录制except:traceback.print_exc()
def tearDown(self):#停止录制if self.args.log and self.args.recording:for k, dev in enumerate(G.DEVICE_LIST):try:output = os.path.join(self.args.log, "recording_%d.mp4" % k)dev.stop_recording(output)except:traceback.print_exc()
def runTest(self):#运行脚本scriptpath = self.args.script #参数传入的脚本路径#分割路径最后的名字,替换.air为.py,也就是传入‘d:/aaa/bbb.air’,pyfilename就为bbb.pypyfilename = os.path.basename(scriptpath).replace(self.SCRIPTEXT, ".py")#再组装py文件的路径,d:/aaa/bbb.air/bbb.py,看过air脚本文件就知道,这才是脚本代码,其他是图片pyfilepath = os.path.join(scriptpath, pyfilename)pyfilepath = os.path.abspath(pyfilepath)self.scope["__file__"] = pyfilepath#读进来with open(pyfilepath, 'r', encoding="utf8") as f:code = f.read()pyfilepath = pyfilepath.encode(sys.getfilesystemencoding())#运行读进来的脚本try:exec(compile(code.encode("utf-8"), pyfilepath, 'exec'), self.scope)except Exception as err:#出错处理,日志tb = traceback.format_exc()log("Final Error", tb)six.reraise(*sys.exc_info())
def exec_other_script(cls, scriptpath):#这个接口不分析了,因为已经用using代替了。
#这个接口就是在你的air脚本中如果用了exec_script就会调用这里,它会把子脚本的图片文件拷贝过来,并读取py文件运行
#参数设置
def setup_by_args(args):# init devicesif isinstance(args.device, list):#如果传入的设备参数是一个列表,所以命令行可以设置多个设备哦devices = args.deviceelif args.device:devices = [args.device]#不是列表就给转成列表else:devices = []print("do not connect device")# set base dir to find tplargs.script = decode_path(args.script)#脚本路径# set log dir日志路径if args.log is True:print("save log in %s/log" % args.script)args.log = os.path.join(args.script, "log")elif args.log:print("save log in '%s'" % args.log)args.log = decode_path(args.log)else:print("do not save log")# guess project_root to be basedir of current .air path把air脚本的路径设置为工程根目录project_root = os.path.dirname(args.script) if not ST.PROJECT_ROOT else Noneauto_setup(args.script, devices, args.log, project_root)#这个接口很熟悉吧,在IDE里新建一个air脚本就会自动生成这句,里面就设备的初始化连接,设置工程路径,日志路径。
所以,上层的air脚本不用什么测试框架,是这个airtestCase在支撑,这种设计思路确实降低了脚本的上手门槛,跟那些用excel表格、自然语言脚本的框架有点像。
源码贴出来方便看
# -*- coding: utf-8 -*-import unittest
import os
import sys
import six
import re
import shutil
import traceback
import warnings
from io import open
from airtest.core.api import G, auto_setup, log
from airtest.core.settings import Settings as ST
from airtest.utils.compat import decode_path
from copy import copyclass AirtestCase(unittest.TestCase):PROJECT_ROOT = "."SCRIPTEXT = ".air"TPLEXT = ".png"@classmethoddef setUpClass(cls):cls.args = argssetup_by_args(args)# setup script exec scopecls.scope = copy(globals())cls.scope["exec_script"] = cls.exec_other_scriptdef setUp(self):if self.args.log and self.args.recording:for dev in G.DEVICE_LIST:try:dev.start_recording()except:traceback.print_exc()def tearDown(self):if self.args.log and self.args.recording:for k, dev in enumerate(G.DEVICE_LIST):try:output = os.path.join(self.args.log, "recording_%d.mp4" % k)dev.stop_recording(output)except:traceback.print_exc()def runTest(self):scriptpath = self.args.scriptpyfilename = os.path.basename(scriptpath).replace(self.SCRIPTEXT, ".py")pyfilepath = os.path.join(scriptpath, pyfilename)pyfilepath = os.path.abspath(pyfilepath)self.scope["__file__"] = pyfilepathwith open(pyfilepath, 'r', encoding="utf8") as f:code = f.read()pyfilepath = pyfilepath.encode(sys.getfilesystemencoding())try:exec(compile(code.encode("utf-8"), pyfilepath, 'exec'), self.scope)except Exception as err:tb = traceback.format_exc()log("Final Error", tb)six.reraise(*sys.exc_info())@classmethoddef exec_other_script(cls, scriptpath):"""run other script in test script"""warnings.simplefilter("always")warnings.warn("please use using() api instead.", PendingDeprecationWarning)def _sub_dir_name(scriptname):dirname = os.path.splitdrive(os.path.normpath(scriptname))[-1]dirname = dirname.strip(os.path.sep).replace(os.path.sep, "_").replace(cls.SCRIPTEXT, "_sub")return dirnamedef _copy_script(src, dst):if os.path.isdir(dst):shutil.rmtree(dst, ignore_errors=True)os.mkdir(dst)for f in os.listdir(src):srcfile = os.path.join(src, f)if not (os.path.isfile(srcfile) and f.endswith(cls.TPLEXT)):continuedstfile = os.path.join(dst, f)shutil.copy(srcfile, dstfile)# find script in PROJECT_ROOTscriptpath = os.path.join(ST.PROJECT_ROOT, scriptpath)# copy submodule's images into sub_dirsub_dir = _sub_dir_name(scriptpath)sub_dirpath = os.path.join(cls.args.script, sub_dir)_copy_script(scriptpath, sub_dirpath)# read codepyfilename = os.path.basename(scriptpath).replace(cls.SCRIPTEXT, ".py")pyfilepath = os.path.join(scriptpath, pyfilename)pyfilepath = os.path.abspath(pyfilepath)with open(pyfilepath, 'r', encoding='utf8') as f:code = f.read()# replace tpl filepath with filepath in sub_dircode = re.sub("[\'\"](\w+.png)[\'\"]", "\"%s/\g<1>\"" % sub_dir, code)exec(compile(code.encode("utf8"), pyfilepath, 'exec'), cls.scope)def setup_by_args(args):# init devicesif isinstance(args.device, list):devices = args.deviceelif args.device:devices = [args.device]else:devices = []print("do not connect device")# set base dir to find tplargs.script = decode_path(args.script)# set log dirif args.log is True:print("save log in %s/log" % args.script)args.log = os.path.join(args.script, "log")elif args.log:print("save log in '%s'" % args.log)args.log = decode_path(args.log)else:print("do not save log")# guess project_root to be basedir of current .air pathproject_root = os.path.dirname(args.script) if not ST.PROJECT_ROOT else Noneauto_setup(args.script, devices, args.log, project_root)def run_script(parsed_args, testcase_cls=AirtestCase):global args # make it global deliberately to be used in AirtestCase & test scriptsargs = parsed_argssuite = unittest.TestSuite()suite.addTest(testcase_cls())result = unittest.TextTestRunner(verbosity=0).run(suite)if not result.wasSuccessful():sys.exit(-1)
这篇关于AirTest源码分析之运行器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!