分析Netty直接內存原理及應用

一、通常的內存模型概述

一般地,系統為瞭保證系統本身的安全性和健壯性,會將內存從邏輯上隔離成內核區域和用戶區域,這很容易理解。因為用戶行為不可控性太強,暴露得太多,就容易導致各種神奇的用法,超出系統的控制范圍。當然,有的語言是支持直接控制內存的,比如C, 你可以用一個指針,訪問內存中的幾乎任意位置的數據(除瞭一些硬件地址)。而像匯編,則可以訪問任意地址。而這些底層的語言,已經離我們越來越遠瞭,它基本上和普通程序員關系不大瞭。

用戶很多時候的編程控制,都是在用戶區域進行的,比如我做一些加減乘除,如 Integer a = 2; Integer b = 3; Integer c = a * b; 這種操作, 所有操作就是在用戶空間上完成的。這些操作,不會有內核區域的介入。但是有些操作,則必須由內核進行,比如對文件的讀寫,就是不同設備之間的數據交換,也就是io類操作。這類操作因為有非常的難度實現,所以一定是由操作系統來完成底層的操作的。那麼,第一手的數據必定要經過內核區域。然而我們的代碼是跑在用戶區的,那麼,通常情況下,就會存在內核區數據,拷貝到用戶區數據的這麼一個過程。這是一個讀的過程,而寫的過程則是一個相反的操作,從用戶區拷貝數據到內核區,然後再由內核完成io操作。

直接將內存劃分為內核區與用戶區,實在是太泛瞭,不能說錯,但有一種說瞭等於沒說的感覺。

所以,對內存的劃分,還需要再細點,即所謂的內存模型或者內存區域。各語言各場景各實現自然是百傢爭鳴,無可厚非。但大致就是按照一定的規則,切分成不同用途的區域,然後在需要的時候向該區域進行內存分配,並保存到相應的表或者標識中,以便後續可讀或不可再分配。而這其中,還有個非常重要的點是,除瞭知道如何分配內存之外,還要知道如何回收內存。另外,如何保證內存的可見性,也是一個內存模型需要考慮的重要話題。

具體實現就不用說瞭,因為沒有一個放之四海而皆準的說法,我也沒那能耐講清楚這事情。大傢自行腦補吧。

二、Java中的直接內存原理

首先,來說說為什麼java中會有直接內存這個概念?我們知道,java中有很重要的一個內存區域,即堆內存,幾乎所有的對象都堆上進行分配,所以,大部分的GC工作,也是針對堆進行的。關聯上一節所講的事,堆內存我們可以劃分到用戶空間內存區域去。應該說,java隻要將這一塊內存管理好瞭,基本上就可以管理好java的對象的生命周期瞭。那麼,到底什麼直接內存?和堆內存又有啥關系?

直接內存是脫離掉堆空間的,它不屬於java的堆,其他區域也不屬於,即直接內存不受jvm管控。它屬於受系統直接控制的一段內存區域。

為什麼直接內存要脫離jvm的管控呢?因為jvm管控的是用戶空間,而有的場景則必須要內核空間的介入,整個過程才能完成。而如果用戶空間想要獲取數據,則必須要像內核中請求復制數據,數據才對用戶空間可見。而很多這種場景,復制數據的目的,僅僅是為瞭使用一次其數據,做瞭相應的轉換後,就不再使用有關系,比如流數據的接入過程。這個復制的過程,則必定有不少的性能損耗,所以就有直接內存的出現。它的目的在於避免內核空間和用戶空間之間進行無意義的數據復制,從而提升程序性能。

直接內存不受jvm管控,那麼它受誰的管控呢?實際上,是由操作系統的底層進行管控的,在進行內存分配請求時,系統會申請一段共享區域。由內核和用戶代碼共享這裡的數據寫入,即內核寫入的數據,用戶代碼可以直接訪問,用戶代碼寫入的數據,內核可以直接使用。在底層,是由mmap這種函數接口來實現的共享內存的。

而在java層面,則是使用DirectByteBuffer來呈現的,它的創建、使用、刪除如下:

// 創建直接內存空間實例
    ByteBuffer buffer = ByteBuffer.allocateDirect(1600);
    for (int i = 0; i < 90_0000; i++) {
        for (int j = 0; j < 199; j++) {
            // 數據的寫入
            buffer.putInt(j);
        }
        buffer.flip();
        for (int j = 0; j < 199; j++) {
            // 數據的讀取
            buffer.get();
        }
        // 數據清理
        buffer.clear();
    }

三、Netty中使用直接內存

知道瞭直接內存的使用過程,那麼如何找到更好的場景,則是需要我們去發現的。netty作為一個高性能網絡通信框架,重要的工作就是在處理網絡io問題。那麼,在它的場景裡,使用上直接內存這一大殺器,則是再好不過瞭。那麼,netty是如何利用它的呢?

