dubbo源码分析第九篇一消费者通信NettyClient
通信协议图示
- dubbo使用netty4时建立三层handler: encode,decode,和nettyhandler[NettyClientHandler,NettyServerHandler]
- dubbo通信三层exchange transport,codec分别表示交换层,传输层,编码层
- exchange层主要是完成RpcRequest和RpcResponse的解析映射处理,同步转异步
- transportor层主要是完成通信,比如选择netty 还是mina等
- codec编解码以dubbo协议来说对应dubboCountCodec [netty的第一个handler]
源码分析
dubboProtocol.getClients
- 默认采用共享单个客户端,多线程模型
private ExchangeClient[] getClients(URL url) { 是否共享连接boolean useShareConnect = false;获取要创建的连接数int connections = url.getParameter(CONNECTIONS_KEY, 0);共享的客户端List<ReferenceCountExchangeClient> shareClients = null;if (connections == 0) {默认为零useShareConnect = true;获取共享连接数量 默认1个String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr); 构建共享客户端数组shareClients = getSharedClient(url, connections);}ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {if (useShareConnect) {clients[i] = shareClients.get(i);} else {clients[i] = initClient(url);}}retu clients;}
DubboProtocol.getSharedClient
- 增加client被引用的数量,client引用数量为0则可以被销毁
- ip:port为维度构建共享client
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {String key = url.getAddress();List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);if (checkClientCanUse(clients)) { 每个Invoker引用时候会增加客户端连接数 当客户端连接数<=0 可能会触发closebatchClientRefIncr(clients);retu clients;}locks.putIfAbsent(key, new Object());synchronized (locks.get(key)) {clients = referenceClientMap.get(key);二次检查if (checkClientCanUse(clients)) {batchClientRefIncr(clients);retu clients;}至少创建一个连接 兜底逻辑connectNum = Math.max(connectNum, 1);创建客户端 放置referenceClientMap {ip:port,Client数组}if (CollectionUtils.isEmpty(clients)) {clients = buildReferenceCountExchangeClientList(url, connectNum);referenceClientMap.put(key, clients);} else { 检查不可用client 进行重建for (int i = 0; i < clients.size(); i++) {ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);// If there is a client in the list that is no longer available, create a new one to replace him.if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {clients.set(i, buildReferenceCountExchangeClient(url));continue;}referenceCountExchangeClient.incrementAndGetCount();}}locks.remove(key);retu clients;}}
buildReferenceCountExchangeClientList构建客户端
- 按照连接数构建客户端
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {List<ReferenceCountExchangeClient> clients = new ArrayList<>(); 默认共享场景下:共享连接为1个for (int i = 0; i < connectNum; i++) {clients.add(buildReferenceCountExchangeClient(url));}retu clients;}构建客户端实现部分 private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {ExchangeClient exchangeClient = initClient(url);retu new ReferenceCountExchangeClient(exchangeClient);}
构建 ExchangeClient
涉及dubbo远程通信三层 Exchange Transport codec
dubbo协议指定了codec 层 传入了Exchange层的requestHandler Transport层默认url不配置则根据spi指定为netty实现
initClient
- 指定了codec编解码序列化层实现
- 指定了传输层采用 netty
- 指定了ExchangeHandlerAdapter作为exchange的核心实现
- ExchangeHandlerAdapter通过Future加请求唯一id完成请求和响应映射
private ExchangeClient initClient(URL url) {指定transport层实现为nettyString str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));指定codec编解码层为DubboCountCodecurl = url.addParameter(CODEC_KEY, DubboCodec.NAME);指定心跳的时长为1分钟url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));...... 删除其他代码ExchangeClient client;try {connection should be lazy 是否懒连接if (url.getParameter(LAZY_CONNECT_KEY, false)) {client = new LazyConnectExchangeClient(url, requestHandler);} else { 默认走直接创建Clientclient = Exchangers.connect(url, requestHandler);}} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}retu client;}
HeaderExchanger.connect
- ExchangeClient构建 完成exchange层的handler增强 对dubboProtocol传入的handler进行增强形成完整的exchange层
- 通过调用Transporters.bind完成传输层的选择,默认选择netty
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");基于spi获取Exchanger,有且只有HeaderExchangerretu getExchanger(url).connect(url, handler);}
- 完成 exchange-transport-codec三层统一暴露
public class HeaderExchanger implements Exchanger {public static final String NAME = "header";@Overridepublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { 构建exchange层 封装transport层和exchange层的核心 handlerretu new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);}}
Transporters.connect
- getTransporter通过spi选择NettyTransporter
- NettyTransporter实现netty骨架编排
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { ...... 删除其他代码retu getTransporter().connect(url, handler);}
public class NettyTransporter implements Transporter {public static final String NAME = "netty";@Overridepublic Client connect(URL url, ChannelHandler listener) throws RemotingException { 负责netty骨架编排retu new NettyClient(url, listener);}}
nettyclient编排
- 增强handler
- 根据url获取codec=>DubboCountCodec
- 编排netty模型 netty的channelhandle统一采用三层架构[encode decode bizHandler]
MultiMessageHandler | HeartbeatHandler | AllChannelHandler | DecodeHandler | HeaderExchangeHandler | ExchangeHandlerAdapter |
---|---|---|---|---|---|
多消息处理 | 心跳处理 | 支持多线程注入 | 支持编解码 | request和response 映射 | 完成方法查找调用 |
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { netty框架编排前 handler增强成如下结构 MultiMessageHandler->HeartbeatHandler->AllChannelHandler DecodeHandler->HeaderExchangeHandler->DubboProtocol.ExchangeHandlerAdapter super(url, wrapChannelHandler(url, handler));}
public AbstractEndpoint(URL url, ChannelHandler handler) { 将handler设置到父类的handler属性super(url, handler);将codec属性设置为spi名为dubbo的编解码器DubboCountCodecthis.codec = getChannelCodec(url);this.timeout = url.getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);}
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {根据url赋值codec到codec属性handler赋值到handler属性super(url, handler);netty框架 编排doOpen();打开链接connect();设置线程池executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().get(CONSUMER_SIDE, Integer.toString(url.getPort()));ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().remove(CONSUMER_SIDE, Integer.toString(url.getPort()));}
NettyClient.doOpen完成netty框架编排
- handler结构: NettyClientHandler-> NettyClient->NettyClient.handler
- 编码结构: NettyCodecAdapter.IntealEncoder->DubboCountCodec->DubboCodec
- 解码结构: NettyCodecAdapter.IntealDecoder->DubboCountCodec->DubboCodec
protected void doOpen() throws Throwable { 封装handlerfinal NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);引导启动器bootstrap = new Bootstrap();设置线程组bootstrap.group(nioEventLoopGroup) 保持长链接.option(ChannelOption.SO_KEEPALIVE, true) 关闭小包滞连 Nagle算法.option(ChannelOption.TCP_NODELAY, true) 池化直接内存管理netty内存,参加伙伴算法分配,本文不介绍netty,后续分享netty源码.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)超时配置.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) 指定SocketChannel.channel(NioSocketChannel.class);bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));设置channel初始化器 初始化handlerbootstrap.handler(new ChannelInitializer() { 构建三层handler架构模型[decoder,encoder,handler]@Overrideprotected void initChannel(Channel ch) throws Exception {int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug.addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder()) 固定空闲检测handler.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)).addLast("handler", nettyClientHandler);String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);if(socksProxyHost != null) {int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));ch.pipeline().addFirst(socks5ProxyHandler);}}});}
总结
- 消费端通信层架设
- netty编排的实现
- handler采用装饰者设计模式
- 采用netty实现时,dubbo remote三层对应netty编排如下
exchange | transport | codec |
---|---|---|
netty handler | 选择netty mina 实现 | netty 编解码handler |
- dubbo 映射为 handler 选择netty通信框架
扩展点一handler概述
handler为什么采用装饰者模式,而不采用netty的管道链式模型
- 引用相关书籍解释: Netty一次完整的RPC调用贯穿了一系列的Handler,如果直接挂载到底层通信框架(Netty ,因为整个链路比较长,则需要触发大量链式查找和事件,不仅低效,而且浪费资源
- 作者理解1: 相关书籍解释并未问题,但就效率来说,dubbo的场景是rpc调用,在实际业务场景中,这个调用链路最耗时的是业务IO,而非链式查找的cpu消耗;
- 作者理解2: 假设作为dubbo框架开发,你需要完成dubbo handler对不同框架的适配,比如netty和mina,你肯定希望入侵越小越好,结合越简单越好,我们通过装饰者模式完成整个业务handler处理,只需要将最外层的handler与通信框架的处理器结合即可,比如netty的channelhandler,这样无需将多个handler编排到netty,而如果是多个handler,编排netty和mina一定是两套适配模板
- 作者理解3: 综合2,作者认为其一个handler的暴露方式更适合模型的统一
handler作用
扩展点一 dubbo协议报文
- 前16字节为dubbo协议魔术值[(0xdabb]
- 32至64位表示映射请求响应的REQUEST_ID
- 消息体长度
- 消息体invocation内容
扩展点一 codec与telnet codec
- AbstractCodec主要提供基础能力,比如校验报文长度和查找编解码器
- transportCodec 主要负责序列化反序列化以及自动清理流
- telnet协议和dubbo codec协议共用一套 支持telnet命令
- ExchangeCodec核心 负责字节流到Request/Response转换
- DubboCodec 负责一些细节部分,比如dubbo协议的隐式参数附件等
- DubboCountCodec 支持多消息处理
作者:岁月人
来源链接:https://blog.csdn.net/qq_35529969/article/details/122519010
版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。
2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。