本文主要是介绍Python中websockets服务端给多线程发送消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
思路:
1.由于websockets创建服务端只能绑定一个端口,所以需要单独占用一个线程
2.消息中心需要独立出来,一边放一边取
模块功能都在下面,先运行Main.py,再运行websocket_client.py
修改websocket_client.py中data的信息,可以看到不同消息
一、消息处理中心 message_base.py
根据设备,创建储存设备消息,提取设备消息的功能
from queue import Queueclass MessageBase:def __init__(self):self.data = dict()def add(self, device, data):if device in self.data:self.data[device].put(data)else:self.data[device] = Queue()self.data[device].put(data)def get(self, device):data_queue: Queue = self.data.get(device)if not data_queue or data_queue.empty():return Nonedata = data_queue.get()return dataif __name__ == '__main__':mb = MessageBase()mb.add("a", "asdasd")mb.add("a", "11111111")print(mb.data)data = mb.get("a")print(data)
二、服务端:websocket_server.py
给消息中心存消息
import asyncio
import json
import threading
import websockets
##
from message_base import MessageBaseclass WebServer:def __init__(self, host, port, message_base: MessageBase):self.host = hostself.port = portself.clients = []self.message_base = message_baseasync def echo(self, websocket, path):self.clients.append(websocket)client_ip, client_port = websocket.remote_addresswhile True:try:recv_text = await websocket.recv()data = json.loads(recv_text)device = data.get("device")if device:self.message_base.add(device, data)else:continueexcept websockets.ConnectionClosed:print("ConnectionClosed...") # 链接断开self.clients.remove(websocket)breakexcept websockets.InvalidState:print("InvalidState...") # 无效状态self.clients.remove(websocket)breakexcept Exception as e:print(e)def connect(self):print("连接成功!")asyncio.set_event_loop(asyncio.new_event_loop())start_server = websockets.serve(self.echo, self.host, self.port)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()def run(self):t = threading.Thread(target=self.connect)t.start()print("已启动!")if __name__ == '__main__':mb = MessageBase()ws = WebServer("192.168.6.28", 8001, mb)ws.run()
三、设备功能 device_function.py
每个设备对应的线程功能,可以同意也可以写多个功能
class DeviceFunc:def __init__(self, device_name, data):self.device_name = device_nameself.data = datadef show_data(self):if self.data:print(self.device_name, "收到消息:", self.data.get("value"))
四、主线程 main.py
初始化所有功能,并运行主线程
from message_base import MessageBase
from websocket_server import WebServer
from device_function import DeviceFuncclass MainThread:def __init__(self, message_base: MessageBase, websocket_server: WebServer, device_list):self.message_base = message_baseself.websocket_server = websocket_serverself.device_list = device_listdef run_server(self):self.websocket_server.run()def run(self):self.run_server()while True:for device in self.device_list:try:data = self.message_base.get(device)df = DeviceFunc(device, data)df.show_data()except Exception as err:passif __name__ == '__main__':mb = MessageBase()ws = WebServer("192.168.6.28", 8000, mb)device_list = ["aa", "bb", "cc"]mt = MainThread(mb, ws, device_list)mt.run()
五、客户端
给服务端发送消息,测试用
import jsonimport websocketclass WebClient:def __init__(self, host, port):self.host = hostself.port = portself.conn = Noneself.flag = Falsedef connect(self):try:url = f"ws://{self.host}:{self.port}"self.conn = websocket.create_connection(url)self.flag = Trueprint("连接成功")except Exception as err:self.flag = Falseprint("连接失败", err)def close(self):self.conn.close()def recv(self):data = self.conn.recv(1024)print(data)def send(self, data):self.conn.send(data)print("发送成功")if __name__ == '__main__':host = "192.168.6.28"port = 8000ws = WebClient(host, port)if not ws.flag:ws.connect()data = {"device": "bb", "value": "123"}data = {"device": "cc", "value": "123"}data = json.dumps(data)ws.send(data)
这篇关于Python中websockets服务端给多线程发送消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!