import json import threading import time from json import JSONDecodeError from typing import Callable, Optional import websocket class ServerException(Exception): pass class Device: def __init__(self, device_id: str, device_key: str, on_msg_callback: Optional[Callable[[str, any], None]] = lambda point, value : print(point, value), on_error_callback: Optional[Callable[[Exception], None]] = lambda error: print(error), base_url: str = "wss://api.makergen.cn"): """ 创建一个设备对象。需要调用connect方法才能真正连接。 :param device_id: 设备ID,在控制台设备管理中显示 :param device_key: Device Key,在控制台设备管理中显示 :param on_msg_callback: 当收到消息后的回调。接收两个参数,第一个参数为控制点,字符串型;第二个参数为控制值,任意类型 :param on_error_callback: 当遇到错误时的回调。接收一个参数,为错误对象;若遇到服务器或页面发送的消息不合规,传入ServerException对象 :param base_url: makergen的基础URL,通常为wss://api.makergen.cn """ url = f"{base_url if base_url[-1] == '/' else base_url + '/'}ws/device/{device_id}?key={device_key}" print(url) self.ws = websocket.WebSocketApp( url, on_open=self._on_open, on_message=self._on_message, on_close=self._on_close, on_error=self._on_error, ) self.on_msg_callback = on_msg_callback if on_msg_callback is not None else lambda _, __ : None self.on_error_callback = on_error_callback if on_error_callback is not None else lambda _ : None self.is_connected = False self._connect_event = threading.Event() def _on_open(self, ws): self.is_connected = True self._connect_event.set() def _on_close(self, ws, close_status_code, close_msg): self.is_connected = False self._connect_event.clear() def _on_message(self, ws, message): try: message_str = json.loads(message)["payload"] self.on_msg_callback(message_str["point"], message_str["value"]) except (KeyError, JSONDecodeError): self.on_error_callback(ServerException("Page or server sent an invalid message")) except Exception as e: self.on_error_callback(e) def _on_error(self, ws, error): self.on_error_callback(error) def send_data(self, point: str, value: str): self.ws.send(json.dumps({ "type": "data", "payload": { "point": point, "value": value }, "timestamp": int(time.time() * 1000) })) def connect(self, timeout: float = 5.0): """ 在新线程中运行客户端循环(推荐) """ self._connect_event.clear() wst = threading.Thread(target=self.ws.run_forever) wst.daemon = True wst.start() start_time = time.time() if not self._connect_event.wait(timeout=timeout): self.on_error_callback(TimeoutError(f"WebSocket connection timed out after {timeout} seconds")) def main_loop(self): """ 在当前线程中运行客户端循环(不推荐) """ self.ws.run_forever()