兩個場景:1. 向應用傳遞網絡數據時(讀過程); 2. 應用向遠端傳遞數據時(寫過程);

// 寫過程,將msg轉換為直接內存存儲的二進制數據
    // io.netty.handler.codec.MessageToByteEncoder#write
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                // 默認 preferDirect = true;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    // 調用子類的實現,編碼數據,以便實現私有協議
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                    // 寫數據到遠端
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }
    // io.netty.handler.codec.MessageToByteEncoder#allocateBuffer
    /**
     * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.
     * Sub-classes may override this method to return {@link ByteBuf} with a perfect matching {@code initialCapacity}.
     */
    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                               boolean preferDirect) throws Exception {
        if (preferDirect) {
            // PooledByteBufAllocator
            return ctx.alloc().ioBuffer();
        } else {
            return ctx.alloc().heapBuffer();
        }
    }
    // io.netty.buffer.AbstractByteBufAllocator#ioBuffer()
    @Override
    public ByteBuf ioBuffer() {
        if (PlatformDependent.hasUnsafe()) {
            return directBuffer(DEFAULT_INITIAL_CAPACITY);
        }
        return heapBuffer(DEFAULT_INITIAL_CAPACITY);
    }
    // io.netty.buffer.AbstractByteBufAllocator#directBuffer(int)
    @Override
    public ByteBuf directBuffer(int initialCapacity) {
        return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
    }
    @Override
    public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
        if (initialCapacity == 0 && maxCapacity == 0) {
            return emptyBuf;
        }
        validate(initialCapacity, maxCapacity);
        return newDirectBuffer(initialCapacity, maxCapacity);
    }
    // io.netty.buffer.PooledByteBufAllocator#newDirectBuffer
    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

    // io.netty.buffer.PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int)
    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        allocate(cache, buf, reqCapacity);
        return buf;
    }
        // io.netty.buffer.PoolArena.DirectArena#newByteBuf
        @Override
        protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
            if (HAS_UNSAFE) {
                return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
            } else {
                return PooledDirectByteBuf.newInstance(maxCapacity);
            }
        }

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }
    // io.netty.util.internal.PlatformDependent0#newDirectBuffer
    static ByteBuffer newDirectBuffer(long address, int capacity) {
        ObjectUtil.checkPositiveOrZero(capacity, "capacity");

        try {
            return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity);
        } catch (Throwable cause) {
            // Not expected to ever throw!
            if (cause instanceof Error) {
                throw (Error) cause;
            }
            throw new Error(cause);
        }
    }

向ByteBuffer中寫入數據過程, 即是向直接內存中寫入數據的過程,它可能不像普通的堆對象一樣簡單咯。

// io.netty.buffer.AbstractByteBuf#writeBytes(byte[])
    @Override
    public ByteBuf writeBytes(byte[] src) {
        writeBytes(src, 0, src.length);
        return this;
    }

    @Override
    public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
        ensureWritable(length);
        setBytes(writerIndex, src, srcIndex, length);
        writerIndex += length;
        return this;
    }
    
    // io.netty.buffer.PooledUnsafeDirectByteBuf#setBytes(int, byte[], int, int)
    @Override
    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
        // addr() 將會得到一個內存地址
        UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
        return this;
    }
    // io.netty.buffer.PooledUnsafeDirectByteBuf#addr
    private long addr(int index) {
        return memoryAddress + index;
    }

    // io.netty.buffer.UnsafeByteBufUtil#setBytes(io.netty.buffer.AbstractByteBuf, long, int, byte[], int, int)
    static void setBytes(AbstractByteBuf buf, long addr, int index, byte[] src, int srcIndex, int length) {
        buf.checkIndex(index, length);
        if (length != 0) {
            // 將字節數據copy到DirectByteBuffer中
            PlatformDependent.copyMemory(src, srcIndex, addr, length);
        }
    }
    // io.netty.util.internal.PlatformDependent#copyMemory(byte[], int, long, long)
    public static void copyMemory(byte[] src, int srcIndex, long dstAddr, long length) {
        PlatformDependent0.copyMemory(src, BYTE_ARRAY_BASE_OFFSET + srcIndex, null, dstAddr, length);
    }
    // io.netty.util.internal.PlatformDependent0#copyMemory(java.lang.Object, long, java.lang.Object, long, long)
    static void copyMemory(Object src, long srcOffset, Object dst, long dstOffset, long length) {
        //UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, length);
        while (length > 0) {
            long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
            // 最終由jvm的本地方法,進行內存的copy, 此處dst為null, 即數據隻會copy到對應的 dstOffset 中
            // 偏移基數就是: 各種基礎地址 ARRAY_OBJECT_BASE_OFFSET...
            UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
            length -= size;
            srcOffset += size;
            dstOffset += size;
        }
    }

