解決Netty解碼http請求獲取URL亂碼問題
Netty解碼http請求獲取URL亂碼
解決方案
獲取URI時,使用URLDecoder進行解碼
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpRequest fhr = (FullHttpRequest) msg; String uri = URLDecoder.decode(fhr.uri().trim().replace("/", "") .replace("\\", ""), "UTF-8"); }
原因
1、URLEncoder.encode和URLDecoder.decode
URL隻能使用英文字母、阿拉伯數字和某些標點符號,不能使用其他文字和符號,即
隻有字母和數字[0-9a-zA-Z]、一些特殊符號$-_.+!*'()[不包括雙引號]、以及某些保留字(空格轉換為+),才可以不經過編碼直接用於URL,如果URL中有漢字,就必須編碼後使用。
URLDecoder
類包含一個decode(String s,String enc)靜態方法,它可以將application/x-www-form-urlencoded MIME字符串轉成編碼前的字符串;URLEncoder
類包含一個encode(String s,String enc)靜態方法,它可以將中文字符及特殊字符用轉換成application/x-www-form-urlencoded MIME字符串。
2、使用URLEncoder.encode編碼
public static String urlEncode(String urlToken) { String encoded = null; try { //用URLEncoder.encode方法會把空格變成加號(+),encode之後在替換一下 encoded = URLEncoder.encode(urlToken, "UTF-8").replace("+", "%20"); } catch (UnsupportedEncodingException e) { logger.error("URLEncode error {}", e); } return encoded; }
3、使用URLEncoder.encode解碼
public static String urlEncode(String urlToken) { String decoded = null; try { decoded =URLDecoder.decode(urlToken, "UTF-8"); } catch (UnsupportedEncodingException e) { logger.error("URLEncode error {}", e); } return decoded; }
Netty—編解碼(原理)
1.ByteToMessageDecoder
用於將ByteBuf解碼成為POJO對象
重要字段:
ByteBuf cumulation; //緩存 private Cumulator cumulator = MERGE_CUMULATOR; //累計器 private boolean singleDecode; private boolean first; //是否第一次解碼 private boolean firedChannelRead; //狀態碼 private byte decodeState = STATE_INIT; private int discardAfterReads = 16; //解碼次數閾值,用來刪除已讀數據 private int numReads; //解碼次數
介紹一下累計器:Cumulator類是幹什麼的
它的本類中的內部類,而且還是一個接口,隻提供瞭方法。它的實現,隻有匿名類,所以就是開頭的靜態兩個字段瞭。
public interface Cumulator { ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in); }
也就是我們默認使用的cumulator->MEGRE_CUMULATOR,我們看看它是如何實現的cumulator接口
public static final Cumulator MERGE_CUMULATOR = new Cumulator() { //參數:ByteBuf的分配器,本類中的ByteBuf,傳遞過來的ByteBuf @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { if (!cumulation.isReadable() && in.isContiguous()) { 累加的不可讀(比如空緩存),且新的是連續的 cumulation.release(); //釋放 return in; } try { final int required = in.readableBytes(); //返回可讀區域 //可讀區域,大於累加器中的可寫區域, 或者累加器隻能讀 if (required > cumulation.maxWritableBytes() || (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) || cumulation.isReadOnly()) { return expandCumulation(alloc, cumulation, in); //擴充累計器 } //寫入到累計器中 cumulation.writeBytes(in, in.readerIndex(), required); in.readerIndex(in.writerIndex()); //調整in的讀指針到寫的位置,那麼可讀區域為0 return cumulation; } finally { in.release(); //釋放ByteBuf } } };
這個類的實現方法,很重要,因為下面的ChannelRead()方法的核心就是調用上面的方法,
重要方法:channelRead()
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { //判斷傳入的 是否是ByteBuf對象 CodecOutputList out = CodecOutputList.newInstance(); try { first = cumulation == null; //如果為null,說明是第一次 cumulation = cumulator.cumulate(ctx.alloc(), first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg); //判斷解碼器是否緩存瞭沒有解碼完成的半包信息 callDecode(ctx, cumulation, out); //如果為空,說明第一次解析,或者上一次的已經解析完成。 }... } finally { try { if (cumulation != null && !cumulation.isReadable()) { //不為空,不可讀,要釋放 numReads = 0; cumulation.release(); cumulation = null; } else if (++numReads >= discardAfterReads) {//讀取數據的次數大於閾值,則嘗試丟棄已讀數據 numReads = 0; discardSomeReadBytes(); } int size = out.size(); firedChannelRead |= out.insertSinceRecycled(); //有被添加或者設置,表示已經讀過瞭 fireChannelRead(ctx, out, size); //嘗試傳遞數據 } finally { out.recycle(); } } } else { ctx.fireChannelRead(msg); //其他類型進行傳遞 } }
先看ctx.alloc()方法就得到的什麼,它對應上面cumulator()的第一個參數,返回的自然是Bytebuf的分配器
public ByteBufAllocator alloc() { return channel().config().getAllocator(); //返回ByteBufAllocator,要嘛是池化的,要嘛是非池化 }
如何對msg中的信息,進行轉移到本地的cumulator中,
之後調用callDecode進行解碼
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) {//可讀 int outSize = out.size(); //數量 if (outSize > 0) { //一個一個的把解析出來的結果,傳遞下去 fireChannelRead(ctx, out, outSize); //傳遞 out.clear(); //已經傳播 的,要清理掉。 if (ctx.isRemoved()) { //上下文被移除瞭,就不處理瞭 break; } outSize = 0; } //繼續編解碼, int oldInputLength = in.readableBytes(); decodeRemovalReentryProtection(ctx, in, out); //解碼 ★ if (ctx.isRemoved()) { break; } if (outSize == out.size()) { //沒有新生成的消息, if (oldInputLength == in.readableBytes()) { //沒有讀取數據 break; } else { continue; } } if (oldInputLength == in.readableBytes()) { //解碼器沒有讀取數據 ... } if (isSingleDecode()) { //是否每次隻解碼一條,就返回 break; ... }
這個方法具體的邏輯就是解碼+傳播解碼出的pojo,傳播pojo就是調用context.fire..方法,沒什麼好看的,我們之前的pipline講解的時候,已經講過瞭事件傳播的邏輯,這裡我們重點看解碼方法
decodeRemovalReentryProtection(),它其實也沒有實現解碼,功能,我們前面說過,本類隻是一個抽象類,具體的解碼要交給它的子類,實現類,比如我們之前 章節,解碼器的使用部分,我們自定義的Handler繼承這個類,它的裡面才真正實現瞭解碼的功能。!
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { decodeState = STATE_CALLING_CHILD_DECODE; //狀態,調用子類 解碼 try { decode(ctx, in, out); //調用子類解碼 } finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; decodeState = STATE_INIT; //處理完瞭,設置為初始化 if (removePending) { fireChannelRead(ctx, out, out.size()); out.clear(); handlerRemoved(ctx); } } }
再來看,丟棄已讀部分的ByteBuf
protected final void discardSomeReadBytes() { if (cumulation != null && !first && cumulation.refCnt() == 1) { cumulation.discardSomeReadBytes(); } }
它其實是一個入口,具體的實現是在AbstractByteBuf中
public ByteBuf discardSomeReadBytes() { if (readerIndex > 0) { if (readerIndex == writerIndex) { ensureAccessible(); adjustMarkers(readerIndex); writerIndex = readerIndex = 0; return this; } if (readerIndex >= capacity() >>> 1) { setBytes(0, this, readerIndex, writerIndex - readerIndex); writerIndex -= readerIndex; adjustMarkers(readerIndex); readerIndex = 0; return this; } } ensureAccessible(); return this; }
2.FixedLengthFrameDecoder
它是ByteToMessageDecoder的子類,也就是實現瞭具體的decode,解決半包,粘包問題,通過固定長度的手法。
它的字段隻有一個,frameLength,固定的長度大小,
方法也就是構造方法+decoder()
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } }
調用重載的方法,簡單判斷一下長度,然後讀取
protected Object decode( @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in.readableBytes() < frameLength) { return null; } else { return in.readRetainedSlice(frameLength); //AbstracByteBuf實現的方法 } }
3.MessageToByteEncoder
位於outbound中,功能是將pojo編碼成為Byte[]組,
兩個字段:
private final TypeParameterMatcher matcher; //類型參數匹配器,針對范型的 private final boolean preferDirect;
第一個字段更重要,是以前沒見過的類型,用來處理范型進行匹配的,主要運用在構造方法中。
3.1 TypeParameterMatcher
先看字段,就一個成員Noop,匿名類,實現的是自己!也就實現瞭match方法,返回true。邏輯簡單。
private static final TypeParameterMatcher NOOP = new TypeParameterMatcher() { @Override public boolean match(Object msg) { return true; } };
常用方法:
get(),跟回傳進來的Class對象,判斷是哪個類型,如果是Object,就是上面NOOP,
public static TypeParameterMatcher get(final Class<?> parameterType) { final Map<Class<?>, TypeParameterMatcher> getCache = InternalThreadLocalMap.get().typeParameterMatcherGetCache(); TypeParameterMatcher matcher = getCache.get(parameterType); //緩存中獲取 if (matcher == null) { //未擊中 if (parameterType == Object.class) { matcher = NOOP; } else { //內部類,封裝Class,match匹配的時候,利用反射,判斷是否是這個類的實例 matcher = new ReflectiveMatcher(parameterType); } getCache.put(parameterType, matcher); //放入緩存中 } return matcher; }
內部類,和上面的NOOP邏輯相似
private static final class ReflectiveMatcher extends TypeParameterMatcher { private final Class<?> type; ReflectiveMatcher(Class<?> type) { this.type = type; } @Override //判斷 msg是否是type的實現類 public boolean match(Object msg) { return type.isInstance(msg); } }
3.2 write()方法
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { if (acceptOutboundMessage(msg)) { //類型匹配 @SuppressWarnings("unchecked") I cast = (I) msg; //類型轉換 buf = allocateBuffer(ctx, cast, preferDirect); //分配空間 try { encode(ctx, cast, buf); //調用子類編碼方法 } finally { ReferenceCountUtil.release(cast); //釋放 } if (buf.isReadable()) { //可讀 ctx.write(buf, promise); //傳播 } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } ...釋放 }
if中的方法,就會調用上方的matcher進行匹配
public boolean acceptOutboundMessage(Object msg) throws Exception { return matcher.match(msg); }
然後分配一個空間,作為ByteBuf
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg, boolean preferDirect) throws Exception { if (preferDirect) { //是否是直接內存 return ctx.alloc().ioBuffer(); } else { return ctx.alloc().heapBuffer(); } }
再調用子類,實現類的encode()方法,進行編碼,同樣也就是調用ByteBuf的寫入方法,將對象寫進去。
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。
推薦閱讀:
- 一文學習Java NIO的ByteBuffer工作原理
- 又又叕出BUG啦!理智分析Java NIO的ByteBuffer到底有多難用
- 親自教你在netty中使用TCP協議請求DNS服務器的詳細過程
- 分析Netty直接內存原理及應用
- SpringBoot Webflux創建TCP/UDP server並使用handler解析數據