解決Netty解碼http請求獲取URL亂碼問題

Netty解碼http請求獲取URL亂碼

解決方案

獲取URI時,使用URLDecoder進行解碼

    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        FullHttpRequest fhr = (FullHttpRequest) msg;
        String uri = URLDecoder.decode(fhr.uri().trim().replace("/", "")
                .replace("\\", ""), "UTF-8");
    }

原因

1、URLEncoder.encode和URLDecoder.decode

URL隻能使用英文字母、阿拉伯數字和某些標點符號,不能使用其他文字和符號,即

隻有字母和數字[0-9a-zA-Z]、一些特殊符號$-_.+!*'()[不包括雙引號]、以及某些保留字(空格轉換為+),才可以不經過編碼直接用於URL,如果URL中有漢字,就必須編碼後使用。

  • URLDecoder類包含一個decode(String s,String enc)靜態方法,它可以將application/x-www-form-urlencoded MIME字符串轉成編碼前的字符串;
  • URLEncoder類包含一個encode(String s,String enc)靜態方法,它可以將中文字符及特殊字符用轉換成application/x-www-form-urlencoded MIME字符串。

2、使用URLEncoder.encode編碼

public static String urlEncode(String urlToken) {
    String encoded = null;
    try {
        //用URLEncoder.encode方法會把空格變成加號(+),encode之後在替換一下
        encoded = URLEncoder.encode(urlToken, "UTF-8").replace("+", "%20");
    } catch (UnsupportedEncodingException e) {
        logger.error("URLEncode error {}", e);
    }
    return encoded;
}

3、使用URLEncoder.encode解碼

public static String urlEncode(String urlToken) {
    String decoded = null;
    try {
        decoded =URLDecoder.decode(urlToken, "UTF-8"); 
    } catch (UnsupportedEncodingException e) {
        logger.error("URLEncode error {}", e);
    }
    return decoded;
}

Netty—編解碼(原理) 

1.ByteToMessageDecoder

用於將ByteBuf解碼成為POJO對象

重要字段:

ByteBuf cumulation;     //緩存
private Cumulator cumulator = MERGE_CUMULATOR; //累計器
private boolean singleDecode;  
private boolean first; //是否第一次解碼
private boolean firedChannelRead;
//狀態碼
private byte decodeState = STATE_INIT;
private int discardAfterReads = 16; //解碼次數閾值,用來刪除已讀數據
private int numReads; //解碼次數

介紹一下累計器:Cumulator類是幹什麼的

它的本類中的內部類,而且還是一個接口,隻提供瞭方法。它的實現,隻有匿名類,所以就是開頭的靜態兩個字段瞭。

public interface Cumulator {
    ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}

也就是我們默認使用的cumulator->MEGRE_CUMULATOR,我們看看它是如何實現的cumulator接口

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    //參數:ByteBuf的分配器,本類中的ByteBuf,傳遞過來的ByteBuf
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        if (!cumulation.isReadable() && in.isContiguous()) {
            累加的不可讀(比如空緩存),且新的是連續的
            cumulation.release(); //釋放
            return in;
        }
        try {
            final int required = in.readableBytes(); //返回可讀區域
            //可讀區域,大於累加器中的可寫區域, 或者累加器隻能讀
            if (required > cumulation.maxWritableBytes() ||
                    (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
                    cumulation.isReadOnly()) {
                return expandCumulation(alloc, cumulation, in); //擴充累計器
            }
            //寫入到累計器中
            cumulation.writeBytes(in, in.readerIndex(), required);
            in.readerIndex(in.writerIndex()); //調整in的讀指針到寫的位置,那麼可讀區域為0
            return cumulation;
        } finally {
            in.release();  //釋放ByteBuf
        }
    }
};

這個類的實現方法,很重要,因為下面的ChannelRead()方法的核心就是調用上面的方法,

