import json
import time
import logging
import paho.mqtt.client as mqtt
import queue
import os
import json
import gzip
import time
import threading
import datetime
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
class SimpleMQTT:
def __init__(self, host="127.0.0.1", port=1883, username=None, password=None):
self.host, self.port = host, port
self.username, self.password = username, password
self.client = mqtt.Client()
if username and password:
self.client.username_pw_set(username, password)
self.connected = False
self.client.on_connect = self._on_connected
self.client.on_disconnect = self._on_disconnected
self.client.on_message = self._on_message
self.subscriptions = {}
self._connect()
def _on_connected(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
logger.info(f"[MQTT] Connected to {self.host}:{self.port}")
for topic in self.subscriptions.keys():
client.subscribe(topic)
logger.info(f"[MQTT] Re-subscribed to topic: {topic}")
else:
logger.warning(f"[MQTT] Connection failed, code={rc}")
def _on_disconnected(self, client, userdata, rc):
self.connected = False
logger.info(f"[MQTT] Disconnected (rc={rc})")
def _on_message(self, client, userdata, msg):
topic = msg.topic
payload = msg.payload.decode("utf-8", errors="ignore")
#logger.info(f"[MQTT] Message received: {topic} => {payload}")
cb = self.subscriptions.get(topic)
if cb:
try:
cb(topic, payload)
except Exception as e:
logger.exception(f"[MQTT] Error in callback for {topic}: {e}")
def _connect(self):
try:
self.client.connect(self.host, self.port, keepalive=60)
self.client.loop_start()
except Exception as e:
logger.error(f"[MQTT] Connect failed: {e}")
self.connected = False
def wait_connected(self, timeout=5):
"""等待连接成功(阻塞)"""
start = time.time()
while not self.connected and time.time() - start < timeout:
time.sleep(0.1)
if not self.connected:
logger.warning(f"[MQTT] Wait connected timeout ({timeout}s)")
return self.connected
def publish(self, payload, topic=None, qos=1, retain=False):
if not self.connected:
logger.warning("[MQTT] Not connected, retrying...")
self._connect()
time.sleep(1)
payload_str = (
json.dumps(payload, ensure_ascii=False)
if isinstance(payload, (dict, list))
else str(payload)
)
if self.connected and topic:
self.client.publish(topic, payload_str, qos=qos, retain=retain)
logger.info(f"[MQTT] Published to {topic}: {payload_str}")
def subscribe(self, topic, callback=None, qos=1):
"""安全订阅:连接成功后再订阅"""
if not self.connected:
logger.info(f"[MQTT] Waiting for connection before subscribing {topic}...")
self.wait_connected(timeout=5)
if self.connected:
self.client.subscribe(topic, qos=qos)
self.subscriptions[topic] = callback
logger.info(f"[MQTT] Subscribed to topic: {topic}")
else:
logger.warning(f"[MQTT] Subscribe failed, still not connected.")
def stop(self):
self.client.loop_stop()
self.client.disconnect()
logger.info("[MQTT] Client stopped.")
data_queue = queue.Queue() #数据存储队列
last_storage_time_lock = threading.Lock()
last_storage_time = "" #数据存储时间
def on_message_callback(topic, payloads):
#logger.info(f"[回调] 收到消息:{topic} => {payloads}")
global last_storage_time,data_queue,last_storage_time_lock
payloads_result = json.loads(payloads)
for payload in payloads_result:
data_time = datetime.datetime.fromtimestamp(payload['time'])
data_time_str = data_time.strftime("%H-%M-00")
with last_storage_time_lock:
if len(last_storage_time) <1 :
last_storage_time = data_time_str
if(last_storage_time !=data_time_str):
#触发数据存储代码
full_file_path = f"c:/stc/zhengdong/{data_time.strftime('%Y-%m-%d')}/{last_storage_time}.json.gz"
Path(full_file_path).parent.mkdir(parents=True, exist_ok=True) #创建父级路径
all_data = []
# 从队列中取出所有数据
while True:
try:
# 设置timeout参数避免无限期阻塞
item = data_queue.get(timeout=0.01)
all_data.append(item)
data_queue.task_done()
except queue.Empty:
break # 队列为空,退出循环
with gzip.open(full_file_path, "wb") as f:
f.write(json.dumps(all_data, ensure_ascii=False).encode('utf-8'))
last_storage_time = data_time_str
data_queue.put(payload)
if __name__ == "__main__":
mqtt_client = SimpleMQTT(host="192.168.1.1", port=2771,username='user',password='123')
recv_topic = "/iios/coco/mqtt-send/session/04f5c05f-db68-4ec4-9368-ed0ef9c60dda/msg"
send_topic = "/iios/coco/mqtt-send/session/86e92aaa-1e6c-4eb3-823e-8a018e88a288/msg"
# 订阅振动传感器采集的消息
mqtt_client.wait_connected(timeout=5)
mqtt_client.subscribe(recv_topic, on_message_callback)
# 这里是推理代码
pass
# 将推理的结果发送给平台。data的key可以自己定义
try:
while True:
data = {"device": "demo01", "value": round(time.time() % 100, 2)}
mqtt_client.publish(data, topic=send_topic)
time.sleep(3)
except KeyboardInterrupt:
mqtt_client.stop()
logger.info("[MQTT] 程序结束。")