Python websocket 模块 WebSocketApp 长连接方法新老版本不兼容

本文主要是介绍Python websocket 模块 WebSocketApp 长连接方法新老版本不兼容,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

____tz_zs

websocket 库在 0.48.0 版本后对回调进行了修改。
新版本中,当我们将一个实例对象的方法作为 WebSocketApp 的回调时,WebSocketApp 将不再会返回他自己作为回调的第一个参数。

普通方法作为 WebSocketApp 回调

以下为官方示例的长连接用法 Long-lived connection,此种方式在新老版本中均能正常使用。

# -*- coding:utf-8 -*-"""
@author:    tz_zs
"""import websockettry:import thread
except ImportError:import _thread as thread
import timeurl = "ws://echo.websocket.org/"def on_message(ws, message):print("####### on_message #######")print(ws)print(message)def on_error(ws, error):print("####### on_error #######")print(ws)print(error)def on_close(ws):print("####### on_close #######")print(ws)print("####### closed #######")def on_open(ws):print("####### on_open #######")def run(*args):for i in range(3):time.sleep(1)ws.send("Hello %d" % i)time.sleep(1)ws.close()print("thread terminating...")thread.start_new_thread(run, ())if __name__ == '__main__':ws = websocket.WebSocketApp(url,on_message=on_message,on_error=on_error,on_close=on_close)ws.on_open = on_openws.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118)"""
####### on_open #######
####### on_message #######
<websocket._app.WebSocketApp object at 0x7f8ffe73eb38>
Hello 0
####### on_message #######
<websocket._app.WebSocketApp object at 0x7f8ffe73eb38>
Hello 1
####### on_message #######
<websocket._app.WebSocketApp object at 0x7f8ffe73eb38>
Hello 2
####### on_close #######
<websocket._app.WebSocketApp object at 0x7f8ffe73eb38>
thread terminating...
####### closed #######
"""

对象的方法作为 WebSocketApp 回调

注意,下方代码中的 Test 不是 WebSocketApp 的子类。
版本 0.48.0 之前能如下方式使用,这种方式比较灵活,

# -*- coding:utf-8 -*-"""
@author:    tz_zs
"""import websockettry:import thread
except ImportError:import _thread as thread
import timeclass Test(object):def __init__(self):self.url = "ws://echo.websocket.org/"def on_message(self, ws, message):print("on_message")print(self)print(ws)print(message)def on_error(self, ws, error):print("on_error")print(self)print(ws)print(error)def on_close(self, ws):print("on_close")print(self)print(ws)print("### closed ###")def on_open(self, ws):def run(*args):for i in range(3):time.sleep(1)ws.send("Hello %d" % i)time.sleep(1)ws.close()print("thread terminating...")thread.start_new_thread(run, ())def start(self):ws = websocket.WebSocketApp(self.url,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close)ws.on_open = self.on_openws.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118)if __name__ == '__main__':Test().start()"""
on_message
<__main__.Test object at 0x7ffa6409e908>
<websocket._app.WebSocketApp object at 0x7ffa6409eb70>
Hello 0
on_message
<__main__.Test object at 0x7ffa6409e908>
<websocket._app.WebSocketApp object at 0x7ffa6409eb70>
Hello 1
on_message
<__main__.Test object at 0x7ffa6409e908>
<websocket._app.WebSocketApp object at 0x7ffa6409eb70>
Hello 2
on_close
thread terminating...
<__main__.Test object at 0x7ffa6409e908>
<websocket._app.WebSocketApp object at 0x7ffa6409eb70>
### closed ###
"""

但当升级为新版本后(0.48.0 版之后),这种方式不再兼容,具体原因:
新版本中,当我们将一个实例对象的方法作为 WebSocketApp 的回调时,WebSocketApp 将不再会返回他自己作为回调的第一个参数。

如果设置 log 等级为 DEBUG,可看到以下信息

# -*- coding:utf-8 -*-"""
@author:    tz_zs
"""from websocket import WebSocketApptry:import thread
except ImportError:import _thread as thread
import time
import logging
import syslogging.basicConfig(level=logging.DEBUG,format='asctime:        %(asctime)s \n'  # 时间'filename_line:  %(filename)s_[line:%(lineno)d] \n'  # 文件名_行号'level:          %(levelname)s \n'  # log级别'message:        %(message)s \n',  # log信息datefmt='%a, %d %b %Y %H:%M:%S',stream=sys.stdout,filemode='w')class Test(object):def __init__(self):self.url = "ws://echo.websocket.org/"def on_message(self, ws, message):print("on_message")print(self)print(ws)print(message)def on_error(self, ws, error):print("on_error")print(self)print(ws)print(error)def on_close(self, ws):print("on_close")print(self)print(ws)print("### closed ###")def on_open(self, ws):def run(*args):for i in range(3):time.sleep(1)ws.send("Hello %d" % i)time.sleep(1)ws.close()print("thread terminating...")thread.start_new_thread(run, ())def start(self):ws = WebSocketApp(self.url,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close)ws.on_open = self.on_openws.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118)if __name__ == '__main__':Test().start()"""
asctime:        Thu, 11 Jul 2019 15:22:16 
filename_line:  _logging.py_[line:69] 
level:          DEBUG 
message:        Connecting proxy... File "/usr/local/lib/python3.5/dist-packages/websocket/_app.py", line 343, in _callbackcallback(*args)
asctime:        Thu, 11 Jul 2019 15:22:17 
filename_line:  _logging.py_[line:61] 
level:          ERROR 
message:        error from callback <bound method Test.on_open of <__main__.Test object at 0x7fb68a72ec88>>: on_open() missing 1 required positional argument: 'ws' asctime:        Thu, 11 Jul 2019 15:23:00 
filename_line:  _logging.py_[line:61] 
level:          ERROR 
message:        error from callback <bound method Test.on_error of <__main__.Test object at 0x7fb68a72ec88>>: on_error() missing 1 required positional argument: 'error' File "/usr/local/lib/python3.5/dist-packages/websocket/_app.py", line 343, in _callbackcallback(*args)
asctime:        Thu, 11 Jul 2019 15:23:00 
filename_line:  _logging.py_[line:61] 
level:          ERROR 
message:        error from callback <bound method Test.on_close of <__main__.Test object at 0x7fb68a72ec88>>: on_close() missing 1 required positional argument: 'ws' File "/usr/local/lib/python3.5/dist-packages/websocket/_app.py", line 343, in _callbackcallback(*args)
"""

对于新版本,我们可以采用以下几种方法

新版 对象的方法作为 WebSocketApp 回调

因为新版库不再返回 WebSocketApp 本身,所以参数不再包括 ws,我们保存 WebSocketApp 对象作为实例的一个参数 self.ws,如此,仍可在类中的任意位置使用。

# -*- coding:utf-8 -*-"""
@author:    tz_zs
"""import websocket
from websocket import WebSocketApptry:import thread
except ImportError:import _thread as thread
import timeclass Test(object):def __init__(self):super(Test, self).__init__()self.url = "ws://echo.websocket.org/"self.ws = Nonedef on_message(self, message):print("####### on_message #######")print(self)print(message)def on_error(self, error):print("####### on_error #######")print(self)print(error)def on_close(self):print("####### on_close #######")print(self)print("####### closed #######")def on_open(self):print(self)def run(*args):for i in range(3):time.sleep(1)self.ws.send("Hello %d" % i)time.sleep(1)self.ws.close()print("thread terminating...")thread.start_new_thread(run, ())def start(self):self.ws = WebSocketApp(self.url,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close)self.ws.on_open = self.on_openself.ws.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118)if __name__ == '__main__':Test().start()"""
<__main__.Test object at 0x7fb4e855cb70>
####### on_message #######
<__main__.Test object at 0x7fb4e855cb70>
Hello 0
####### on_message #######
<__main__.Test object at 0x7fb4e855cb70>
Hello 1
####### on_message #######
<__main__.Test object at 0x7fb4e855cb70>
Hello 2
thread terminating...
####### on_close #######
<__main__.Test object at 0x7fb4e855cb70>
####### closed #######
"""

静态方法作为 WebSocketApp 回调

缺点是回调方法中无法获得 self

# -*- coding:utf-8 -*-"""
@author:    tz_zs
"""import websocket
from websocket import WebSocketApptry:import thread
except ImportError:import _thread as thread
import timeclass Test(object):def __init__(self):super(Test, self).__init__()self.url = "ws://echo.websocket.org/"self.ws = None@staticmethoddef on_message(ws, message):print("####### on_message #######")print(ws)print(message)@staticmethoddef on_error(ws, error):print("####### on_error #######")print(ws)print(error)@staticmethoddef on_close(ws):print("####### on_close #######")print(ws)print("####### closed #######")@staticmethoddef on_open(ws):print(ws)def run(*args):for i in range(3):time.sleep(1)ws.send("Hello %d" % i)time.sleep(1)ws.close()print("thread terminating...")thread.start_new_thread(run, ())def start(self):self.ws = WebSocketApp(self.url,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close)self.ws.on_open = self.on_openself.ws.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118)if __name__ == '__main__':Test().start()"""
<websocket._app.WebSocketApp object at 0x7f05f2a580f0>
####### on_message #######
<websocket._app.WebSocketApp object at 0x7f05f2a580f0>
Hello 0
####### on_message #######
<websocket._app.WebSocketApp object at 0x7f05f2a580f0>
Hello 1
####### on_message #######
<websocket._app.WebSocketApp object at 0x7f05f2a580f0>
Hello 2
####### on_close #######
<websocket._app.WebSocketApp object at 0x7f05f2a580f0>
####### closed #######
"""

子类方式

class 继承 WebSocketApp,作为其子类。但这种方法不够灵活。

# -*- coding:utf-8 -*-"""
@author:    tz_zs
"""import websocket
from websocket import WebSocketApptry:import thread
except ImportError:import _thread as thread
import timeclass Test(WebSocketApp):def __init__(self):self.url = "ws://echo.websocket.org/"super(Test, self).__init__(url=self.url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)def on_message(self, message):print("####### on_message #######")print(self)print(message)def on_error(self, error):print("####### on_error #######")print(self)print(error)def on_close(self):print("####### on_close #######")print(self)print("####### closed #######")def on_open(self):print(self)def run(*args):for i in range(3):time.sleep(1)self.send("Hello %d" % i)time.sleep(1)self.close()print("thread terminating...")thread.start_new_thread(run, ())# def start(self):#     ws = websocket.WebSocketApp(self.url,#                                 on_message=self.on_message,#                                 on_error=self.on_error,#                                 on_close=self.on_close)#     ws.on_open = self.on_open#     ws.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118)if __name__ == '__main__':obj = Test()obj.run_forever(http_proxy_host="127.0.0.1", http_proxy_port=8118)"""
<__main__.Test object at 0x7f8a3d7e2908>
####### on_message #######
<__main__.Test object at 0x7f8a3d7e2908>
Hello 0
####### on_message #######
<__main__.Test object at 0x7f8a3d7e2908>
Hello 1
####### on_message #######
<__main__.Test object at 0x7f8a3d7e2908>
Hello 2
####### on_close #######
<__main__.Test object at 0x7f8a3d7e2908>
####### closed #######
"""

其他方法:版本回退到 0.48.0

从网站下载低版本
https://launchpad.net/ubuntu/+source/websocket-client/0.48.0-1
解压提取,使用以下命令安装
sudo python3 setup.py install

附:新老版本源码

版本 websocket-client 0.44.0

"""
websocket - WebSocket client library for PythonCopyright (C) 2010 Hiroki Ohtani(liris)This library is free software; you can redistribute it and/ormodify it under the terms of the GNU Lesser General PublicLicense as published by the Free Software Foundation; eitherversion 2.1 of the License, or (at your option) any later version.This library is distributed in the hope that it will be useful,but WITHOUT ANY WARRANTY; without even the implied warranty ofMERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNULesser General Public License for more details.You should have received a copy of the GNU Lesser General PublicLicense along with this library; if not, write to the Free SoftwareFoundation, Inc., 51 Franklin Street, Fifth Floor,Boston, MA  02110-1335  USA""""""
WebSocketApp provides higher level APIs.
"""
import select
import sys
import threading
import time
import tracebackimport sixfrom ._abnf import ABNF
from ._core import WebSocket, getdefaulttimeout
from ._exceptions import *
from . import _logging__all__ = ["WebSocketApp"]class WebSocketApp(object):"""Higher level of APIs are provided.The interface is like JavaScript WebSocket object."""def __init__(self, url, header=None,on_open=None, on_message=None, on_error=None,on_close=None, on_ping=None, on_pong=None,on_cont_message=None,keep_running=True, get_mask_key=None, cookie=None,subprotocols=None,on_data=None):"""url: websocket url.header: custom header for websocket handshake.on_open: callable object which is called at opening websocket.this function has one argument. The argument is this class object.on_message: callable object which is called when received data.on_message has 2 arguments.The 1st argument is this class object.The 2nd argument is utf-8 string which we get from the server.on_error: callable object which is called when we get error.on_error has 2 arguments.The 1st argument is this class object.The 2nd argument is exception object.on_close: callable object which is called when closed the connection.this function has one argument. The argument is this class object.on_cont_message: callback object which is called when receive continuedframe data.on_cont_message has 3 arguments.The 1st argument is this class object.The 2nd argument is utf-8 string which we get from the server.The 3rd argument is continue flag. if 0, the data continueto next frame dataon_data: callback object which is called when a message received.This is called before on_message or on_cont_message,and then on_message or on_cont_message is called.on_data has 4 argument.The 1st argument is this class object.The 2nd argument is utf-8 string which we get from the server.The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.The 4th argument is continue flag. if 0, the data continuekeep_running: a boolean flag indicating whether the app's main loopshould keep running, defaults to Trueget_mask_key: a callable to produce new mask keys,see the WebSocket.set_mask_key's docstring for more informationsubprotocols: array of available sub protocols. default is None."""self.url = urlself.header = header if header is not None else []self.cookie = cookieself.on_open = on_openself.on_message = on_messageself.on_data = on_dataself.on_error = on_errorself.on_close = on_closeself.on_ping = on_pingself.on_pong = on_pongself.on_cont_message = on_cont_messageself.keep_running = keep_runningself.get_mask_key = get_mask_keyself.sock = Noneself.last_ping_tm = 0self.last_pong_tm = 0self.subprotocols = subprotocolsdef send(self, data, opcode=ABNF.OPCODE_TEXT):"""send message.data: message to send. If you set opcode to OPCODE_TEXT,data must be utf-8 string or unicode.opcode: operation code of data. default is OPCODE_TEXT."""if not self.sock or self.sock.send(data, opcode) == 0:raise WebSocketConnectionClosedException("Connection is already closed.")def close(self, **kwargs):"""close websocket connection."""self.keep_running = Falseif self.sock:self.sock.close(**kwargs)def _send_ping(self, interval, event):while not event.wait(interval):self.last_ping_tm = time.time()if self.sock:try:self.sock.ping()except Exception as ex:_logging.warning("send_ping routine terminated: {}".format(ex))breakdef run_forever(self, sockopt=None, sslopt=None,ping_interval=0, ping_timeout=None,http_proxy_host=None, http_proxy_port=None,http_no_proxy=None, http_proxy_auth=None,skip_utf8_validation=False,host=None, origin=None):"""run event loop for WebSocket framework.This loop is infinite loop and is alive during websocket is available.sockopt: values for socket.setsockopt.sockopt must be tupleand each element is argument of sock.setsockopt.sslopt: ssl socket optional dict.ping_interval: automatically send "ping" commandevery specified period(second)if set to 0, not send automatically.ping_timeout: timeout(second) if the pong message is not received.http_proxy_host: http proxy host name.http_proxy_port: http proxy port. If not set, set to 80.http_no_proxy: host names, which doesn't use proxy.skip_utf8_validation: skip utf8 validation.host: update host header.origin: update origin header."""if not ping_timeout or ping_timeout <= 0:ping_timeout = Noneif ping_timeout and ping_interval and ping_interval <= ping_timeout:raise WebSocketException("Ensure ping_interval > ping_timeout")if sockopt is None:sockopt = []if sslopt is None:sslopt = {}if self.sock:raise WebSocketException("socket is already opened")thread = Noneclose_frame = Nonetry:self.sock = WebSocket(self.get_mask_key, sockopt=sockopt, sslopt=sslopt,fire_cont_frame=self.on_cont_message and True or False,skip_utf8_validation=skip_utf8_validation)self.sock.settimeout(getdefaulttimeout())self.sock.connect(self.url, header=self.header, cookie=self.cookie,http_proxy_host=http_proxy_host,http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols,host=host, origin=origin)self._callback(self.on_open)if ping_interval:event = threading.Event()thread = threading.Thread(target=self._send_ping, args=(ping_interval, event))thread.setDaemon(True)thread.start()while self.sock.connected:r, w, e = select.select((self.sock.sock, ), (), (), ping_timeout or 10) # Use a 10 second timeout to avoid to wait forever on closeif not self.keep_running:breakif r:op_code, frame = self.sock.recv_data_frame(True)if op_code == ABNF.OPCODE_CLOSE:close_frame = framebreakelif op_code == ABNF.OPCODE_PING:self._callback(self.on_ping, frame.data)elif op_code == ABNF.OPCODE_PONG:self.last_pong_tm = time.time()self._callback(self.on_pong, frame.data)elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:self._callback(self.on_data, data,frame.opcode, frame.fin)self._callback(self.on_cont_message,frame.data, frame.fin)else:data = frame.dataif six.PY3 and op_code == ABNF.OPCODE_TEXT:data = data.decode("utf-8")self._callback(self.on_data, data, frame.opcode, True)self._callback(self.on_message, data)if ping_timeout and self.last_ping_tm \and time.time() - self.last_ping_tm > ping_timeout \and self.last_ping_tm - self.last_pong_tm > ping_timeout:raise WebSocketTimeoutException("ping/pong timed out")except (Exception, KeyboardInterrupt, SystemExit) as e:self._callback(self.on_error, e)if isinstance(e, SystemExit):# propagate SystemExit furtherraisefinally:if thread and thread.isAlive():event.set()thread.join()self.keep_running = Falseself.sock.close()close_args = self._get_close_args(close_frame.data if close_frame else None)self._callback(self.on_close, *close_args)self.sock = Nonedef _get_close_args(self, data):""" this functions extracts the code, reason from the close bodyif they exists, and if the self.on_close except three arguments """import inspect# if the on_close callback is "old", just return empty listif sys.version_info < (3, 0):if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:return []else:if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:return []if data and len(data) >= 2:code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2])reason = data[2:].decode('utf-8')return [code, reason]return [None, None]def _callback(self, callback, *args):if callback:try:callback(self, *args)except Exception as e:_logging.error("error from callback {}: {}".format(callback, e))if _logging.isEnabledForDebug():_, _, tb = sys.exc_info()traceback.print_tb(tb)

