服务远程暴露 - 创建Exporter与启动netty服务端(1)

勿忘初心2018-11-18 11:49

此文已由作者赵计刚薪授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


为了安全:服务启动的ip全部使用10.10.10.10

远程服务的暴露总体步骤:

  • 将ref封装为invoker
  • 将invoker转换为exporter
  • 启动netty
  • 注册服务到zookeeper
  • 订阅
  • 返回新的exporter实例

服务远程暴露的代码:

 1             //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
 2             if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
 3                 if (logger.isInfoEnabled()) {
 4                     logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
 5                 }
 6                 if (registryURLs != null && registryURLs.size() > 0
 7                         && url.getParameter("register", true)) {
 8                     for (URL registryURL : registryURLs) {
 9                         url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
10                         URL monitorUrl = loadMonitor(registryURL);
11                         if (monitorUrl != null) {
12                             url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
13                         }
14                         if (logger.isInfoEnabled()) {
15                             logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
16                         }
17                         Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
18                         Exporter<?> exporter = protocol.export(invoker);
19                         exporters.add(exporter);
20                     }
21                 } else {
22                     Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
23                     Exporter<?> exporter = protocol.export(invoker);
24                     exporters.add(exporter);
25                 }
26             }

首先将实现类ref封装为Invoker,之后将invoker转换为exporter,最后将exporter放入缓存List<Exporter> exporters中。

 

一 将实现类ref封装为Invoker

1 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

1  为registryURL拼接export=providerUrl参数

一开始的registryURL:

registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&pid=887&registry=zookeeper&timestamp=1507096022072

registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())这句代码为registryURL添加了参数并编码:(这里给出没有编码的样子)

1 export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=887&side=provider&timestamp=1507096024334

2  ProxyFactory$Adaptive.getInvoker(DemoServiceImpl实例, Class<DemoService>, registryURL)

 1     public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
 2         if (arg2 == null)
 3             throw new IllegalArgumentException("url == null");
 4         com.alibaba.dubbo.common.URL url = arg2;
 5         String extName = url.getParameter("proxy", "javassist");//结果是javassist
 6         if(extName == null)
 7             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
 8         com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
 9         return extension.getInvoker(arg0, arg1, arg2);
10     }

这里,本来是调用JavassistProxyFactory的getInvoker方法,但是JavassistProxyFactory被StubProxyFactoryWrapper给aop了。

3  StubProxyFactoryWrapper.getInvoker(DemoServiceImpl实例, Class<DemoService>, registryURL)

1     public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
2         return proxyFactory.getInvoker(proxy, type, url);
3     }

4  JavassistProxyFactory.getInvoker(DemoServiceImpl实例, Class<DemoService>, registryURL)

 1     public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
 2         // TODO Wrapper类不能正确处理带$的类名
 3         final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
 4         return new AbstractProxyInvoker<T>(proxy, type, url) {
 5             @Override
 6             protected Object doInvoke(T proxy, String methodName,
 7                                       Class<?>[] parameterTypes,
 8                                       Object[] arguments) throws Throwable {
 9                 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
10             }
11         };
12     }

首先是创建Wrapper类:Wrapper.getWrapper(Class<DemoServiceImpl>)。该类记录了DemoServiceImpl的属性名称,方法名称等信息。关键代码如下:(完整代码见:7.2 服务本地暴露

 1 import com.alibaba.dubbo.common.bytecode.Wrapper;
 2 import java.util.HashMap;
 3 
 4 public class Wrapper1 extends Wrapper {
 5 
 6     public static String[] pns;//property name array
 7     public static java.util.Map pts = new HashMap();//<property key, property value>
 8     public static String[] mns;//method names
 9     public static String[] dmns;//
10     public static Class[] mts0;
55     /**
56      * @param o  实现类
57      * @param n  方法名称
58      * @param p  参数类型
59      * @param v  参数名称
60      * @return
61      * @throws java.lang.reflect.InvocationTargetException
62      */
63     public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
64         com.alibaba.dubbo.demo.provider.DemoServiceImpl w;
65         try {
66             w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) o);
67         } catch (Throwable e) {
68             throw new IllegalArgumentException(e);
69         }
70         try {
71             if ("sayHello".equals(n) && p.length == 1) {
72                 return ($w) w.sayHello((java.lang.String) v[0]);
73             }
74         } catch (Throwable e) {
75             throw new java.lang.reflect.InvocationTargetException(e);
76         }
77         throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.");
78     }
79 }

创建完DemoServiceImpl的Wrapper类之后(实际上该实例在本地暴露的时候已经存入缓存了,这里只是从缓存中拿出来而已),创建一个AbstractProxyInvoker实例。

 1     private final T proxy;
 2     private final Class<T> type;
 3     private final URL url;
 4 
 5     public AbstractProxyInvoker(T proxy, Class<T> type, URL url) {
 6         if (proxy == null) {
 7             throw new IllegalArgumentException("proxy == null");
 8         }
 9         if (type == null) {
10             throw new IllegalArgumentException("interface == null");
11         }
12         if (!type.isInstance(proxy)) {
13             throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);
14         }
15         this.proxy = proxy;
16         this.type = type;
17         this.url = url;
18     }

最后创建完成的AbstractProxyInvoker实例属性如下:

  • proxy:DemoServiceImpl实例
  • type:Class<com.alibaba.dubbo.demo.DemoService>
  • url:registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D993%26side%3Dprovider%26timestamp%3D1507100322516&pid=993&registry=zookeeper&timestamp=1507100319830

这样我们就将ref实现类转换成了Invoker,之后在调用该invoker.invoke(Invocation invocation)的时候,会调用invoker.doInvoke(T proxy, String methodName,Class<?>[] parameterTypes, Object[] arguments)的时候,就会调用相应的实现类proxy的wrapper类的invokeMethod(proxy, methodName, parameterTypes, arguments),该方法又会调用真实的实现类methodName方法。这里可以先给出AbstractProxyInvoker.invoke(Invocation invocation)源码:

1     public Result invoke(Invocation invocation) throws RpcException {
2         try {
3             return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
4         } catch (InvocationTargetException e) {
5             return new RpcResult(e.getTargetException());
6         } catch (Throwable e) {
7             throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
8         }
9     }

这里的proxy就是上边赋好值的proxy:DemoServiceImpl实例。而方法信息会封装在Invocation对象中,该对象在服务引用时介绍。

 

二  将Invoker转换为Exporter

1 Exporter<?> exporter = protocol.export(invoker)

1  Protocol$Adaptive.export(com.alibaba.dubbo.rpc.Invoker AbstractProxyInvoker实例)

 1     public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
 2         if (arg0 == null)
 3             throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
 4         if (arg0.getUrl() == null)
 5             throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
 6         com.alibaba.dubbo.common.URL url = arg0.getUrl();
 7         String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//registry
 8         if(extName == null)
 9             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
10         com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
11         return extension.export(arg0);
12     }

这里,由于aop的原因,首先调用了ProtocolListenerWrapper的export(Invoker<T> invoker),如下:

1     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2         if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
3             return protocol.export(invoker);
4         }
5         return new ListenerExporterWrapper<T>(protocol.export(invoker),
6                                               Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
7     }

由于协议是“registry”,所以不做任何处理,继续调用ProtocolFilterWrapper的export(Invoker<T> invoker),如下:

1     public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
2         if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
3             return protocol.export(invoker);
4         }
5         return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
6     }

同理,由于协议是“registry”,所以不做任何处理,继续调用RegistryProtocol.export(final Invoker<T> originInvoker),如下:

 1     public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
 2         //export invoker
 3         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
 4         //registry provider
 5         final Registry registry = getRegistry(originInvoker);
 6         final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
 7         registry.register(registedProviderUrl);
 8         // 订阅override数据
 9         // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
10         final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
11         final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
12         overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
13         registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
14         //保证每次export都返回一个新的exporter实例
15         return new Exporter<T>() {
16             public Invoker<T> getInvoker() {
17                 return exporter.getInvoker();
18             }
19 
20             public void unexport() {
21                 try {
22                     exporter.unexport();
23                 } catch (Throwable t) {
24                     logger.warn(t.getMessage(), t);
25                 }
26                 try {
27                     registry.unregister(registedProviderUrl);
28                 } catch (Throwable t) {
29                     logger.warn(t.getMessage(), t);
30                 }
31                 try {
32                     overrideListeners.remove(overrideSubscribeUrl);
33                     registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
34                 } catch (Throwable t) {
35                     logger.warn(t.getMessage(), t);
36                 }
37             }
38         };
39     }

该方法完成了远程暴露的全部流程。

  • 将invoker转换为exporter
  • 启动netty
  • 注册服务到zookeeper
  • 订阅
  • 返回新的exporter实例

2  将invoker转换为exporter并启动netty服务

1 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

doLocalExport(final Invoker<T> originInvoker)

 1     /**
 2      * 1 从invoker的URL中的Map<String, String> parameters中获取key为export的地址providerUrl,该地址将是服务注册在zk上的节点
 3      * 2 从 Map<String, ExporterChangeableWrapper<?>> bounds 缓存中获取key为上述providerUrl的exporter,如果有,直接返回,如果没有,创建并返回
 4      * @return
 5      */
 6     @SuppressWarnings("unchecked")
 7     private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
 8         String key = getCacheKey(originInvoker);//根据originInvoker获取providerUrl
 9         ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
10         if (exporter == null) {
11             synchronized (bounds) {
12                 exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
13                 if (exporter == null) {
14                     final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));//存储originInvoker和providerUrl
15                     exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
16                     bounds.put(key, exporter);
17                 }
18             }
19         }
20         return exporter;
21     }

2.1 从originInvoker中获取providerUrl

该方法直接首先调用getCacheKey(final Invoker<?> originInvoker)中获取providerUrl,这里的originInvoker就是上述创建出来的AbstractProxyInvoker实例,注意他的url是registry协议的,该url的export参数的value就是我们要获取的providerUrl。获取providerUrl的源码如下:

 1     private String getCacheKey(final Invoker<?> originInvoker) {
 2         URL providerUrl = getProviderUrl(originInvoker);
 3         String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
 4         return key;
 5     }
 6 
 7     private URL getProviderUrl(final Invoker<?> origininvoker) {
 8         String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY);
 9         if (export == null || export.length() == 0) {
10             throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
11         }
12 
13         URL providerUrl = URL.valueOf(export);
14         return providerUrl;
15     }

之后一系列的操作,就是获取该providerUrl对应的exporter,之后放入缓存Map<String, ExporterChangeableWrapper<?>> bounds中,所以一个providerUrl只会对应一个exporter。


免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击