Dubbo 优雅停机修改方案
1. 服务端不能优雅停机的原因:
NettyServer在构造函数中会调用
ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))
方法将handler进行包装,包装成MultiMessageHandler的一个对象。在下面红色代码中会判断handler是否是WrappedChannelHandler对象,只有是的时候才会对executor对象复值。因为MultiMessageHandler对象不是WrappedChannelHandler的子类,所以executor为空。
当NettyServer的close方法被调用的时候,会调用ExecutorUtil.gracefulShutdown方法,gracefulShutdown方法只有executor不为空时才会等待线程池关闭
public abstract class AbstractServer extends AbstractEndpoint implements Server { public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } if (handler instanceof WrappedChannelHandler ){ executor = ((WrappedChannelHandler)handler).getExecutor(); } } public void close(int timeout) { ExecutorUtil.gracefulShutdown(executor ,timeout); close(); } } |
2. 服务端改动:
修改了AbstractServer类的红色部分
public abstract class AbstractServer extends AbstractEndpoint implements Server {
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
private int accepts;
private int idleTimeout = 600; //600 seconds
protected static final String SERVER_THREAD_POOL_NAME ="DubboServerHandler";
ExecutorService executor;
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } try { recycleToGetExecutor(handler); } catch (Exception e) { logger.error("recycle to get executor failed"); } }
private void recycleToGetExecutor(ChannelHandler handler) throws NoSuchFieldException, IllegalAccessException { if(handler == null) { return; }
if (handler instanceof WrappedChannelHandler ){ executor = ((WrappedChannelHandler)handler).getExecutor(); }else if(handler instanceof AbstractChannelHandlerDelegate){ Field field = AbstractChannelHandlerDelegate.class.getDeclaredField("handler"); field.setAccessible(true); recycleToGetExecutor((ChannelHandler) field.get(handler)); } }
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;
public void reset(URL url) { if (url == null) { return; } try { if (url.hasParameter(Constants.ACCEPTS_KEY)) { int a = url.getParameter(Constants.ACCEPTS_KEY, 0); if (a > 0) { this.accepts = a; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) { int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0); if (t > 0) { this.idleTimeout = t; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { if (url.hasParameter(Constants.THREADS_KEY) && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; int threads = url.getParameter(Constants.THREADS_KEY, 0); int max = threadPoolExecutor.getMaximumPoolSize(); int core = threadPoolExecutor.getCorePoolSize(); if (threads > 0 && (threads != max || threads != core)) { if (threads < core) { threadPoolExecutor.setCorePoolSize(threads); if (core == max) { threadPoolExecutor.setMaximumPoolSize(threads); } } else { threadPoolExecutor.setMaximumPoolSize(threads); if (core == max) { threadPoolExecutor.setCorePoolSize(threads); } } } } } catch (Throwable t) { logger.error(t.getMessage(), t); } super.setUrl(getUrl().addParameters(url.getParameters())); }
public void send(Object message, boolean sent) throws RemotingException { Collection<Channel> channels = getChannels(); for (Channel channel : channels) { if (channel.isConnected()) { channel.send(message, sent); } } }
public void close() { if (logger.isInfoEnabled()) { logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } ExecutorUtil.shutdownNow(executor ,100); try { super.close(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { doClose(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } }
public void close(int timeout) { ExecutorUtil.gracefulShutdown(executor ,timeout); close(); }
public InetSocketAddress getLocalAddress() { return localAddress; }
public InetSocketAddress getBindAddress() { return bindAddress; }
public int getAccepts() { return accepts; }
public int getIdleTimeout() { return idleTimeout; }
@Override public void connected(Channel ch) throws RemotingException { Collection<Channel> channels = getChannels(); if (accepts > 0 && channels.size() > accepts) { logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts); ch.close(); return; } super.connected(ch); }
@Override public void disconnected(Channel ch) throws RemotingException { Collection<Channel> channels = getChannels(); if (channels.size() == 0){ logger.warn("All clients has discontected from " + ch.getLocalAddress() + ". You can graceful shutdown now."); } super.disconnected(ch); }
} |
3. 客户端不能优雅停机的原因:
DubboInvoker停机的时候直接调用了client.close()方法应该调用client.close(long timeout)方法
public class DubboInvoker<T> extends AbstractInvoker<T> { public void destroy() { //防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭 if (super.isDestroyed()){ return ; } else { //dubbo check ,避免多次关闭 destroyLock.lock(); try{ if (super.isDestroyed()){ return ; } super.destroy(); if (invokers != null){ invokers.remove(this); } for (ExchangeClient client : clients) { try { client.close(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
}finally { destroyLock.unlock(); } } } } |
HeaderExchangeChannel类中判断HeaderExchangeChannel.this是否在DefaultFuture中。实际上在处理请求的时候是将HeaderExchangeChannel对象中channel成员变量放到DefaultFuture中的
final class HeaderExchangeChannel implements ExchangeChannel { public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send(req); }catch (RemotingException e) { future.cancel(); throw e; } return future; }
public void close(int timeout) { if (closed) { return; } closed = true; if (timeout > 0) { long start = System.currentTimeMillis(); while (DefaultFuture.hasFuture(HeaderExchangeChannel.this) && System.currentTimeMillis() - start < timeout) { try { Thread.sleep(10); } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } } close(); } } |
4. 客户端的修改
修改下面高量代码:
public class DubboInvoker<T> extends AbstractInvoker<T> { public void destroy() { //防止client被关闭多次.在connect per jvm的情况下,client.close方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭 if (super.isDestroyed()){ return ; } else { //dubbo check ,避免多次关闭 destroyLock.lock(); try{ if (super.isDestroyed()){ return ; } super.destroy(); if (invokers != null){ invokers.remove(this); } for (ExchangeClient client : clients) { try { logger.info("start to shutdown:" + getServerShutdownTimeout()); client.close(getServerShutdownTimeout()); } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
}finally { destroyLock.unlock(); } } }
protected static int getServerShutdownTimeout() { int timeout = Constants.DEFAULT_SERVER_SHUTDOWN_TIMEOUT; String value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY); if (value != null && value.length() > 0) { try{ timeout = Integer.parseInt(value); }catch (Exception e) { } } else { value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY); if (value != null && value.length() > 0) { try{ timeout = Integer.parseInt(value) * 1000; }catch (Exception e) { } } }
return timeout; } } |
final class HeaderExchangeChannel implements ExchangeChannel { // graceful close public void close(int timeout) { if (closed) { return; } closed = true; if (timeout > 0) { long start = System.currentTimeMillis(); while (DefaultFuture.hasFuture(channel) && System.currentTimeMillis() - start < timeout) { try { Thread.sleep(10); } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } } close(); } } |
注意服务端和客户端都需要加上配置dubbo.service.shutdown.wait=60000,设定停机等待时间
相关推荐
dubbo注册中心服务ip和实际服务提供者ip不一致问题 网上收集的资料
Dubbo分布式服务架构,对于研究大型Web服务器的并发技术的同学们有帮助。
修改阿里官方脚本,保证脚本的可执行性,应用于dubbo服务的启动和停止,文档目录应该符合bin、log、lib、conf结构,bin中放置脚本
Dubbo超时机制导致的雪崩连接
dubbo服务监控 目录包含: dubbo-admin dubbo-monitor-simple dubbo-registry-simple pom.xml README.md
dubbo 服务提供方和服务消费方的代码
dubbo上传文件+oss上传文件服务
dubbo服务注册到eureka
服务治理工具dubbo,欢迎学习爱好者下载资源,共同学习。。
dubbo资源 dubbo-admin dubbo demo
Dubbo是Alibaba开源的分布式服务框架,我们可以非常容易地通过Dubbo来构建分布式服务,并根据自己实际业务应用场景来选择合适的集群容错模式,这个对于很多应用都是迫切希望的,只需要通过简单的配置就能够实现...
dubbo接口提供方服务demo
Dubbo+Maven消费方和服务费, Dubbo+Maven消费方和服务费
本地直接连接服务提供者,模拟发送数据测试,dubbo接口调试
dubbo的使用,其实只需要有注册中心,消费者,提供者这三个就可以使用了,但是并不能看到有哪些消费者和提供者,为了更好的调试,发现问题,解决问题,因此引入dubbo-admin。通过dubbo-admin可以对消费者和提供者...
1.SpringBoot聚合工程整合Dubbo,实现服务提供者与服务消费者的数据调用, 2.该项目提高了自己对Spring Boot整合Dubbo的理解,并深刻的认识到了服务者与消费者之间的调用及流程 4. Dubbo配置全部采用yml文件配置,...
在dubbo的注册服务下单元测试,然后扫描 注册上来的各种服务,轻松测试每个注册服务
今天小编就为大家分享一篇关于Dubbo无法访问远程Zookeeper已注册服务的问题解决方案,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
Dubbo分布式服务框架教程,介绍dubbo,dubbo的使用步骤,dubbo和spring框架整合调用的教程