更新交互-队列监听-数据并发-程序执行

2024-03-19 18:36

本文主要是介绍更新交互-队列监听-数据并发-程序执行,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

需求(每次数据库算法配置数据更改程序重新执行)

1 . 因为是数据库交互,所以采用队列形式来生产消费

 2 . 每次接口数据发生变化执行入队操作

3 . 使用进程池进行算法配置数据并行操作

4 . 算法程序进行消费执行 (代码案例仅供参考)


需求(每次数据库算法配置数据更改程序重新执行)

1 . 因为是数据库交互,所以采用队列形式来生产消费

# redis队列
class TestQueue:def __init__(self):self.r = redis.Redis(host='192.168.14.93', port=6379, decode_responses=True, db=1)self.key = "queue"# 入队def push(self, item,item1):# 在尾部加入数据self.r.rpush(self.key, item,item1)return '队列数据添加成功'# 出队def pop(self):# 阻塞地从头部移除数据result = self.r.blpop(self.key, timeout=0)if result:return result[1]  # 返回数据else:return None  # 如果超时返回 None# 返回队列数据列表def get_all(self):return self.r.lrange(self.key, 0, -1)

 2 . 每次接口数据发生变化执行入队操作

# redis队列 当算法配置修改完成时,数据写入队列,redis监听到数据重新执行
queue_redis = TestQueue()
queue_redis.push('1','1')

3 . 使用进程池进行算法配置数据并行操作

# 创建进程池
pool = multiprocessing.Pool()data_results = get_data()  # 每次 pop 的时候更新数据
print(data_results)# 使用进程池并行执行 process_data 函数,每个进程处理更新后的数据
pool.map(process_data, data_results)# 关闭进程池
pool.close()# 等待所有进程结束
pool.join()

4 . 算法程序进行消费执行 (代码案例仅供参考)根据消费场景加锁进行限制

