Asp.Net Core7 preview4限流中間件新特性詳解
前言
限流是應對流量暴增或某些用戶惡意攻擊等場景的重要手段之一,然而微軟官方從未支持這一重要特性,AspNetCoreRateLimit這一第三方庫限流庫一般作為首選使用,然而其配置參數過於繁多,對使用者造成較大的學習成本。令人高興的是,在剛剛發佈的.NET 7 Preview 4中開始支持限流中間件。
UseRateLimiter嘗鮮
安裝.NET 7.0 SDK(v7.0.100-preview.4)
通過nuget包安裝Microsoft.AspNetCore.RateLimiting
創建.Net7網站應用,註冊中間件
全局限流並發1個
app.UseRateLimiter(new RateLimiterOptions { Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource => { return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter", _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); }) });
根據不同資源不同限制並發數,/api前綴的資源租約數2,等待隊列長度為2,其他默認租約數1,隊列長度1。
app.UseRateLimiter(new RateLimiterOptions() { // 觸發限流的響應碼 DefaultRejectionStatusCode = 500, OnRejected = async (ctx, rateLimitLease) => { // 觸發限流回調處理 }, Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource => { if (resource.Request.Path.StartsWithSegments("/api")) { return RateLimitPartition.CreateConcurrencyLimiter("WebApiLimiter", _ => new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2)); } else { return RateLimitPartition.CreateConcurrencyLimiter("DefaultLimiter", _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); } }) });
本地測試
新建一個webapi項目,並註冊限流中間件如下
using Microsoft.AspNetCore.RateLimiting; using System.Threading.RateLimiting; var builder = WebApplication.CreateBuilder(args); // Add services to the container. builder.Services.AddControllers(); // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); var app = builder.Build(); // Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); } app.UseRateLimiter(new RateLimiterOptions { DefaultRejectionStatusCode = 500, OnRejected = async (ctx, lease) => { await Task.FromResult(ctx.Response.WriteAsync("ConcurrencyLimiter")); }, Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource => { return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter", _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)); }) }); app.UseHttpsRedirection(); app.UseAuthorization(); app.MapControllers(); app.Run();
啟動項目,使用jmeter測試100並發,請求接口/WeatherForecast
所有請求處理成功,失敗0!
這個結果是不是有點失望,其實RateLimitPartition.CreateConcurrencyLimiter創建的限流器是
ConcurrencyLimiter,後續可以實現個各種策略的限流器進行替換之。
看瞭ConcurrencyLimiter的實現,其實就是令牌桶的限流思想,上面配置的new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)),第一個1代表令牌的個數,第二個1代表可以當桶裡的令牌為空時,進入等待隊列,而不是直接失敗,當前面的請求結束後,會歸還令牌,此時等待的請求就可以拿到令牌瞭,QueueProcessingOrder.NewestFirst代表最新的請求優先獲取令牌,也就是獲取令牌時非公平的,還有另一個枚舉值QueueProcessingOrder.OldestFirst老的優先,獲取令牌是公平的。隻要我們獲取到令牌的人幹活速度快,雖然我們令牌隻有1,並發就很高。
3. 測試觸發失敗場景
隻需要讓我們拿到令牌的人持有時間長點,就能輕易的觸發。
調整jmater並發數為10
相應內容也是我們設置的內容。
ConcurrencyLimiter源碼
令牌桶限流思想
獲取令牌
protected override RateLimitLease AcquireCore(int permitCount) { // These amounts of resources can never be acquired if (permitCount > _options.PermitLimit) { throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit)); } ThrowIfDisposed(); // Return SuccessfulLease or FailedLease to indicate limiter state if (permitCount == 0) { return _permitCount > 0 ? SuccessfulLease : FailedLease; } // Perf: Check SemaphoreSlim implementation instead of locking if (_permitCount >= permitCount) { lock (Lock) { if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) { return lease; } } } return FailedLease; }
嘗試獲取令牌核心邏輯
private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out RateLimitLease? lease) { ThrowIfDisposed(); // if permitCount is 0 we want to queue it if there are no available permits if (_permitCount >= permitCount && _permitCount != 0) { if (permitCount == 0) { // Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available lease = SuccessfulLease; return true; } // a. if there are no items queued we can lease // b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst)) { _idleSince = null; _permitCount -= permitCount; Debug.Assert(_permitCount >= 0); lease = new ConcurrencyLease(true, this, permitCount); return true; } } lease = null; return false; }
令牌獲取失敗後進入等待隊列
protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default) { // These amounts of resources can never be acquired if (permitCount > _options.PermitLimit) { throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit)); } // Return SuccessfulLease if requestedCount is 0 and resources are available if (permitCount == 0 && _permitCount > 0 && !_disposed) { return new ValueTask<RateLimitLease>(SuccessfulLease); } // Perf: Check SemaphoreSlim implementation instead of locking lock (Lock) { if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) { return new ValueTask<RateLimitLease>(lease); } // Avoid integer overflow by using subtraction instead of addition Debug.Assert(_options.QueueLimit >= _queueCount); if (_options.QueueLimit - _queueCount < permitCount) { if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit) { // remove oldest items from queue until there is space for the newest request do { RequestRegistration oldestRequest = _queue.DequeueHead(); _queueCount -= oldestRequest.Count; Debug.Assert(_queueCount >= 0); if (!oldestRequest.Tcs.TrySetResult(FailedLease)) { // Updating queue count is handled by the cancellation code _queueCount += oldestRequest.Count; } } while (_options.QueueLimit - _queueCount < permitCount); } else { // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst return new ValueTask<RateLimitLease>(QueueLimitLease); } } CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken); CancellationTokenRegistration ctr = default; if (cancellationToken.CanBeCanceled) { ctr = cancellationToken.Register(static obj => { ((CancelQueueState)obj!).TrySetCanceled(); }, tcs); } RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr); _queue.EnqueueTail(request); _queueCount += permitCount; Debug.Assert(_queueCount <= _options.QueueLimit); return new ValueTask<RateLimitLease>(request.Tcs.Task); } }
歸還令牌
private void Release(int releaseCount) { lock (Lock) { if (_disposed) { return; } _permitCount += releaseCount; Debug.Assert(_permitCount <= _options.PermitLimit); while (_queue.Count > 0) { RequestRegistration nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.PeekHead() : _queue.PeekTail(); if (_permitCount >= nextPendingRequest.Count) { nextPendingRequest = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); _permitCount -= nextPendingRequest.Count; _queueCount -= nextPendingRequest.Count; Debug.Assert(_permitCount >= 0); ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count); // Check if request was canceled if (!nextPendingRequest.Tcs.TrySetResult(lease)) { // Queued item was canceled so add count back _permitCount += nextPendingRequest.Count; // Updating queue count is handled by the cancellation code _queueCount += nextPendingRequest.Count; } nextPendingRequest.CancellationTokenRegistration.Dispose(); Debug.Assert(_queueCount >= 0); } else { break; } } if (_permitCount == _options.PermitLimit) { Debug.Assert(_idleSince is null); Debug.Assert(_queueCount == 0); _idleSince = Stopwatch.GetTimestamp(); } } }
總結
雖然這次官方對限流進行瞭支持,但貌似還不能支持對ip或client級別的限制支持,對於更高級的限流策略仍需要借助第三方庫或自己實現,期待後續越來越完善,更多關於Asp.Net Core7 preview限流中間件的資料請關註WalkonNet其它相關文章!