本文主要是介绍python_locust 集合点并发,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
集合点:用以同步一定数量的虚拟用户,以便能够在同一时刻来执行任务。例如,设置1000个虚拟用户集合点,当虚拟用户运行到提交数据的集合点时,检查当前同时有多少用户运行到集合点,如果不到1000 人,已经到集合点的用户在此等待,当在集合点等待的用户达到1000 人时,1000 人同时去提交数据,从而达到测试计划中的需求。
import os
from gevent._semaphore import Semaphore
from locust import HttpUser, task, between, SequentialTaskSet, tag, events
import randomall_locust_spawned = Semaphore()
all_locust_spawned.acquire() # 阻塞线程def on_hatch_complete(**kwargs):# 创建钩子方法all_locust_spawned.release()# 挂在到locust钩子函数
events.spawning_complete.add_listener(on_hatch_complete)class JgTaskCase(SequentialTaskSet):# 初始化方法,相当于 setupdef on_start(self):all_locust_spawned.wait() # 同步锁等待print("***开始压测***")@tag('login')@task(1) # 装饰器# 登录接口def userLogin(self):all_locust_spawned.wait()url = '/userManage/userLogin'data = {"username": "admin","password": "123456"}with self.client.post(url,data=data,catch_response=True) as rsp:if rsp.status_code == 200 and rsp.json()['resCode'] == '00000':rsp.success()else:rsp.failure('login_failure')@tag('pluginLog')@task(1) # 装饰器# 日志上传接口def pluginLog(self):all_locust_spawned.wait()url = '/pluginInfo/pluginLog'data = '/P2024022200001_FrsjbjP5F2otG='with self.client.post(url,data=data,catch_response=True) as rsp:if rsp.status_code == 200 and rsp.json()['resCode'] == '00000':rsp.success()else:rsp.failure('pluginLog_failure')# 结束方法, 相当于teardowndef on_stop(self):print("***压测结束***")# 定义一个运行类 继承HttpUser类, 所以要从locust中引入 HttpUser类
class UserRun(HttpUser):tasks = [JgTaskCase]host = 'http://www.baidu.com'# 设置运行过程中间隔时间 需要从locust中 引入 between# wait_time = between(0.1, 3)# min_wait = 1000# max_wait = 3000if __name__ == '__main__':file_path = os.path.abspath(__file__)os.system(f"locust -f {file_path} --tags login -t 5m")
--tags login:执行标记为login的任务;
-t 5m:执行5分钟压测。
这篇关于python_locust 集合点并发的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!