Netty的ByteBuf主要用于网络传输,有读写两个index。
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* | | (CONTENT) | |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
我们可以看出,大致分为几类ByteBuf,一类是Pooled,UnPooled。另一个是Heap,Direct。
public abstract class AbstractByteBuf extends ByteBuf { static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class); int readerIndex; //读的索引 int writerIndex;//写的索引 private int markedReaderIndex;//标记读的索引 private int markedWriterIndex;//标记写的索引 private int maxCapacity;//最大容量 }
PooledByteBuf,通过capacity和deallocate方法我们可以发现都是用 chunk.arena来分配内存和释放内存的
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf { private final Recycler.Handle recyclerHandle; protected PoolChunk<T> chunk; protected long handle; protected T memory; protected int offset; protected int length; int maxLength; private ByteBuffer tmpNioBuf; @Override public final ByteBuf capacity(int newCapacity) { ensureAccessible(); // If the request capacity does not require reallocation, just update the length of the memory. if (chunk.unpooled) { if (newCapacity == length) { return this; } } else { if (newCapacity > length) { if (newCapacity <= maxLength) { length = newCapacity; return this; } } else if (newCapacity < length) { if (newCapacity > maxLength >>> 1) { if (maxLength <= 512) { if (newCapacity > maxLength - 16) { length = newCapacity; setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); return this; } } else { // > 512 (i.e. >= 1024) length = newCapacity; setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); return this; } } } else { return this; } } // Reallocation required. chunk.arena.reallocate(this, newCapacity, true); return this; } @Override protected final void deallocate() { if (handle >= 0) { final long handle = this.handle; this.handle = -1; memory = null; chunk.arena.free(chunk, handle, maxLength); recycle(); } } }
PooledDirectByteBuf 是缓存的DirectByteBuf. 用RECYCLER来做回收重复利用。调用memory的get方法获取数据。memory是一个DirectByteBuffer对象。
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> { private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() { @Override protected PooledDirectByteBuf newObject(Handle handle) { return new PooledDirectByteBuf(handle, 0); } }; static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); buf.maxCapacity(maxCapacity); return buf; } @Override protected byte _getByte(int index) { return memory.get(idx(index)); } @Override protected short _getShort(int index) { return memory.getShort(idx(index)); } }
PooledUnsafeDirectByteBuf和PooledDirectByteBuf的区别是没有使用DirectByteBuffer的接口方法去获取数据,而是直接用PlatformDependent的方法获取数据
final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> { private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN; private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() { @Override protected PooledUnsafeDirectByteBuf newObject(Handle handle) { return new PooledUnsafeDirectByteBuf(handle, 0); } }; static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) { PooledUnsafeDirectByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); buf.maxCapacity(maxCapacity); return buf; } private long memoryAddress; private void initMemoryAddress() { memoryAddress = PlatformDependent.directBufferAddress(memory) + offset; } @Override protected byte _getByte(int index) { return PlatformDependent.getByte(addr(index)); } @Override protected short _getShort(int index) { short v = PlatformDependent.getShort(addr(index)); return NATIVE_ORDER? v : Short.reverseBytes(v); } }
PooledHeapByteBuf是直接用堆内分配的byte数组来存储数据,因此可以直接通过偏移量来读取数据
final class PooledHeapByteBuf extends PooledByteBuf<byte[]> { private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() { @Override protected PooledHeapByteBuf newObject(Handle handle) { return new PooledHeapByteBuf(handle, 0); } }; static PooledHeapByteBuf newInstance(int maxCapacity) { PooledHeapByteBuf buf = RECYCLER.get(); buf.setRefCnt(1); buf.maxCapacity(maxCapacity); return buf; } @Override protected byte _getByte(int index) { return memory[idx(index)]; } @Override protected short _getShort(int index) { index = idx(index); return (short) (memory[index] << 8 | memory[index + 1] & 0xFF); } }
UnpooledHeapByteBuf和PooledHeapByteBuf的区别是UnpooledHeapByteBuf不会用RECYCLER回收器
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc; private byte[] array; private ByteBuffer tmpNioBuf; /** * Creates a new heap buffer with a newly allocated byte array. * * @param initialCapacity the initial capacity of the underlying byte array * @param maxCapacity the max capacity of the underlying byte array */ protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { this(alloc, new byte[initialCapacity], 0, 0, maxCapacity); } }
Netty会调用PoolArea的allocate方法创建ByteBuf对象
abstract class PoolArena<T> { static final int numTinySubpagePools = 512 >>> 4; final PooledByteBufAllocator parent; private final int maxOrder; final int pageSize; final int pageShifts; final int chunkSize; final int subpageOverflowMask; final int numSmallSubpagePools; private final PoolSubpage<T>[] tinySubpagePools; private final PoolSubpage<T>[] smallSubpagePools; private final PoolChunkList<T> q050; private final PoolChunkList<T> q025; private final PoolChunkList<T> q000; private final PoolChunkList<T> qInit; private final PoolChunkList<T> q075; private final PoolChunkList<T> q100; private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { // capacity < pageSize int tableIdx; PoolSubpage<T>[] table; if (isTiny(normCapacity)) { // < 512 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } synchronized (this) { final PoolSubpage<T> head = table[tableIdx]; final PoolSubpage<T> s = head.next; if (s != head) { assert s.doNotDestroy && s.elemSize == normCapacity; long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, handle, reqCapacity); return; } } } else if (normCapacity <= chunkSize) { if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; } } else { // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, reqCapacity); return; } allocateNormal(buf, reqCapacity, normCapacity); } }
相关推荐
对于 Netty ByteBuf 的零拷贝(Zero Copy) 的理解1
类似于Netty ByteBuf的ByteBuf的Rust实现
netty通信时经常和底层数据交互,C语言和java的数据类型和范围不同,通信时需要转化或兼容,附件为字节码、进制常用的转换类。
将Netty ByteBuf的相关类,进行梳理总结、源码分析,通过思维导图的注释看源码!
再根据本人实际学习体验总结而成。本部分内容可能不那么全面,但是我尽量挑选Netty中我认为比较重要的部分做讲解。
81_Netty引用计数的实现机制与自旋锁的使用技巧 82_Netty引用计数原子更新揭秘与AtomicIntegerFieldUpdater深度剖析 83_AtomicIntegerFieldUpdater实例演练与volatile关键字分析 84_Netty引用计数注意事项与内存泄露...
第81讲:Netty引用计数的实现机制与自旋锁的使用技巧 第82讲:Netty引用计数原子更新揭秘与AtomicIntegerFieldUpdater深度剖析 第83讲:AtomicIntegerFieldUpdater实例演练与volatile关键字分析 第84讲:Netty...
Netty是一个异步事件驱动的网络应用...只需要理解并熟练运用ByteBuf、Channel、Pipeline、Event模型等相关技术,就可以进行Netty的开发。同时,Netty对事件的处理也非常灵活,支持用户自定义各种类型的ChannelHandler。
Netty是一个异步事件驱动的网络应用...只需要理解并熟练运用ByteBuf、Channel、Pipeline、Event模型等相关技术,就可以进行Netty的开发。同时,Netty对事件的处理也非常灵活,支持用户自定义各种类型的ChannelHandler。
Netty框架中LineBasedFrameDecoder分隔符解码器解决考虑TCP的粘包与拆包问题。依次编译bytebuf中的可读字符,判断看是否有“\n”或者“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成...
6.数据传输载体ByteBuf介绍 12 7.客户端与服务端通信协议编解码 14 8.实战:Netty实现客户端登录 15 9.实战:实现客户端与服务端收发消息 16 10.Pipeline与ChannelHandler 17 11.实战:构建客户端与服务端 Pipeline 18 ...
第15章 ByteBuf 和相关辅助类 第16章 Channel 和Unsafe 第17章 ChannelPipeline 和ChannelHandler 第18章 EvenLoop 和EventLoopGroup 第19章 Future 和Promise 第20章 Netty 架构解析 第21章j ava多线程编程在netty...
76_Netty项目开发过程中常见且重要事项分析 77_Java NIO Buffer总结回顾与难点拓展 78_Netty数据容器ByteBuf底层数据结构深度剖析 79_Netty的ByteBuf底层实现大揭秘 80_Netty复合缓冲区详解与3种缓冲区适用场景分析 ...
第15 章 ByteBuf 和相关辅助类...... 288 第16 章 Channel 和Unsafe ...... 338 第17 章 ChannelPipeline 和ChannelHandler...... 388 第18 章 EventLoop 和EventLoopGroup...... 419 ...
第5章 ByteBuf 第6章 ChannelHandler和ChannelPipeline 第7章 EventLoop和线程模型 第8章 引导 第9章 单元测试 第二部分 编解码器 第10章 编解码器框架 第11章 预置的ChannelHandler和编解码器 第三部分 ...
UdpServerSocketChannel: ...import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.Ch
今天学习Netty堆缓存和直接缓存遇到一个问题,明明使用的是堆缓存,这么读取不到数据呢?打印日志一看heapBuf.hasArray()直接返回false。来下面我们来看看源码,到底是怎么回事。 2、问题分析 首先写一个...
Netty中频道的分类 NioServerSocketChannel服务端的Channel,在服务启动的时候创建,用于接收新连接 NioSocketChannel客服端Channel,在新连接进入时创建,并负责监听数据读写 不安全,用于实现两个频道临时的协议...
org.apache.mina.core.buffer.IoBuffer mina core 包
Netty 4.x写一个分区服务器 写个特定服务器 世上最简单的协议不是而是DISCARD(替代)。这个协议将会丢掉任何收到的数据,而不响应。... (( ByteBuf ) msg) . release(); } @Override public voi