Python中websockets服务端从客户端接收消息并发送给多线程

本文主要是介绍Python中websockets服务端从客户端接收消息并发送给多线程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

思路:

1.websockets需要从客户端接收消息,由于websockets创建服务端只能绑定一个端口,所以需要单独占用一个线程。收到的消息,我们需要共享给主线程,然后主线程根据设备(多线程)分发消息给各线程

2.消息中心需要独立出来,websockets服务端放消息,主线程去消息

3.根据思路设计模块:

                        1.消息库

                        2.服务端

                        3.主线程

                        4.多线程

先运行Main.py,再运行websocket_client.py(客户端),客户端发送的消息可能不一样,所以统一消息里面必须有device_id 或者device_name

修改websocket_client.py中data的信息,发送不同消息

一、消息处理中心 message_base.py

根据设备,创建储存设备消息,提取设备消息的功能

from queue import Queueclass MessageBase:def __init__(self):self.data = dict()def add(self, device, data):if device in self.data:self.data[device].put(data)else:self.data[device] = Queue()self.data[device].put(data)def get(self, device):data_queue: Queue = self.data.get(device)if not data_queue or data_queue.empty():return Nonedata = data_queue.get()return dataif __name__ == '__main__':mb = MessageBase()mb.add("a", "asdasd")mb.add("a", "11111111")print(mb.data)data = mb.get("a")print(data)

二、服务端:websocket_server.py

从客户端接收消息,并存到消息中心

import asyncio
import json
import threading
import websockets
##
from message_base import MessageBaseclass WebServer:def __init__(self, host, port, message_base: MessageBase):self.host = hostself.port = portself.clients = []self.message_base = message_baseasync def echo(self, websocket, path):self.clients.append(websocket)client_ip, client_port = websocket.remote_addresswhile True:try:recv_text = await websocket.recv()data = json.loads(recv_text)device = data.get("device")if device:self.message_base.add(device, data)else:continueexcept websockets.ConnectionClosed:print("ConnectionClosed...")  # 链接断开self.clients.remove(websocket)breakexcept websockets.InvalidState:print("InvalidState...")  # 无效状态self.clients.remove(websocket)breakexcept Exception as e:print(e)def connect(self):print("连接成功!")asyncio.set_event_loop(asyncio.new_event_loop())start_server = websockets.serve(self.echo, self.host, self.port)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()def run(self):t = threading.Thread(target=self.connect)t.start()print("已启动!")if __name__ == '__main__':mb = MessageBase()ws = WebServer("192.168.6.28", 8001, mb)ws.run()

三、设备功能 device_function.py

每个设备对应的线程功能,可以统一也可以写多个功能

class DeviceFunc:def __init__(self, device_name, data):self.device_name = device_nameself.data = datadef show_data(self):if self.data:print(self.device_name, "收到消息:", self.data.get("value"))

四、主线程 main.py

初始化所有功能模块,并运行主线程

from message_base import MessageBase
from websocket_server import WebServer
from device_function import DeviceFuncclass MainThread:def __init__(self, message_base: MessageBase, websocket_server: WebServer, device_list):self.message_base = message_baseself.websocket_server = websocket_serverself.device_list = device_listdef run_server(self):self.websocket_server.run()def run(self):self.run_server()while True:for device in self.device_list:try:# 开始根据设备即功能处理消息data = self.message_base.get(device)if not data:continuedf = DeviceFunc(device, data)df.show_data()except Exception as err:passif __name__ == '__main__':mb = MessageBase()ws = WebServer("192.168.6.28", 8000, mb)device_list = ["aa", "bb", "cc"]mt = MainThread(mb, ws, device_list)mt.run()

五、客户端

给服务端发送消息,测试用

import jsonimport websocketclass WebClient:def __init__(self, host, port):self.host = hostself.port = portself.conn = Noneself.flag = Falsedef connect(self):try:url = f"ws://{self.host}:{self.port}"self.conn = websocket.create_connection(url)self.flag = Trueprint("连接成功")except Exception as err:self.flag = Falseprint("连接失败", err)def close(self):self.conn.close()def recv(self):data = self.conn.recv(1024)print(data)def send(self, data):self.conn.send(data)print("发送成功")if __name__ == '__main__':host = "192.168.6.28"port = 8000ws = WebClient(host, port)if not ws.flag:ws.connect()data = {"device": "bb", "value": "123"}data = {"device": "cc", "value": "123"}data = json.dumps(data)ws.send(data)

