Python中websockets服务端从客户端接收消息并发送给多线程

本文主要是介绍Python中websockets服务端从客户端接收消息并发送给多线程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

思路:

1.websockets需要从客户端接收消息,由于websockets创建服务端只能绑定一个端口,所以需要单独占用一个线程。收到的消息,我们需要共享给主线程,然后主线程根据设备(多线程)分发消息给各线程

2.消息中心需要独立出来,websockets服务端放消息,主线程去消息

3.根据思路设计模块:

                        1.消息库

                        2.服务端

                        3.主线程

                        4.多线程

先运行Main.py,再运行websocket_client.py(客户端),客户端发送的消息可能不一样,所以统一消息里面必须有device_id 或者device_name

修改websocket_client.py中data的信息,发送不同消息

一、消息处理中心 message_base.py

根据设备,创建储存设备消息,提取设备消息的功能

from queue import Queueclass MessageBase:def __init__(self):self.data = dict()def add(self, device, data):if device in self.data:self.data[device].put(data)else:self.data[device] = Queue()self.data[device].put(data)def get(self, device):data_queue: Queue = self.data.get(device)if not data_queue or data_queue.empty():return Nonedata = data_queue.get()return dataif __name__ == '__main__':mb = MessageBase()mb.add("a", "asdasd")mb.add("a", "11111111")print(mb.data)data = mb.get("a")print(data)

二、服务端:websocket_server.py

从客户端接收消息,并存到消息中心

import asyncio
import json
import threading
import websockets
##
from message_base import MessageBaseclass WebServer:def __init__(self, host, port, message_base: MessageBase):self.host = hostself.port = portself.clients = []self.message_base = message_baseasync def echo(self, websocket, path):self.clients.append(websocket)client_ip, client_port = websocket.remote_addresswhile True:try:recv_text = await websocket.recv()data = json.loads(recv_text)device = data.get("device")if device:self.message_base.add(device, data)else:continueexcept websockets.ConnectionClosed:print("ConnectionClosed...")  # 链接断开self.clients.remove(websocket)breakexcept websockets.InvalidState:print("InvalidState...")  # 无效状态self.clients.remove(websocket)breakexcept Exception as e:print(e)def connect(self):print("连接成功!")asyncio.set_event_loop(asyncio.new_event_loop())start_server = websockets.serve(self.echo, self.host, self.port)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()def run(self):t = threading.Thread(target=self.connect)t.start()print("已启动!")if __name__ == '__main__':mb = MessageBase()ws = WebServer("192.168.6.28", 8001, mb)ws.run()

三、设备功能 device_function.py

每个设备对应的线程功能,可以统一也可以写多个功能

class DeviceFunc:def __init__(self, device_name, data):self.device_name = device_nameself.data = datadef show_data(self):if self.data:print(self.device_name, "收到消息:", self.data.get("value"))

四、主线程 main.py

初始化所有功能模块,并运行主线程

from message_base import MessageBase
from websocket_server import WebServer
from device_function import DeviceFuncclass MainThread:def __init__(self, message_base: MessageBase, websocket_server: WebServer, device_list):self.message_base = message_baseself.websocket_server = websocket_serverself.device_list = device_listdef run_server(self):self.websocket_server.run()def run(self):self.run_server()while True:for device in self.device_list:try:# 开始根据设备即功能处理消息data = self.message_base.get(device)if not data:continuedf = DeviceFunc(device, data)df.show_data()except Exception as err:passif __name__ == '__main__':mb = MessageBase()ws = WebServer("192.168.6.28", 8000, mb)device_list = ["aa", "bb", "cc"]mt = MainThread(mb, ws, device_list)mt.run()

五、客户端

给服务端发送消息,测试用

import jsonimport websocketclass WebClient:def __init__(self, host, port):self.host = hostself.port = portself.conn = Noneself.flag = Falsedef connect(self):try:url = f"ws://{self.host}:{self.port}"self.conn = websocket.create_connection(url)self.flag = Trueprint("连接成功")except Exception as err:self.flag = Falseprint("连接失败", err)def close(self):self.conn.close()def recv(self):data = self.conn.recv(1024)print(data)def send(self, data):self.conn.send(data)print("发送成功")if __name__ == '__main__':host = "192.168.6.28"port = 8000ws = WebClient(host, port)if not ws.flag:ws.connect()data = {"device": "bb", "value": "123"}data = {"device": "cc", "value": "123"}data = json.dumps(data)ws.send(data)

这篇关于Python中websockets服务端从客户端接收消息并发送给多线程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/747042

相关文章

Python办公自动化实战之打造智能邮件发送工具

《Python办公自动化实战之打造智能邮件发送工具》在数字化办公场景中,邮件自动化是提升工作效率的关键技能,本文将演示如何使用Python的smtplib和email库构建一个支持图文混排,多附件,多... 目录前言一、基础配置:搭建邮件发送框架1.1 邮箱服务准备1.2 核心库导入1.3 基础发送函数二、

Javaee多线程之进程和线程之间的区别和联系(最新整理)

《Javaee多线程之进程和线程之间的区别和联系(最新整理)》进程是资源分配单位,线程是调度执行单位,共享资源更高效,创建线程五种方式:继承Thread、Runnable接口、匿名类、lambda,r... 目录进程和线程进程线程进程和线程的区别创建线程的五种写法继承Thread,重写run实现Runnab

Python包管理工具pip的升级指南

《Python包管理工具pip的升级指南》本文全面探讨Python包管理工具pip的升级策略,从基础升级方法到高级技巧,涵盖不同操作系统环境下的最佳实践,我们将深入分析pip的工作原理,介绍多种升级方... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

基于Python实现一个图片拆分工具

《基于Python实现一个图片拆分工具》这篇文章主要为大家详细介绍了如何基于Python实现一个图片拆分工具,可以根据需要的行数和列数进行拆分,感兴趣的小伙伴可以跟随小编一起学习一下... 简单介绍先自己选择输入的图片,默认是输出到项目文件夹中,可以自己选择其他的文件夹,选择需要拆分的行数和列数,可以通过

Python中反转字符串的常见方法小结

《Python中反转字符串的常见方法小结》在Python中,字符串对象没有内置的反转方法,然而,在实际开发中,我们经常会遇到需要反转字符串的场景,比如处理回文字符串、文本加密等,因此,掌握如何在Pyt... 目录python中反转字符串的方法技术背景实现步骤1. 使用切片2. 使用 reversed() 函

Python中将嵌套列表扁平化的多种实现方法

《Python中将嵌套列表扁平化的多种实现方法》在Python编程中,我们常常会遇到需要将嵌套列表(即列表中包含列表)转换为一个一维的扁平列表的需求,本文将给大家介绍了多种实现这一目标的方法,需要的朋... 目录python中将嵌套列表扁平化的方法技术背景实现步骤1. 使用嵌套列表推导式2. 使用itert

使用Docker构建Python Flask程序的详细教程

《使用Docker构建PythonFlask程序的详细教程》在当今的软件开发领域,容器化技术正变得越来越流行,而Docker无疑是其中的佼佼者,本文我们就来聊聊如何使用Docker构建一个简单的Py... 目录引言一、准备工作二、创建 Flask 应用程序三、创建 dockerfile四、构建 Docker

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

Python使用pip工具实现包自动更新的多种方法

《Python使用pip工具实现包自动更新的多种方法》本文深入探讨了使用Python的pip工具实现包自动更新的各种方法和技术,我们将从基础概念开始,逐步介绍手动更新方法、自动化脚本编写、结合CI/C... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

Conda与Python venv虚拟环境的区别与使用方法详解

《Conda与Pythonvenv虚拟环境的区别与使用方法详解》随着Python社区的成长,虚拟环境的概念和技术也在不断发展,:本文主要介绍Conda与Pythonvenv虚拟环境的区别与使用... 目录前言一、Conda 与 python venv 的核心区别1. Conda 的特点2. Python v