此文已由作者赵计刚薪授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
到此为止,NettyServer就创建成功了。 之后,终于执行到了:
new HeaderExchangeServer(Server NettyServer)。
1 private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, new NamedThreadFactory("dubbo-remoting-server-heartbeat",true)); 2 private final Server server; 3 // 心跳定时器 4 private ScheduledFuture<?> heatbeatTimer; 5 // 心跳超时,毫秒。缺省0,不会执行心跳。 6 private int heartbeat; 7 private int heartbeatTimeout; 8 private AtomicBoolean closed = new AtomicBoolean(false); 9 10 public HeaderExchangeServer(Server server) { 11 if (server == null) { 12 throw new IllegalArgumentException("server == null"); 13 } 14 this.server = server; 15 this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);//60000 在createServer(URL providerUrl)中拼接了heartbeat参数 16 this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);//3*60000 17 if (heartbeatTimeout < heartbeat * 2) { 18 throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); 19 } 20 startHeatbeatTimer(); 21 }
说明:
startHeatbeatTimer()
1 private void startHeatbeatTimer() { 2 stopHeartbeatTimer(); 3 if (heartbeat > 0) { 4 heatbeatTimer = scheduled.scheduleWithFixedDelay( 5 new HeartBeatTask(new HeartBeatTask.ChannelProvider() { 6 public Collection<Channel> getChannels() { 7 return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels()); 8 } 9 }, heartbeat, heartbeatTimeout), 10 heartbeat, 11 heartbeat, 12 TimeUnit.MILLISECONDS); 13 } 14 } 15 16 private void stopHeartbeatTimer() { 17 try { 18 ScheduledFuture<?> timer = heatbeatTimer; 19 if (timer != null && !timer.isCancelled()) { 20 timer.cancel(true); 21 } 22 } catch (Throwable t) { 23 logger.warn(t.getMessage(), t); 24 } finally { 25 heatbeatTimer = null; 26 } 27 }
首先停掉之前的计时器,之后在线程创建开始heartbeat毫秒(60s)后执行第一次HeartBeatTask任务,之后每隔heartbeat毫秒(60s)执行一次HeartBeatTask任务。来看一下HeartBeatTask:
HeartBeatTask
1 final class HeartBeatTask implements Runnable { 2 private ChannelProvider channelProvider; 3 private int heartbeat;//60s 4 private int heartbeatTimeout;//180s 5 6 HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) { 7 this.channelProvider = provider; 8 this.heartbeat = heartbeat; 9 this.heartbeatTimeout = heartbeatTimeout; 10 } 11 12 public void run() { 13 try { 14 long now = System.currentTimeMillis(); 15 for (Channel channel : channelProvider.getChannels()) { 16 if (channel.isClosed()) { 17 continue; 18 } 19 try { 20 Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);//"READ_TIMESTAMP" 21 Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);//"WRITE_TIMESTAMP" 22 //如果最后一次读和写在heartbeat时间(60s)内,则最后一次的读和写本身可以看作心跳;否则,需要程序发送心跳 23 if ((lastRead != null && now - lastRead > heartbeat) 24 || (lastWrite != null && now - lastWrite > heartbeat)) { 25 Request req = new Request(); 26 req.setVersion("2.0.0"); 27 req.setTwoWay(true); 28 req.setEvent(Request.HEARTBEAT_EVENT); 29 channel.send(req); 30 if (logger.isDebugEnabled()) { 31 logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() 32 + ", cause: The channel has no>); 33 } 34 } 35 //如果最后一次读的时间距离现在已经超过heartbeatTimeout了,我们认为channel已经断了(因为在这个过程中,发送了三次心跳都没反应),此时channel进行重连 36 if (lastRead != null && now - lastRead > heartbeatTimeout) { 37 logger.warn("Close channel " + channel 38 + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); 39 if (channel instanceof Client) { 40 try { 41 ((Client) channel).reconnect(); 42 } catch (Exception e) { 43 //do nothing 44 } 45 } else { 46 channel.close(); 47 } 48 } 49 } catch (Throwable t) { 50 logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); 51 } 52 } 53 } catch (Throwable t) { 54 logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t); 55 } 56 } 57 58 interface ChannelProvider { 59 Collection<Channel> getChannels(); 60 } 61 }
说明:
到现在一个完整的ExchangeServer就OK了。之后我们将创建出来的ExchangeServer实例存放在DubboProtocol的Map<String, ExchangeServer> serverMap属性中:
{ "10.10.10.10:20880" : ExchangeServer实例 }
最后,DubboProtocol.export(Invoker<T> invoker)将之前创建的DubboExporter实例返回。
2.4 创建RegistryProtocol.ExporterChangeableWrapper来封装Exporter和originInvoker
1 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)
1 private class ExporterChangeableWrapper<T> implements Exporter<T> { 2 private final Invoker<T> originInvoker; 3 private Exporter<T> exporter; 4 5 public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) { 6 this.exporter = exporter; 7 this.originInvoker = originInvoker; 8 } 9 10 public Invoker<T> getOriginInvoker() { 11 return originInvoker; 12 } 13 14 public Invoker<T> getInvoker() { 15 return exporter.getInvoker(); 16 } 17 18 public void setExporter(Exporter<T> exporter) { 19 this.exporter = exporter; 20 } 21 22 public void unexport() { 23 String key = getCacheKey(this.originInvoker); 24 bounds.remove(key); 25 exporter.unexport(); 26 } 27 }
ExporterChangeableWrapper类是RegistryProtocol的私有内部类。
最后,将<providerUrl, ExporterChangeableWrapper实例>放入RegistryProtocol的属性Map<String, ExporterChangeableWrapper<?>> bounds中。
到此为止,RegistryProtocol.export(final Invoker<T> originInvoker)的第一行代码就完成了。
1 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 2 //export invoker 3 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); 4 //registry provider 5 final Registry registry = getRegistry(originInvoker); 6 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 7 registry.register(registedProviderUrl); 8 // 订阅override数据 9 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 10 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 11 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 12 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 13 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 14 //保证每次export都返回一个新的exporter实例 15 return new Exporter<T>() { 16 public Invoker<T> getInvoker() { 17 return exporter.getInvoker(); 18 } 19 20 public void unexport() { 21 try { 22 exporter.unexport(); 23 } catch (Throwable t) { 24 logger.warn(t.getMessage(), t); 25 } 26 try { 27 registry.unregister(registedProviderUrl); 28 } catch (Throwable t) { 29 logger.warn(t.getMessage(), t); 30 } 31 try { 32 overrideListeners.remove(overrideSubscribeUrl); 33 registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); 34 } catch (Throwable t) { 35 logger.warn(t.getMessage(), t); 36 } 37 } 38 }; 39 }
更多网易技术、产品、运营经验分享请点击。