服务远程暴露 - 创建Exporter与启动netty服务端(4)

勿忘初心2018-11-18 11:53

此文已由作者赵计刚薪授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


到此为止,NettyServer就创建成功了。 之后,终于执行到了:

new HeaderExchangeServer(Server NettyServer)

 1     private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, new NamedThreadFactory("dubbo-remoting-server-heartbeat",true));
 2     private final Server server;
 3     // 心跳定时器
 4     private ScheduledFuture<?> heatbeatTimer;
 5     // 心跳超时,毫秒。缺省0,不会执行心跳。
 6     private int heartbeat;
 7     private int heartbeatTimeout;
 8     private AtomicBoolean closed = new AtomicBoolean(false);
 9 
10     public HeaderExchangeServer(Server server) {
11         if (server == null) {
12             throw new IllegalArgumentException("server == null");
13         }
14         this.server = server;
15         this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);//60000 在createServer(URL providerUrl)中拼接了heartbeat参数
16         this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);//3*60000
17         if (heartbeatTimeout < heartbeat * 2) {
18             throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
19         }
20         startHeatbeatTimer();
21     }

说明:

  • 属性
    • scheduled:是一个有1个名字为dubbo-remoting-server-heartbeat的后台线程的定时线程池;
    • server:之前创建出来的NettyServer实例;
    • heartbeatTimer:心跳计时器
    • heartbeat:心跳时间,该参数会在HeaderExchangeServer的构造器中进行赋值,60000
    • heartbeatTimeout:心跳超时时间,超过该时间,会进行channel重连,180000
  • 启动心跳计时器

startHeatbeatTimer()

 1     private void startHeatbeatTimer() {
 2         stopHeartbeatTimer();
 3         if (heartbeat > 0) {
 4             heatbeatTimer = scheduled.scheduleWithFixedDelay(
 5                     new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
 6                         public Collection<Channel> getChannels() {
 7                             return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());
 8                         }
 9                     }, heartbeat, heartbeatTimeout),
10                     heartbeat,
11                     heartbeat,
12                     TimeUnit.MILLISECONDS);
13         }
14     }
15 
16     private void stopHeartbeatTimer() {
17         try {
18             ScheduledFuture<?> timer = heatbeatTimer;
19             if (timer != null && !timer.isCancelled()) {
20                 timer.cancel(true);
21             }
22         } catch (Throwable t) {
23             logger.warn(t.getMessage(), t);
24         } finally {
25             heatbeatTimer = null;
26         }
27     }

首先停掉之前的计时器,之后在线程创建开始heartbeat毫秒(60s)后执行第一次HeartBeatTask任务,之后每隔heartbeat毫秒(60s)执行一次HeartBeatTask任务。来看一下HeartBeatTask:

HeartBeatTask

 1 final class HeartBeatTask implements Runnable {
 2     private ChannelProvider channelProvider;
 3     private int heartbeat;//60s
 4     private int heartbeatTimeout;//180s
 5 
 6     HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
 7         this.channelProvider = provider;
 8         this.heartbeat = heartbeat;
 9         this.heartbeatTimeout = heartbeatTimeout;
10     }
11 
12     public void run() {
13         try {
14             long now = System.currentTimeMillis();
15             for (Channel channel : channelProvider.getChannels()) {
16                 if (channel.isClosed()) {
17                     continue;
18                 }
19                 try {
20                     Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);//"READ_TIMESTAMP"
21                     Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);//"WRITE_TIMESTAMP"
22                     //如果最后一次读和写在heartbeat时间(60s)内,则最后一次的读和写本身可以看作心跳;否则,需要程序发送心跳
23                     if ((lastRead != null && now - lastRead > heartbeat)
24                             || (lastWrite != null && now - lastWrite > heartbeat)) {
25                         Request req = new Request();
26                         req.setVersion("2.0.0");
27                         req.setTwoWay(true);
28                         req.setEvent(Request.HEARTBEAT_EVENT);
29                         channel.send(req);
30                         if (logger.isDebugEnabled()) {
31                             logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
32                                     + ", cause: The channel has no>);
33                         }
34                     }
35                     //如果最后一次读的时间距离现在已经超过heartbeatTimeout了,我们认为channel已经断了(因为在这个过程中,发送了三次心跳都没反应),此时channel进行重连
36                     if (lastRead != null && now - lastRead > heartbeatTimeout) {
37                         logger.warn("Close channel " + channel
38                                 + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
39                         if (channel instanceof Client) {
40                             try {
41                                 ((Client) channel).reconnect();
42                             } catch (Exception e) {
43                                 //do nothing
44                             }
45                         } else {
46                             channel.close();
47                         }
48                     }
49                 } catch (Throwable t) {
50                     logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
51                 }
52             }
53         } catch (Throwable t) {
54             logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
55         }
56     }
57 
58     interface ChannelProvider {
59         Collection<Channel> getChannels();
60     }
61 }

