阅山

  • WIN
    • CSharp
    • JAVA
    • OAM
    • DirectX
    • Emgucv
  • UNIX
    • FFmpeg
    • QT
    • Python
    • Opencv
    • Openwrt
    • Twisted
    • Design Patterns
    • Mysql
    • Mycat
    • MariaDB
    • Make
    • OAM
    • Supervisor
    • Nginx
    • KVM
    • Docker
    • OpenStack
  • WEB
    • ASP
    • Node.js
    • PHP
    • Directadmin
    • Openssl
    • Regex
  • APP
    • Android
  • AI
    • Algorithm
    • Deep Learning
    • Machine Learning
  • IOT
    • Device
    • MSP430
  • DIY
    • Algorithm
    • Design Patterns
    • MATH
    • X98 AIR 3G
    • Tucao
    • fun
  • LIFE
    • 美食
    • 关于我
  • LINKS
  • ME
Claves
长风破浪会有时,直挂云帆济沧海
  1. 首页
  2. Programming
  3. Python
  4. 正文

python从mqtt中订阅数据,并存储到本地文件的代码

2025-11-10
import json
import time
import logging
import paho.mqtt.client as mqtt
import queue
import os
import json
import gzip
import time
import threading
import datetime
from pathlib import Path

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)




class SimpleMQTT:
    def __init__(self, host="127.0.0.1", port=1883, username=None, password=None):
        self.host, self.port = host, port
        self.username, self.password = username, password
        self.client = mqtt.Client()


        if username and password:
            self.client.username_pw_set(username, password)

        self.connected = False
        self.client.on_connect = self._on_connected
        self.client.on_disconnect = self._on_disconnected
        self.client.on_message = self._on_message
        self.subscriptions = {}
        self._connect()

    def _on_connected(self, client, userdata, flags, rc):
        if rc == 0:
            self.connected = True
            logger.info(f"[MQTT] Connected to {self.host}:{self.port}")
            for topic in self.subscriptions.keys():
                client.subscribe(topic)
                logger.info(f"[MQTT] Re-subscribed to topic: {topic}")
        else:
            logger.warning(f"[MQTT] Connection failed, code={rc}")

    def _on_disconnected(self, client, userdata, rc):
        self.connected = False
        logger.info(f"[MQTT] Disconnected (rc={rc})")

    def _on_message(self, client, userdata, msg):
        topic = msg.topic
        payload = msg.payload.decode("utf-8", errors="ignore")
        #logger.info(f"[MQTT] Message received: {topic} => {payload}")
        cb = self.subscriptions.get(topic)
        if cb:
            try:
                cb(topic, payload)
            except Exception as e:
                logger.exception(f"[MQTT] Error in callback for {topic}: {e}")

    def _connect(self):
        try:
            self.client.connect(self.host, self.port, keepalive=60)
            self.client.loop_start()
        except Exception as e:
            logger.error(f"[MQTT] Connect failed: {e}")
            self.connected = False

    def wait_connected(self, timeout=5):
        """等待连接成功(阻塞)"""
        start = time.time()
        while not self.connected and time.time() - start < timeout:
            time.sleep(0.1)
        if not self.connected:
            logger.warning(f"[MQTT] Wait connected timeout ({timeout}s)")
        return self.connected

    def publish(self, payload, topic=None, qos=1, retain=False):
        if not self.connected:
            logger.warning("[MQTT] Not connected, retrying...")
            self._connect()
            time.sleep(1)

        payload_str = (
            json.dumps(payload, ensure_ascii=False)
            if isinstance(payload, (dict, list))
            else str(payload)
        )
        if self.connected and topic:
            self.client.publish(topic, payload_str, qos=qos, retain=retain)
            logger.info(f"[MQTT] Published to {topic}: {payload_str}")

    def subscribe(self, topic, callback=None, qos=1):
        """安全订阅:连接成功后再订阅"""
        if not self.connected:
            logger.info(f"[MQTT] Waiting for connection before subscribing {topic}...")
            self.wait_connected(timeout=5)
        if self.connected:
            self.client.subscribe(topic, qos=qos)
            self.subscriptions[topic] = callback
            logger.info(f"[MQTT] Subscribed to topic: {topic}")
        else:
            logger.warning(f"[MQTT] Subscribe failed, still not connected.")

    def stop(self):
        self.client.loop_stop()
        self.client.disconnect()
        logger.info("[MQTT] Client stopped.")

data_queue = queue.Queue()   #数据存储队列
last_storage_time_lock = threading.Lock()
last_storage_time = ""  #数据存储时间

def on_message_callback(topic, payloads):
    #logger.info(f"[回调] 收到消息:{topic} => {payloads}")
    global last_storage_time,data_queue,last_storage_time_lock
    payloads_result = json.loads(payloads)
    for payload in payloads_result:
        data_time = datetime.datetime.fromtimestamp(payload['time'])
        data_time_str = data_time.strftime("%H-%M-00")
        with last_storage_time_lock:
            if len(last_storage_time) <1 :
                last_storage_time = data_time_str
            if(last_storage_time !=data_time_str):
                #触发数据存储代码
                full_file_path = f"c:/stc/zhengdong/{data_time.strftime('%Y-%m-%d')}/{last_storage_time}.json.gz"
                Path(full_file_path).parent.mkdir(parents=True, exist_ok=True) #创建父级路径
                
                all_data = []
                # 从队列中取出所有数据
                while True:
                    try:
                        # 设置timeout参数避免无限期阻塞
                        item = data_queue.get(timeout=0.01) 
                        all_data.append(item)
                        data_queue.task_done()
                    except queue.Empty:
                        break # 队列为空,退出循环
                with gzip.open(full_file_path, "wb") as f:
                    f.write(json.dumps(all_data, ensure_ascii=False).encode('utf-8'))
                last_storage_time = data_time_str
        data_queue.put(payload)



if __name__ == "__main__":
    mqtt_client = SimpleMQTT(host="192.168.1.1", port=2771,username='user',password='123')

    recv_topic = "/iios/coco/mqtt-send/session/04f5c05f-db68-4ec4-9368-ed0ef9c60dda/msg"

    send_topic = "/iios/coco/mqtt-send/session/86e92aaa-1e6c-4eb3-823e-8a018e88a288/msg"

    # 订阅振动传感器采集的消息
    mqtt_client.wait_connected(timeout=5)
    mqtt_client.subscribe(recv_topic, on_message_callback)
    # 这里是推理代码
    pass
    # 将推理的结果发送给平台。data的key可以自己定义
    try:
       while True:
           data = {"device": "demo01", "value": round(time.time() % 100, 2)}
           mqtt_client.publish(data, topic=send_topic)
           time.sleep(3)
    except KeyboardInterrupt:
       mqtt_client.stop()
       logger.info("[MQTT] 程序结束。")
标签: 暂无
最后更新:2025-11-10

代号山岳

知之为知之 不知为不知

点赞
< 上一篇

COPYRIGHT © 2099 登峰造极境. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

蜀ICP备14031139号-5

川公网安备51012202000587号