重要方法:channelRead()

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) { //判斷傳入的 是否是ByteBuf對象
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            first = cumulation == null;  //如果為null,說明是第一次
            cumulation = cumulator.cumulate(ctx.alloc(),
                    first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg); //判斷解碼器是否緩存瞭沒有解碼完成的半包信息
            callDecode(ctx, cumulation, out);                               //如果為空,說明第一次解析,或者上一次的已經解析完成。
        }...
        } finally {
            try {
                if (cumulation != null && !cumulation.isReadable()) { //不為空,不可讀,要釋放
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++numReads >= discardAfterReads) {//讀取數據的次數大於閾值,則嘗試丟棄已讀數據
                    numReads = 0;
                    discardSomeReadBytes();
                }
                int size = out.size();
                firedChannelRead |= out.insertSinceRecycled(); //有被添加或者設置,表示已經讀過瞭
                fireChannelRead(ctx, out, size);   //嘗試傳遞數據
            } finally {
                out.recycle();
            }
        }
    } else {
        ctx.fireChannelRead(msg);  //其他類型進行傳遞
    }
}

先看ctx.alloc()方法就得到的什麼,它對應上面cumulator()的第一個參數,返回的自然是Bytebuf的分配器

public ByteBufAllocator alloc() {
    return channel().config().getAllocator(); //返回ByteBufAllocator,要嘛是池化的,要嘛是非池化
}

如何對msg中的信息,進行轉移到本地的cumulator中,