说明:

  • 属性
    • channelProvider在startHeatbeatTimer()中创建,并且获取了当前的HeaderExchangeServer的所有channels
    • heartbeat:60s
    • heartbeatTimeout:180s
  • run()
    • 如果最后一次读和写的时间距离现在在heartbeat时间(60s)内,则最后一次的读和写本身可以看作心跳;否则,发送心跳
    • 如果最后一次读的时间距离现在已经超过heartbeatTimeout了,认为channel已经断了(因为在这个过程中,发送了三次心跳都没反应),此时channel进行重连

到现在一个完整的ExchangeServer就OK了。之后我们将创建出来的ExchangeServer实例存放在DubboProtocol的Map<String, ExchangeServer> serverMap属性中:

{ "10.10.10.10:20880" : ExchangeServer实例 }

最后,DubboProtocol.export(Invoker<T> invoker)将之前创建的DubboExporter实例返回。

 

2.4  创建RegistryProtocol.ExporterChangeableWrapper来封装Exporter和originInvoker

1 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)
 1     private class ExporterChangeableWrapper<T> implements Exporter<T> {
 2         private final Invoker<T> originInvoker;
 3         private Exporter<T> exporter;
 4 
 5         public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
 6             this.exporter = exporter;
 7             this.originInvoker = originInvoker;
 8         }
 9 
10         public Invoker<T> getOriginInvoker() {
11             return originInvoker;
12         }
13 
14         public Invoker<T> getInvoker() {
15             return exporter.getInvoker();
16         }
17 
18         public void setExporter(Exporter<T> exporter) {
19             this.exporter = exporter;
20         }
21 
22         public void unexport() {
23             String key = getCacheKey(this.originInvoker);
24             bounds.remove(key);
25             exporter.unexport();
26         }
27     }

ExporterChangeableWrapper类是RegistryProtocol的私有内部类

最后,将<providerUrl, ExporterChangeableWrapper实例>放入RegistryProtocol的属性Map<String, ExporterChangeableWrapper<?>> bounds中。

  • key:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=744&side=provider&timestamp=1507176748026
  • value:RegistryProtocol$ExporterChangeableWrapper实例
    • originInvoker:即AbstractProxyInvoker实例属性如下:
      • proxy:DemoServiceImpl实例
      • type:Class<com.alibaba.dubbo.demo.DemoService>
      • url:registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D993%26side%3Dprovider%26timestamp%3D1507100322516&pid=993&registry=zookeeper&timestamp=1507100319830
    • DubboExporter实例
      • key:com.alibaba.dubbo.demo.DemoService:20880
      • invoker:"InvokerDelegete的filter对象"
      • exporterMap:{ "com.alibaba.dubbo.demo.DemoService:20880" -> 当前的DubboExporter实例 }

 

到此为止,RegistryProtocol.export(final Invoker<T> originInvoker)的第一行代码就完成了。

 1     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
 2         //export invoker
 3         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
 4         //registry provider
 5         final Registry registry = getRegistry(originInvoker);
 6         final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
 7         registry.register(registedProviderUrl);
 8         // 订阅override数据
 9         // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
10         final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
11         final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
12         overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
13         registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
14         //保证每次export都返回一个新的exporter实例
15         return new Exporter<T>() {
16             public Invoker<T> getInvoker() {
17                 return exporter.getInvoker();
18             }
19 
20             public void unexport() {
21                 try {
22                     exporter.unexport();
23                 } catch (Throwable t) {
24                     logger.warn(t.getMessage(), t);
25                 }
26                 try {
27                     registry.unregister(registedProviderUrl);
28                 } catch (Throwable t) {
29                     logger.warn(t.getMessage(), t);
30                 }
31                 try {
32                     overrideListeners.remove(overrideSubscribeUrl);
33                     registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
34                 } catch (Throwable t) {
35                     logger.warn(t.getMessage(), t);
36                 }
37             }
38         };
39     }

 

免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击