dubbo线程池为什么耗尽
文章概述
大家可能都遇到过DUBBO线程池打满这个问题,报错如下,本文我们就一起分析DUBBO线程池打满这个问题。
cause: org.apache.dubbo.remoting.RemotingException: Server side(10.0.0.100,20881) thread pool is exhausted, detail msg:Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-10.0.0.100:20881, Pool Size: 800 (active: 800, core: 800, max: 800, largest: 800), Task: 50397601 (completed: 50396801), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://10.0.0.100:20881!
1 DUBBO线程模型
先看一张图大概了解
** IO线程**
IO线程的工作实际上就是处理字节流的输入输出,对消息的读取,序列化,不涉及业务操作
NettyServer中启动netty服务端,初始化boss和work线程信息
protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss"); workerGroup = NettyEventLoopFactory.eventLoopGroup( getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), "NettyServerWorker"); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopFactory.serverSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); if (getUrl().getParameter(SSL_ENABLED_KEY, false)) { ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler)); } ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
这里分别看线程数量
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss"); workerGroup = NettyEventLoopFactory.eventLoopGroup( getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), "NettyServerWorker");
int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);
boss线程设置为1
主要看work线程(IO线程)
从url中获取线程数,如果没设置的话,设置当前机器的线程数,最少设置为32个
这个配置是iothreads,如果配置的这样配置。但是线程池耗尽并不是io线程数量不够的原因
provider: iothreads: 100
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); handler.received(channel, msg); }
@Override public void received(Channel channel, Object message) throws RemotingException { setReadTimestamp(channel); if (isHeartbeatRequest(message)) { Request req = (Request) message; if (req.isTwoWay()) { Response res = new Response(req.getId(), req.getVersion()); res.setEvent(HEARTBEAT_EVENT); channel.send(res); if (logger.isInfoEnabled()) { int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); if (logger.isDebugEnabled()) { logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period" + (heartbeat > 0 ? ": " + heartbeat + "ms" : "")); } } } return; } if (isHeartbeatResponse(message)) { if (logger.isDebugEnabled()) { logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName()); } return; } handler.received(channel, message); }
消息的不同类型有不同的处理方式如果是心跳直接就发送回去了,
如果是业务请求那么交给业务线程池处理
@Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }
业务线程池
初始化
不同线程池策略会创建不同特性的线程池:
dubbo提供了不同的线程池类型
fixed 包含固定个数线程 cached 线程空闲一分钟会被回收,当新请求到来时会创建新线程 limited 线程个数随着任务增加而增加,但不会超过最大阈值。空闲线程不会被回收 eager 当所有核心线程数都处于忙碌状态时,优先创建新线程执行任务,而不是立即放入队列
一般实际使用的就是fixed
public class FixedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS); int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue() : (queues这里主要看两个参数,分别是线程数,和队列长度。默认的线程数是200,queue默认使用SynchronousQueue
SynchronousQueue由于其独有的线程一一配对通信机制,由于内部没有使用AQS,而是直接使用CAS,其并没有存储任务的队列就是将任务与线程进行匹配,如果任务进来,没用可用线程,那么将直接拒绝,这也是我们碰到拒绝策略的原因
如果需要配置
dubbo: protocol: threads: 800 queues: 10000业务线程线程池拒绝
这里就可以看到线程池拒绝AbortPolicyWithReport
@Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: " + "%d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); dumpJStack(); throw new RejectedExecutionException(msg); }也就是开头的那个报错,这里在发生问题会自动dump stack信息
线程池中的 getTaskCount 和 getCompletedTaskCount 是两个重要的方法,它们用于获取线程池的任务和已完成任务的统计信息。
- getTaskCount: 这个方法返回线程池中的当前任务数。它包括正在执行的任务和等待执行的任务。换句话说,它返回的是线程池中所有任务的总数,包括那些尚未开始执行的任务。
- getCompletedTaskCount: 这个方法返回线程池已完成的任务数量。它只计算那些已经完成执行的任务,而不包括正在执行或等待执行的任务。
再回头我们的那个报错。
Pool Size: 800 (active: 800, core: 800, max: 800, largest: 800), Task: 50397601 (completed: 50396801)
2、估算合适的线程数,寻找慢业务
我们知道DUBBO会选择线程池策略进行业务处理,那么如何估算可能产生的线程数呢?我们首先分析一个问题:一个公司有7200名员工,每天上班打卡时间是早上8点到8点30分,每次打卡时间系统耗时5秒。请问RT、QPS、并发量分别是多少?
RT表示响应时间,问题已经告诉了我们答案:
RT = 5
QPS表示每秒查询量,假设签到行为平均分布:
QPS = 7200 / (30 * 60) = 4
并发量表示系统同时处理的请求数量:
并发量 = QPS x RT = 4 x 5 = 20
根据上述实例引出如下公式:
并发量 = QPS x RT
如果系统为每一个请求分配一个处理线程,那么并发量可以近似等于线程数。基于上述公式不难看出并发量受QPS和RT影响,这两个指标任意一个上升就会导致并发量上升。
但是这只是理想情况,因为并发量受限于系统能力而不可能持续上升,例如DUBBO线程池就对线程数做了限制,超出最大线程数限制则会执行拒绝策略,而拒绝策略会提示线程池已满,这就是DUBBO线程池打满问题的根源。下面我们分别分析RT上升和QPS上升这两个原因。
注意上面仅仅是一个例子,实际上一个服务远比例子复杂,实践往往需要不断的调参数。才能找到合理的值
线程池耗尽,往往是因为某个业务慢导致,我们应该寻找执行缓慢的堆栈,例如使用arthas来监控。