Python中websockets服务端从客户端接收消息并发送给多线程

本文主要是介绍Python中websockets服务端从客户端接收消息并发送给多线程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

思路:

1.websockets需要从客户端接收消息,由于websockets创建服务端只能绑定一个端口,所以需要单独占用一个线程。收到的消息,我们需要共享给主线程,然后主线程根据设备(多线程)分发消息给各线程

2.消息中心需要独立出来,websockets服务端放消息,主线程去消息

3.根据思路设计模块:

                        1.消息库

                        2.服务端

                        3.主线程

                        4.多线程

先运行Main.py,再运行websocket_client.py(客户端),客户端发送的消息可能不一样,所以统一消息里面必须有device_id 或者device_name

修改websocket_client.py中data的信息,发送不同消息

一、消息处理中心 message_base.py

根据设备,创建储存设备消息,提取设备消息的功能

from queue import Queueclass MessageBase:def __init__(self):self.data = dict()def add(self, device, data):if device in self.data:self.data[device].put(data)else:self.data[device] = Queue()self.data[device].put(data)def get(self, device):data_queue: Queue = self.data.get(device)if not data_queue or data_queue.empty():return Nonedata = data_queue.get()return dataif __name__ == '__main__':mb = MessageBase()mb.add("a", "asdasd")mb.add("a", "11111111")print(mb.data)data = mb.get("a")print(data)

二、服务端:websocket_server.py

从客户端接收消息,并存到消息中心

import asyncio
import json
import threading
import websockets
##
from message_base import MessageBaseclass WebServer:def __init__(self, host, port, message_base: MessageBase):self.host = hostself.port = portself.clients = []self.message_base = message_baseasync def echo(self, websocket, path):self.clients.append(websocket)client_ip, client_port = websocket.remote_addresswhile True:try:recv_text = await websocket.recv()data = json.loads(recv_text)device = data.get("device")if device:self.message_base.add(device, data)else:continueexcept websockets.ConnectionClosed:print("ConnectionClosed...")  # 链接断开self.clients.remove(websocket)breakexcept websockets.InvalidState:print("InvalidState...")  # 无效状态self.clients.remove(websocket)breakexcept Exception as e:print(e)def connect(self):print("连接成功!")asyncio.set_event_loop(asyncio.new_event_loop())start_server = websockets.serve(self.echo, self.host, self.port)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()def run(self):t = threading.Thread(target=self.connect)t.start()print("已启动!")if __name__ == '__main__':mb = MessageBase()ws = WebServer("192.168.6.28", 8001, mb)ws.run()

三、设备功能 device_function.py

每个设备对应的线程功能,可以统一也可以写多个功能

class DeviceFunc:def __init__(self, device_name, data):self.device_name = device_nameself.data = datadef show_data(self):if self.data:print(self.device_name, "收到消息:", self.data.get("value"))

四、主线程 main.py

初始化所有功能模块,并运行主线程

from message_base import MessageBase
from websocket_server import WebServer
from device_function import DeviceFuncclass MainThread:def __init__(self, message_base: MessageBase, websocket_server: WebServer, device_list):self.message_base = message_baseself.websocket_server = websocket_serverself.device_list = device_listdef run_server(self):self.websocket_server.run()def run(self):self.run_server()while True:for device in self.device_list:try:# 开始根据设备即功能处理消息data = self.message_base.get(device)if not data:continuedf = DeviceFunc(device, data)df.show_data()except Exception as err:passif __name__ == '__main__':mb = MessageBase()ws = WebServer("192.168.6.28", 8000, mb)device_list = ["aa", "bb", "cc"]mt = MainThread(mb, ws, device_list)mt.run()

五、客户端

给服务端发送消息,测试用

import jsonimport websocketclass WebClient:def __init__(self, host, port):self.host = hostself.port = portself.conn = Noneself.flag = Falsedef connect(self):try:url = f"ws://{self.host}:{self.port}"self.conn = websocket.create_connection(url)self.flag = Trueprint("连接成功")except Exception as err:self.flag = Falseprint("连接失败", err)def close(self):self.conn.close()def recv(self):data = self.conn.recv(1024)print(data)def send(self, data):self.conn.send(data)print("发送成功")if __name__ == '__main__':host = "192.168.6.28"port = 8000ws = WebClient(host, port)if not ws.flag:ws.connect()data = {"device": "bb", "value": "123"}data = {"device": "cc", "value": "123"}data = json.dumps(data)ws.send(data)

这篇关于Python中websockets服务端从客户端接收消息并发送给多线程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/747042

相关文章

Python将博客内容html导出为Markdown格式

《Python将博客内容html导出为Markdown格式》Python将博客内容html导出为Markdown格式,通过博客url地址抓取文章,分析并提取出文章标题和内容,将内容构建成html,再转... 目录一、为什么要搞?二、准备如何搞?三、说搞咱就搞!抓取文章提取内容构建html转存markdown

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

Python Websockets库的使用指南

《PythonWebsockets库的使用指南》pythonwebsockets库是一个用于创建WebSocket服务器和客户端的Python库,它提供了一种简单的方式来实现实时通信,支持异步和同步... 目录一、WebSocket 简介二、python 的 websockets 库安装三、完整代码示例1.

揭秘Python Socket网络编程的7种硬核用法

《揭秘PythonSocket网络编程的7种硬核用法》Socket不仅能做聊天室,还能干一大堆硬核操作,这篇文章就带大家看看Python网络编程的7种超实用玩法,感兴趣的小伙伴可以跟随小编一起... 目录1.端口扫描器:探测开放端口2.简易 HTTP 服务器:10 秒搭个网页3.局域网游戏:多人联机对战4.

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

Python使用自带的base64库进行base64编码和解码

《Python使用自带的base64库进行base64编码和解码》在Python中,处理数据的编码和解码是数据传输和存储中非常普遍的需求,其中,Base64是一种常用的编码方案,本文我将详细介绍如何使... 目录引言使用python的base64库进行编码和解码编码函数解码函数Base64编码的应用场景注意

Python基于wxPython和FFmpeg开发一个视频标签工具

《Python基于wxPython和FFmpeg开发一个视频标签工具》在当今数字媒体时代,视频内容的管理和标记变得越来越重要,无论是研究人员需要对实验视频进行时间点标记,还是个人用户希望对家庭视频进行... 目录引言1. 应用概述2. 技术栈分析2.1 核心库和模块2.2 wxpython作为GUI选择的优

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