`

Netty ByteBuf的使用

阅读更多

Netty的ByteBuf主要用于网络传输,有读写两个index。

 *      +-------------------+------------------+------------------+

 *      | discardable bytes |  readable bytes  |  writable bytes  |

 *      |                   |     (CONTENT)    |                  |

 *      +-------------------+------------------+------------------+

 *      |                   |                  |                  |

 *      0      <=      readerIndex   <=   writerIndex    <=    capacity


 我们可以看出,大致分为几类ByteBuf,一类是Pooled,UnPooled。另一个是Heap,Direct。

public abstract class AbstractByteBuf extends ByteBuf {

    static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);

    int readerIndex; //读的索引
    int writerIndex;//写的索引
    private int markedReaderIndex;//标记读的索引
    private int markedWriterIndex;//标记写的索引

    private int maxCapacity;//最大容量
}

   PooledByteBuf,通过capacity和deallocate方法我们可以发现都是用 chunk.arena来分配内存和释放内存的

abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {

    private final Recycler.Handle recyclerHandle;

    protected PoolChunk<T> chunk;
    protected long handle;
    protected T memory;
    protected int offset;
    protected int length;
    int maxLength;

    private ByteBuffer tmpNioBuf;

   @Override
    public final ByteBuf capacity(int newCapacity) {
        ensureAccessible();

        // If the request capacity does not require reallocation, just update the length of the memory.
        if (chunk.unpooled) {
            if (newCapacity == length) {
                return this;
            }
        } else {
            if (newCapacity > length) {
                if (newCapacity <= maxLength) {
                    length = newCapacity;
                    return this;
                }
            } else if (newCapacity < length) {
                if (newCapacity > maxLength >>> 1) {
                    if (maxLength <= 512) {
                        if (newCapacity > maxLength - 16) {
                            length = newCapacity;
                            setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
                            return this;
                        }
                    } else { // > 512 (i.e. >= 1024)
                        length = newCapacity;
                        setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
                        return this;
                    }
                }
            } else {
                return this;
            }
        }

        // Reallocation required.
        chunk.arena.reallocate(this, newCapacity, true);
        return this;
    }

    @Override
    protected final void deallocate() {
        if (handle >= 0) {
            final long handle = this.handle;
            this.handle = -1;
            memory = null;
            chunk.arena.free(chunk, handle, maxLength);
            recycle();
        }
    }
}

    PooledDirectByteBuf 是缓存的DirectByteBuf. 用RECYCLER来做回收重复利用。调用memory的get方法获取数据。memory是一个DirectByteBuffer对象。

final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {

    private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
        @Override
        protected PooledDirectByteBuf newObject(Handle handle) {
            return new PooledDirectByteBuf(handle, 0);
        }
    };

    static PooledDirectByteBuf newInstance(int maxCapacity) {
        PooledDirectByteBuf buf = RECYCLER.get();
        buf.setRefCnt(1);
        buf.maxCapacity(maxCapacity);
        return buf;
    }

    @Override
    protected byte _getByte(int index) {
        return memory.get(idx(index));
    }

    @Override
    protected short _getShort(int index) {
        return memory.getShort(idx(index));
    }
}

  PooledUnsafeDirectByteBuf和PooledDirectByteBuf的区别是没有使用DirectByteBuffer的接口方法去获取数据,而是直接用PlatformDependent的方法获取数据

final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {

    private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;

    private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
        @Override
        protected PooledUnsafeDirectByteBuf newObject(Handle handle) {
            return new PooledUnsafeDirectByteBuf(handle, 0);
        }
    };

    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();
        buf.setRefCnt(1);
        buf.maxCapacity(maxCapacity);
        return buf;
    }
    private long memoryAddress;

  private void initMemoryAddress() {
        memoryAddress = PlatformDependent.directBufferAddress(memory) + offset;
    }
  
    @Override
    protected byte _getByte(int index) {
        return PlatformDependent.getByte(addr(index));
    }

    @Override
    protected short _getShort(int index) {
        short v = PlatformDependent.getShort(addr(index));
        return NATIVE_ORDER? v : Short.reverseBytes(v);
    }
}

 PooledHeapByteBuf是直接用堆内分配的byte数组来存储数据,因此可以直接通过偏移量来读取数据

final class PooledHeapByteBuf extends PooledByteBuf<byte[]> {

    private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
        @Override
        protected PooledHeapByteBuf newObject(Handle handle) {
            return new PooledHeapByteBuf(handle, 0);
        }
    };

    static PooledHeapByteBuf newInstance(int maxCapacity) {
        PooledHeapByteBuf buf = RECYCLER.get();
        buf.setRefCnt(1);
        buf.maxCapacity(maxCapacity);
        return buf;
    }
    @Override
    protected byte _getByte(int index) {
        return memory[idx(index)];
    }

    @Override
    protected short _getShort(int index) {
        index = idx(index);
        return (short) (memory[index] << 8 | memory[index + 1] & 0xFF);
    }
}

   UnpooledHeapByteBuf和PooledHeapByteBuf的区别是UnpooledHeapByteBuf不会用RECYCLER回收器

public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {

    private final ByteBufAllocator alloc;
    private byte[] array;
    private ByteBuffer tmpNioBuf;

    /**
     * Creates a new heap buffer with a newly allocated byte array.
     *
     * @param initialCapacity the initial capacity of the underlying byte array
     * @param maxCapacity the max capacity of the underlying byte array
     */
    protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
    }
}

   Netty会调用PoolArea的allocate方法创建ByteBuf对象

abstract class PoolArena<T> {

    static final int numTinySubpagePools = 512 >>> 4;

    final PooledByteBufAllocator parent;

    private final int maxOrder;
    final int pageSize;
    final int pageShifts;
    final int chunkSize;
    final int subpageOverflowMask;
    final int numSmallSubpagePools;
    private final PoolSubpage<T>[] tinySubpagePools;
    private final PoolSubpage<T>[] smallSubpagePools;

    private final PoolChunkList<T> q050;
    private final PoolChunkList<T> q025;
    private final PoolChunkList<T> q000;
    private final PoolChunkList<T> qInit;
    private final PoolChunkList<T> q075;
    private final PoolChunkList<T> q100;

 private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            if (isTiny(normCapacity)) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            synchronized (this) {
                final PoolSubpage<T> head = table[tableIdx];
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                    return;
                }
            }
        } else if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
            return;
        }
        allocateNormal(buf, reqCapacity, normCapacity);
    }
}

 

 

  

 

  • 大小: 74.4 KB
分享到:
评论

相关推荐

    对于 Netty ByteBuf 的零拷贝(Zero Copy) 的理解1

    对于 Netty ByteBuf 的零拷贝(Zero Copy) 的理解1

    bytebuf-rs:类似于Netty ByteBuf的ByteBuf的Rust实现

    类似于Netty ByteBuf的ByteBuf的Rust实现

    netty 在java中的字节码转换

    netty通信时经常和底层数据交互,C语言和java的数据类型和范围不同,通信时需要转化或兼容,附件为字节码、进制常用的转换类。

    ByteBuf源码分析

    将Netty ByteBuf的相关类,进行梳理总结、源码分析,通过思维导图的注释看源码!

    Netty简介 Netty线程模型和EventLoop Codec编码与解码 ByteBuf容器

    再根据本人实际学习体验总结而成。本部分内容可能不那么全面,但是我尽量挑选Netty中我认为比较重要的部分做讲解。

    精通并发与netty视频教程(2018)视频教程

    81_Netty引用计数的实现机制与自旋锁的使用技巧 82_Netty引用计数原子更新揭秘与AtomicIntegerFieldUpdater深度剖析 83_AtomicIntegerFieldUpdater实例演练与volatile关键字分析 84_Netty引用计数注意事项与内存泄露...

    精通并发与netty 无加密视频

    第81讲:Netty引用计数的实现机制与自旋锁的使用技巧 第82讲:Netty引用计数原子更新揭秘与AtomicIntegerFieldUpdater深度剖析 第83讲:AtomicIntegerFieldUpdater实例演练与volatile关键字分析 第84讲:Netty...

    netty3.10.6稳定版

    Netty是一个异步事件驱动的网络应用...只需要理解并熟练运用ByteBuf、Channel、Pipeline、Event模型等相关技术,就可以进行Netty的开发。同时,Netty对事件的处理也非常灵活,支持用户自定义各种类型的ChannelHandler。

    netty4.0.56稳定版本

    Netty是一个异步事件驱动的网络应用...只需要理解并熟练运用ByteBuf、Channel、Pipeline、Event模型等相关技术,就可以进行Netty的开发。同时,Netty对事件的处理也非常灵活,支持用户自定义各种类型的ChannelHandler。

    netty 分隔符解码器使用实例

    Netty框架中LineBasedFrameDecoder分隔符解码器解决考虑TCP的粘包与拆包问题。依次编译bytebuf中的可读字符,判断看是否有“\n”或者“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成...

    Netty 入门与实战:仿写微信 IM 即时通讯系统.rar

    6.数据传输载体ByteBuf介绍 12 7.客户端与服务端通信协议编解码 14 8.实战:Netty实现客户端登录 15 9.实战:实现客户端与服务端收发消息 16 10.Pipeline与ChannelHandler 17 11.实战:构建客户端与服务端 Pipeline 18 ...

    netty权威指南第2版

    第15章 ByteBuf 和相关辅助类 第16章 Channel 和Unsafe 第17章 ChannelPipeline 和ChannelHandler 第18章 EvenLoop 和EventLoopGroup 第19章 Future 和Promise 第20章 Netty 架构解析 第21章j ava多线程编程在netty...

    精通并发与 netty 视频教程(2018)视频教程

    76_Netty项目开发过程中常见且重要事项分析 77_Java NIO Buffer总结回顾与难点拓展 78_Netty数据容器ByteBuf底层数据结构深度剖析 79_Netty的ByteBuf底层实现大揭秘 80_Netty复合缓冲区详解与3种缓冲区适用场景分析 ...

    Netty权威指南(第2版)(李林峰)-书签目录-完整版.zip

    第15 章 ByteBuf 和相关辅助类...... 288 第16 章 Channel 和Unsafe ...... 338 第17 章 ChannelPipeline 和ChannelHandler...... 388 第18 章 EventLoop 和EventLoopGroup...... 419 ...

    Netty In Action.zip

    第5章 ByteBuf 第6章 ChannelHandler和ChannelPipeline 第7章 EventLoop和线程模型 第8章 引导 第9章 单元测试 第二部分 编解码器 第10章 编解码器框架 第11章 预置的ChannelHandler和编解码器 第三部分 ...

    UdpServerSocketChannel:Netty udp服务器套接字通道,它为每个远程地址分配单独的通道

    UdpServerSocketChannel: ...import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.Ch

    Netty堆缓存问题

      今天学习Netty堆缓存和直接缓存遇到一个问题,明明使用的是堆缓存,这么读取不到数据呢?打印日志一看heapBuf.hasArray()直接返回false。来下面我们来看看源码,到底是怎么回事。 2、问题分析   首先写一个...

    netty-study:看Netty原始记录

    Netty中频道的分类 NioServerSocketChannel服务端的Channel,在服务启动的时候创建,用于接收新连接 NioSocketChannel客服端Channel,在新连接进入时创建,并负责监听数据读写 不安全,用于实现两个频道临时的协议...

    mina core 包

    org.apache.mina.core.buffer.IoBuffer mina core 包

    netty-learn:Netty4.X社区配套原始码,博客地址:https

    Netty 4.x写一个分区服务器 写个特定服务器 世上最简单的协议不是而是DISCARD(替代)。这个协议将会丢掉任何收到的数据,而不响应。... (( ByteBuf ) msg) . release(); } @Override public voi

Global site tag (gtag.js) - Google Analytics