文章目录

一、背景

分布式压测1台主控机,15台从机,每台测试机1W长连接客户端,进行混合场景MQTT稳定性PubSub测试,发现服务器在跑了一天以后被重启了。

(1)排除GC问题

之前的PET测试中,发现偶尔会有长时间的Full GC,导致Service Mesh探针几次没有响应而强行重启了服务器(那个是由于OpenTracing批量kafka消费的时候没有通过try-resource来释放资源造成堆内存泄漏)。因此看了下GC,发现Full GC耗时很短,次数很少,而且堆内存并没有爆炸。

(2)发现内存增长有问题

通过观察由Prometheus提供数据、Grafana绘制的图像,发现内存以1MB/分钟的速度几乎是线性增长到8G,然后由于达到了k8s的pod限制,被强行重启了。这种不正常的现象明显就是有内存泄漏,那么只能是堆外内存泄漏了。

通过ELK查询ERROR日志,发现被Netty抽样出了一个错:

i.n.u.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:331)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
	io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:113)
	io.netty.handler.codec.ByteToMessageDecoder.expandCumulation(ByteToMessageDecoder.java:529)
	io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:89)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	********
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:648)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:748)

ByteBuf在被垃圾回收器回收的时候,没有调用Release释放掉所有的引用。

二、深入分析原因

Netty的版本是4.1.31.Final。

这个时候我怀疑之前一直纠结的MqttMessage要不要回收的问题:

public class MqttHandler extends ChannelInboundHandlerAdapter {

	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //todo can check "send other packet before CONNECT packet" at here to replace "already offline" implement.

        MqttMessage mqttMessage = (MqttMessage) msg;
        if (mqttMessage != null && mqttMessage.decoderResult().isSuccess()) {
		}
		// todo check whether it will cause memory leak
        // cannot release count refer because of async mqtt processor
        /*
        finally {
            ReferenceCountUtil.release(mqttMessage);
        }
        */
	}

}

我看到很多其他开源Broker都做了这条回收语句,但我总是认为,为什么这个不是Java GC自动回收的呢?所以就注释掉了。

另外一个原因是,很多开源Broker都是默认在Netty线程里面去同步调用业务执行,我是用了单独的业务线程池,每个MQTT报文都不在Netty线程里,如果在这里进行了释放,业务线程获得的MqttMesage的payload就会有异常。

Java NIO的Buffer

Java常用的网络I/O模型就三个:

最早的阻塞网络I/O:BIO(Blocking I/O),jdk1.4以前用的就是这个,服务端和客户端线程数1:1,同步读写,简单可靠,但很明显效率低,服务端有瓶颈。

然后出现了非阻塞网络I/O:NIO(Non-blocking I/O),由数据缓冲区Buffer、通道Channel、多路复用器Selector组成。通过Selector轮询Channel的状态是否是可读写,如果可以就进行读写。

最新的就是异步网络I/O:AIO(Asynchronous I/O),jdk7以上支持,通过注册回调去实现读写,也就是每个Channel注册一个回调方法,当有读写的时候就会自动调用回调方法去读写。

NIO有两类著名Buffer,DirectByteBuffer,HeapByteBuffer。HeapByteBuffer在堆上分配,会被GC回收。而DirectByteBuffer直接利用系统接口进行内存申请,一旦不回收,就会引起OOM。

Netty的Buffer和释放引用计数场景

同样的,Netty使用NIO模型,在io.netty.buffer包内封装了PooledByteBuf抽象类,以及主要的两个实现类PooledDirectByteBuf、PooledHeapByteBuf。和名字一样,多了池化功能,降低分配开销。

Netty通过引用计数的方式来管理ByteBuf,当数据传递到ChannelHandler了之后,则由对应消费处理链(chain)的最后一个Handler在channelRead()方法里负责释放Bytebuf的引用计数。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf buf = (ByteBuf) msg;
    ...
    ctx.fireChannelRead(buf);
}

事实上,不仅仅是Bytebuf需要这样手动释放,你如果用了编解码器,也是需要释放的,比如我们这里的:

MqttMessage mqttMessage = (MqttMessage) msg

