.NET6環境下實現MQTT通信及詳細代碼演示
前言: MQTT廣泛應用於工業物聯網、智能傢居、各類智能制造或各類自動化場景等。MQTT是一個基於客戶端-服務器的消息發佈/訂閱傳輸協議,在很多受限的環境下,比如說機器與機器通信、機器與物聯網通信等。好瞭,科普的廢話不多說,下面直接通過.NET環境來實現一套MQTT通信demo,實現服務端與客戶端的雙邊消息發佈與訂閱的功能和演示。
開發環境:
VS2022 + .NET 6 + Webapi / 控制臺
1、新建一個webapi項目,用來後面做測試使用
2、新建一個繼承自IHostedService的服務,用於隨著webapi程序的啟動而自動執行。(最終代碼在文末)
3、引入 MQTTNet 包,該項目提供瞭.net環境下的MQTT通信協議支持,這款框架很優秀,此處直接引用它來進行使用。
4、在上面的MqttHostService類裡面,開始方法裡面新增初始化MQTT服務端的一些功能,例如 IP、端口號、事件等等。
5、mqtt服務端支持的一系列功能很多,大佬們可以自行去嘗試一些新發現,此處隻使用若幹個簡單功能。
6、添加客戶端連接事件、連接關閉事件
7、由於事件要用的可能有點多,此處就不一一例舉瞭,可以直接看以下的代碼,以及有關註釋來理解。
8、事件觸發時候,打印輸出
9、輸出之前,記錄一個當前事件名稱標記一下,用於可以更加清楚看出是哪個事件輸出的。
10、對MqttHostService類進行註冊,用於程序啟動時候跟隨啟動。
11、上面貌似設計的不是很友好,所以把mqtt服務實例單獨弄出來,寫入到單獨的類裡面做成屬性,供方便調用。
12、把先前的一些東西改一下,換成使用上面步驟的屬性來直接調用使用。
13、運行一下,看看是否可以成功,顯示服務已啟動,說明服務啟動時OK的瞭.
14、新增一個控制臺程序 MqttClient,用於模擬客戶端。
15、創建客戶端啟動以及有關配置信息和有關事件,如圖。具體使用可以看代碼註釋,就不過多解釋瞭。
16、在program類裡面,調用客戶端啟動方法,用於測試使用。
17、上面客戶端對應的三個事件的實現如圖,同時進行有關信息的打印輸出。
18、啟動服務端,然後啟動客戶端,可以看到服務端有一個連接失敗的消息,這個是因為上面配置的客戶端用戶名是admin,密碼是1234567,而服務端配置的規則是,用戶名是admin 密碼是123456
19、密碼改回正常匹配項以後,再重新運行試試看,可以看到客戶端與服務端連接上瞭。
20、如果關閉客戶端,也可以看到服務端會進入客戶端關閉事件內。
21、把上面主題訂閱的內容寫到連接成功以後的事件裡面,不然客戶端連接期間,可能就執行瞭主題訂閱,會存在訂閱失敗的情況。改為寫入到連接成功以後的事件裡面,可以保證主題訂閱肯定是在客戶端連接成功以後才執行的。
22、接下來測試服務端消息推送,在MqttService服務裡面,新增一個方法,用來執行mqtt服務端發佈主題消息使用。有關配置信息和消息格式,如圖所示。
23、新增一個API控制器,用來測試使用。API參數直接拿來進行消息的推送使用。
24、運行服務端和客戶端,並訪問剛剛新增的api接口,手動隨意輸入一條消息,可以看到客戶端訂閱的主題消息已經被實時接收到瞭。
25、接下來對客戶端新增一個消息推送的方法,用來測試客戶端消息發佈的功能。有關消息格式和調用,如圖所示,以及註釋部分的說明。
26、客戶端program類裡面,客戶端連接以後,通過手動回車,來執行客戶端發佈消息。
27、再次啟動服務端和客戶端
28、然後客戶端內按一下回車,執行消息發佈功能。可以看到,服務端成功接收到瞭客戶端發過來的主題消息。
29、接下來測試客戶端與客戶端之間的消息發佈與訂閱,為瞭模擬多客戶端效果,把上面客戶端已經編譯好的文件拷貝一份出來。
30、然後本地的代碼進行一些修改,用來當做第二個客戶端程序。所以客戶端id也進行變更為 testclient02
31、對客戶端訂閱的主題,也改成 topic_02
32、啟動服務端,以及拷貝出來的客戶端1,和上面修改瞭部分代碼的客戶端2,保證都已經連接上服務端。
33、調用服務端的api接口,由於服務端發佈的消息是發佈給topic_01的,所以隻有客戶端1可以接收到消息。
34、客戶端1執行回車,用於發佈一段消息給主題 topic_02,可以看到客戶端01發佈的消息,同時被服務端和客戶端02接收到瞭。因為服務端是總指揮,所以客戶端發佈的消息都會經過服務端,從而服務端都可以接收到連接的客戶端發佈的所有消息。
35、測試數據保持,下面先對客戶端1進行斷開,然後再重新連接客戶端1,可以看到客戶端1直接接收到瞭它訂閱的主題的上一次最新的消息內容,這個就是消息裡面,Retain屬性設為True的結果,用於讓服務端記憶該主題消息使用的。如果設為false,就沒有這個效果瞭,大佬們也可以自己嘗試。
36、最終的服務端代碼:
MqttHostService:
public class MqttHostService : IHostedService, IDisposable { public void Dispose() { } const string ServerClientId = "SERVER"; public Task StartAsync(CancellationToken cancellationToken) { MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder(); optionsBuilder.WithDefaultEndpoint(); optionsBuilder.WithDefaultEndpointPort(10086); // 設置 服務端 端口號 optionsBuilder.WithConnectionBacklog(1000); // 最大連接數 MqttServerOptions options = optionsBuilder.Build(); MqttService._mqttServer = new MqttFactory().CreateMqttServer(options); MqttService._mqttServer.ClientConnectedAsync += _mqttServer_ClientConnectedAsync; //客戶端連接事件 MqttService._mqttServer.ClientDisconnectedAsync += _mqttServer_ClientDisconnectedAsync; // 客戶端關閉事件 MqttService._mqttServer.ApplicationMessageNotConsumedAsync += _mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件 MqttService._mqttServer.ClientSubscribedTopicAsync += _mqttServer_ClientSubscribedTopicAsync; // 客戶端訂閱主題事件 MqttService._mqttServer.ClientUnsubscribedTopicAsync += _mqttServer_ClientUnsubscribedTopicAsync; // 客戶端取消訂閱事件 MqttService._mqttServer.StartedAsync += _mqttServer_StartedAsync; // 啟動後事件 MqttService._mqttServer.StoppedAsync += _mqttServer_StoppedAsync; // 關閉後事件 MqttService._mqttServer.InterceptingPublishAsync += _mqttServer_InterceptingPublishAsync; // 消息接收事件 MqttService._mqttServer.ValidatingConnectionAsync += _mqttServer_ValidatingConnectionAsync; // 用戶名和密碼驗證有關 MqttService._mqttServer.StartAsync(); return Task.CompletedTask; } /// <summary> /// 客戶端訂閱主題事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg) { Console.WriteLine($"ClientSubscribedTopicAsync:客戶端ID=【{arg.ClientId}】訂閱的主題=【{arg.TopicFilter}】 "); return Task.CompletedTask; } /// <summary> /// 關閉後事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_StoppedAsync(EventArgs arg) { Console.WriteLine($"StoppedAsync:MQTT服務已關閉……"); return Task.CompletedTask; } /// <summary> /// 用戶名和密碼驗證有關 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg) { arg.ReasonCode = MqttConnectReasonCode.Success; if ((arg.Username ?? string.Empty)!="admin" || (arg.Password??String.Empty)!="123456") { arg.ReasonCode = MqttConnectReasonCode.Banned; Console.WriteLine($"ValidatingConnectionAsync:客戶端ID=【{arg.ClientId}】用戶名或密碼驗證錯誤 "); } return Task.CompletedTask; } /// <summary> /// 消息接收事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg) { if (string.Equals(arg.ClientId, ServerClientId)) { return Task.CompletedTask; } Console.WriteLine($"InterceptingPublishAsync:客戶端ID=【{arg.ClientId}】 Topic主題=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】"); return Task.CompletedTask; } /// <summary> /// 啟動後事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_StartedAsync(EventArgs arg) { Console.WriteLine($"StartedAsync:MQTT服務已啟動……"); return Task.CompletedTask; } /// <summary> /// 客戶端取消訂閱事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg) { Console.WriteLine($"ClientUnsubscribedTopicAsync:客戶端ID=【{arg.ClientId}】已取消訂閱的主題=【{arg.TopicFilter}】 "); return Task.CompletedTask; } private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg) { Console.WriteLine($"ApplicationMessageNotConsumedAsync:發送端ID=【{arg.SenderId}】 Topic主題=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】"); return Task.CompletedTask; } /// <summary> /// 客戶端斷開時候觸發 /// </summary> /// <param name="arg"></param> /// <returns></returns> /// <exception cref="NotImplementedException"></exception> private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg) { Console.WriteLine($"ClientDisconnectedAsync:客戶端ID=【{arg.ClientId}】已斷開, 地址=【{arg.Endpoint}】 "); return Task.CompletedTask; } /// <summary> /// 客戶端連接時候觸發 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg) { Console.WriteLine($"ClientConnectedAsync:客戶端ID=【{arg.ClientId}】已連接, 用戶名=【{arg.UserName}】地址=【{arg.Endpoint}】 "); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } }
MqttService:
public class MqttService { public static MqttServer _mqttServer { get; set; } public static void PublishData(string data) { var message = new MqttApplicationMessage { Topic = "topic_01", Payload = Encoding.Default.GetBytes(data), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Retain = true // 服務端是否保留消息。true為保留,如果有新的訂閱者連接,就會立馬收到該消息。 }; _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 發送消息給有訂閱 topic_01的客戶端 { SenderClientId = "Server_01" }).GetAwaiter().GetResult(); } }
37、最終的客戶端代碼:
MqttClientService:
public class MqttClientService { public static IMqttClient _mqttClient; public void MqttClientStart() { var optionsBuilder = new MqttClientOptionsBuilder() .WithTcpServer("127.0.0.1", 10086) // 要訪問的mqtt服務端的 ip 和 端口號 .WithCredentials("admin", "123456") // 要訪問的mqtt服務端的用戶名和密碼 .WithClientId("testclient02") // 設置客戶端id .WithCleanSession() .WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = false // 是否使用 tls加密 }); var clientOptions = optionsBuilder.Build(); _mqttClient = new MqttFactory().CreateMqttClient(); _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 客戶端連接成功事件 _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 客戶端連接關閉事件 _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件 _mqttClient.ConnectAsync(clientOptions); } /// <summary> /// 客戶端連接關閉事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) { Console.WriteLine($"客戶端已斷開與服務端的連接……"); return Task.CompletedTask; } /// <summary> /// 客戶端連接成功事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) { Console.WriteLine($"客戶端已連接服務端……"); // 訂閱消息主題 // MqttQualityOfServiceLevel: (QoS): 0 最多一次,接收者不確認收到消息,並且消息不被發送者存儲和重新發送提供與底層 TCP 協議相同的保證。 // 1: 保證一條消息至少有一次會傳遞給接收方。發送方存儲消息,直到它從接收方收到確認收到消息的數據包。一條消息可以多次發送或傳遞。 // 2: 保證每條消息僅由預期的收件人接收一次。級別2是最安全和最慢的服務質量級別,保證由發送方和接收方之間的至少兩個請求/響應(四次握手)。 _mqttClient.SubscribeAsync("topic_02", MqttQualityOfServiceLevel.AtLeastOnce); return Task.CompletedTask; } /// <summary> /// 收到消息事件 /// </summary> /// <param name="arg"></param> /// <returns></returns> private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { Console.WriteLine($"ApplicationMessageReceivedAsync:客戶端ID=【{arg.ClientId}】接收到消息。 Topic主題=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】"); return Task.CompletedTask; } public void Publish(string data) { var message = new MqttApplicationMessage { Topic = "topic_02", Payload = Encoding.Default.GetBytes(data), QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce, Retain = true // 服務端是否保留消息。true為保留,如果有新的訂閱者連接,就會立馬收到該消息。 }; _mqttClient.PublishAsync(message); } }
38、後記:MQTT以上演示已經完畢,可以看到它的一些特性,跟websocket很接近,但是又比websocket通信更加靈活。其實,實際上MQTT的客戶端在現實生產環境場景下,並不需要咱們開發者進行開發,很多硬件設備都支持提供MQTT協議的通信客戶端,所以隻需要自己搭建一個服務端,就可以實現實時監控各種設備推送過來的各種信號數據。同時客戶端支持發佈消息給其他客戶端,所以就實現瞭設備與設備之間的一對一信號通信的效果瞭。如果需要下發信號給硬件設備,MQTT服務端也可以直接下發給某個指定設備來進行實現即可。上面案例隻提供入門方案,如果有感興趣的大佬,可以自己去拓展一下,來達到更好的效果。
以上就是本文的全部內容,希望對大傢的學習有所幫助,也希望大傢多多支持WalkonNet。
推薦閱讀:
- C#中對象與JSON字符串互相轉換的三種方式
- 使用 Node-RED對 MQTT 數據流處理
- 前端與RabbitMQ實時消息推送未讀消息小紅點實現示例
- 基於ABP框架實現數據字典開發
- 詳解C#中普通緩存的使用