可以看到,最後直接內存的寫入,是通過 Unsafe 類,對操作系統進行內存數據的寫入的。

最後,來看下它如何將寫數據到遠端:

// io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
    @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return promise;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        write(msg, false, promise);

        return promise;
    }

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
        }
    }
    
    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
        // io.netty.channel.DefaultChannelPipeline.HeadContext#write
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }
        // io.netty.channel.AbstractChannel.AbstractUnsafe#write
        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
                // 轉換msg為直接內存,如有必要
                msg = filterOutboundMessage(msg);
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }
            // 將msg放入outboundBuffer中,即相當於寫完瞭數據
            outboundBuffer.addMessage(msg, size, promise);
        }
    // io.netty.channel.nio.AbstractNioByteChannel#filterOutboundMessage
    @Override
    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }

            return newDirectBuffer(buf);
        }

        if (msg instanceof FileRegion) {
            return msg;
        }

        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
    }
    // io.netty.channel.ChannelOutboundBuffer#addMessage
    /**
     * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
     * the message was written.
     */
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        // 如有必要,立即觸發 fireChannelWritabilityChanged 事件,從而使立即向網絡寫入數據
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }

大概就是說,通過直接內存寫好的數據,隻需要再調用下內核的接入接口,將直接內存的數據放入緩沖,就可以被發送到遠端瞭。

最後,我們來看下簡要netty對於網絡數據的接入讀取過程,以辨別是否使用瞭直接內存,以及是如何使用的。

// io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
        @Override
        public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
            // 分配創建ByteBuffer, 此處實際就是直接內存的體現
                    byteBuf = allocHandle.allocate(allocator);
            // 將數據讀取到ByteBuffer中
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
            // 讀取到一部分數據,就向pipeline的下遊傳遞,而非全部完成後再傳遞
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
    // io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#allocate
        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {
            return alloc.ioBuffer(guess());
        }
    // io.netty.buffer.AbstractByteBufAllocator#ioBuffer(int)
    @Override
    public ByteBuf ioBuffer(int initialCapacity) {
        if (PlatformDependent.hasUnsafe()) {
            return directBuffer(initialCapacity);
        }
        return heapBuffer(initialCapacity);
    }

可見同樣,在接入數據時,仍然使用直接內存進行數據接收,從而達到內核與用戶共享,無需拷貝的目的。

以上,就是netty對整個直接內存的操作方式瞭。看起來有點復雜,主要netty到處都是其設計哲學的體現,無論是一個寫事件、讀事件、或者是狀態變更事件,都是一長串的流水線操作。當然瞭,我們此處討論的是,其如何使用直接內存的。它通過使用一個 PooledUnsafeDirectByteBuf , 最終引用jdk的 direct = ByteBuffer.allocateDirect(1); 使用 DirectByteBuffer 實現直接內存的使用。並使用其構造方法 DirectByteBuffer(long addr, int cap) 進行直接內存對象創建。

四、總結

從整體上來說,直接內存減少瞭進行io時的內存復制操,但其僅為內核與用戶空間的內存復制,因為用戶空間的數據復制是並不可少的,因為最終它們都必須要轉換為二進制流,才能被不同空間的程序讀取。但創建直接內存對象的開銷要高於創建普通內存對象,因為它可能需要維護更復雜的關系環境。事實上,直接內存可以做到不同進程間的內存共享,而這在普通對象內存中是無法做到的(不過java是單進程的,不care此場景)。java的直接內存的使用,僅為使用系統提供的一個便捷接口,適應更好的場景。

直接內存實際上也可以叫共享內存,它可以實現不同進程之間的通信,即不同進程可以看到其他進程對本塊內存地址的修改。這是一種高效的進程間通信方式,這對於多進程應用很有幫助。但對於多線程應用則不是必須,因為多線程本身就是共享內存的。而類似於nginx之類的應用,則非常有用瞭。因為對於一些全局計數器,必然需要多進程維護,通過共享內存完美解決。

而netty作為一個網絡通信框架,則是為瞭更好處理具體場景,更合理的使用瞭直接內存,從而成就瞭所謂的零拷貝,高性能的基石之一。所以,一個好的框架,一定是解決某類問題的翹楚,它不一定是功能開創者,但一定是很好的繼承者。

另外,內存管理是個非常復雜的問題。 但又很重要,值得我們花大量時間去研究。

以上就是分析Netty直接內存原理及應用的詳細內容,更多關於Netty 直接內存原理的資料請關註WalkonNet其它相關文章!

推薦閱讀: