78 lines
2.9 KiB
Python
78 lines
2.9 KiB
Python
import json
|
||
import threading
|
||
import time
|
||
from json import JSONDecodeError
|
||
from typing import Callable, Optional
|
||
|
||
import websocket
|
||
|
||
|
||
class ServerException(Exception):
|
||
pass
|
||
|
||
|
||
class Device:
|
||
def __init__(self, device_id: str, device_key: str,
|
||
on_msg_callback: Optional[Callable[[str, any], None]] = lambda point, value : print(point, value),
|
||
on_error_callback: Optional[Callable[[Exception], None]] = lambda error: print(error),
|
||
base_url: str = "wss://api.makergen.cn"):
|
||
"""
|
||
创建一个设备对象。需要调用connect方法才能真正连接。
|
||
:param device_id: 设备ID,在控制台设备管理中显示
|
||
:param device_key: Device Key,在控制台设备管理中显示
|
||
:param on_msg_callback: 当收到消息后的回调。接收两个参数,第一个参数为控制点,字符串型;第二个参数为控制值,任意类型
|
||
:param on_error_callback: 当遇到错误时的回调。接收一个参数,为错误对象;若遇到服务器或页面发送的消息不合规,传入ServerException对象
|
||
:param base_url: makergen的基础URL,通常为wss://api.makergen.cn
|
||
"""
|
||
url = f"{base_url if base_url[-1] == '/' else base_url + "/"}ws/device/{device_id}?key={device_key}"
|
||
print(url)
|
||
self.ws = websocket.WebSocketApp(
|
||
url,
|
||
on_open=self._on_open,
|
||
on_message=self._on_message,
|
||
on_close=self._on_close,
|
||
on_error=self._on_error,
|
||
)
|
||
self.on_msg_callback = on_msg_callback if on_msg_callback is not None else lambda _, __ : None
|
||
self.on_error_callback = on_error_callback if on_error_callback is not None else lambda _ : None
|
||
|
||
def _on_open(self, ws):
|
||
print("open")
|
||
def _on_close(self, ws, close_status_code, close_msg):
|
||
print("close")
|
||
|
||
def _on_message(self, ws, message):
|
||
try:
|
||
message_str = json.loads(message)["payload"]
|
||
self.on_msg_callback(message_str["point"], message_str["value"])
|
||
except KeyError or JSONDecodeError:
|
||
self.on_error_callback(ServerException("Page or server sent an invalid message"))
|
||
except Exception as e:
|
||
self.on_error_callback(e)
|
||
|
||
def _on_error(self, ws, error):
|
||
self.on_error_callback(error)
|
||
|
||
def send_data(self, point: str, value: str):
|
||
self.ws.send(json.dumps({
|
||
"type": "data",
|
||
"payload": {
|
||
"point": point,
|
||
"value": value
|
||
},
|
||
"timestamp": int(time.time() * 1000)
|
||
}))
|
||
|
||
def connect(self):
|
||
"""
|
||
在新线程中运行客户端循环(推荐)
|
||
"""
|
||
wst = threading.Thread(target=self.ws.run_forever)
|
||
wst.daemon = True
|
||
wst.start()
|
||
|
||
def main_loop(self):
|
||
"""
|
||
在当前线程中运行客户端循环(不推荐)
|
||
"""
|
||
self.ws.run_forever() |