就是进行了编码,需要我们自己去释放。当然,你可以考虑继承io.netty.channel.SimpleChannelInboundHandler,实现一个channelRead0的方法,由这个抽象类去帮你做释放:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;

        try {
            if (this.acceptInboundMessage(msg)) {
                this.channelRead0(ctx, msg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (this.autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }

        }

    }

注意我们这里并不适合这样去做!因为我们的业务线程是分离的,不能去用SimpleChannelInboundHandler在业务线程还没处理的时候把它释放掉了,所以只能用channelRead并且自己释放。

Netty没有释放引用计数会怎么样

DirectBuffer在堆上分配对象,并用long指向一块用户态的内存区域(也就是所谓的native memory),这样I/O时只会进行一次拷贝(堆外内存-内核),避免了HeapBuffer在I/O时发生2次内存拷贝(堆-堆外-内核)的开销。

由于Netty的引用计数是自己的设计,而不是JVM的设计,JVM进行GC的时候只会判断是否可达,而不会判断Netty的引用计数是否为0,如果这样的Buffer对象被回收掉,不能再回到池中去,那么这个对象指向的直接内存就无法再次使用了,也就是造成了内存泄漏。

Netty提供了一个抽样方法,默认会抽样1%的buffer去检测GC回收时是否引用计数也为0,从而判断是否有内存泄漏。一共有4个策略:

  • Disabled:关闭抽样检测
  • Simple:抽样1%判断是否有内存泄漏,默认会采取这个策略
  • Advanced:打印泄漏buffer的详细信息
  • Paranoid:检测全部Buffer,会影响性能。

所以,我们的问题就是由于没有释放引用计数造成内存泄漏了。

三、问题解决和测试检验

1、问题解决

【方法一】

首先需要解决问题,既然业务线程没有释放,那么一个方法是,发给业务线程的是一个堆上分配的Java对象,也就是把MqttMessage的数据赋值给一个new出来的对象,然后再发给java线程。然后再在channelRead里面去释放掉引用计数:

try {
   CustomMqttMessage msg = CustomMqttMessage.builder.xxx.build();
   xxxExecutor.submit(()-> xxx.process(msg));
} finally {
  ReferenceCountUtil.release(mqttMessage);
}

【方法二】

这样的方法不喜欢,因为多了一次复制拷贝。我们还可以利用AOP去在业务线程结束后释放掉它:

@Aspect
@Component
@Slf4j
public class NettyAspect {

    @Pointcut("execution(public * xxxx.mqtt.processor.MqttProcessor.processRequest(..))")
    public void process() {
        // pointcut definition
    }

    @Around("process()")
    public Object aroundProcess(ProceedingJoinPoint pjp) throws Throwable {
        Object res = null;
        MqttMessage mqttMessage = null;
        Object[] args = pjp.getArgs();

        try {
            if (args != null && args.length > 0) {
                mqttMessage = (MqttMessage) args[1];
            }
            res = pjp.proceed();
        } finally {
            if (mqttMessage != null) {
                ReferenceCountUtil.release(mqttMessage);
            }
        }
        return res;
    }

}

这样没有数据拷贝,同时释放掉了Netty的引用计数。

2、测试检验

我们需要一个指标去测试。Netty的“io.netty.util.internal.PlatformDependent”的

private static final AtomicLong DIRECT_MEMORY_COUNTER

是一个不错的指标,但是它是private的,我们拿不到。

于是可以写一个反射去拿到:

@Slf4j
@Service
public class NettyDirectMemoryReporterImpl implements NettyDirectMemoryReporter {

    private AtomicLong directMemory;

    @PostConstruct
    public void init() {
        Field field = ReflectionUtils.findField(PlatformDependent.class, "DIRECT_MEMORY_COUNTER");
        if (field != null) {
            try {
                field.setAccessible(true);
                directMemory = (AtomicLong) field.get(PlatformDependent.class);
                log.info("netty direct memory will be metric.");
            } catch (Exception e){
                log.error("cannot get direct memory!");
            } finally {
                field.setAccessible(false);
            }
        } else {
            log.error("cannot reflect PlatformDependent class!");
        }
    }

    @Override
    public long getNettyDirectMemory() {
        return directMemory.longValue();
    }

}

在这里先设置了Accessible为true,但非常不安全,于是拿到地址后赋给本地private对象,再把Accessible设置回false。

然后提供一个public方法给外部调用,外部只能获取,不能修改,避免外部直接修改这个值造成Netty逻辑异常。

最后用prometheus暴露出去

@Bean
    public Gauge nettyDirectMemoryCounterGauge(PrometheusMeterRegistry registry, NettyDirectMemoryReporter nettyDirectMemoryReporter) {
        Gauge gauge = Gauge.builder(METRIC_NETTY_DIRECT_MEMORY, nettyDirectMemoryReporter, NettyDirectMemoryReporter::getNettyDirectMemory)
                .tags(TAG_METRIC_TYPE, REALTIME)
                .description("netty direct memory count")
                .register(registry);
        return gauge;
    }

可以通过

curl http://localhost:8080/actuator/metrics/{监测变量名}

观察metric数据输出是否正常。

把它部署到PET性能测试环境继续跑,再次对比观察。

(1)DIRECT_MEMORY_COUNTER

旧的一直往上涨到100+MB(图上没有),新的从启动开始一直保持70MB(图中的2台机器),没有增长:

(2)内存变化

新的内存不再线性增长,而是平稳增长(因为堆分配和热点代码),甚至还有下降(因为GC):

跑了一天之后也没有掉线和报错了。

参考资料

《Netty精粹之玩转NIO缓冲区》

《Netty ByteBuf 谁负责谁释放》

《Netty.io:Reference counted objects》

《netty 堆外内存泄露排查盛宴》


转载请注明出处http://www.bewindoweb.com/291.html | 三颗豆子
分享许可方式知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
重大发现:转载注明原文网址的同学刚买了彩票就中奖,刚写完代码就跑通,刚转身就遇到了真爱。
你可能还会喜欢
具体问题具体杠