更新交互-队列监听-数据并发-程序执行

2024-03-19 18:36

本文主要是介绍更新交互-队列监听-数据并发-程序执行,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

需求(每次数据库算法配置数据更改程序重新执行)

1 . 因为是数据库交互,所以采用队列形式来生产消费

 2 . 每次接口数据发生变化执行入队操作

3 . 使用进程池进行算法配置数据并行操作

4 . 算法程序进行消费执行 (代码案例仅供参考)


需求(每次数据库算法配置数据更改程序重新执行)

1 . 因为是数据库交互,所以采用队列形式来生产消费

# redis队列
class TestQueue:def __init__(self):self.r = redis.Redis(host='192.168.14.93', port=6379, decode_responses=True, db=1)self.key = "queue"# 入队def push(self, item,item1):# 在尾部加入数据self.r.rpush(self.key, item,item1)return '队列数据添加成功'# 出队def pop(self):# 阻塞地从头部移除数据result = self.r.blpop(self.key, timeout=0)if result:return result[1]  # 返回数据else:return None  # 如果超时返回 None# 返回队列数据列表def get_all(self):return self.r.lrange(self.key, 0, -1)

 2 . 每次接口数据发生变化执行入队操作

# redis队列 当算法配置修改完成时,数据写入队列,redis监听到数据重新执行
queue_redis = TestQueue()
queue_redis.push('1','1')

3 . 使用进程池进行算法配置数据并行操作

# 创建进程池
pool = multiprocessing.Pool()data_results = get_data()  # 每次 pop 的时候更新数据
print(data_results)# 使用进程池并行执行 process_data 函数,每个进程处理更新后的数据
pool.map(process_data, data_results)# 关闭进程池
pool.close()# 等待所有进程结束
pool.join()

4 . 算法程序进行消费执行 (代码案例仅供参考)根据消费场景加锁进行限制

