C#實現萬物皆可排序的隊列方法詳解
需求
產品中需要向不同的客戶推送數據,原來的實現是每條數據產生後就立即向客戶推送數據,走的的是HTTP協議。因為每條數據都比較小,而數據生成的頻次也比較高,這就會頻繁的建立HTTP連接,而且每次HTTP傳輸中攜帶的業務數據都很小,對網絡的實際利用率不高。希望能夠提高網絡的利用率,並降低系統的負載。
分析
一個很自然的想法就是將多條數據一起發送,這裡有幾個關鍵點:
1、多條數據的聚合邏輯: 是攢夠幾條發送,還是按照時間周期發送。如果是攢夠幾條發送,在數據比較稀疏或者產生頻率不那麼穩定的時候,攢夠需要的數據條數可能比較困難,這時候還得需要一個過期時間,因為客戶可能接受不瞭太多的延遲。既然不管怎樣都需要使用時間進行控制,我這裡索性就選擇按照時間周期發送瞭。思路是:自上次發送時間起,經過瞭某個時長之後,就發送客戶在這段時間內產生的所有數據。
2、數據到期判斷方法:既然選擇瞭按照時間周期發送,那麼就必須有辦法判斷是否到瞭發送時間。一個很簡單的想法就是輪詢,把所有客戶輪詢一遍,看看誰的數據到期瞭,就發送誰的。這個算法的時間復雜度是O(N),如果客戶比較多,就會消耗過多的時間在這上邊。還有一個辦法:如果客戶按照時間排序好瞭,那麼隻需要取時間最早的客戶的數據時間判斷就好瞭,滿足就發送,一直向後找,直到獲取的客戶數據時間不符合條件,則退出處理,然後等一會再進行判斷處理。這就需要有一個支持排序的數據結構,寫入數據時自動排序,這種數據結構的時間復雜度一般可以做到O(log(n))。對於這個數據結構的讀寫操作原理上就是隊列的操作方式,隻不過是個可排序的隊列。
3、區分客戶:不同客戶的數據接收地址不同,向具體某個客戶發送數據時,應該能比較方便的聚合他的數據,最好是直接就能拿到需要發送的數據。可以使用字典數據結構來滿足這個需求,取某個客戶數據的時間復雜度可以降低到O(1)。
4、數據的安全性問題:如果程序在數據發送成功之前退出瞭,未發送的數據怎麼辦?是還能繼續發送,還是就丟掉不管瞭。如果要在程序重啟後恢復未發送成功的數據,則必須將數據同步到別的地方,比如持久化到磁盤。因為我這裡的數據安全性要求不高,丟失一些數據也是允許的,所以要發送的數據收到之後放到內存就行瞭。
實現
上文提到可排序的數據結構,可以使用SortedList<TKey,TValue>,鍵是時間,值是這個時間產生瞭數據的客戶標識列表。不過它的讀寫操作不是線程安全的,需要自己做同步,這裡簡單點就使用lock瞭。
對於不同客戶的數據,為瞭方便獲取,使用Dictionary<TKey,TValue>來滿足,鍵是客戶的標識,值是累積的未發送客戶數據。這個數據讀寫也不是線程安全的,可以和SortedList的讀寫放到同一個lock中。
下邊是它們的定義:
SortedList<DateTime, List<TKey>> _queue = new SortedList<DateTime, List<TKey>>(); Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>(); readonly object _lock = new object();
插入數據的時候,需要先寫入SortedList,然後再寫入Dictionary。代碼邏輯比較簡單,請看:
public void Publish(TKey key, TValue value) { DateTime now = DateTime.Now; lock (_lock) { if (_queue.TryGetValue(now, out List<TKey>? keys)) { if (!keys!.Contains(key)) { keys.Add(key); } } else { _queue.Add(now, new List<TKey> { key }); } if (_data.TryGetValue(key, out List<TValue>? values)) { values.Add(value); } else { _data.Add(key, new List<TValue> { value }); } } }
對於消費數據,這裡采用拉數據的模式。最開始寫的方法邏輯是:讀取一條數據,處理它,然後從隊列中刪除。但是這個邏輯需要對隊列進行讀寫,所以必須加鎖。一般處理數據比較耗時,比如這裡要通過HTTP發送數據,加鎖的話就可能導致寫數據到隊列時阻塞的時間比較長。所以這裡實現的是把可以發送的數據全部提取出來,然後就釋放鎖,數據的處理放到鎖的外部實現,這樣隊列的讀寫性能就比較好瞭。
public List<(TKey key, List<TValue> value)> Pull(int maxNumberOfMessages) { List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>(); DateTime now = DateTime.Now; lock (_lock) { int messageCount = 0; while (true) { if (!_queue.Any()) { break; } var first = _queue.First(); var diffMillseconds = now.Subtract(first.Key).TotalMilliseconds; if (diffMillseconds < _valueDequeueMillseconds) { break; } var keys = first.Value; foreach (var key in keys) { if (_data.TryGetValue(key, out List<TValue>? keyValues)) { result.Add((key, keyValues)); _data.Remove(key); messageCount += keyValues!.Count; } } _queue.RemoveAt(0); if (messageCount >= maxNumberOfMessages) { break; } } } return result; }
這段代碼比較長一些,我梳理下邏輯:取隊列的第一條數據,判斷時間是否達到發送周期,未達到則直接退出,方法返回空列表。如果達到發送周期,則取出第一條數據中存儲的客戶標識,然後根據這些標識獲取對應的客戶未發送數據,將這些數據按照客戶維度添加到返回列表中,將這些客戶及其數據從隊列中移除,返回有數據的列表。這裡還增加瞭一個拉取數據的條數限制,方便根據業務實際情況進行控制。
再來看一下怎麼使用這個隊列,這裡模擬多個生產者加一個消費者,其實可以任意多個生產者和消費者:
TimeSortedQueue<string, string> queue = new TimeSortedQueue<string, string>(3000); List<Task> publishTasks = new List<Task>(); for (int i = 0; i < 4; i++) { var j = i; publishTasks.Add(Task.Factory.StartNew(() => { int k = 0; while (true) { queue.Publish($"key_{k}", $"value_{j}_{k}"); Thread.Sleep(15); k++; } }, TaskCreationOptions.LongRunning)); } Task.Factory.StartNew(() => { while (true) { var list = queue.Pull(100); if (list.Count <= 0) { Thread.Sleep(100); continue; } foreach (var item in list) { Console.WriteLine($"{DateTime.Now.ToString("mmss.fff")}:{item.key}, {string.Join(",", item.value)}"); } } }, TaskCreationOptions.LongRunning); Task.WaitAll(publishTasks.ToArray());
以上就是針對這個特定需求實現的一個按照時間進行排序的隊列。
萬物皆可排序的隊列
我們很容易想到,既然可以按照時間排序,那麼按照別的數據類型排序也是可以的。這個數據結構可以應用的場景很多,比如按照權重排序的隊列、按照優先級排序的隊列、按照年齡排序的隊列、按照銀行存款排序的隊列,等等。這就是一個萬物皆可排序的隊列。
我這裡把主要代碼貼出來(完整代碼和示例請看文末):
public class SortedQueue<TSortKey, TKey, TValue> where TSortKey : notnull, IComparable where TKey : notnull where TValue : notnull { Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>(); SortedList<TSortKey, List<TKey>> _queue = new SortedList<TSortKey, List<TKey>>(); readonly object _lock = new object(); /// <summary> /// Create a new instance of SortedQueue /// </summary> public SortedQueue(int maxNumberOfMessageConsumedOnce) { } /// <summary> /// Publish a message to queue /// </summary> /// <param name="sortKey">The key in the queue for sorting. Different messages can use the same key.</param> /// <param name="key">The message key.</param> /// <param name="value">The message value.</param> public void Publish(TSortKey sortKey, TKey key, TValue value) { lock (_lock) { if (_queue.TryGetValue(sortKey, out List<TKey>? keys)) { keys.Add(key); } else { _queue.Add(sortKey, new List<TKey> { key }); } if (_data.TryGetValue(key, out List<TValue>? values)) { values.Add(value); } else { _data.Add(key, new List<TValue> { value }); } } } /// <summary> /// Pull a batch of messages. /// </summary> /// <param name="maxNumberOfMessages">The maximum number of pull messages.</param> /// <returns></returns> public List<(TKey Key, List<TValue> Value)> Pull(int maxNumberOfMessages) { List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>(); lock (_lock) { int messageCount = 0; while (true) { if (!_queue.Any()) { break; } var keys = _queue.First().Value; foreach (var key in keys) { if (_data.TryGetValue(key, out List<TValue>? keyValues)) { result.Add((key, keyValues)); _data.Remove(key); messageCount += keyValues!.Count; } } _queue.RemoveAt(0); if (messageCount >= maxNumberOfMessages) { break; } } } return result; } }
代碼邏輯還是比較簡單的,就不羅嗦瞭,如有問題歡迎留言交流。
再說數據安全
因為在這個實現中所有待處理的數據都在內存中,丟失數據會帶來一定的風險,因為我這個程序前邊還有一個隊列,即使程序崩潰瞭,也隻損失沒處理的一小部分數據,業務上可以接受,所以這樣做沒有問題。如果你對這個程序感興趣,需要慎重考慮你的應用場景。
來看看數據丟失可能發生的兩種情況:
一是數據還在隊列中時程序重啟瞭:對於這種情況,前文提到將數據同步到其它地方,比如寫入Redis、寫入數據庫、寫入磁盤等等。不過因為網絡IO、磁盤IO較慢,這往往會帶來吞吐量的大幅下降,想要保證一定的吞吐量,還得引入一些分片機制,又因為分佈式的不可靠,可能還得增加一些容錯容災機制,比較復雜,可以參考Kafka。
二是數據處理的時候失敗瞭:對於這種情況,可以讓程序重試;但是如果異常導致程序崩潰瞭,數據已經從內存或者其它存儲中移除瞭,數據還是會發生丟失。這時候可以采用一個ACK機制,處理成功後向隊列發送一個ACK,攜帶已經處理的數據標識,隊列根據標識刪除數據。否則消費者還能消費到這些數據。
這些問題並不一定要完全解決,還是得看業務場景,有可能你把數據持久化到Redis就夠瞭,或者你也不用引入ACK機制,記錄下處理到哪一條瞭就行瞭。
以上就是本文的全部內容,希望對大傢的學習有所幫助,也希望大傢多多支持WalkonNet。
推薦閱讀:
- 詳解C#對Dictionary內容的通用操作
- C#實現Dictionary字典賦值的方法
- C#使用struct類型作為泛型Dictionary<TKey,TValue>的鍵
- C# 通過ServiceStack 操作Redis
- 分析C# Dictionary的實現原理