版本 websocket-client 0.56.0

"""
websocket - WebSocket client library for PythonCopyright (C) 2010 Hiroki Ohtani(liris)This library is free software; you can redistribute it and/ormodify it under the terms of the GNU Lesser General PublicLicense as published by the Free Software Foundation; eitherversion 2.1 of the License, or (at your option) any later version.This library is distributed in the hope that it will be useful,but WITHOUT ANY WARRANTY; without even the implied warranty ofMERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNULesser General Public License for more details.You should have received a copy of the GNU Lesser General PublicLicense along with this library; if not, write to the Free SoftwareFoundation, Inc., 51 Franklin Street, Fifth Floor,Boston, MA  02110-1335  USA""""""
WebSocketApp provides higher level APIs.
"""
import inspect
import select
import sys
import threading
import time
import tracebackimport sixfrom ._abnf import ABNF
from ._core import WebSocket, getdefaulttimeout
from ._exceptions import *
from . import _logging__all__ = ["WebSocketApp"]class Dispatcher:def __init__(self, app, ping_timeout):self.app  = appself.ping_timeout = ping_timeoutdef read(self, sock, read_callback, check_callback):while self.app.sock.connected:r, w, e = select.select((self.app.sock.sock, ), (), (), self.ping_timeout)if r:if not read_callback():breakcheck_callback()class SSLDispacther:def __init__(self, app, ping_timeout):self.app  = appself.ping_timeout = ping_timeoutdef read(self, sock, read_callback, check_callback):while self.app.sock.connected:r = self.select()if r:if not read_callback():breakcheck_callback()def select(self):sock = self.app.sock.sockif sock.pending():return [sock,]r, w, e = select.select((sock, ), (), (), self.ping_timeout)return rclass WebSocketApp(object):"""Higher level of APIs are provided.The interface is like JavaScript WebSocket object."""def __init__(self, url, header=None,on_open=None, on_message=None, on_error=None,on_close=None, on_ping=None, on_pong=None,on_cont_message=None,keep_running=True, get_mask_key=None, cookie=None,subprotocols=None,on_data=None):"""url: websocket url.header: custom header for websocket handshake.on_open: callable object which is called at opening websocket.this function has one argument. The argument is this class object.on_message: callable object which is called when received data.on_message has 2 arguments.The 1st argument is this class object.The 2nd argument is utf-8 string which we get from the server.on_error: callable object which is called when we get error.on_error has 2 arguments.The 1st argument is this class object.The 2nd argument is exception object.on_close: callable object which is called when closed the connection.this function has one argument. The argument is this class object.on_cont_message: callback object which is called when receive continuedframe data.on_cont_message has 3 arguments.The 1st argument is this class object.The 2nd argument is utf-8 string which we get from the server.The 3rd argument is continue flag. if 0, the data continueto next frame dataon_data: callback object which is called when a message received.This is called before on_message or on_cont_message,and then on_message or on_cont_message is called.on_data has 4 argument.The 1st argument is this class object.The 2nd argument is utf-8 string which we get from the server.The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.The 4th argument is continue flag. if 0, the data continuekeep_running: this parameter is obsolete and ignored.get_mask_key: a callable to produce new mask keys,see the WebSocket.set_mask_key's docstring for more informationsubprotocols: array of available sub protocols. default is None."""self.url = urlself.header = header if header is not None else []self.cookie = cookieself.on_open = on_openself.on_message = on_messageself.on_data = on_dataself.on_error = on_errorself.on_close = on_closeself.on_ping = on_pingself.on_pong = on_pongself.on_cont_message = on_cont_messageself.keep_running = Falseself.get_mask_key = get_mask_keyself.sock = Noneself.last_ping_tm = 0self.last_pong_tm = 0self.subprotocols = subprotocolsdef send(self, data, opcode=ABNF.OPCODE_TEXT):"""send message.data: message to send. If you set opcode to OPCODE_TEXT,data must be utf-8 string or unicode.opcode: operation code of data. default is OPCODE_TEXT."""if not self.sock or self.sock.send(data, opcode) == 0:raise WebSocketConnectionClosedException("Connection is already closed.")def close(self, **kwargs):"""close websocket connection."""self.keep_running = Falseif self.sock:self.sock.close(**kwargs)self.sock = Nonedef _send_ping(self, interval, event):while not event.wait(interval):self.last_ping_tm = time.time()if self.sock:try:self.sock.ping()except Exception as ex:_logging.warning("send_ping routine terminated: {}".format(ex))breakdef run_forever(self, sockopt=None, sslopt=None,ping_interval=0, ping_timeout=None,http_proxy_host=None, http_proxy_port=None,http_no_proxy=None, http_proxy_auth=None,skip_utf8_validation=False,host=None, origin=None, dispatcher=None,suppress_origin = False, proxy_type=None):"""run event loop for WebSocket framework.This loop is infinite loop and is alive during websocket is available.sockopt: values for socket.setsockopt.sockopt must be tupleand each element is argument of sock.setsockopt.sslopt: ssl socket optional dict.ping_interval: automatically send "ping" commandevery specified period(second)if set to 0, not send automatically.ping_timeout: timeout(second) if the pong message is not received.http_proxy_host: http proxy host name.http_proxy_port: http proxy port. If not set, set to 80.http_no_proxy: host names, which doesn't use proxy.skip_utf8_validation: skip utf8 validation.host: update host header.origin: update origin header.dispatcher: customize reading data from socket.suppress_origin: suppress outputting origin header.Returns-------False if caught KeyboardInterruptTrue if other exception was raised during a loop"""if ping_timeout is not None and ping_timeout <= 0:ping_timeout = Noneif ping_timeout and ping_interval and ping_interval <= ping_timeout:raise WebSocketException("Ensure ping_interval > ping_timeout")if not sockopt:sockopt = []if not sslopt:sslopt = {}if self.sock:raise WebSocketException("socket is already opened")thread = Noneself.keep_running = Trueself.last_ping_tm = 0self.last_pong_tm = 0def teardown(close_frame=None):"""Tears down the connection.If close_frame is set, we will invoke the on_close handler with thestatusCode and reason from there."""if thread and thread.isAlive():event.set()thread.join()self.keep_running = Falseif self.sock:self.sock.close()close_args = self._get_close_args(close_frame.data if close_frame else None)self._callback(self.on_close, *close_args)self.sock = Nonetry:self.sock = WebSocket(self.get_mask_key, sockopt=sockopt, sslopt=sslopt,fire_cont_frame=self.on_cont_message is not None,skip_utf8_validation=skip_utf8_validation,enable_multithread=True if ping_interval else False)self.sock.settimeout(getdefaulttimeout())self.sock.connect(self.url, header=self.header, cookie=self.cookie,http_proxy_host=http_proxy_host,http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols,host=host, origin=origin, suppress_origin=suppress_origin,proxy_type=proxy_type)if not dispatcher:dispatcher = self.create_dispatcher(ping_timeout)self._callback(self.on_open)if ping_interval:event = threading.Event()thread = threading.Thread(target=self._send_ping, args=(ping_interval, event))thread.setDaemon(True)thread.start()def read():if not self.keep_running:return teardown()op_code, frame = self.sock.recv_data_frame(True)if op_code == ABNF.OPCODE_CLOSE:return teardown(frame)elif op_code == ABNF.OPCODE_PING:self._callback(self.on_ping, frame.data)elif op_code == ABNF.OPCODE_PONG:self.last_pong_tm = time.time()self._callback(self.on_pong, frame.data)elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:self._callback(self.on_data, frame.data,frame.opcode, frame.fin)self._callback(self.on_cont_message,frame.data, frame.fin)else:data = frame.dataif six.PY3 and op_code == ABNF.OPCODE_TEXT:data = data.decode("utf-8")self._callback(self.on_data, data, frame.opcode, True)self._callback(self.on_message, data)return Truedef check():if (ping_timeout):has_timeout_expired = time.time() - self.last_ping_tm > ping_timeouthas_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeoutif (self.last_ping_tmand has_timeout_expiredand (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):raise WebSocketTimeoutException("ping/pong timed out")return Truedispatcher.read(self.sock.sock, read, check)except (Exception, KeyboardInterrupt, SystemExit) as e:self._callback(self.on_error, e)if isinstance(e, SystemExit):# propagate SystemExit furtherraiseteardown()return not isinstance(e, KeyboardInterrupt)def create_dispatcher(self, ping_timeout):timeout = ping_timeout or 10if self.sock.is_ssl():return SSLDispacther(self, timeout)return Dispatcher(self, timeout)def _get_close_args(self, data):""" this functions extracts the code, reason from the close bodyif they exists, and if the self.on_close except three arguments """# if the on_close callback is "old", just return empty listif sys.version_info < (3, 0):if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:return []else:if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:return []if data and len(data) >= 2:code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2])reason = data[2:].decode('utf-8')return [code, reason]return [None, None]def _callback(self, callback, *args):if callback:try:if inspect.ismethod(callback):callback(*args)else:callback(self, *args)except Exception as e:_logging.error("error from callback {}: {}".format(callback, e))if _logging.isEnabledForDebug():_, _, tb = sys.exc_info()traceback.print_tb(tb)

相关讨论

https://stackoverflow.com/questions/26980966/using-a-websocket-client-as-a-class-in-python
Passing method of non-WebSocketApp object as callback does not receive the WebSocketApp object as an argument
why the function “WebSocketApp run_forever” doesn’t work in linux? But it’s OK in windows.

这篇关于Python websocket 模块 WebSocketApp 长连接方法新老版本不兼容的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/954314

相关文章

MySQL查询JSON数组字段包含特定字符串的方法

《MySQL查询JSON数组字段包含特定字符串的方法》在MySQL数据库中,当某个字段存储的是JSON数组,需要查询数组中包含特定字符串的记录时传统的LIKE语句无法直接使用,下面小编就为大家介绍两种... 目录问题背景解决方案对比1. 精确匹配方案(推荐)2. 模糊匹配方案参数化查询示例使用场景建议性能优

关于集合与数组转换实现方法

《关于集合与数组转换实现方法》:本文主要介绍关于集合与数组转换实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、Arrays.asList()1.1、方法作用1.2、内部实现1.3、修改元素的影响1.4、注意事项2、list.toArray()2.1、方

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

Python中注释使用方法举例详解

《Python中注释使用方法举例详解》在Python编程语言中注释是必不可少的一部分,它有助于提高代码的可读性和维护性,:本文主要介绍Python中注释使用方法的相关资料,需要的朋友可以参考下... 目录一、前言二、什么是注释?示例:三、单行注释语法:以 China编程# 开头,后面的内容为注释内容示例:示例:四

Python中win32包的安装及常见用途介绍

《Python中win32包的安装及常见用途介绍》在Windows环境下,PythonWin32模块通常随Python安装包一起安装,:本文主要介绍Python中win32包的安装及常见用途的相关... 目录前言主要组件安装方法常见用途1. 操作Windows注册表2. 操作Windows服务3. 窗口操作

Python中re模块结合正则表达式的实际应用案例

《Python中re模块结合正则表达式的实际应用案例》Python中的re模块是用于处理正则表达式的强大工具,正则表达式是一种用来匹配字符串的模式,它可以在文本中搜索和匹配特定的字符串模式,这篇文章主... 目录前言re模块常用函数一、查看文本中是否包含 A 或 B 字符串二、替换多个关键词为统一格式三、提

一文详解Git中分支本地和远程删除的方法

《一文详解Git中分支本地和远程删除的方法》在使用Git进行版本控制的过程中,我们会创建多个分支来进行不同功能的开发,这就容易涉及到如何正确地删除本地分支和远程分支,下面我们就来看看相关的实现方法吧... 目录技术背景实现步骤删除本地分支删除远程www.chinasem.cn分支同步删除信息到其他机器示例步骤

python常用的正则表达式及作用

《python常用的正则表达式及作用》正则表达式是处理字符串的强大工具,Python通过re模块提供正则表达式支持,本文给大家介绍python常用的正则表达式及作用详解,感兴趣的朋友跟随小编一起看看吧... 目录python常用正则表达式及作用基本匹配模式常用正则表达式示例常用量词边界匹配分组和捕获常用re

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

python删除xml中的w:ascii属性的步骤

《python删除xml中的w:ascii属性的步骤》使用xml.etree.ElementTree删除WordXML中w:ascii属性,需注册命名空间并定位rFonts元素,通过del操作删除属... 可以使用python的XML.etree.ElementTree模块通过以下步骤删除XML中的w:as