# -*- coding: gbk -*-
import cv2
import numpy as np
import os
import random
import pymysql
import copy
import ctypes
import time
import paramiko
import redisfrom ctypes import *
from dbutils.pooled_db import PooledDB# 数据库连接池
pool = PooledDB(pymysql, maxconnections=10, host='192.168.14.93', user='root', password='abc123',database='seal_system',charset='utf8')# redis队列
class TestQueue:def __init__(self):self.r = redis.Redis(host='192.168.14.93', port=6379, decode_responses=True, db=1)self.key = "queue"# 入队def push(self, item,item1):# 在尾部加入数据self.r.rpush(self.key, item,item1)return '队列数据添加成功'# 出队def pop(self):# 阻塞地从头部移除数据result = self.r.blpop(self.key, timeout=0)if result:return result[1]  # 返回数据else:return None  # 如果超时返回 None# 返回队列数据列表def get_all(self):return self.r.lrange(self.key, 0, -1)# 服务器k类
class SSH_Func(object):def __init__(self):# 寒武纪盒子self.Folder_SSH = '/var/car_image/model_algorithm_package/'  # 华为云上传文件默认地址self.ssh_host = '114.116.48.77'self.ssh_user = 'root'self.ssh_password = 'hmwp1!YZHHab'self.ssh = paramiko.SSHClient()# ssh连接盒子服务器def connect_ssh(self):# SSH连接信息# 标识符connected = False# 最大连接次数attempts = 0while connected == False and attempts < 180:try:self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())self.ssh.connect(self.ssh_host, username=self.ssh_user, password=self.ssh_password)print("SSH连接成功")connected = Trueexcept Exception as e:print("SSH连接失败:", e)print("正在进行连接重试...")attempts += 1print(attempts, '次数')time.sleep(1)if connected == False:print('连接超过最大次数限制,无法连接到SSH服务器')return Nonereturn self.ssh# 华为云文件夹创建相关逻辑def Add_Folder(self):ssh = self.connect_ssh()if ssh is None:returnsftp = self.ssh.open_sftp()# 判断文件夹是否存在,如果不存在进行创建remote_folder_1st_level = os.path.dirname(self.Folder_SSH)try:sftp.mkdir(remote_folder_1st_level)print('成功创建算法模型目录:', remote_folder_1st_level)except Exception as e:print('算法模型目录已经存在,无法创建目录:', remote_folder_1st_level, e)sftp.close()ssh.close()# 上传文件方法def upload_send_file(self, img0, Conf_id, ii):ssh = self.connect_ssh()sftp = ssh.open_sftp()try:output_dir = self.Folder_SSH + 'output/{}/'.format(Conf_id)# 新增两个目录main_output_dir = self.Folder_SSH + 'output/'# 如果云服务不存在这个路径。那么进行创建try:sftp.chdir(main_output_dir)except IOError:sftp.mkdir(main_output_dir)try:sftp.chdir(output_dir)except IOError:sftp.mkdir(output_dir)finally:sftp.chdir(output_dir)# 路径赋值filename = '{:04d}.jpg'.format(ii)# 将图像数据直接写入到远程服务器with sftp.file(filename, 'wb') as f:# 使用OpenCV将图像数据写入文件对象_, img_data = cv2.imencode('.jpg', img0)f.write(img_data.tobytes())return output_dir + filenameexcept Exception as e:print('文件上传失败:', e)finally:sftp.close()ssh.close()# 获取视频为录像机子集的返回格式  参数为  父级id  子集通道code
def get_children_rtsp(id, code):# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()try:sql = "SELECT equipment_uname, equipment_password, equipment_ip FROM t_equipment WHERE id = %s"cursor.execute(sql, (id,))parent_data = cursor.fetchone()if parent_data:user = parent_data[0]password = parent_data[1]ip = parent_data[2]result = 'rtsp://{}:{}@{}:554/Streaming/Unicast/Channels/{}'.format(user, password, ip, code)else:result = Nonefinally:conn.close()return result# 获取所需数据
def get_data():# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()sql = ("SELECT c.id AS Mine_id, c.mine_name, d.id AS Equipment_id, ""d.equipment_name, d.equipment_type, d.equipment_ip, d.equipment_uname, d.equipment_password, d.code, d.parent_id, ""a.id AS Conf_id, a.Algorithm_library_id, a.conf_area, e.id AS test_type_id, e.test_type_ename, ""b.algorithm_status, b.algorithm_path, b.algorithm_name ""FROM `t_algorithm_config` AS a ""JOIN `t_algorithm_library` AS b ON a.Algorithm_library_id = b.id ""JOIN `t_mine` AS c ON a.Mine_id = c.id ""JOIN `t_equipment` AS d ON a.Equipment_id = d.id ""JOIN `t_algorithm_test_type` AS e ON a.Algorithm_test_type_id = e.id ""WHERE b.algorithm_status = 1;")cursor.execute(sql)# 获取查询结果集的列名columns = [i[0] for i in cursor.description]results = cursor.fetchall()# 构建字典列表result_dict_list = [dict(zip(columns, i)) for i in results]# 在列表推导式中使用了浅拷贝 copy.deepcopy() 来复制每个字典,更新 'rtsp_url' 字段。# 这样做可以保留原始数据的所有字段,同时只对 'rtsp_url' 进行修改。result_dict_list = [{**copy.deepcopy(i),'conf_area': list(np.ravel(eval(i['conf_area']))),'rtsp_url': 'rtsp://{}:{}@{}'.format(i['equipment_uname'], i['equipment_password'], i['equipment_ip'])}if i['equipment_type'] == '摄像头'else {**copy.deepcopy(i),'rtsp_url': get_children_rtsp(i['parent_id'], i['code']),'conf_area': list(np.ravel(eval(i['conf_area']))),}for i in result_dict_list]result_dict_list = [{**copy.deepcopy(i),'max_x': max(d['x'] for d in i['conf_area']),'max_y': max(d['y'] for d in i['conf_area']),'min_x': min(d['x'] for d in i['conf_area']),'min_y': min(d['y'] for d in i['conf_area'])} for i in result_dict_list]return result_dict_listnames = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', \'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', \'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', \'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', \'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', \'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', \'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', \'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'dining table', \'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', \'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier','toothbrush']def clip_coords(boxes, shape):# Clip bounding xyxy bounding boxes to image shape (height, width)boxes[:, [0, 2]] = boxes[:, [0, 2]].clip(0, shape[1])  # x1, x2boxes[:, [1, 3]] = boxes[:, [1, 3]].clip(0, shape[0])  # y1, y2def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None):# Rescale coords (xyxy) from img1_shape to img0_shapeif ratio_pad is None:  # calculate from img0_shapegain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1])  # gain  = old / newpad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2  # wh paddingelse:gain = ratio_pad[0][0]pad = ratio_pad[1]coords[:, [0, 2]] -= pad[0]  # x paddingcoords[:, [1, 3]] -= pad[1]  # y paddingcoords[:, :4] /= gainclip_coords(coords, img0_shape)return coordsdef letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):# Resize and pad image while meeting stride-multiple constraintsshape = im.shape[:2]  # current shape [height, width]if isinstance(new_shape, int):new_shape = (new_shape, new_shape)# Scale ratio (new / old)r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])if not scaleup:  # only scale down, do not scale up (for better val mAP)r = min(r, 1.0)# Compute paddingratio = r, r  # width, height ratiosnew_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]  # wh paddingif auto:  # minimum rectangledw, dh = np.mod(dw, stride), np.mod(dh, stride)  # wh paddingelif scaleFill:  # stretchdw, dh = 0.0, 0.0new_unpad = (new_shape[1], new_shape[0])ratio = new_shape[1] / shape[1], new_shape[0] / shape[0]  # width, height ratiosdw /= 2  # divide padding into 2 sidesdh /= 2if shape[::-1] != new_unpad:  # resizeim = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))left, right = int(round(dw - 0.1)), int(round(dw + 0.1))im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)  # add borderreturn im, ratio, (dw, dh)def plot_one_box(x, img, color=None, label=None, line_thickness=None):# Plots one bounding box on image imgtl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1  # line/font thicknesscolor = color or [random.randint(0, 255) for _ in range(3)]c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)if label:tf = max(tl - 1, 1)  # font thicknesst_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA)  # filledcv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)def get_boxes(prediction, batch_size=1, img_size=608):"""Returns detections with shape:(x1, y1, x2, y2, object_conf, class_score, class_pred)"""reshape_value = np.reshape(prediction, (-1, 1))num_boxes_final = reshape_value[0]print('num_boxes_final: ', num_boxes_final)all_list = [[] for _ in range(batch_size)]for i in range(int(num_boxes_final)):batch_idx = int(reshape_value[64 + i * 7 + 0])if batch_idx >= 0 and batch_idx < batch_size:bl = reshape_value[64 + i * 7 + 3]br = reshape_value[64 + i * 7 + 4]bt = reshape_value[64 + i * 7 + 5]bb = reshape_value[64 + i * 7 + 6]if bt - bl > 0 and bb - br > 0:all_list[batch_idx].append(bl)all_list[batch_idx].append(br)all_list[batch_idx].append(bt)all_list[batch_idx].append(bb)all_list[batch_idx].append(reshape_value[64 + i * 7 + 2])# all_list[batch_idx].append(reshape_value[64 + i * 7 + 2])all_list[batch_idx].append(reshape_value[64 + i * 7 + 1])output = [np.array(all_list[i]).reshape(-1, 6) for i in range(batch_size)]return outputdef detect(img0, ip, model, ii, img_size, stride, api_lib, handle, save_img, colors, max_x, max_y, min_x, min_y,Conf_id, test_type_ename):print('ip:%s, ii:%s, model:%s' % (ip, ii, model))img = letterbox(img0, new_shape=img_size, stride=stride, auto=False)[0]image = cv2.cvtColor(img, cv2.COLOR_BGR2RGBA)image = image[np.newaxis, :].astype(np.uint8)data = np.asarray(image, dtype=np.uint8)input = data.ctypes.data_as(ctypes.c_void_p)# 3.inferenceoutput = api_lib.cnpyInference(handle, input)pred0 = output.contents[0:7232]pred0 = np.array(pred0).reshape(1, 7232)pred = get_boxes(pred0)print(pred)for i, det in enumerate(pred):if det is not None and len(det):det[:, :4] = scale_coords(img.shape[:2], det[:, :4], img0.shape).round()for *xyxy, conf, cls in det:cls = int(cls)  # 标签索引,对应 name_list = ['opencar_model', 'opencar_num', 'flatcar_model', 'flatcar_num', 'container_model', 'container_num']xmin = int(xyxy[0])  # 横坐标最小值ymin = int(xyxy[1])  # 纵坐标最小值xmax = int(xyxy[2])  # 横坐标最大值ymax = int(xyxy[3])  # 纵坐标最小值if min_x < xmin < max_x and min_y < ymin < max_y and min_x < xmax < max_x and min_y < ymax < max_y:# if xmax < max_x and xmin > min_x  and ymax < max_y and ymin > min_y:# if names[int(cls)] == 'person':if save_img:label = '%s %.2f' % (names[int(cls)], conf)plot_one_box(xyxy, img0, label=label, color=colors[int(cls)], line_thickness=2)print(names[int(cls)], '检测类型')if test_type_ename == names[int(cls)]:# 上传服务器类ssh = SSH_Func()ssh.Add_Folder()ssh.upload_send_file(img0, Conf_id, ii)# dirs = './output/%s/' % (Conf_id)## if not os.path.exists(dirs):  # 如果不存在路径,则创建这个路径#     os.makedirs(dirs)## # 路径赋值# dir_path = dirs + '/%04d.jpg' % ii## cv2.imwrite(dir_path, img0)# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()# 构建 SQL 语句和要插入的值sql = "INSERT INTO `t_algorithm_result` (`Algorithm_config_id`, `res_type`, `res_image`) VALUES (%s, %s, %s)"values = (Conf_id, 1, 'http://114.116.48.77:9001' + ssh.upload_send_file(img0, Conf_id, ii))# 执行 SQL 语句并提交事务cursor.execute(sql, values)conn.commit()else:print('检测类型不符,数据不予添加')print('---------------------------------------------------------------------------------------------------')returnimport multiprocessing
import timedef process_data(data):# 配置算法数据# 是否保存图片  yolov5save_img = True# 框与标注颜色colors = [[random.randint(0, 255) for _ in range(3)] for _ in range(len(names))]# 1.initll = ctypes.cdll.LoadLibraryapi_lib = ll("./lib/libcnrtapi.so")# 2.data preparationstride = 32img_size = 640offlinemodel_yolov5s = data['algorithm_path'].encode()handle_yolov5s = api_lib.cnpyInit(offlinemodel_yolov5s)api_lib.cnpyInference.restype = POINTER(POINTER(c_float))cap = cv2.VideoCapture(data['rtsp_url'])ii = 0# 创建队列对象queue = TestQueue()if cap.isOpened():while True:ret, frame = cap.read()if ii % 30 == 0 and queue.get_all() == []:  # 抽帧if ret:ip = 34detect(frame, ip, 'model_yolov5s', ii, img_size, stride, api_lib, handle_yolov5s, save_img,colors, data['max_x'], data['max_y'], data['min_x'], data['min_y'], data['Conf_id'],data['test_type_ename'])else:cap = cv2.VideoCapture(data['rtsp_url'])if ii % 30 == 0 and queue.get_all() != []:queue.pop()breakii += 1def main():while True:# 创建进程池pool = multiprocessing.Pool()data_results = get_data()  # 每次 pop 的时候更新数据print(data_results)# 使用进程池并行执行 process_data 函数,每个进程处理更新后的数据pool.map(process_data, data_results)# 关闭进程池pool.close()# 等待所有进程结束pool.join()if __name__ == "__main__":main()

