C#中使用async和await實現異步Udp通訊的示例代碼

在之前的C#版本中, 如果我們想要進行異步的Udp, 需要單開線程接收消息, C#7.1開始, 我們可以使用async/await關鍵字來編寫異步代碼, 我們今天一起來探索怎麼實現.

C/S架構

我們要實現兩個app, 一個客戶端和一個服務器, 各自都可以發消息和收消息.
發消息很簡單, 收消息的話需要一直在端口上監聽.

udp相比tcp來說簡單瞭很多, 不需要一直保持連接, 也不需要處理發送回調, 因為udp不可靠, 隻要發瞭就不管, 丟瞭也與我無關. 而且因為不需要保證順序, 所以沒有發送緩存, 隻要請求發送, 立馬就發, 收到的包也不會堆積, 肯定是整包, 所以我們也不需要處理粘包問題.

整個實現的關鍵點有:

  • Sockets.socket: socket類, tcp和udp共用.
  • System.Net.IPEndPoint: 端口類, tcp和udp共用.
  • Sockets.socket.Bind: 綁定本地端口方法, 主要是服務器使用.
  • Sockets.socket.Create: 綁定遠端端口方法, 主要是客戶端使用.
  • Sockets.socket.SendTo: 向指定端口發送數據, 主要是服務器使用.
  • Sockets.socket.ReceiveFrom: 從指定端口接收數據, 主要是服務器使用.
  • Sockets.socket.Send: 從綁定的端口發送數據, 主要是客戶端使用.
  • Sockets.socket.Receive: 從綁定的端口接收數據, 主要是客戶端使用.
  • async 關鍵字: 標識方法為異步方法.
  • await 關鍵字: 標識異步執行方法, 等待返回.
  • System.Threading.Tasks.Task: 異步任務類

客戶端實現

我們先來研究客戶端, 服務器的實現大致相同, 隻是有細微的差別.

客戶端主流程和實現

// 構建socket對象
Socket udpSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);

// 連接遠端口, 用於向遠端發送消息, 這裡是自己的機器同時當服務器和客戶端, 所以都是127...
// 註意這裡的連接隻是將`socket`對象與ip和端口綁定, 不是tcp中的連接概念.
// 內部會分配新的本地端口, 發送給遠端, 供遠端使用
var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8060);
udpSocket.Connect(endPoint);

// 發送消息
SendMessageToServer("客戶端說:Hello Server!");

// 監聽消息
StartRecvMessage();
Console.ReadKey();

客戶端發送消息實現

static void SendMessageToServer(string message)
{
    udpSocket.Send(Encoding.UTF8.GetBytes(message));
}

因為之前已經和遠端口綁定瞭, 所以客戶端可以直接發送消息, 在內部會自動分配一個客戶端自己的本地端口, 服務器端使用這個端口來向本客戶端發送消息, 我們會在服務器實現中看到.

客戶端監聽消息實現

// 從byte中轉換string
static string ConverBytesToString(Decoder decoder, byte[] bytes, int len)
{
    var nchar = decoder.GetCharCount(bytes, 0, len);

    var bytesChar = new char[nchar];
    nchar = decoder.GetChars(bytes, 0, len, bytesChar, 0);

    var result = new string(bytesChar, 0, nchar);
    return result;
}

// 從連接的端口接收消息, 返回讀取到的字節數
static int SocketRecvMessage()
{
    var nread = udpSocket.Receive(buffer);
    return nread;
}

// 開始異步接收消息
static async void StartRecvMessage()
{
    Console.WriteLine("客戶端開始監聽: " + udpSocket.LocalEndPoint);
    var decoder8 = Encoding.UTF8.GetDecoder();

    while (true)
    {
        var nread = await Task.Run<int>(SocketRecvMessage);
        var message = ConverBytesToString(decoder8, buffer, nread);

        Console.WriteLine($"收到來自服務器的消息: {message}");
    }
}

上面的代碼中, 主要的部分是:

async/await/Task.Run<int>(xxx):

  • async:標識方法StartRecvMessage將采用異步方式執行
  • await: 標識要等待的操作, 而這種操作是需要耗時的, 比如socket, io等, 也可以是單純就是要等待多久(Task.Delay(500); // 等待500ms).
  • Task.Run<int>(xxx): 將耗時的操作包裝為異步任務(類似開瞭一個線程來執行該操作).

udpSocket.Receive(buffer): 從連接好的遠端口接收消息, 這是一個阻斷性的操作, 在消息回來之前會停留在這裡不動.

上面的異步還能寫成下面的形式, 隻是將耗時操作推遲到瞭更具體的操作而已:

// 從連接的端口接收消息, 返回讀取到的字節數
static async Task<int> SocketRecvMessage()
{
    var nread = await Task.Run<int>(() => udpSocket.Receive(buffer));
    return nread;
}

// 開始異步接收消息
static async void StartRecvMessage()
{
    Console.WriteLine("客戶端開始監聽: " + udpSocket.LocalEndPoint);
    var decoder8 = Encoding.UTF8.GetDecoder();

    while (true)
    {
        var nread = await SocketRecvMessage();
        var message = ConverBytesToString(decoder8, buffer, nread);

        Console.WriteLine($"收到來自服務器的消息: {message}");
    }
}

我們還能進一步簡化代碼:

// 開始異步接收消息
static async void StartRecvMessage()
{
    Console.WriteLine("客戶端開始監聽: " + udpSocket.LocalEndPoint);
    var decoder8 = Encoding.UTF8.GetDecoder();

    while (true)
    {
        var nread = await Task.Run<int>(() => udpSocket.Receive(buffer));
        var message = ConverBytesToString(decoder8, buffer, nread);

        Console.WriteLine($"收到來自服務器的消息: {message}");
    }
}

服務器實現

服務器和客戶端的實現差別很小.

主要區別在於服務器針對的是很多客戶端, 所以在收發消息上對於端口的處理不一樣.

服務器主流程和實現

// 構建socket對象
Socket udpSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);

// 綁定本地端口, 監聽來自於各個客戶端的消息
var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8060);
udpSocket.Bind(endPoint);

// 監聽消息
StartRecvMessage();
Console.ReadKey();

服務器發送消息實現

// 向指定的客戶端端口發送消息
// 註意這裡和客戶端的實現不一樣, 還是因為服務器會對應多個客戶端, 所以每次發送都需要指明目的地
static void SendMessageToClient(EndPoint endPoint, string message)
{
    udpSocket.SendTo(Encoding.UTF8.GetBytes(message), endPoint);
}

服務器監聽消息實現

static (int, EndPoint) SocketRecvMessage()
{
    EndPoint endPoint = new IPEndPoint(IPAddress.Any, 0);

    var nread = udpSocket.ReceiveFrom(buffer, ref endPoint);
    return (nread, endPoint);
}

static async void StartRecvMessage()
{
    Console.WriteLine("服務器開始監聽: " + udpSocket.LocalEndPoint);
    var decoder8 = Encoding.UTF8.GetDecoder();

    while(true)
    {
        var (nread, endPoint) = await Task.Run<(int, EndPoint)>(SocketRecvMessage);
        var message = ConverBytesToString(decoder8, buffer, nread);

        Console.WriteLine($"收到來自客戶端[{endPoint}]的消息: {message}");

        SendMessageToClient(endPoint, "服務器對你說Hi!");
    }
}

上面的代碼中, 主要的差別在對於端口的處理上:

  • SocketRecvMessage返回的是一個元組(int, EndPoint): 即讀取到的字節數, 還有客戶端的端口信息.
  • ReceiveFrom: 接收消息指定瞭端口, 服務器每次接收消息都要使用端口信息用來標識發送消息的客戶端.

優化過後的代碼為:

static async void StartRecvMessage()
{
    Console.WriteLine("服務器開始監聽: " + udpSocket.LocalEndPoint);
    var decoder8 = Encoding.UTF8.GetDecoder();

    while(true)
    {
        EndPoint endPoint = new IPEndPoint(IPAddress.Any, 0);
        var nread = await Task.Run<int>(() => udpSocket.ReceiveFrom(buffer, ref endPoint));
        var message = ConverBytesToString(decoder8, buffer, nread);

        Console.WriteLine($"收到來自客戶端[{endPoint}]的消息: {message}");

        SendMessageToClient(endPoint, "服務器對你說Hi!");
    }
}

下面是完整的代碼:

// --- AsyncUdpClient.cs

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace test
{
    class AsyncUdpClient
    {
        static Socket udpSocket;
        static byte[] buffer = new byte[4096];

        public static void Main()
        {
            udpSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);

            var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8060);
            //udpSocket.Bind(endPoint);
            udpSocket.Connect(endPoint);

            SendMessageToServer("客戶端說:Hello Server!");
            StartRecvMessage();

            Console.ReadKey();
        }

        static void SendMessageToServer(string message)
        {
            udpSocket.Send(Encoding.UTF8.GetBytes(message));
        }

        static async void StartRecvMessage()
        {
            Console.WriteLine("客戶端開始監聽: " + udpSocket.LocalEndPoint);
            var decoder8 = Encoding.UTF8.GetDecoder();

            while (true)
            {
                var nread = await Task.Run<int>(() => udpSocket.Receive(buffer));
                var message = ConverBytesToString(decoder8, buffer, nread);

                Console.WriteLine($"收到來自服務器的消息: {message}");

                #region 交互
                Console.WriteLine("是否繼續監聽?[yes|no]");
                var str = await Task.Run<string>(() => Console.ReadLine());
                if (str == "yes")
                {
                    Console.WriteLine("繼續監聽...");
                    continue;
                }

                Console.WriteLine("客戶端停止監聽.");
                return;
                #endregion
            }
        }

        static string ConverBytesToString(Decoder decoder, byte[] bytes, int len)
        {
            var nchar = decoder.GetCharCount(bytes, 0, len);

            var bytesChar = new char[nchar];
            nchar = decoder.GetChars(bytes, 0, len, bytesChar, 0);

            var result = new string(bytesChar, 0, nchar);
            return result;
        }
    }
}

// --- AsyncUdpServer.cs

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace test
{
    static class AsyncUdpServer
    {
        static Socket udpSocket;
        static byte[] buffer = new byte[4096];

        public static void Main()
        {
            udpSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);

            var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 8060);
            udpSocket.Bind(endPoint);
            //udpSocket.Connect(endPoint);

            StartRecvMessage();
            Console.ReadKey();
        }

        static void SendMessageToClient(EndPoint endPoint, string message)
        {
            udpSocket.SendTo(Encoding.UTF8.GetBytes(message), endPoint);
        }

        static async void StartRecvMessage()
        {
            Console.WriteLine("服務器開始監聽: " + udpSocket.LocalEndPoint);
            var decoder8 = Encoding.UTF8.GetDecoder();

            while(true)
            {
                EndPoint endPoint = new IPEndPoint(IPAddress.Any, 0);
                var nread = await Task.Run<int>(() => udpSocket.ReceiveFrom(buffer, ref endPoint));
                var message = ConverBytesToString(decoder8, buffer, nread);

                Console.WriteLine($"收到來自客戶端[{endPoint}]的消息: {message}");

                SendMessageToClient(endPoint, "服務器對你說Hi!");

#region 交互
                Console.WriteLine("是否繼續監聽?[yes|no]");
                var str = await Task.Run<string>(()=> Console.ReadLine());
                if (str == "yes")
                {
                    Console.WriteLine("繼續監聽...");
                    continue;
                }

                Console.WriteLine("服務器停止監聽.");
                return;
#endregion

            }
        }

        static string ConverBytesToString(Decoder decoder, byte[] bytes, int len)
        {
            var nchar = decoder.GetCharCount(bytes, 0, len);

            var bytesChar = new char[nchar];
            nchar = decoder.GetChars(bytes, 0, len, bytesChar, 0);

            var result = new string(bytesChar, 0, nchar);
            return result;
        }
    }
}

總結

今天我們使用aync/await關鍵字實現瞭異步的udp通訊.

主要是瞭解和實踐異步關鍵字的知識和使用, 同時對傳統的單開線程來進行udp通訊方式進行瞭優化, 這

樣的好處是不需要自己維護多線程環境, 不需要保證線程安全, 各種鎖之類的操作.

udp通訊本身很簡單, 隻要搞清楚Bind, Connect還有端口的概念即可.

aync/await對於長期寫同步代碼或者使用異步callback形式回調的同學來說, 可能會有一定的理解困難,

但是其實也就那麼回事, 我們簡單理解為協程即可(隻是比協程使用起來更方便).

到此這篇關於C#中使用async和await實現異步Udp通訊的示例代碼的文章就介紹到這瞭,更多相關C# 異步Udp通訊內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: