服务端接收请求消息并发送响应消息源码

叁叁肆2018-11-18 11:24

此文已由作者赵计刚薪授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


一 总体流程图

服务端接收请求消息
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->MultiMessageHandler.received(Channel channel, Object message)
  -->HeartbeatHandler.received(Channel channel, Object message)
    -->AllChannelHandler.received(Channel channel, Object message)
      -->ExecutorService cexecutor = getExecutorService()
      -->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
        -->ChannelEventRunnable.run()
          -->DecodeHandler.received(Channel channel, Object message)
            -->decode(Object message)
            -->HeaderExchangeHandler.received(Channel channel, Object message)
              -->Response response = handleRequest(exchangeChannel, request)
                -->DubboProtocol.requestHandler.reply(ExchangeChannel channel, Object message)//这里的message就是上边的RpcInvocation
		  //首先获取exporter,之后再获取invoker
		  -->getInvoker(Channel channel, Invocation inv)//组装serviceKey=com.alibaba.dubbo.demo.DemoService:20880
		    -->(DubboExporter<?>) exporterMap.get(serviceKey)//从Map<String, Exporter<?>> exporterMap中根据serviceKey获取DubboExport实例,
		    -->exporter.getInvoker()//获取RegistryProtocol$InvokerDelegete实例
		  //执行filter链
		  -->EchoFilter.invoke(Invoker<?> invoker, Invocation inv)
		    -->ClassLoaderFilter.nvoke(Invoker<?> invoker, Invocation invocation)
		      -->GenericFilter.invoke(Invoker<?> invoker, Invocation inv)
		        -->ContextFilter.invoke(Invoker<?> invoker, Invocation invocation)
			  -->TraceFilter.invoke(Invoker<?> invoker, Invocation invocation)
			    -->TimeoutFilter.invoke(Invoker<?> invoker, Invocation invocation)
			      -->MonitorFilter.invoke(Invoker<?> invoker, Invocation invocation)
			        -->ExceptionFilter.invoke(Invoker<?> invoker, Invocation invocation)
			          //执行真正的invoker调用
				  -->AbstractProxyInvoker.invoke(Invocation invocation)
			            -->JavassistProxyFactory$AbstractProxyInvoker.doInvoke
				      -->Wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments)
					-->DemoServiceImpl.sayHello(String name)
			            -->new RpcResult(Object result)//将返回值result包装成RpcResult(最后该参数会被包装为Response)
	      服务端发送响应消息
              -->channel.send(response)//NettyChannel
                -->NioAcceptedSocketChannel.write(Object message)//已经是netty的东西了,这里的message=Response实例:最重要的是RpcResult [result=Hello world, response form provider: 10.211.55.2:20880, exception=null]

 

二 源码解析

netty通信是在netty的handler中进行消息的接收处理和发送。来看一下NettyServer的handler。

 1     protected void doOpen() throws Throwable {
 2         ...
 3         final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
 4         ...
 5         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
 6             public ChannelPipeline getPipeline() {
 7                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
 8                 ChannelPipeline pipeline = Channels.pipeline();
 9                 pipeline.addLast("decoder", adapter.getDecoder());
10                 pipeline.addLast("encoder", adapter.getEncoder());
11                 pipeline.addLast("handler", nettyHandler);
12                 return pipeline;
13             }
14         });
15         ...
16     }

NettyHandler.messageReceived

 1     private final ChannelHandler handler;//NettyServer
 2 
 3     @Override
 4     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
 5         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
 6         try {
 7             handler.received(channel, e.getMessage());
 8         } finally {
 9             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
10         }
11     }

首先会执行NettyServer父类AbstractPeer的received方法,其调用MultiMessageHandler.received:

 1     protected ChannelHandler handler;//HeartbeatHandler
 2     public void received(Channel channel, Object message) throws RemotingException {
 3         if (message instanceof MultiMessage) {
 4             MultiMessage list = (MultiMessage) message;
 5             for (Object obj : list) {
 6                 handler.received(channel, obj);
 7             }
 8         } else {
 9             handler.received(channel, message);
10         }
11     }

HeartbeatHandler.received(Channel channel, Object message)

 1     protected ChannelHandler handler;//AllChannelHandler
 2     public void received(Channel channel, Object message) throws RemotingException {
 3         setReadTimestamp(channel);
 4         if (isHeartbeatRequest(message)) {
 5             ...
 6             return;
 7         }
 8         if (isHeartbeatResponse(message)) {
 9            ...
10             return;
11         }
12         handler.received(channel, message);
13     }

AllChannelHandler.received(Channel channel, Object message)

 1     protected final ExecutorService executor;//ThreadPoolExecutor
 2     protected final ChannelHandler handler;//DecodeHandler
 3 
 4     public void received(Channel channel, Object message) throws RemotingException {
 5         ExecutorService cexecutor = getExecutorService();
 6         try {
 7             cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
 8         } catch (Throwable t) {
 9             ...
10             throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
11         }
12     }
13 
14     private ExecutorService getExecutorService() {
15         ExecutorService cexecutor = executor;
16         if (cexecutor == null || cexecutor.isShutdown()) {
17             cexecutor = SHARED_EXECUTOR;
18         }
19         return cexecutor;
20     }

这里首先创建了一个线程任务ChannelEventRunnable,之后丢入线程池进行执行。

ChannelEventRunnable.run()

 1     private final ChannelHandler handler;//DecodeHandler
 2     public void run() {
 3         switch (state) {
 4             case CONNECTED:
 5                 ...
 6                 break;
 7             case DISCONNECTED:
 8                 ...
 9                 break;
10             case SENT:
11                 ...              
12                 break;
13             case RECEIVED:
14                 try {
15                     handler.received(channel, message);
16                 } catch (Exception e) {
17                     logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
18                             + ", message is " + message, e);
19                 }
20                 break;
21             case CAUGHT:
22                 ...
23                 break;
24             default:
25                 logger.warn("unknown state: " + state + ", message is " + message);
26         }
27     }

 DecodeHandler.received(Channel channel, Object message)

 1     protected ChannelHandler handler;//HeaderExchangeHandler
 2     public void received(Channel channel, Object message) throws RemotingException {
 3         if (message instanceof Decodeable) {
 4             decode(message);
 5         }
 6 
 7         if (message instanceof Request) {
 8             decode(((Request) message).getData());//解码
 9         }
10 
11         if (message instanceof Response) {
12             decode(((Response) message).getResult());
13         }
14 
15         handler.received(channel, message);
16     }

HeaderExchangeHandler.received(Channel channel, Object message)

 1     private final ExchangeHandler handler;//DubboProtocol$ExchangeHandler
 2 
 3     public void received(Channel channel, Object message) throws RemotingException {
 4         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
 5         ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
 6         try {
 7             if (message instanceof Request) {
 8                 // handle request.
 9                 Request request = (Request) message;
10                 if (request.isEvent()) {
11                     handlerEvent(channel, request);
12                 } else {
13                     if (request.isTwoWay()) {
14                         Response response = handleRequest(exchangeChannel, request);
15                         channel.send(response);
16                     } else {
17                         handler.received(exchangeChannel, request.getData());
18                     }
19                 }
20             } else if (message instanceof Response) {
21                 handleResponse(channel, (Response) message);
22             } else if (message instanceof String) {
23                 if (isClientSide(channel)) {
24                     Exception e = new Exception(...);
25                 } else {
26                     String echo = handler.telnet(channel, (String) message);
27                     if (echo != null && echo.length() > 0) {
28                         channel.send(echo);
29                     }
30                 }
31             } else {
32                 handler.received(exchangeChannel, message);
33             }
34         } finally {
35             HeaderExchangeChannel.removeChannelIfDisconnected(channel);
36         }
37     }
38 
39     Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
40         Response res = new Response(req.getId(), req.getVersion());
41         if (req.isBroken()) {
42             Object data = req.getData();
43 
44             String msg;
45             if (data == null) msg = null;
46             else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
47             else msg = data.toString();
48             res.setErrorMessage("Fail to decode request due to: " + msg);
49             res.setStatus(Response.BAD_REQUEST);
50 
51             return res;
52         }
53         // find handler by message class.
54         Object msg = req.getData();
55         try {
56             // handle data.
57             Object result = handler.reply(channel, msg);
58             res.setStatus(Response.OK);
59             res.setResult(result);
60         } catch (Throwable e) {
61             res.setStatus(Response.SERVICE_ERROR);
62             res.setErrorMessage(StringUtils.toString(e));
63         }
64         return res;
65     }

DubboProtocol$ExchangeHandler.reply(ExchangeChannel channel, Object message)

1         public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
2             if (message instanceof Invocation) {
3                 Invocation inv = (Invocation) message;
4                 Invoker<?> invoker = getInvoker(channel, inv);
5                 ...
6                 return invoker.invoke(inv);
7             }
8             throw new RemotingException(...);
9         }

首先是获取Invoker,之后使用该invoker执行真正调用。

 1     protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
 2 
 3     Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
 4         ...
 5         int port = channel.getLocalAddress().getPort();//20880
 6         String path = inv.getAttachments().get(Constants.PATH_KEY);
 7         ...
 8         String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
 9 
10         DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
11 
12         if (exporter == null)
13             throw new RemotingException(...);
14 
15         return exporter.getInvoker();
16     }