这篇关于更新交互-队列监听-数据并发-程序执行的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/826913

相关文章

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

MySQL追踪数据库表更新操作来源的全面指南

《MySQL追踪数据库表更新操作来源的全面指南》本文将以一个具体问题为例,如何监测哪个IP来源对数据库表statistics_test进行了UPDATE操作,文内探讨了多种方法,并提供了详细的代码... 目录引言1. 为什么需要监控数据库更新操作2. 方法1:启用数据库审计日志(1)mysql/mariad

Navicat数据表的数据添加,删除及使用sql完成数据的添加过程

《Navicat数据表的数据添加,删除及使用sql完成数据的添加过程》:本文主要介绍Navicat数据表的数据添加,删除及使用sql完成数据的添加过程,具有很好的参考价值,希望对大家有所帮助,如有... 目录Navicat数据表数据添加,删除及使用sql完成数据添加选中操作的表则出现如下界面,查看左下角从左

SpringBoot中4种数据水平分片策略

《SpringBoot中4种数据水平分片策略》数据水平分片作为一种水平扩展策略,通过将数据分散到多个物理节点上,有效解决了存储容量和性能瓶颈问题,下面小编就来和大家分享4种数据分片策略吧... 目录一、前言二、哈希分片2.1 原理2.2 SpringBoot实现2.3 优缺点分析2.4 适用场景三、范围分片

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模

浅析如何保证MySQL与Redis数据一致性

《浅析如何保证MySQL与Redis数据一致性》在互联网应用中,MySQL作为持久化存储引擎,Redis作为高性能缓存层,两者的组合能有效提升系统性能,下面我们来看看如何保证两者的数据一致性吧... 目录一、数据不一致性的根源1.1 典型不一致场景1.2 关键矛盾点二、一致性保障策略2.1 基础策略:更新数

Oracle 数据库数据操作如何精通 INSERT, UPDATE, DELETE

《Oracle数据库数据操作如何精通INSERT,UPDATE,DELETE》在Oracle数据库中,对表内数据进行增加、修改和删除操作是通过数据操作语言来完成的,下面给大家介绍Oracle数... 目录思维导图一、插入数据 (INSERT)1.1 插入单行数据,指定所有列的值语法:1.2 插入单行数据,指