- ByteBuf API
- ByteBuf write
- writeInt
- writeChar
- writeBytes
- writeBytes
- writeBytes
- writeFloat
- writeByte
- writeShort
- writeDouble
- writeBoolean
- writeLong
- writeBytes
- ByteBuf read
- readInt
- readChar
- readBytes
- readFloat
- readLong
- readByte
- readShort
- readBoolean
- readDouble
- readUnsignedByte
- readUnsignedShort
- readerIndex
- readByte
- readUnsignedInt
- readSlice
- readInt
- discard bytes
- clear
- mark reset
- 查找
- derived buffers
- get set
- 内存池
- ButeBuf 类型
- AbstractByteBuf
- ResourceLeakDetector
- SwappedByteBuf
- AbstractReferenceCountedByteBuf
- UnPooledHeapByteBuf
首先我们来看一下netty buffer包的继承结构
接下来我会对几个类进行代码测试.
首先我们来看一下如何使用Netty提供的工具类构建一个ByteBuf
1 2
| ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); Assert.assertEquals(1024, buf.capacity());
|
我们使用ByteBufAllocator
这个工具类构建了一个1024
大小的ByteBuf
出来.
ByteBuf提供了 readerIndex
和 writerIndex
进行缓冲区的顺序读写操作.
readerIndex
标志读取索引
writerIndex
标志写入索引
- [0, readerIndex] 已经读取多的缓冲区区间
- [readerIndex, writerIndex] 可读的缓冲区区间
- [writerIndex, capacity] 可写的缓冲区区间
每个索引移动的单位是bytes
, 在下例中我们向ByteBuf写入一个int数值, writerIdex
会移动4个bytes
ByteBuf API
我们首先看一下ByteBuf提供的API
ByteBuf write
接下来我们看一下向ByteBuf缓冲区写入数据的API
writeInt
1 2 3 4 5 6
| public void testWriteInt() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeInt(1); Assert.assertEquals(4, buf.writerIndex()); }
|
writeChar
1 2 3 4 5 6
| public void testWriteChar() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeChar('a'); Assert.assertEquals(2, buf.writerIndex()); }
|
writeBytes
1 2 3 4 5 6 7 8
| public void testWriteBytes() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); byte[] bytes = new byte[]{100}; buf.writeBytes(bytes); Assert.assertEquals(1, buf.writerIndex()); }
|
writeBytes
1 2 3 4 5 6 7 8 9
| public void testWriteBytesWithStartEndIndex() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); byte[] bytes = new byte[]{100, 1, 3}; buf.writeBytes(bytes, 1, 1); Assert.assertEquals(1, buf.writerIndex()); }
|
writeBytes
1 2 3 4 5 6 7 8
| public void testWriteBytes3() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(1024); buf1.writeInt(1); buf.writeBytes(buf1); Assert.assertEquals(4, buf.writerIndex()); }
|
writeFloat
1 2 3 4 5 6 7
| public void testWriteFloat() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeFloat(0.1f); Assert.assertEquals(4, buf.writerIndex()); }
|
writeByte
1 2 3 4 5 6 7 8
| public void testWriteByte() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeByte(1); Assert.assertEquals(1, buf.writerIndex()); buf.writeByte(1000); Assert.assertEquals(2, buf.writerIndex()); }
|
writeShort
1 2 3 4 5 6
| public void testWriteShort() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeShort(1000); Assert.assertEquals(2, buf.writerIndex()); }
|
writeDouble
1 2 3 4 5 6
| public void testWriteDouble() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeDouble(1000.0d); Assert.assertEquals(8, buf.writerIndex()); }
|
writeBoolean
1 2 3 4 5 6
| public void testWriteBoolean() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeBoolean(false); Assert.assertEquals(1, buf.writerIndex()); }
|
writeLong
1 2 3 4 5 6 7
| public void testWriteLong() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeLong(100l); Assert.assertEquals(8, buf.writerIndex()); }
|
writeBytes
1 2 3 4 5 6 7
| public void testWriteOverLoadMaxCapacity() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(5); buf.writeBytes("123456".getBytes()); Assert.assertEquals(6, buf.writerIndex()); }
|
ByteBuf read
刚才我们看了向ByteBuf缓冲区写入数据的API,接下来我们看一下从ByteBuf缓冲区读取数据的API
readInt
1 2 3 4 5 6 7 8
| public void testReadInt() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeInt(1); int read = buf.readInt(); Assert.assertEquals(4, buf.readerIndex()); Assert.assertEquals(1, read); }
|
readChar
1 2 3 4 5 6 7 8 9
| public void testReadChar() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeChar('1'); char read = buf.readChar(); Assert.assertEquals(2, buf.readerIndex()); Assert.assertEquals('1', read); }
|
readBytes
1 2 3 4 5 6 7 8 9
| public void testReadBytes() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}); byte[] read = new byte[10]; buf.readBytes(read); Assert.assertEquals(10, buf.readerIndex()); Assert.assertEquals(0, read[9]); }
|
1 2 3 4 5 6 7 8 9
| public void testReadBytesWithStartEndIndex() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}); byte[] read = new byte[10]; buf.readBytes(read, 2, 3); Assert.assertEquals(3, buf.readerIndex()); Assert.assertEquals(3, read[0]); }
|
1 2 3 4 5 6 7
| public void testRead3Bytes() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}); buf.readBytes(3); Assert.assertEquals(3, buf.readerIndex()); }
|
readFloat
1 2 3 4 5 6 7 8 9
| public void testReadFloat() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeFloat(10.0f); float read = buf.readFloat(); Assert.assertEquals(4, buf.readerIndex()); Assert.assertEquals(10.f, read); }
|
readLong
1 2 3 4 5 6 7 8
| public void testReadLong() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeLong(10l); buf.readLong(); Assert.assertEquals(8, buf.readerIndex());
}
|
readByte
1 2 3 4 5 6 7
| public void testReadByte() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}); buf.readByte(); Assert.assertEquals(1, buf.readerIndex()); }
|
readShort
1 2 3 4 5 6 7
| public void testReadShort() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeShort(10); buf.readShort(); Assert.assertEquals(2, buf.readerIndex()); }
|
readBoolean
1 2 3 4 5 6 7
| public void testReadBoolean() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeBoolean(true); buf.readBoolean(); Assert.assertEquals(1, buf.readerIndex()); }
|
readDouble
1 2 3 4 5 6 7
| public void testReadDouble() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeDouble(10.0d); buf.readDouble(); Assert.assertEquals(8, buf.readerIndex()); }
|
readUnsignedByte
1 2 3 4 5 6 7 8 9
| public void testReadUnsignedByte() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeByte(-10); short read = buf.readUnsignedByte(); Assert.assertEquals(1, buf.readerIndex()); Assert.assertEquals(246, read); }
|
readUnsignedShort
1 2 3 4 5 6 7 8 9 10
| public void testReadUnsignedShort() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeShort(-1024); int read = buf.readUnsignedShort(); Assert.assertEquals(2, buf.readerIndex()); Assert.assertEquals(64512, read); }
|
readerIndex
1 2 3 4 5 6
| public void testReaderIndex() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}); Assert.assertEquals(0, buf.readerIndex()); }
|
readByte
1 2 3 4 5 6 7 8 9
| public void testReadableBytes() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}); Assert.assertEquals(10, buf.readableBytes()); buf.readByte(); Assert.assertEquals(9, buf.readableBytes()); }
|
readUnsignedInt
1 2 3 4 5 6 7 8
| public void testReadUnsignedInt() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeInt(10); long read = buf.readUnsignedInt(); Assert.assertEquals(4, buf.readerIndex()); Assert.assertEquals(10, read); }
|
readSlice
1 2 3 4 5 6 7 8 9 10 11
| public void testReadSlice() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}); ByteBuf read = buf.readSlice(5); Assert.assertEquals(5, buf.readerIndex()); Assert.assertEquals(1, read.readByte()); Assert.assertEquals(6, buf.readByte());
}
|
readInt
1 2 3 4 5 6 7
| public void testWriteBytesReadInt() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(1024); buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}); int read = buf.readInt(); Assert.assertEquals(16909060, read); }
|
discard bytes
在前面的测试中我们看到了,当向ByteBuf写入数据时,当超出分配内存大小时,ByteBuf会进行自动拓容(重新生成一个数组缓冲区,然后将原先的缓冲区内容拷贝到新的缓冲区中),这样一来ByteBuf占用的内从会越来越大. 我们可以是discardReadBytes()
这个方法重用以前的缓冲区, 它会将[0, readerIndex]区间的内存舍弃掉(内部也是数组复制), 这么着就节间的重用了以前的缓冲区,但是这种方式有一点就是如果频繁的调用这个方法会带来性能问题.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(50); buf.writeBytes("123456789".getBytes()); buf.readBytes(3); System.out.println(buf.readerIndex()); System.out.println(buf.writerIndex()); System.out.println(buf.readableBytes()); System.out.println(buf.writableBytes());
buf.discardReadBytes(); System.out.println(buf.readerIndex()); System.out.println(buf.writerIndex()); System.out.println(buf.readableBytes()); System.out.println(buf.writableBytes());
|
clear
这个操作并不会情况缓冲区的内容只是用来将readerIndex和writerIndex重置为0. 但是缓冲区的内容我们是仍然可以读到的.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(50); buf.writeBytes("123456789".getBytes()); buf.readBytes(3); System.out.println(buf.readerIndex()); System.out.println(buf.writerIndex()); System.out.println(buf.readableBytes()); System.out.println(buf.writableBytes());
buf.clear(); System.out.println(buf.readerIndex()); System.out.println(buf.writerIndex()); System.out.println(buf.readableBytes()); System.out.println(buf.writableBytes());
buf.writerIndex(6); System.out.println(buf.readerIndex()); System.out.println(buf.writerIndex()); System.out.println(buf.readableBytes()); System.out.println(buf.writableBytes()); System.out.println(buf.readByte());
|
mark reset
mark reset相关的四个方法也是对指针位置的操作
markReaderIndex()
记录readerIndex
markWriterIndex()
记录writerIndex
resetReaderIndex()
将记录的readerIndex重置到当前的readerIndex值
resetWriterIndex()
将记录的writerIndex重置到当前的writerIndex值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public void testReaderIndex() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(50); buf.writeBytes("123456789".getBytes()); buf.readBytes(3); buf.markReaderIndex(); buf.readBytes(1); Assert.assertEquals(4, buf.readerIndex()); buf.resetReaderIndex(); Assert.assertEquals(3, buf.readerIndex()); }
```java public void testWriterIndex() { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(50); buf.writeBytes("123456789".getBytes()); buf.markWriterIndex(); buf.writeByte(1); Assert.assertEquals(10, buf.writerIndex()); buf.resetWriterIndex(); Assert.assertEquals(9, buf.writerIndex()); }
|
查找
ByteBuf提供丰富的API让我查找某个Byte
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(50); buf.writeBytes(new byte[]{1, 2, 3, 4 ,5, 6, 7, 8, 9});
int idx = buf.indexOf(0, buf.writerIndex(), (byte)2); System.out.println(idx);
idx = buf.indexOf(3, buf.writerIndex(), (byte)2); System.out.println(idx);
idx = buf.bytesBefore((byte)2); System.out.println(idx);
buf.readBytes(3); idx = buf.bytesBefore((byte)2); System.out.println(idx);
idx = buf.forEachByte(b -> b == (byte) 6); System.out.println(idx);
|
derived buffers
ByteBuf提供多种API用于创建某个ByteBuf的视图或者复制版本
duplicate()
复制ByteBuf对象, 俩个对象共享同一个缓冲区,但是各自维护自己的索引(readerIndex, writerIndex)
copy()
复制ByteBuf对象, 俩个对象共享有自己的缓冲区, 缓冲区和索引都不共享
slice()
复制Bytebuf对象,但是只复制[readerIndex, writerIndex]区间的缓冲区, 俩个对象的缓冲区是共享的,但是维护各自的索引
get set
ByteBuf不仅仅支持read, write的顺序读写还支持get,set的随机读取。 但是get/set不会进行自动拓容.
1 2 3 4 5 6 7
| ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(50); buf.writeBytes(new byte[]{1, 2, 3, 4 ,5, 6, 7, 8, 9});
byte b = buf.getByte(2); System.out.println(buf.readableBytes()); System.out.println(buf.readerIndex()); System.out.println(b);
|
内存池
Netty的内存池由PoolArea
. PoolArea
由多个PoolChunk
组成.
ButeBuf 类型
看完ByteBuf的API操作我们来看一下ByteBuf的分类,在内存使用种类上ByteBuf分为以下俩类
- DirectByteBuf : 使用JVM堆外内存分配. 虽然分配和回收速度慢一些,但是从SocketChannel中写入或者读取数据由于少了一次内存复制,因此速度较快.(SocketIO通信时适合使用)
- HeapByteBuf: 使用JVM堆内内存分配. 内存分配和回收速度较快,但是读写Socket IO的时候由于会额外进行一次内存复制,堆内存对应的缓冲区复制到内核Channel中,性能会有下降.(后端业务在编解码时适合使用)
在内存使用种类上由分为以下俩类
- PooledByteBuf: 基于内存对象池的ByteBuf,
- UnpooledByteBuf:
UnpooledDirectByteBuf, UnpooledHeapByteBuf, UnpooledUnsafeDirectByteBuf ,PooledDirectByteBuf, PooledHeapByteBuf
AbstractByteBuf
AbstractByteBuf
继承自ByteBuf
, 它内部并没有定义ByteBuf的缓冲区实现,只是通过定义readerIndex
, writerIndex
, capacity
等实现ByteBuf接口中的各种API, 具体的缓冲区实现则由子类实现
1 2 3 4 5 6 7 8 9 10
| static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);
int readerIndex; private int writerIndex; private int markedReaderIndex; private int markedWriterIndex;
private int maxCapacity;
private SwappedByteBuf swappedBuf;
|
除了操作具体缓冲区API没有实现之外 AbstractByteBuf
为我们实现了大量的API,首先我们看一下读数据的API
1 2 3 4 5 6 7 8 9 10
| @Override public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { checkReadableBytes(length); getBytes(readerIndex, dst, dstIndex, length); readerIndex += length; return this; }
|
下面我们看一下写数据的API实现
1 2 3 4 5 6 7
| @Override public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { ensureWritable(length); setBytes(writerIndex, src, srcIndex, length); writerIndex += length; return this; }
|
同样的setBytes();
是由子类具体实现, 我们着重看一下ensureWritable()
方法实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Override public ByteBuf ensureWritable(int minWritableBytes) { if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (expected: >= 0)", minWritableBytes)); }
if (minWritableBytes <= writableBytes()) { return this; }
if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); }
int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);
capacity(newCapacity); return this; }
|
ResourceLeakDetector
ResourceLeakDetector
用于检测内存泄漏. 它被所有ByteBuf实例共享.
SwappedByteBuf
AbstractReferenceCountedByteBuf
UnPooledHeapByteBuf
不使用对象池的基于堆内存分配的字节缓冲区. 每次IO读写的时候都会创建一个新的UnPooledHeapByteBuf.