Python中使用matplotlib繪制mqtt數據實時圖像功能
效果圖
mqtt發佈
本代碼中publish
是一個死循環,數據一直往外發送。
import random import time from paho.mqtt import client as mqtt_client import json from datetime import datetime broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt/li" client_id = f'python-mqtt-{random.randint(0, 1000)}' # 隨機生成客戶端id def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client def publish(client): while True: time.sleep(0.01) msg = json.dumps({"MAC": "0123456789", "samplerate": 12, "sampletime": str(datetime.utcnow().strftime('%Y/%m/%d-%H:%M:%S.%f')[:-3]), "battery": 0.5, "acc": [ [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)], ]}) result = client.publish(topic, msg) status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") def run(): client = connect_mqtt() client.loop_start() publish(client) if __name__ == '__main__': run()
mqtt訂閱
from paho.mqtt import client as mqtt_client import time import os broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt/li" def connect_mqtt(client_id): """ MQTT 連接函數。 """ def on_connect(client, userdata, flags, rc): """ 連接回調函數 在客戶端連接後被調用,在該函數中可以依據 rc 來判斷客戶端是否連接成功。 """ if rc == 0: print("Connected to MQTT Broker! return code %d" % rc) else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) # client.username_pw_set('uname', 'upwd') # 鏈接mqtt所需的用戶名和密碼,沒有可不寫 client.on_connect = on_connect client.connect(broker , port) return client def subscribe(client: mqtt_client, a_topic): """ 訂閱消息 """ def on_message(client, userdata, msg): """ 消息回調函數 在客戶端從 MQTT Broker 收到消息後被調用,在該函數中我們將打印出訂閱的 topic 名稱以及接收到的消息內容。 * 這裡可添加自定義數據處理程序 """ print('From topic : %s\n\tmsg : %s' % (msg.topic, msg.payload.decode())) client.subscribe(topic) client.on_message = on_message def run(client_id, topic): client = connect_mqtt(client_id) subscribe(client, topic) client.loop_forever() if __name__ == '__main__': run('test_eartag-003-python-li', 'zk100/gw/#')
matplotlib繪制動態圖
import matplotlib.pyplot as plt import numpy as np count = 100 # 圖中最多數據量 ax = list(range(count)) # 保存圖1數據 ay = [0] * 100 bx = list(range(count)) # 保存圖2數據 by = [0] * 100 num = count # 計數 plt.ion() # 開啟一個畫圖的窗口進入交互模式,用於實時更新數據 plt.rcParams['figure.figsize'] = (10, 10) # 圖像顯示大小 plt.rcParams['font.sans-serif'] = ['SimHei'] # 防止中文標簽亂碼,還有通過導入字體文件的方法 plt.rcParams['axes.unicode_minus'] = False plt.rcParams['lines.linewidth'] = 0.5 # 設置曲線線條寬度 plt.tight_layout() while True: plt.clf() # 清除刷新前的圖表,防止數據量過大消耗內存 plt.suptitle("總標題", fontsize=30) # 添加總標題,並設置文字大小 g1 = np.random.random() # 生成隨機數畫圖 # 圖表1 ax.append(num) # 追加x坐標值 ay.append(g1) # 追加y坐標值 agraphic = plt.subplot(2, 1, 1) agraphic.set_title('子圖表標題1') # 添加子標題 agraphic.set_xlabel('x軸', fontsize=10) # 添加軸標簽 agraphic.set_ylabel('y軸', fontsize=20) plt.plot(ax[-count:], ay[-count:], 'g-') # 等於agraghic.plot(ax,ay,'g-') # 圖表2 bx.append(num) by.append(g1) bgraghic = plt.subplot(2, 1, 2) bgraghic.set_title('子圖表標題2') bgraghic.plot(bx[-count:], by[-count:], 'r^') plt.pause(0.001) # 設置暫停時間,太快圖表無法正常顯示 num = num + 1
matplotlib繪制mqtt數據實時圖像
- 單線程
先啟動mqtt訂閱服務
mqtt訂閱中有阻塞,更新數據後因訂閱服務沒有結束,導致繪圖程序無法繪圖
先啟動繪圖程序
繪圖程序本身也是個循環,拿不到mqtt的實時數據,圖像無法更新
- 兩個服務加入協程,也不行。具體原因還不知道,容後補充。
- mqtt作為線程啟動,可解決上述問題
import json import random from paho.mqtt import client as mqtt_client import time import datetime from math import ceil, floor import matplotlib.pyplot as plt import _thread # 公共變量 broker = 'broker.emqx.io' topic = "/python/mqtt/li" port = 1883 client_id = f'python-mqtt-li-{random.randint(0, 100)}' show_num = 300 x_num = [-1] # 計數 acc1 = [] acc2 = [] acc3 = [] acc4 = [] acc5 = [] acc6 = [] stime = [] """mqtt subscribe topic""" def str_microsecond_datetime2int_13timestamp(str_microsecond_datetime): """將字符串型【毫秒級】格式化時間 轉為 【13位】整型時間戳""" datetime_obj = datetime.datetime.strptime(str_microsecond_datetime, "%Y/%m/%d-%H:%M:%S.%f") obj_stamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) / 1000.0 return obj_stamp def int2datetime(int_float_timestamp): """ 有小數點:分離小數點,整數轉為格式化時間,小數點直接跟在後面 無小數點:從第10位進行分離, 所以本函數隻適用於時間戳整數位數大於9且小於11. """ if '.' in str(int_float_timestamp): int_float = str(int_float_timestamp).split('.') date = time.localtime(int(int_float[0])) tempDate = time.strftime("%Y/%m/%d-%H:%M:%S", date) secondafter = '.' + str(int_float[1]) return str(tempDate) + secondafter def parse_mqttmsg(msg): """解析mqt頭數據 MAC samplerate sampletime battery acc""" content = json.loads(msg.payload.decode()) span = 1000 / content['samplerate'] * 10 time_span = [ceil(span) / 10 / 1000, floor(span) / 10 / 1000] sampletime = content['sampletime'] sampletime_int = str_microsecond_datetime2int_13timestamp(sampletime) acc = content['acc'] for i in range(len(acc)): x_num.append(x_num[-1] + 1) acc1.append(acc[i][0]) acc2.append(acc[i][1]) acc3.append(acc[i][2]) acc4.append(acc[i][3]) acc5.append(acc[i][4]) acc6.append(acc[i][5]) if i != 0: sampletime_int += time_span[i % 2] stime.append(int2datetime(round(sampletime_int * 1000, 0) / 1000)) else: stime.append(sampletime) print(x_num[-1], stime[-1], acc1[-1], acc2[-1], acc3[-1], acc4[-1], acc5[-1], acc6[-1]) def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) pass client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client def subscribe(client: mqtt_client): def on_message(client, userdata, msg): # print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") parse_mqttmsg(msg) client.subscribe(topic) client.on_message = on_message def run(): client = connect_mqtt() subscribe(client) client.loop_forever() """ draw figures """ def draw_figure(): plt.ion() # 開啟一個畫圖的窗口進入交互模式,用於實時更新數據 plt.rcParams['figure.figsize'] = (10, 10) # 圖像顯示大小 plt.rcParams['font.sans-serif'] = ['SimHei'] # 防止中文標簽亂碼,還有通過導入字體文件的方法 plt.rcParams['axes.unicode_minus'] = False plt.rcParams['lines.linewidth'] = 0.5 # 設置曲線線條寬度 count = 0 while True: plt.clf() # 清除刷新前的圖表,防止數據量過大消耗內存 plt.suptitle("總標題", fontsize=30) # 添加總標題,並設置文字大小 plt.tight_layout() # 圖表1 agraphic = plt.subplot(2, 1, 1) agraphic.set_title('子圖表標題1') # 添加子標題 agraphic.set_xlabel('x軸', fontsize=10) # 添加軸標簽 agraphic.set_ylabel('y軸', fontsize=20) plt.plot(x_num[1:][-show_num:], acc1[-show_num:], 'g-') try: xtricks = list(range(len(acc1) - show_num, len(acc1), 10)) # **1** xlabels = [stime[i] for i in xtricks] # **2** plt.xticks(xtricks, xlabels, rotation=15) except: pass # 圖表2 bgraghic = plt.subplot(2, 1, 2) bgraghic.set_title('子圖表標題2') bgraghic.set_xlabel('x軸', fontsize=10) # 添加軸標簽 bgraghic.set_ylabel('y軸', fontsize=20) bgraghic.plot(x_num[1:][-show_num:], acc2[-show_num:], 'r^') plt.pause(0.001) # 設置暫停時間,太快圖表無法正常顯示 count = count + 1 if __name__ == '__main__': # 多線程 _thread.start_new_thread(run, ()) draw_figure()
到此這篇關於Python中使用matplotlib繪制mqtt數據實時圖像的文章就介紹到這瞭,更多相關matplotlib繪制mqtt數據實時圖像內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 在Django中使用MQTT的方法
- 使用 Node-RED對 MQTT 數據流處理
- springboot+rabbitmq實現智能傢居實例詳解
- 前端與RabbitMQ實時消息推送未讀消息小紅點實現示例
- 消息中間件詳解以及比較選擇