之後調用callDecode進行解碼

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        while (in.isReadable()) {//可讀
            int outSize = out.size();  //數量
            if (outSize > 0) { //一個一個的把解析出來的結果,傳遞下去
                fireChannelRead(ctx, out, outSize); //傳遞
                out.clear();  //已經傳播 的,要清理掉。
                if (ctx.isRemoved()) {  //上下文被移除瞭,就不處理瞭
                    break;
                }
                outSize = 0;
            }
            //繼續編解碼,
            int oldInputLength = in.readableBytes();
            decodeRemovalReentryProtection(ctx, in, out); //解碼  ★
            if (ctx.isRemoved()) {
                break;
            }
            if (outSize == out.size()) { //沒有新生成的消息,
                if (oldInputLength == in.readableBytes()) { //沒有讀取數據
                    break;
                } else {  continue;  }
            }
 
            if (oldInputLength == in.readableBytes()) { //解碼器沒有讀取數據
               ... }
 
            if (isSingleDecode()) { //是否每次隻解碼一條,就返回
                break;
        ...
}

這個方法具體的邏輯就是解碼+傳播解碼出的pojo,傳播pojo就是調用context.fire..方法,沒什麼好看的,我們之前的pipline講解的時候,已經講過瞭事件傳播的邏輯,這裡我們重點看解碼方法

decodeRemovalReentryProtection(),它其實也沒有實現解碼,功能,我們前面說過,本類隻是一個抽象類,具體的解碼要交給它的子類,實現類,比如我們之前 章節,解碼器的使用部分,我們自定義的Handler繼承這個類,它的裡面才真正實現瞭解碼的功能。!

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
        throws Exception {
    decodeState = STATE_CALLING_CHILD_DECODE; //狀態,調用子類 解碼
    try {
        decode(ctx, in, out); //調用子類解碼
    } finally {
        boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
        decodeState = STATE_INIT; //處理完瞭,設置為初始化
        if (removePending) {
            fireChannelRead(ctx, out, out.size());
            out.clear();
            handlerRemoved(ctx);
        }
    }
}

再來看,丟棄已讀部分的ByteBuf

protected final void discardSomeReadBytes() {
    if (cumulation != null && !first && cumulation.refCnt() == 1) {
        cumulation.discardSomeReadBytes();
    }
}

它其實是一個入口,具體的實現是在AbstractByteBuf中

public ByteBuf discardSomeReadBytes() {
    if (readerIndex > 0) {
        if (readerIndex == writerIndex) {
            ensureAccessible();
            adjustMarkers(readerIndex);
            writerIndex = readerIndex = 0;
            return this;
        }
 
        if (readerIndex >= capacity() >>> 1) {
            setBytes(0, this, readerIndex, writerIndex - readerIndex);
            writerIndex -= readerIndex;
            adjustMarkers(readerIndex);
            readerIndex = 0;
            return this;
        }
    }
    ensureAccessible();
    return this;
}

2.FixedLengthFrameDecoder

它是ByteToMessageDecoder的子類,也就是實現瞭具體的decode,解決半包,粘包問題,通過固定長度的手法。

它的字段隻有一個,frameLength,固定的長度大小,

方法也就是構造方法+decoder()

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

調用重載的方法,簡單判斷一下長度,然後讀取

protected Object decode(
        @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    if (in.readableBytes() < frameLength) {
        return null;
    } else {
        return in.readRetainedSlice(frameLength); //AbstracByteBuf實現的方法
    }
}

3.MessageToByteEncoder

位於outbound中,功能是將pojo編碼成為Byte[]組,

兩個字段:

private final TypeParameterMatcher matcher;  //類型參數匹配器,針對范型的
private final boolean preferDirect;

第一個字段更重要,是以前沒見過的類型,用來處理范型進行匹配的,主要運用在構造方法中。

3.1 TypeParameterMatcher

先看字段,就一個成員Noop,匿名類,實現的是自己!也就實現瞭match方法,返回true。邏輯簡單。

private static final TypeParameterMatcher NOOP = new TypeParameterMatcher() {
    @Override
    public boolean match(Object msg) {
        return true;
    }
};

常用方法:

get(),跟回傳進來的Class對象,判斷是哪個類型,如果是Object,就是上面NOOP,

public static TypeParameterMatcher get(final Class<?> parameterType) {
    final Map<Class<?>, TypeParameterMatcher> getCache =
            InternalThreadLocalMap.get().typeParameterMatcherGetCache();
 
    TypeParameterMatcher matcher = getCache.get(parameterType); //緩存中獲取
    if (matcher == null) { //未擊中
        if (parameterType == Object.class) {
            matcher = NOOP;
        } else {    //內部類,封裝Class,match匹配的時候,利用反射,判斷是否是這個類的實例
            matcher = new ReflectiveMatcher(parameterType);
        }
        getCache.put(parameterType, matcher); //放入緩存中
    }
 
    return matcher;
}

內部類,和上面的NOOP邏輯相似

private static final class ReflectiveMatcher extends TypeParameterMatcher {
    private final Class<?> type;
    ReflectiveMatcher(Class<?> type) { this.type = type; }
    @Override  //判斷 msg是否是type的實現類
    public boolean match(Object msg) {
        return type.isInstance(msg);
    }
}

3.2 write()方法

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) { //類型匹配
            @SuppressWarnings("unchecked")
            I cast = (I) msg;  //類型轉換
            buf = allocateBuffer(ctx, cast, preferDirect); //分配空間
            try {
                encode(ctx, cast, buf); //調用子類編碼方法
            } finally {
                ReferenceCountUtil.release(cast); //釋放
            }
 
            if (buf.isReadable()) {  //可讀
                ctx.write(buf, promise); //傳播
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } ...釋放
}

if中的方法,就會調用上方的matcher進行匹配

public boolean acceptOutboundMessage(Object msg) throws Exception {
    return matcher.match(msg);
}

然後分配一個空間,作為ByteBuf

protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                           boolean preferDirect) throws Exception {
    if (preferDirect) { //是否是直接內存
        return ctx.alloc().ioBuffer();
    } else {
        return ctx.alloc().heapBuffer();
    }
}

再調用子類,實現類的encode()方法,進行編碼,同樣也就是調用ByteBuf的寫入方法,將對象寫進去。 

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: