本文主要是介绍更新交互-队列监听-数据并发-程序执行,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
需求(每次数据库算法配置数据更改程序重新执行)
1 . 因为是数据库交互,所以采用队列形式来生产消费
2 . 每次接口数据发生变化执行入队操作
3 . 使用进程池进行算法配置数据并行操作
4 . 算法程序进行消费执行 (代码案例仅供参考)
需求(每次数据库算法配置数据更改程序重新执行)
1 . 因为是数据库交互,所以采用队列形式来生产消费
# redis队列
class TestQueue:def __init__(self):self.r = redis.Redis(host='192.168.14.93', port=6379, decode_responses=True, db=1)self.key = "queue"# 入队def push(self, item,item1):# 在尾部加入数据self.r.rpush(self.key, item,item1)return '队列数据添加成功'# 出队def pop(self):# 阻塞地从头部移除数据result = self.r.blpop(self.key, timeout=0)if result:return result[1] # 返回数据else:return None # 如果超时返回 None# 返回队列数据列表def get_all(self):return self.r.lrange(self.key, 0, -1)
2 . 每次接口数据发生变化执行入队操作
# redis队列 当算法配置修改完成时,数据写入队列,redis监听到数据重新执行
queue_redis = TestQueue()
queue_redis.push('1','1')
3 . 使用进程池进行算法配置数据并行操作
# 创建进程池
pool = multiprocessing.Pool()data_results = get_data() # 每次 pop 的时候更新数据
print(data_results)# 使用进程池并行执行 process_data 函数,每个进程处理更新后的数据
pool.map(process_data, data_results)# 关闭进程池
pool.close()# 等待所有进程结束
pool.join()
4 . 算法程序进行消费执行 (代码案例仅供参考)根据消费场景加锁进行限制
# -*- coding: gbk -*-
import cv2
import numpy as np
import os
import random
import pymysql
import copy
import ctypes
import time
import paramiko
import redisfrom ctypes import *
from dbutils.pooled_db import PooledDB# 数据库连接池
pool = PooledDB(pymysql, maxconnections=10, host='192.168.14.93', user='root', password='abc123',database='seal_system',charset='utf8')# redis队列
class TestQueue:def __init__(self):self.r = redis.Redis(host='192.168.14.93', port=6379, decode_responses=True, db=1)self.key = "queue"# 入队def push(self, item,item1):# 在尾部加入数据self.r.rpush(self.key, item,item1)return '队列数据添加成功'# 出队def pop(self):# 阻塞地从头部移除数据result = self.r.blpop(self.key, timeout=0)if result:return result[1] # 返回数据else:return None # 如果超时返回 None# 返回队列数据列表def get_all(self):return self.r.lrange(self.key, 0, -1)# 服务器k类
class SSH_Func(object):def __init__(self):# 寒武纪盒子self.Folder_SSH = '/var/car_image/model_algorithm_package/' # 华为云上传文件默认地址self.ssh_host = '114.116.48.77'self.ssh_user = 'root'self.ssh_password = 'hmwp1!YZHHab'self.ssh = paramiko.SSHClient()# ssh连接盒子服务器def connect_ssh(self):# SSH连接信息# 标识符connected = False# 最大连接次数attempts = 0while connected == False and attempts < 180:try:self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())self.ssh.connect(self.ssh_host, username=self.ssh_user, password=self.ssh_password)print("SSH连接成功")connected = Trueexcept Exception as e:print("SSH连接失败:", e)print("正在进行连接重试...")attempts += 1print(attempts, '次数')time.sleep(1)if connected == False:print('连接超过最大次数限制,无法连接到SSH服务器')return Nonereturn self.ssh# 华为云文件夹创建相关逻辑def Add_Folder(self):ssh = self.connect_ssh()if ssh is None:returnsftp = self.ssh.open_sftp()# 判断文件夹是否存在,如果不存在进行创建remote_folder_1st_level = os.path.dirname(self.Folder_SSH)try:sftp.mkdir(remote_folder_1st_level)print('成功创建算法模型目录:', remote_folder_1st_level)except Exception as e:print('算法模型目录已经存在,无法创建目录:', remote_folder_1st_level, e)sftp.close()ssh.close()# 上传文件方法def upload_send_file(self, img0, Conf_id, ii):ssh = self.connect_ssh()sftp = ssh.open_sftp()try:output_dir = self.Folder_SSH + 'output/{}/'.format(Conf_id)# 新增两个目录main_output_dir = self.Folder_SSH + 'output/'# 如果云服务不存在这个路径。那么进行创建try:sftp.chdir(main_output_dir)except IOError:sftp.mkdir(main_output_dir)try:sftp.chdir(output_dir)except IOError:sftp.mkdir(output_dir)finally:sftp.chdir(output_dir)# 路径赋值filename = '{:04d}.jpg'.format(ii)# 将图像数据直接写入到远程服务器with sftp.file(filename, 'wb') as f:# 使用OpenCV将图像数据写入文件对象_, img_data = cv2.imencode('.jpg', img0)f.write(img_data.tobytes())return output_dir + filenameexcept Exception as e:print('文件上传失败:', e)finally:sftp.close()ssh.close()# 获取视频为录像机子集的返回格式 参数为 父级id 子集通道code
def get_children_rtsp(id, code):# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()try:sql = "SELECT equipment_uname, equipment_password, equipment_ip FROM t_equipment WHERE id = %s"cursor.execute(sql, (id,))parent_data = cursor.fetchone()if parent_data:user = parent_data[0]password = parent_data[1]ip = parent_data[2]result = 'rtsp://{}:{}@{}:554/Streaming/Unicast/Channels/{}'.format(user, password, ip, code)else:result = Nonefinally:conn.close()return result# 获取所需数据
def get_data():# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()sql = ("SELECT c.id AS Mine_id, c.mine_name, d.id AS Equipment_id, ""d.equipment_name, d.equipment_type, d.equipment_ip, d.equipment_uname, d.equipment_password, d.code, d.parent_id, ""a.id AS Conf_id, a.Algorithm_library_id, a.conf_area, e.id AS test_type_id, e.test_type_ename, ""b.algorithm_status, b.algorithm_path, b.algorithm_name ""FROM `t_algorithm_config` AS a ""JOIN `t_algorithm_library` AS b ON a.Algorithm_library_id = b.id ""JOIN `t_mine` AS c ON a.Mine_id = c.id ""JOIN `t_equipment` AS d ON a.Equipment_id = d.id ""JOIN `t_algorithm_test_type` AS e ON a.Algorithm_test_type_id = e.id ""WHERE b.algorithm_status = 1;")cursor.execute(sql)# 获取查询结果集的列名columns = [i[0] for i in cursor.description]results = cursor.fetchall()# 构建字典列表result_dict_list = [dict(zip(columns, i)) for i in results]# 在列表推导式中使用了浅拷贝 copy.deepcopy() 来复制每个字典,更新 'rtsp_url' 字段。# 这样做可以保留原始数据的所有字段,同时只对 'rtsp_url' 进行修改。result_dict_list = [{**copy.deepcopy(i),'conf_area': list(np.ravel(eval(i['conf_area']))),'rtsp_url': 'rtsp://{}:{}@{}'.format(i['equipment_uname'], i['equipment_password'], i['equipment_ip'])}if i['equipment_type'] == '摄像头'else {**copy.deepcopy(i),'rtsp_url': get_children_rtsp(i['parent_id'], i['code']),'conf_area': list(np.ravel(eval(i['conf_area']))),}for i in result_dict_list]result_dict_list = [{**copy.deepcopy(i),'max_x': max(d['x'] for d in i['conf_area']),'max_y': max(d['y'] for d in i['conf_area']),'min_x': min(d['x'] for d in i['conf_area']),'min_y': min(d['y'] for d in i['conf_area'])} for i in result_dict_list]return result_dict_listnames = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', \'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', \'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', \'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', \'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', \'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', \'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', \'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'dining table', \'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', \'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier','toothbrush']def clip_coords(boxes, shape):# Clip bounding xyxy bounding boxes to image shape (height, width)boxes[:, [0, 2]] = boxes[:, [0, 2]].clip(0, shape[1]) # x1, x2boxes[:, [1, 3]] = boxes[:, [1, 3]].clip(0, shape[0]) # y1, y2def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None):# Rescale coords (xyxy) from img1_shape to img0_shapeif ratio_pad is None: # calculate from img0_shapegain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / newpad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh paddingelse:gain = ratio_pad[0][0]pad = ratio_pad[1]coords[:, [0, 2]] -= pad[0] # x paddingcoords[:, [1, 3]] -= pad[1] # y paddingcoords[:, :4] /= gainclip_coords(coords, img0_shape)return coordsdef letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32):# Resize and pad image while meeting stride-multiple constraintsshape = im.shape[:2] # current shape [height, width]if isinstance(new_shape, int):new_shape = (new_shape, new_shape)# Scale ratio (new / old)r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])if not scaleup: # only scale down, do not scale up (for better val mAP)r = min(r, 1.0)# Compute paddingratio = r, r # width, height ratiosnew_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh paddingif auto: # minimum rectangledw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh paddingelif scaleFill: # stretchdw, dh = 0.0, 0.0new_unpad = (new_shape[1], new_shape[0])ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratiosdw /= 2 # divide padding into 2 sidesdh /= 2if shape[::-1] != new_unpad: # resizeim = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))left, right = int(round(dw - 0.1)), int(round(dw + 0.1))im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add borderreturn im, ratio, (dw, dh)def plot_one_box(x, img, color=None, label=None, line_thickness=None):# Plots one bounding box on image imgtl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thicknesscolor = color or [random.randint(0, 255) for _ in range(3)]c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)if label:tf = max(tl - 1, 1) # font thicknesst_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3cv2.rectangle(img, c1, c2, color, -1, cv2.LINE_AA) # filledcv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)def get_boxes(prediction, batch_size=1, img_size=608):"""Returns detections with shape:(x1, y1, x2, y2, object_conf, class_score, class_pred)"""reshape_value = np.reshape(prediction, (-1, 1))num_boxes_final = reshape_value[0]print('num_boxes_final: ', num_boxes_final)all_list = [[] for _ in range(batch_size)]for i in range(int(num_boxes_final)):batch_idx = int(reshape_value[64 + i * 7 + 0])if batch_idx >= 0 and batch_idx < batch_size:bl = reshape_value[64 + i * 7 + 3]br = reshape_value[64 + i * 7 + 4]bt = reshape_value[64 + i * 7 + 5]bb = reshape_value[64 + i * 7 + 6]if bt - bl > 0 and bb - br > 0:all_list[batch_idx].append(bl)all_list[batch_idx].append(br)all_list[batch_idx].append(bt)all_list[batch_idx].append(bb)all_list[batch_idx].append(reshape_value[64 + i * 7 + 2])# all_list[batch_idx].append(reshape_value[64 + i * 7 + 2])all_list[batch_idx].append(reshape_value[64 + i * 7 + 1])output = [np.array(all_list[i]).reshape(-1, 6) for i in range(batch_size)]return outputdef detect(img0, ip, model, ii, img_size, stride, api_lib, handle, save_img, colors, max_x, max_y, min_x, min_y,Conf_id, test_type_ename):print('ip:%s, ii:%s, model:%s' % (ip, ii, model))img = letterbox(img0, new_shape=img_size, stride=stride, auto=False)[0]image = cv2.cvtColor(img, cv2.COLOR_BGR2RGBA)image = image[np.newaxis, :].astype(np.uint8)data = np.asarray(image, dtype=np.uint8)input = data.ctypes.data_as(ctypes.c_void_p)# 3.inferenceoutput = api_lib.cnpyInference(handle, input)pred0 = output.contents[0:7232]pred0 = np.array(pred0).reshape(1, 7232)pred = get_boxes(pred0)print(pred)for i, det in enumerate(pred):if det is not None and len(det):det[:, :4] = scale_coords(img.shape[:2], det[:, :4], img0.shape).round()for *xyxy, conf, cls in det:cls = int(cls) # 标签索引,对应 name_list = ['opencar_model', 'opencar_num', 'flatcar_model', 'flatcar_num', 'container_model', 'container_num']xmin = int(xyxy[0]) # 横坐标最小值ymin = int(xyxy[1]) # 纵坐标最小值xmax = int(xyxy[2]) # 横坐标最大值ymax = int(xyxy[3]) # 纵坐标最小值if min_x < xmin < max_x and min_y < ymin < max_y and min_x < xmax < max_x and min_y < ymax < max_y:# if xmax < max_x and xmin > min_x and ymax < max_y and ymin > min_y:# if names[int(cls)] == 'person':if save_img:label = '%s %.2f' % (names[int(cls)], conf)plot_one_box(xyxy, img0, label=label, color=colors[int(cls)], line_thickness=2)print(names[int(cls)], '检测类型')if test_type_ename == names[int(cls)]:# 上传服务器类ssh = SSH_Func()ssh.Add_Folder()ssh.upload_send_file(img0, Conf_id, ii)# dirs = './output/%s/' % (Conf_id)## if not os.path.exists(dirs): # 如果不存在路径,则创建这个路径# os.makedirs(dirs)## # 路径赋值# dir_path = dirs + '/%04d.jpg' % ii## cv2.imwrite(dir_path, img0)# 从连接池获取连接对象conn = pool.connection()cursor = conn.cursor()# 构建 SQL 语句和要插入的值sql = "INSERT INTO `t_algorithm_result` (`Algorithm_config_id`, `res_type`, `res_image`) VALUES (%s, %s, %s)"values = (Conf_id, 1, 'http://114.116.48.77:9001' + ssh.upload_send_file(img0, Conf_id, ii))# 执行 SQL 语句并提交事务cursor.execute(sql, values)conn.commit()else:print('检测类型不符,数据不予添加')print('---------------------------------------------------------------------------------------------------')returnimport multiprocessing
import timedef process_data(data):# 配置算法数据# 是否保存图片 yolov5save_img = True# 框与标注颜色colors = [[random.randint(0, 255) for _ in range(3)] for _ in range(len(names))]# 1.initll = ctypes.cdll.LoadLibraryapi_lib = ll("./lib/libcnrtapi.so")# 2.data preparationstride = 32img_size = 640offlinemodel_yolov5s = data['algorithm_path'].encode()handle_yolov5s = api_lib.cnpyInit(offlinemodel_yolov5s)api_lib.cnpyInference.restype = POINTER(POINTER(c_float))cap = cv2.VideoCapture(data['rtsp_url'])ii = 0# 创建队列对象queue = TestQueue()if cap.isOpened():while True:ret, frame = cap.read()if ii % 30 == 0 and queue.get_all() == []: # 抽帧if ret:ip = 34detect(frame, ip, 'model_yolov5s', ii, img_size, stride, api_lib, handle_yolov5s, save_img,colors, data['max_x'], data['max_y'], data['min_x'], data['min_y'], data['Conf_id'],data['test_type_ename'])else:cap = cv2.VideoCapture(data['rtsp_url'])if ii % 30 == 0 and queue.get_all() != []:queue.pop()breakii += 1def main():while True:# 创建进程池pool = multiprocessing.Pool()data_results = get_data() # 每次 pop 的时候更新数据print(data_results)# 使用进程池并行执行 process_data 函数,每个进程处理更新后的数据pool.map(process_data, data_results)# 关闭进程池pool.close()# 等待所有进程结束pool.join()if __name__ == "__main__":main()
这篇关于更新交互-队列监听-数据并发-程序执行的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!