这里serviceKey是:com.alibaba.dubbo.demo.DemoService:20880。实际上是group/serviceName:serviceVersion:port。

 1     public static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
 2         StringBuilder buf = new StringBuilder();
 3         if (serviceGroup != null && serviceGroup.length() > 0) {
 4             buf.append(serviceGroup);
 5             buf.append("/");
 6         }
 7         buf.append(serviceName);
 8         if (serviceVersion != null && serviceVersion.length() > 0 && !"0.0.0".equals(serviceVersion)) {
 9             buf.append(":");
10             buf.append(serviceVersion);
11         }
12         buf.append(":");
13         buf.append(port);
14         return buf.toString();
15     }

Map<String, Exporter<?>> exporterMap在服务暴露时就已经初始化好了。"com.alibaba.dubbo.demo.DemoService:20880"->DubboExporter实例。该实例包含一个呗filter链包裹的Invoker实例:RegistryProtocol$InvokerDelegete实例。

之后开始执行filter链了,直到最后执行到RegistryProtocol$InvokerDelegete.invoke,该方法实际上是在RegistryProtocol$InvokerDelegete的父类InvokerWrapper执行,InvokerWrapper调用AbstractProxyInvoker.invoke(Invocation invocation)。

 1     private final T proxy;//DemoServiceImpl实例
 2 
 3     public Result invoke(Invocation invocation) throws RpcException {
 4         try {
 5             return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
 6         } catch (InvocationTargetException e) {
 7             return new RpcResult(e.getTargetException());
 8         } catch (Throwable e) {
 9             throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
10         }
11     }

这里先调用子类JavassistProxyFactory$AbstractProxyInvoker.doInvoke,之后将返回结果封装为RpcResult返回。

1 protected Object doInvoke(T proxy, String methodName,
2                                       Class<?>[] parameterTypes,
3                                       Object[] arguments) throws Throwable {
4                 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
5             }

这里调用了Wrapper类的invokeMethod方法,Wrapper是一个动态生成的类,笔者给出:

 1 import com.alibaba.dubbo.common.bytecode.Wrapper;
 2 import java.util.HashMap;
 3 
 4 public class Wrapper1 extends Wrapper {
 5 
 6     public static String[] pns;//property name array
 7     public static java.util.Map pts = new HashMap();//<property key, property value>
 8     public static String[] mns;//method names
 9     public static String[] dmns;//
10     public static Class[] mts0;
11     /**
12      * @param o  实现类
13      * @param n  方法名称
14      * @param p  参数类型
15      * @param v  参数名称
16      * @return
17      * @throws java.lang.reflect.InvocationTargetException
18      */
19     public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
20         com.alibaba.dubbo.demo.provider.DemoServiceImpl w;
21         try {
22             w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) o);
23         } catch (Throwable e) {
24             throw new IllegalArgumentException(e);
25         }
26         try {
27             if ("sayHello".equals(n) && p.length == 1) {
28                 return ($w) w.sayHello((java.lang.String) v[0]);
29             }
30         } catch (Throwable e) {
31             throw new java.lang.reflect.InvocationTargetException(e);
32         }
33         throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.");
34     }
35 }

这里距执行到了DemoServiceImpl的sayHello(String name)方法。之后将返回结果封装为RpcResult并返回,一直返回到HeaderExchangeHandler的received(Channel channel, Object message)

 1     public void received(Channel channel, Object message) throws RemotingException {
 2         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
 3         ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
 4         try {
 5             if (message instanceof Request) {
 6                 // handle request.
 7                 Request request = (Request) message;
 8                 if (request.isEvent()) {
 9                     handlerEvent(channel, request);
10                 } else {
11                     if (request.isTwoWay()) {
12                         Response response = handleRequest(exchangeChannel, request);
13                         channel.send(response);
14                     } else {
15                         handler.received(exchangeChannel, request.getData());
16                     }
17                 }
18             } else if (message instanceof Response) {
19                 handleResponse(channel, (Response) message);
20             } else if (message instanceof String) {
21                 if (isClientSide(channel)) {
22                     Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
23                     logger.error(e.getMessage(), e);
24                 } else {
25                     String echo = handler.telnet(channel, (String) message);
26                     if (echo != null && echo.length() > 0) {
27                         channel.send(echo);
28                     }
29                 }
30             } else {
31                 handler.received(exchangeChannel, message);
32             }
33         } finally {
34             HeaderExchangeChannel.removeChannelIfDisconnected(channel);
35         }
36     }

之后将响应结果返回给客户端,这里的channel是NettyChannel,执行NettyChannel的send方法,其调用NioAcceptedSocketChannel.write(Object message)将消息写会给客户端,结束!


免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击