博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty5_内存管理_源码讲解
阅读量:6039 次
发布时间:2019-06-20

本文共 16834 字,大约阅读时间需要 56 分钟。

hot3.png

欢迎大家关注我的微博 会将发布的开源项目技术贴通过微博通知大家,希望大家能够互勉共进!谢谢!也很希望能够得到大家对我博文的反馈,写出更高质量的文章!!

read事件发生,Netty需要从内核中读取数据到自己内部可以管辖的缓冲区,怎么进行分配?使用完毕后,怎么释放?已经write方法调用,怎样将相应数据进行缓冲区分配,以及write事件发生,flush完成后,怎样将缓冲区释放?

  1. read内存分配

要知道read是怎样进行进行内存分配的首先要知道是什么进行分配的,分配完之后,怎么进行内存回收?每次分配新的ByteBuf大小是多少?

  • 分配内存:假设是初次进行分配(同一个socket多次进行分配的情况,后面会讲到.),我们看一下是什么时候进行分配的.上代码:

  •                 int byteBufCapacity = allocHandle.guess();                int totalReadAmount = 0;                do {                    //可能是 direct或者 heap  从与当前socket相关的allocator得到byteBuf数组//                    byteBuf =allocHandle.allocate(allocator);                    byteBuf = allocator.ioBuffer(byteBufCapacity);                    int writable = byteBuf.writableBytes(); //分一个多大的内存就从socket中读取多大的数据                    int localReadAmount = doReadBytes(byteBuf);//从socket中读取数据到bytebuf中                    if (localReadAmount <= 0) {                        // not was read release the buffer                        byteBuf.release();//释放到Thread Cache中                        close = localReadAmount < 0;//是否进行关闭,关键要看读取到的数据的长度是否为-1;                        break;                    }                    //发起读取事件---如果是第一次积累数据的话,那么就会将当前的bytebuf作为累积对象,供继续使用                    pipeline.fireChannelRead(byteBuf);                    byteBuf = null;//由pipeline进行byteBuf的释放                    //避免内存溢出,                    if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {                        // Avoid overflow.                        totalReadAmount = Integer.MAX_VALUE;                        break;                    }                    totalReadAmount += localReadAmount;                    if (localReadAmount < writable) {                        // Read less than what the buffer can hold,                        // which might mean we drained the recv buffer completely.                        break;                    }                } while (++ messages < maxMessagesPerRead);//每次读取的消息的数量                //读取完成---处理完一次 读取事件                pipeline.fireChannelReadComplete();                allocHandle.record(totalReadAmount);                if (close) {                    closeOnRead(pipeline);                    close = false;                }
  • 从中可以看出,就是通过ByteBufAllocator.alloc(capacity)进行分配的。(capacity参数的大小是不断变化的。具体的我们会稍后介绍.)。下面我们看一下ByteBufAllocator.alloc(capacity)的具体实现:上代码:

  •  public ByteBuf ioBuffer(int initialCapacity) {        if (PlatformDependent.hasUnsafe()) {            return directBuffer(initialCapacity);        }        return heapBuffer(initialCapacity);    }    public ByteBuf heapBuffer(int initialCapacity) {        return heapBuffer(initialCapacity, Integer.MAX_VALUE);    }    public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {        if (initialCapacity == 0 && maxCapacity == 0) {            return emptyBuf;        }        validate(initialCapacity, maxCapacity);        return newHeapBuffer(initialCapacity, maxCapacity);//newHeapBuffer是个抽象方法    }
  • 由此可以看出,netty是鼓励使用直接内存。newHeapBuffer是一个抽象方法,这里我们仅仅关注他在类PooledByteBufAllocator的实现(另一个实现UnpooledByteBufAllocator我们这就不讲了,至于为什么自己去想--原因很简单,就是实际使用的情况很少,好歹前一个还是用了基于池的分配方式,避免了重复不断的分配,可以进行不断重复的利用。)。上PooledByteBufAllocator的newHeapBuffer实现:

  •  protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {        //如果在线程中已经存在一个cache 没有的话,那么就会调用initialValue进行初始化        PoolThreadCache cache = threadCache.get();        PoolArena
     heapArena = cache.heapArena;        ByteBuf buf;        if (heapArena != null) {            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);        } else {//如果没有的话,那么就会分配一个不由当前的allocator管理的bytebuf            buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);        }        return toLeakAwareBuffer(buf);    }
  • 这里出现了一个基于ThreadLocal的变量,这个ThreadLocal存储的变量类型是PoolThreadCache。PoolThreadCache有个heap和direct的两个变量,这两个变量是用来分配direct和heap内存的。我们来看一下threadCache的初始化代码:

  •     private final PoolArena
    [] heapArenas;    private final PoolArena
    [] directArenas;    final ThreadLocal
     threadCache = new ThreadLocal
    () {        private final AtomicInteger index = new AtomicInteger();//为每一个线程都会分配一个heapArena和directArena        //在为某个线程初次调用get方法时,会调用以下此方法,用于初始化为当前线程要初始化的数据---一个线程中的内存池盛放的数据可以是直接内存或者堆内存        protected PoolThreadCache initialValue() {            final int idx = index.getAndIncrement();            final PoolArena
     heapArena;            final PoolArena
     directArena;            //为线程分配的区域可以是direct和heap的组合            if (heapArenas != null) {                heapArena = heapArenas[Math.abs(idx % heapArenas.length)];            } else {                heapArena = null;            }            if (directArenas != null) {                directArena = directArenas[Math.abs(idx % directArenas.length)];            } else {                directArena = null;            }            return new PoolThreadCache(heapArena, directArena);        }    };
  • initialValue方法就是为当前的thread生成PoolThreadCache对象的初始化代码。PoolThreadCache的的directArena和heapArena的赋值也是分别从数组directArenas和heapArenas中取摸得到index,分别摘取两个元素得到的。由此可以看出:

    • 假设是个socketChannel为ABCD都有自己的PooledByteBufAllocator(就是在config()进行设置呗)。不过ABCDsocketChannel有哪一个线程进行处理,他们的treadCache都是不可能相同的。因为treadCache没有static修饰符。但是这里需要注意一个问题,就是一个pipeline对应一个独立的PooledByteBufAllocator的时候,PooledByteBufAllocator的成员变量heapArenas和direcArenas数组的长度为1.否则会造成浪费。因为,threaCache一旦初始化完毕,就不会变化了,使用到的directArena和heapArena就是固定下来了,数组长度长度超过1,数组中的剩余元素是不会被使用到的(因为每一个pipeline对应一个PooledByteBufAllocator)。注意一下(可以从PooledByteBufAllocator的源代码中找到的): 怎么去调整一个PooledByteBufAllocator的变量heapArenas和direcArenas的数组长度呢?我们可以通过设置io.netty.allocator.numHeapArenas和io.netty.allocator.numDirectArenas来设置PooledByteBufAllocator中的heapArenas和direcArenas的数组长度(当然,也可以在初始化PooledByteBufAllocator调用构造函数,进行自定义)。这一点很重要哦!

    • A,B,C,D的pipeline使用同一个PooledByteBufAllocator,但是AB的事件有一个worker线程T1进行处理,但是cd的事件由另一个worker线程T2处理,那么cd和Ab使用的threadCache就是不同的,因为threadCache都是和线程进行绑定的。这个时候,就要将PooledByteBufAllocator中的heapArenas和direcArenas的数组长度设置的大一点。至于多少合适,具体应用具体对待。

  • 概述一下: 就是一个workerthread可以管理多个socket的读写事件,那么在进行内存分配时,内存的分配就要使用每一个socektChannel的PooledByteBufAllocator对象,为当前的thread分配的threadCache进行内存分配。PooledByteBufAllocator是基于内存池的形式进行使用的。至于好处,不进行多讲了!!

  • 到目前为止,我们已经讲述了内存分配的对象使用情况,可以看成是讲述了一下read事件的过程中,内存分配的对象图情况。下面我们再来看看,PooledByteBufAllocator的heapArenas和directArenas的初始化情况,上代码:

  •  public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {        super(preferDirect);//查看是否字节内存可用,如果可用,则生成一个空directMemory        final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);        int pageShifts = validateAndCalculatePageShifts(pageSize);        if (nHeapArena > 0) {//            heapArenas = newArenaArray(nHeapArena);            for (int i = 0; i < heapArenas.length; i ++) {                heapArenas[i] = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);            }        } else {            heapArenas = null;        }        if (nDirectArena > 0) {            directArenas = newArenaArray(nDirectArena);            for (int i = 0; i < directArenas.length; i ++) {                directArenas[i] = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize);            }        } else {            directArenas = null;        }    }
  • 从中可以看到PooledByteBufAllocator对象的heapArenas和directArenas分配都是通过直接调用PoolArena.HeapArena和PoolArena.DirectArena进行分配的。至于这两个静态方法的具体实现,我们这里就不讲了,网上资料也有不少。我粘贴一下我自己收集的材料:  这是我自己的网易云笔记收集的材料,应该够用了,很简单,不是多么复杂,就是一个分大小按组分配 的过程。大家自己看看吧!共享会永远存在的!!

2. read内存回收:

如果观察过netty的pipeline,肯定会注意到的一点就是第一个ChannelHandler肯定是ByteToMessageDecoder,每一次read事件发生,因此分配的byteBuf都是直接调用该Handler的channelRead()方法,至于handler对此bytebuf后续怎样的处理,上层调用是不进行管理的。也就是说,ByteBuf的一些别的操作(例如释放、合并等)都是在ByteToMessageDecoder内完成的。下面我们来看一下ByteToMessageDecoder的channelRead方法的具体实现,看看对byteBuf进行了什么操作(如果大家看过我的read事件处理的博客,那么也会提前了解).上代码

@Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        if (msg instanceof ByteBuf) {            RecyclableArrayList out = RecyclableArrayList.newInstance();            try {                ByteBuf data = (ByteBuf) msg;                first = cumulation == null;                if (first) {                    cumulation = data;                } else {                    //缓冲区的大小没有超过需要写入的数据的大小                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {                        expandCumulation(ctx, data.readableBytes());                    }                    cumulation.writeBytes(data);//将数据写入到积累对象中                    data.release();//释放bytebuffer(heap或者direct)--通过引用的方式进行释放缓冲区                }                //收集完毕之后解析收集到的字符串                callDecode(ctx, cumulation, out);            } catch (DecoderException e) {                throw e;            } catch (Throwable t) {                throw new DecoderException(t);            } finally {                if (cumulation != null && !cumulation.isReadable()) {//如果累积对象为null或者没有可读内容的话,那么就将累积对象释放掉(因为空了或者为null了)                    cumulation.release();                    cumulation = null;                }                int size = out.size();                decodeWasNull = size == 0;                //针对解析后的out结果,逐个调用message                for (int i = 0; i < size; i ++) {                    ctx.fireChannelRead(out.get(i));                }                out.recycle();            }        } else {            ctx.fireChannelRead(msg);        }    }    private void expandCumulation(ChannelHandlerContext ctx, int readable) {        ByteBuf oldCumulation = cumulation;//新的容量=旧的容量+可读取的数量  ---在此处的扩展和初次的分配都是通过同一个allocator进行分配的        cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable);        cumulation.writeBytes(oldCumulation);//复制的过程        oldCumulation.release();//释放老的对象    }

从前面read事件处理流程博文可以知晓,ByteToMessageDecoder的cumulation对象起到bytebuf累积作用的对象。当前ByteToMessageDecoder.cumulation不能盛放传过来的bytebuf,那么就要调用expandCumulation进行缓冲扩展。在expandCumulation实现中,也是通过Allocator分配一个更大的(能够盛放原先的cumulation数据和即将添加的bytebuf数据之和),然后将原先老的cumulation中的数据复制一下就可以了。

小总结: 在进行bytebuf累积的过程中也要面临着容器cumulation的不断扩充。每一个decoderHandler都会有一个cumulation对象。一个socket对应一个decoderHandler对象。

上面我们讲到解析完数据之后,需要对累积对象bytebuf是怎样进行释放?  在channelReadComplete(每一次Read事件发生读取byte数据完成后)会调用cumulation.discardSomeReadBytes()释放空间)。再就是要说的就是,通过创建出来的bytebuf msg都是堆类型的,不用了就不用管了,gc会进行垃圾回收的。这个问题大家要记住哦!! 其实byte Msg 的是否为直接内存或者堆内存都要取决于decode的具体实现是怎样实现的(我看了一下具体的实现都是heap类型的msg,当然也可以通过使用(ByteBuf)msg.isDirect()进行判断)。我们自己想一下也应该知道,其实msg为heap类型的是最好的,因为msg会被后续的各种各样的handler使用。

讲一下累积对象的释放,其实讲的就是bytebuf的release方法。

欢迎大家吐槽! 本文仅仅是我本人自己的总结,不是太权威,如果有不同意的地方在oschina博客中吐槽

总结一下: 目前为止,讲解了一下,read事件中的分配和释放问题,总结一下,分配时,是从本pipeline的Allocator为当前线程分配的treadcache中获得direct或者heap缓冲,取得适合大小的一块,标记引用数目一下就行了,释放时,减1! 。一个Allocator的heaps和directs数组长度可以通过特定参数进行设置。累积对象会在channelReadComplete事件发生时,在ByteToMessageDecoder的channelReadComplete事件处理中调用discardSomeReadBytes(需要了解一下ByteBuf的数据结构,自己可以查看一下是怎么实现的!)释放部分数据的。解析出来的msg不用进行手动释放,因为都是基于非pool的heap类型的,由垃圾进行回收的,之所以netty这样设计msg的返回类型,依我个人的看法,就是因为msg还会被很多的后续的handler进行访问,二次解码等。

    write内存分配

其实write事件处理流程仅仅涉及到ms保存到entries数组中。没有内存分配问题。如果非得说是有内存分配问题得话,那么write(msg)的msg的类型可以说成是内存分配问题。msg可以是bytebuf或者对象类型。可以调用工具类或者Allocator直接进行内存申请即可。

flush事件内存管理

  1. 如果调用write(msg)类型为bytebuf并且此bytebuf为堆类型的话,那么就将其转换成direct内存。

    1. 在分配直接内存的时候,如果为当前socekt的Allocator为isDirectBufferPooled的话,那么就那么就分配一个直接内存bytebuf。

    2. 不是得话,就从线程中绑定的ThreadLocalPooledByteBuf生成一个(此方式我们后续还会讲解)

    3. 分配完成后,将原先的数据写入到此bytebuf,然后释放老的bytebuf。将新的bytebuf添加到entry的末尾。

    4. 上代码:

    5. if (alloc.isDirectBufferPooled()) {            directBuf = alloc.directBuffer(readableBytes);        } else {            directBuf = ThreadLocalPooledByteBuf.newInstance();        }        //将非直接内存的数据写入到直接内存中        directBuf.writeBytes(buf, readerIndex, readableBytes);        buf.release();//释放原先的非directbuffer        entry.msg = directBuf;//转换成direct类型的buffer        // cache ByteBuffer        ByteBuffer nioBuf = entry.buf = directBuf.internalNioBuffer(0, readableBytes);        entry.count = 1;        nioBuffers[nioBufferCount ++] = nioBuf;        return nioBufferCount;
  2. 半包写入成功,释放必要空间的过程:先上代码:

    1.  for (int i = msgCount; i > 0; i --) {                    final ByteBuf buf = (ByteBuf) in.current();//得到当前正在刷新的数据缓冲区                    final int readerIndex = buf.readerIndex();                    final int readableBytes = buf.writerIndex() - readerIndex;//得到当前的bytebuf中可以读取的数据字节数                    if (readableBytes < writtenBytes) {//如果写入的部分大于当前缓存指针的大小的话,那么就将当前的对象进行释放                        in.progress(readableBytes);                        in.remove();//移动指针,移动到下一个buffer中,通过refCount,安全的进行释放                        writtenBytes -= readableBytes;//将变量进行变更,为一下                    } else if (readableBytes > writtenBytes) {//该bytebuf刷出了一部分,没有全部刷出去                        buf.readerIndex(readerIndex + (int) writtenBytes);//重新设置当前的buffer的大小                        in.progress(writtenBytes);                        break;                    } else { //真好全部刷出                        in.progress(readableBytes);                        in.remove();//直接移除(其实是删除引用个数)                        break;                    }                }
    2. public boolean remove() {        if (isEmpty()) {            return false;        }        Entry e = buffer[flushed];        Object msg = e.msg;        if (msg == null) {            return false;        }        ChannelPromise promise = e.promise;        int size = e.pendingSize;        e.clear();        flushed = flushed + 1 & buffer.length - 1;        safeRelease(msg);//安全释放,就是将此msg的引用设置为0        promise.trySuccess();        decrementPendingOutboundBytes(size);        return true;    }

刷出数据时,调用 ch.write(nioBuffers, 0, nioBufferCnt)  不会对niobuffers中字节内容产生影响,所以刷出去之后,还要niobuffers中的已经刷出去的bytebuff的引用数设置为0(in.remove()).恢复其使用。

如果此次flush将所有的数据都刷出去了得话,那么就遍历niobuffers,逐个恢复其中每一个元素的nioBytebuff的引用情况.

总结:

flush事件: 就是将entries解析成niobuffers;并且niobuffers中元素都必须是direct类型;如果不是,还用调用Allocator分配一个direct类型,将heap数据写入到direct内存中,并添加到niobuffers中,恢复heap bytebuf的引用为0;处理flush事件的时候,要根据写入的数据量与niobuffers中的bytebuf的字节比较,判断当前的bytebuf是否已经完全刷出,如果刷出,就从niobuffers删除,恢复引用。否则progress方法呗调用。如果没有将所有的数据刷出得话,还有继续监听write事件。

--------------------------------------------------------------------------------------------------------------------------

博文评论回复:

问题1:

write 时,用户自己从池内分配的内存与释放时不在同一线程,你怎么释放?

回答: 虽然不再同一个线程,但是请记住一点就是 这些线程都会访问ChannelOutBoundBuffer,其中的buffers中每个元素都是通过refcount进行引用和释放的。将refcount设置为0之后,就释放!!设置为1,就被占用了!Netty为我们做好了释放工作! 当flush之后,remove一个bytebuf的时候,就会自己在核心代码中释放bytebuf的引用个数了!!后期我也会写一个Netty5中 promise和future的博文。希望兄弟可以关注一下!! 最好是通过微博-http://weibo.com/hotbain ,贴一下相关代码:

/**handler具体实现  */public class DiscardServerHandler extends ChannelHandlerAdapter {    @Override    public void channelRead(final ChannelHandlerContext ctx,final  Object msg) throws Exception {         new Thread(new MyRunner(ctx)).start();}public class MyRunner implements Runnable {    private ChannelHandlerContext context;        public MyRunner(ChannelHandlerContext context){        this.context =context;    }        @Override    public void run() {        final ByteBuf byteBuf =context.channel().config().getAllocator().ioBuffer();                if(context.channel().isOpen()){            ChannelFuture future = context.writeAndFlush(byteBuf.writeBytes("xxxxxxxxxxxx".getBytes()));            future.addListener(new ChannelFutureListener() {            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                  System.err.println("server write response error,client  host is: " + ((InetSocketAddress) context.channel().remoteAddress()).getHostName()+":"+((InetSocketAddress) context.channel().remoteAddress()).getPort());                  context.channel().close();                }else{                    System.out.println("writeAndFlush is completed");                }                /**                 * 因为在将outbuffer中删除数据的时候已经调用saferelease方法了                 具体请查看 ChannelOutboundBuffer.remove 方法  因为allcator是共享变量,所以在业务线程中释放refcount也是对Allocator内存的释放。                    如果不想使用原socket的Allocator,可以为当前的thread自定义生成一个Allocator,放到threadLocal中                 * */               // byteBuf.release(); 多此一举 但是,如果byteBuf是自己手动创建的,那么此处的释放代码就可以根据bytebuf是否耗费资源决定是否需要手动释放了!!              }            });        }            }}/**ChannelOutboundBuffer.remove 方法实现**/public boolean remove() {        if (isEmpty()) {            return false;        }        Entry e = buffer[flushed];        Object msg = e.msg;        if (msg == null) {            return false;        }        ChannelPromise promise = e.promise;        int size = e.pendingSize;        e.clear();        flushed = flushed + 1 & buffer.length - 1;                safeRelease(msg);//自动释放引用,是当前的msg bytebuf的引用恢复为0        promise.trySuccess(); //调用operationComplete回调        decrementPendingOutboundBytes(size);//调节一下 缓冲大小        return true;    }

本文仅仅代表自己个人对Netty5的看法,欢迎各位吐槽!!互相学习!!

转载于:https://my.oschina.net/hotbain/blog/422344

你可能感兴趣的文章
二叉树的六种遍历方法汇总(转)
查看>>
用wxpython制作可以用于 特征筛选gui程序
查看>>
【转载】 [你必须知道的.NET]目录导航
查看>>
数据存储小例
查看>>
Spring Boot 配置优先级顺序
查看>>
C++中构造函数详解
查看>>
电商网站中添加商品到购物车功能模块2017.12.8
查看>>
android 模拟器 hardWare 属性说明
查看>>
六款值得推荐的android(安卓)开源框架简介
查看>>
max_element( )
查看>>
java中的类
查看>>
Java并发_volatile实现可见性但不保证原子性
查看>>
百度地图添加带数字标注
查看>>
【luogu 1908】逆序对
查看>>
pthread_create线程创建的过程剖析(转)
查看>>
android存储访问框架Storage Access Framework
查看>>
周总结
查看>>
Spring Boot 要点--启动类和热部署
查看>>
Maven配置及本地仓库设置
查看>>
PAT L2-001 紧急救援 —— (多参数最短路)
查看>>