# -*- coding: gbk -*-
import cv2
import numpy as np
import os
import random
import pymysql
import copy
import ctypes
import time
import paramiko
import redisfrom ctypes import *
from dbutils.pooled_db import PooledDB# 数据库连接池
pool = PooledDB(pymysql, maxconnections=10, host='192.168.14.93', user='root', password='abc123',database='seal_system',charset='utf8')# redis队列
class TestQueue:def __init__(self):self.r = redis.Redis(host='192.168.14.93', port=6379, decode_responses=True, db=1)self.key = "queue"# 入队def push(self, item,item1):# 在尾部加入数据self.r.rpush(self.key, item,item1)return '队列数据添加成功'# 出队def pop(self):# 阻塞地从头部移除数据result = self.r.blpop(self.key, timeout=0)if result:return result[1]  # 返回数据else:return None  # 如果超时返回 None# 返回队列数据列表def get_all(self):return self.r.lrange(self.key, 0, -1)# 服务器k类
class SSH_Func(object):def __init__(self):# 寒武纪盒子self.Folder_SSH = '/var/car_image/model_algorithm_package/'  # 华为云上传文件默认地址self.ssh_host = '114.116.48.77'self.ssh_user = 'root'self.ssh_password = 'hmwp1!YZHHab'self.ssh = paramiko.SSHClient()# ssh连接盒子服务器def connect_ssh(self):# SSH连接信息# 标识符connected = False# 最大连接次数attempts = 0while connected == False and attempts < 180:try:self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())self.ssh.connect(self.ssh_host, username=self.ssh_user, password=self.ssh_password)print("SSH连接成功")connected = Trueexcept Exception as e:print("SSH连接失败:", e)print("正在进行连接重试...")attempts += 1print(attempts, '次数')time.sleep(1)if connected == False:print('连接超过最大次数限制,无法连接到SSH服务器')return Nonereturn self.ssh# 华为云文件夹创建相关逻辑def Add_Folder(self):ssh = self.connect_ssh()if ssh is None:returnsftp = self.ssh.open_sftp()# 判断文件夹是否存在,如果不存在进行创建remote_folder_1st_level = os.path.dirname(self.Folder_SSH)try:sftp.mkdir(remote_folder_1st_level)print('成功创建算法模型目录:', remote_folder_1st_level)except Exception as e:print('算法模型目录已经存在,无法创建目录:', remote_folder_1st_level, e)sftp.close()ssh.close()# 上传文件方法def upload_send_file(self, img0, Conf_id, ii):ssh = self.connect_ssh()sftp = ssh.open_sftp()try:output_dir = self.Folder_SSH + 'output/{}/'.format(Conf_id)# 新增两个目录main_output_dir = self.Folder_SSH + 'output/'# 如果云服务不存在这个路径。那么进行创建try:sftp.chdir(main_output_dir)except IOError:sftp.mkdir(main_output_dir)try:sftp.chdir(output_dir)except IOError:sftp.mkdir(output_dir)finally:sftp.chdir(output_dir)# 路径赋值filename = '{:04d}.jpg'.format(ii)# 将图像数据直接写入到远程服务器with sftp.file(filename, 'wb') as f:# 使用OpenCV将图像数据写入文件对象_, img_data = cv2.imencode('.jpg', img0)f.write(img_data.tobytes())return output_dir + filenameexcept Exception as e:print('文件上传失败:', e)finally:sftp.close()ssh.close()# 获取视频为录像机子集的返回格式  参数为  父级id  子集通道code
def get_children_rtsp(id, code):# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()try:sql = "SELECT equipment_uname, equipment_password, equipment_ip FROM t_equipment WHERE id = %s"cursor.execute(sql, (id,))parent_data = cursor.fetchone()if parent_data:user = parent_data[0]password = parent_data[1]ip = parent_data[2]result = 'rtsp://{}:{}@{}:554/Streaming/Unicast/Channels/{}'.format(user, password, ip, code)else:result = Nonefinally:conn.close()return result# 获取所需数据
def get_data():# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()sql = ("SELECT c.id AS Mine_id, c.mine_name, d.id AS Equipment_id, ""d.equipment_name, d.equipment_type, d.equipment_ip, d.equipment_uname, d.equipment_password, d.code, d.parent_id, ""a.id AS Conf_id, a.Algorithm_library_id, a.conf_area, e.id AS test_type_id, e.test_type_ename, ""b.algorithm_status, b.algorithm_path, b.algorithm_name ""FROM `t_algorithm_config` AS a ""JOIN `t_algorithm_library` AS b ON a.Algorithm_library_id = b.id ""JOIN `t_mine` AS c ON a.Mine_id = c.id ""JOIN `t_equipment` AS d ON a.Equipment_id = d.id ""JOIN `t_algorithm_test_type` AS e ON a.Algorithm_test_type_id = e.id ""WHERE b.algorithm_status = 1;")cursor.execute(sql)# 获取查询结果集的列名columns = [i[0] for i in cursor.description]results = cursor.fetchall()# 构建字典列表result_dict_list = [dict(zip(columns, i)) for i in results]# 在列表推导式中使用了浅拷贝 copy.deepcopy() 来复制每个字典,更新 'rtsp_url' 字段。# 这样做可以保留原始数据的所有字段,同时只对 'rtsp_url' 进行修改。result_dict_list = [{**copy.deepcopy(i),'conf_area': list(np.ravel(eval(i['conf_area']))),'rtsp_url': 'rtsp://{}:{}@{}'.format(i['equipment_uname'], i['equipment_password'], i['equipment_ip'])}if i['equipment_type'] == '摄像头'else {**copy.deepcopy(i),'rtsp_url': get_children_rtsp(i['parent_id'], i['code']),'conf_area': list(np.ravel(eval(i['conf_area']))),}for i in result_dict_list]result_dict_list = [{**copy.deepcopy(i),'max_x': max(d['x'] for d in i['conf_area']),'max_y': max(d['y'] for d in i['conf_area']),'min_x': min(d['x'] for d in i['conf_area']),'min_y': min(d['y'] for d in i['conf_area'])} for i in result_dict_list]return result_dict_listnames = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', \'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', \'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', \'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', \'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', \'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', \'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', \'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'dining table', \'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', \'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier','toothbrush']def clip_coords(boxes, shape):# Clip bounding xyxy bounding boxes to image shape (height, width)boxes[:, [0, 2]] = boxes[:, [0, 2]].clip(0, shape[1])  # x1, x2boxes[:, [1, 3]] = boxes[:, [1, 3]].clip(0, shape[0])  # y1, y2def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None):# Rescale coords (xyxy) from img1_shape to img0_shapeif ratio_pad is None:  # calculate from img0_shapegain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1])  # gain  = old / newpad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2  # wh paddingelse:gain = ratio_pad[0][0]pad = ratio_pad[1]coords[:, [0, 2]] -= pad[0]  # x paddingcoords[:, [1, 3]] -= pad[1]  # y paddingcoords[:, :4] /= gainclip_coords(coords, img0_shape)return coordsdef letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):# Resize and pad image while meeting stride-multiple constraintsshape = im.shape[:2]  # current shape [height, width]if isinstance(new_shape, int):new_shape = (new_shape, new_shape)# Scale ratio (new / old)r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])if not scaleup:  # only scale down, do not scale up (for better val mAP)r = min(r, 1.0)# Compute paddingratio = r, r  # width, height ratiosnew_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]  # wh paddingif auto:  # minimum rectangledw, dh = np.mod(dw, stride), np.mod(dh, stride)  # wh paddingelif scaleFill:  # stretchdw, dh = 0.0, 0.0new_unpad = (new_shape[1], new_shape[0])ratio = new_shape[1] / shape[1], new_shape[0] / shape[0]  # width, height ratiosdw /= 2  # divide padding into 2 sidesdh /= 2if shape[::-1] != new_unpad:  # resizeim = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))left, right = int(round(dw - 0.1)), int(round(dw + 0.1))im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)  # add borderreturn im, ratio, (dw, dh)def plot_one_box(x, img, color=None, label=None, line_thickness=None):# Plots one bounding box on image imgtl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1  # line/font thicknesscolor = color or [random.randint(0, 255) for _ in range(3)]c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)if label:tf = max(tl - 1, 1)  # font thicknesst_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA)  # filledcv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)def get_boxes(prediction, batch_size=1, img_size=608):"""Returns detections with shape:(x1, y1, x2, y2, object_conf, class_score, class_pred)"""reshape_value = np.reshape(prediction, (-1, 1))num_boxes_final = reshape_value[0]print('num_boxes_final: ', num_boxes_final)all_list = [[] for _ in range(batch_size)]for i in range(int(num_boxes_final)):batch_idx = int(reshape_value[64 + i * 7 + 0])if batch_idx >= 0 and batch_idx < batch_size:bl = reshape_value[64 + i * 7 + 3]br = reshape_value[64 + i * 7 + 4]bt = reshape_value[64 + i * 7 + 5]bb = reshape_value[64 + i * 7 + 6]if bt - bl > 0 and bb - br > 0:all_list[batch_idx].append(bl)all_list[batch_idx].append(br)all_list[batch_idx].append(bt)all_list[batch_idx].append(bb)all_list[batch_idx].append(reshape_value[64 + i * 7 + 2])# all_list[batch_idx].append(reshape_value[64 + i * 7 + 2])all_list[batch_idx].append(reshape_value[64 + i * 7 + 1])output = [np.array(all_list[i]).reshape(-1, 6) for i in range(batch_size)]return outputdef detect(img0, ip, model, ii, img_size, stride, api_lib, handle, save_img, colors, max_x, max_y, min_x, min_y,Conf_id, test_type_ename):print('ip:%s, ii:%s, model:%s' % (ip, ii, model))img = letterbox(img0, new_shape=img_size, stride=stride, auto=False)[0]image = cv2.cvtColor(img, cv2.COLOR_BGR2RGBA)image = image[np.newaxis, :].astype(np.uint8)data = np.asarray(image, dtype=np.uint8)input = data.ctypes.data_as(ctypes.c_void_p)# 3.inferenceoutput = api_lib.cnpyInference(handle, input)pred0 = output.contents[0:7232]pred0 = np.array(pred0).reshape(1, 7232)pred = get_boxes(pred0)print(pred)for i, det in enumerate(pred):if det is not None and len(det):det[:, :4] = scale_coords(img.shape[:2], det[:, :4], img0.shape).round()for *xyxy, conf, cls in det:cls = int(cls)  # 标签索引,对应 name_list = ['opencar_model', 'opencar_num', 'flatcar_model', 'flatcar_num', 'container_model', 'container_num']xmin = int(xyxy[0])  # 横坐标最小值ymin = int(xyxy[1])  # 纵坐标最小值xmax = int(xyxy[2])  # 横坐标最大值ymax = int(xyxy[3])  # 纵坐标最小值if min_x < xmin < max_x and min_y < ymin < max_y and min_x < xmax < max_x and min_y < ymax < max_y:# if xmax < max_x and xmin > min_x  and ymax < max_y and ymin > min_y:# if names[int(cls)] == 'person':if save_img:label = '%s %.2f' % (names[int(cls)], conf)plot_one_box(xyxy, img0, label=label, color=colors[int(cls)], line_thickness=2)print(names[int(cls)], '检测类型')if test_type_ename == names[int(cls)]:# 上传服务器类ssh = SSH_Func()ssh.Add_Folder()ssh.upload_send_file(img0, Conf_id, ii)# dirs = './output/%s/' % (Conf_id)## if not os.path.exists(dirs):  # 如果不存在路径,则创建这个路径#     os.makedirs(dirs)## # 路径赋值# dir_path = dirs + '/%04d.jpg' % ii## cv2.imwrite(dir_path, img0)# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()# 构建 SQL 语句和要插入的值sql = "INSERT INTO `t_algorithm_result` (`Algorithm_config_id`, `res_type`, `res_image`) VALUES (%s, %s, %s)"values = (Conf_id, 1, 'http://114.116.48.77:9001' + ssh.upload_send_file(img0, Conf_id, ii))# 执行 SQL 语句并提交事务cursor.execute(sql, values)conn.commit()else:print('检测类型不符,数据不予添加')print('---------------------------------------------------------------------------------------------------')returnimport multiprocessing
import timedef process_data(data):# 配置算法数据# 是否保存图片  yolov5save_img = True# 框与标注颜色colors = [[random.randint(0, 255) for _ in range(3)] for _ in range(len(names))]# 1.initll = ctypes.cdll.LoadLibraryapi_lib = ll("./lib/libcnrtapi.so")# 2.data preparationstride = 32img_size = 640offlinemodel_yolov5s = data['algorithm_path'].encode()handle_yolov5s = api_lib.cnpyInit(offlinemodel_yolov5s)api_lib.cnpyInference.restype = POINTER(POINTER(c_float))cap = cv2.VideoCapture(data['rtsp_url'])ii = 0# 创建队列对象queue = TestQueue()if cap.isOpened():while True:ret, frame = cap.read()if ii % 30 == 0 and queue.get_all() == []:  # 抽帧if ret:ip = 34detect(frame, ip, 'model_yolov5s', ii, img_size, stride, api_lib, handle_yolov5s, save_img,colors, data['max_x'], data['max_y'], data['min_x'], data['min_y'], data['Conf_id'],data['test_type_ename'])else:cap = cv2.VideoCapture(data['rtsp_url'])if ii % 30 == 0 and queue.get_all() != []:queue.pop()breakii += 1def main():while True:# 创建进程池pool = multiprocessing.Pool()data_results = get_data()  # 每次 pop 的时候更新数据print(data_results)# 使用进程池并行执行 process_data 函数,每个进程处理更新后的数据pool.map(process_data, data_results)# 关闭进程池pool.close()# 等待所有进程结束pool.join()if __name__ == "__main__":main()

这篇关于更新交互-队列监听-数据并发-程序执行的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/826913

相关文章

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate

Java实现Elasticsearch查询当前索引全部数据的完整代码

《Java实现Elasticsearch查询当前索引全部数据的完整代码》:本文主要介绍如何在Java中实现查询Elasticsearch索引中指定条件下的全部数据,通过设置滚动查询参数(scrol... 目录需求背景通常情况Java 实现查询 Elasticsearch 全部数据写在最后需求背景通常情况下

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每