这篇关于Python中websockets服务端从客户端接收消息并发送给多线程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/747042

相关文章

使用Python将PDF表格自动提取并写入Word文档表格

《使用Python将PDF表格自动提取并写入Word文档表格》在实际办公与数据处理场景中,PDF文件里的表格往往无法直接复制到Word中,本文将介绍如何使用Python从PDF文件中提取表格数据,并将... 目录引言1. 加载 PDF 文件并准备 Word 文档2. 提取 PDF 表格并创建 Word 表格

使用Python实现局域网远程监控电脑屏幕的方法

《使用Python实现局域网远程监控电脑屏幕的方法》文章介绍了两种使用Python在局域网内实现远程监控电脑屏幕的方法,方法一使用mss和socket,方法二使用PyAutoGUI和Flask,每种方... 目录方法一:使用mss和socket实现屏幕共享服务端(被监控端)客户端(监控端)方法二:使用PyA

Python列表的创建与删除的操作指南

《Python列表的创建与删除的操作指南》列表(list)是Python中最常用、最灵活的内置数据结构之一,它支持动态扩容、混合类型、嵌套结构,几乎无处不在,但你真的会创建和删除列表吗,本文给大家介绍... 目录一、前言二、列表的创建方式1. 字面量语法(最常用)2. 使用list()构造器3. 列表推导式

Python使用Matplotlib和Seaborn绘制常用图表的技巧

《Python使用Matplotlib和Seaborn绘制常用图表的技巧》Python作为数据科学领域的明星语言,拥有强大且丰富的可视化库,其中最著名的莫过于Matplotlib和Seaborn,本篇... 目录1. 引言:数据可视化的力量2. 前置知识与环境准备2.1. 必备知识2.2. 安装所需库2.3

Python数据验证神器Pydantic库的使用和实践中的避坑指南

《Python数据验证神器Pydantic库的使用和实践中的避坑指南》Pydantic是一个用于数据验证和设置的库,可以显著简化API接口开发,文章通过一个实际案例,展示了Pydantic如何在生产环... 目录1️⃣ 崩溃时刻:当你的API接口又双叒崩了!2️⃣ 神兵天降:3行代码解决验证难题3️⃣ 深度

Python+FFmpeg实现视频自动化处理的完整指南

《Python+FFmpeg实现视频自动化处理的完整指南》本文总结了一套在Python中使用subprocess.run调用FFmpeg进行视频自动化处理的解决方案,涵盖了跨平台硬件加速、中间素材处理... 目录一、 跨平台硬件加速:统一接口设计1. 核心映射逻辑2. python 实现代码二、 中间素材处

python中的flask_sqlalchemy的使用及示例详解

《python中的flask_sqlalchemy的使用及示例详解》文章主要介绍了在使用SQLAlchemy创建模型实例时,通过元类动态创建实例的方式,并说明了如何在实例化时执行__init__方法,... 目录@orm.reconstructorSQLAlchemy的回滚关联其他模型数据库基本操作将数据添

Python实现快速扫描目标主机的开放端口和服务

《Python实现快速扫描目标主机的开放端口和服务》这篇文章主要为大家详细介绍了如何使用Python编写一个功能强大的端口扫描器脚本,实现快速扫描目标主机的开放端口和服务,感兴趣的小伙伴可以了解下... 目录功能介绍场景应用1. 网络安全审计2. 系统管理维护3. 网络故障排查4. 合规性检查报错处理1.

Python轻松实现Word到Markdown的转换

《Python轻松实现Word到Markdown的转换》在文档管理、内容发布等场景中,将Word转换为Markdown格式是常见需求,本文将介绍如何使用FreeSpire.DocforPython实现... 目录一、工具简介二、核心转换实现1. 基础单文件转换2. 批量转换Word文件三、工具特性分析优点局

Python中4大日志记录库比较的终极PK

《Python中4大日志记录库比较的终极PK》日志记录框架是一种工具,可帮助您标准化应用程序中的日志记录过程,:本文主要介绍Python中4大日志记录库比较的相关资料,文中通过代码介绍的非常详细,... 目录一、logging库1、优点2、缺点二、LogAid库三、Loguru库四、Structlogphp