当前位置: 首页 >服务端 > dubbo源码分析第九篇一消费者通信NettyClient

dubbo源码分析第九篇一消费者通信NettyClient

通信协议图示

  • dubbo使用netty4时建立三层handler: encode,decode,和nettyhandler[NettyClientHandler,NettyServerHandler]
  • dubbo通信三层exchange transport,codec分别表示交换层,传输层,编码层
  • exchange层主要是完成RpcRequest和RpcResponse的解析映射处理,同步转异步
  • transportor层主要是完成通信,比如选择netty 还是mina等
  • codec编解码以dubbo协议来说对应dubboCountCodec [netty的第一个handler]

dubbo源码分析第九篇一消费者通信NettyClient _ JavaClub全栈架构师技术笔记

源码分析

dubboProtocol.getClients

  • 默认采用共享单个客户端,多线程模型
private ExchangeClient[] getClients(URL url) { 	是否共享连接boolean useShareConnect = false;获取要创建的连接数int connections = url.getParameter(CONNECTIONS_KEY, 0);共享的客户端List<ReferenceCountExchangeClient> shareClients = null;if (connections == 0) {默认为零useShareConnect = true;获取共享连接数量 默认1String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);		构建共享客户端数组shareClients = getSharedClient(url, connections);}ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {if (useShareConnect) {clients[i] = shareClients.get(i);} else {clients[i] = initClient(url);}}retu clients;}

DubboProtocol.getSharedClient

  • 增加client被引用的数量,client引用数量为0则可以被销毁
  • ip:port为维度构建共享client
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {String key = url.getAddress();List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);if (checkClientCanUse(clients)) {	每个Invoker引用时候会增加客户端连接数 当客户端连接数<=0 可能会触发closebatchClientRefIncr(clients);retu clients;}locks.putIfAbsent(key, new Object());synchronized (locks.get(key)) {clients = referenceClientMap.get(key);二次检查if (checkClientCanUse(clients)) {batchClientRefIncr(clients);retu clients;}至少创建一个连接 兜底逻辑connectNum = Math.max(connectNum, 1);创建客户端 放置referenceClientMap {ip:port,Client数组}if (CollectionUtils.isEmpty(clients)) {clients = buildReferenceCountExchangeClientList(url, connectNum);referenceClientMap.put(key, clients);} else {	检查不可用client 进行重建for (int i = 0; i < clients.size(); i++) {ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);// If there is a client in the list that is no longer available, create a new one to replace him.if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {clients.set(i, buildReferenceCountExchangeClient(url));continue;}referenceCountExchangeClient.incrementAndGetCount();}}locks.remove(key);retu clients;}}

buildReferenceCountExchangeClientList构建客户端

  • 按照连接数构建客户端
  private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {List<ReferenceCountExchangeClient> clients = new ArrayList<>();		默认共享场景下:共享连接为1for (int i = 0; i < connectNum; i++) {clients.add(buildReferenceCountExchangeClient(url));}retu clients;}构建客户端实现部分 private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {ExchangeClient exchangeClient = initClient(url);retu new ReferenceCountExchangeClient(exchangeClient);}

构建 ExchangeClient

涉及dubbo远程通信三层 Exchange Transport codec

dubbo协议指定了codec 层 传入了Exchange层的requestHandler Transport层默认url不配置则根据spi指定为netty实现

initClient

  • 指定了codec编解码序列化层实现
  • 指定了传输层采用 netty
  • 指定了ExchangeHandlerAdapter作为exchange的核心实现
  • ExchangeHandlerAdapter通过Future加请求唯一id完成请求和响应映射
private ExchangeClient initClient(URL url) {指定transport层实现为nettyString str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));指定codec编解码层为DubboCountCodecurl = url.addParameter(CODEC_KEY, DubboCodec.NAME);指定心跳的时长为1分钟url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));...... 删除其他代码ExchangeClient client;try {connection should be lazy 是否懒连接if (url.getParameter(LAZY_CONNECT_KEY, false)) {client = new LazyConnectExchangeClient(url, requestHandler);} else {	默认走直接创建Clientclient = Exchangers.connect(url, requestHandler);}} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}retu client;}

HeaderExchanger.connect

  • ExchangeClient构建 完成exchange层的handler增强 对dubboProtocol传入的handler进行增强形成完整的exchange层
  • 通过调用Transporters.bind完成传输层的选择,默认选择netty
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");基于spi获取Exchanger,有且只有HeaderExchangerretu getExchanger(url).connect(url, handler);}
  • 完成 exchange-transport-codec三层统一暴露
public class HeaderExchanger implements Exchanger {public static final String NAME = "header";@Overridepublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {	构建exchange层 封装transport层和exchange层的核心 handlerretu new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);}}

Transporters.connect

  • getTransporter通过spi选择NettyTransporter
  • NettyTransporter实现netty骨架编排
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {	...... 删除其他代码retu getTransporter().connect(url, handler);}
public class NettyTransporter implements Transporter {public static final String NAME = "netty";@Overridepublic Client connect(URL url, ChannelHandler listener) throws RemotingException {	负责netty骨架编排retu new NettyClient(url, listener);}}

nettyclient编排

  • 增强handler
  • 根据url获取codec=>DubboCountCodec
  • 编排netty模型 netty的channelhandle统一采用三层架构[encode decode bizHandler]
MultiMessageHandlerHeartbeatHandlerAllChannelHandlerDecodeHandlerHeaderExchangeHandlerExchangeHandlerAdapter
多消息处理心跳处理支持多线程注入支持编解码request和response 映射完成方法查找调用
  public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {  		netty框架编排前 handler增强成如下结构	MultiMessageHandler->HeartbeatHandler->AllChannelHandler	DecodeHandler->HeaderExchangeHandler->DubboProtocol.ExchangeHandlerAdapter	super(url, wrapChannelHandler(url, handler));}
public AbstractEndpoint(URL url, ChannelHandler handler) {	将handler设置到父类的handler属性super(url, handler);将codec属性设置为spi名为dubbo的编解码器DubboCountCodecthis.codec = getChannelCodec(url);this.timeout = url.getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);}
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {根据url赋值codec到codec属性handler赋值到handler属性super(url, handler);netty框架 编排doOpen();打开链接connect();设置线程池executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().get(CONSUMER_SIDE, Integer.toString(url.getPort()));ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension().remove(CONSUMER_SIDE, Integer.toString(url.getPort()));}

NettyClient.doOpen完成netty框架编排

  • handler结构: NettyClientHandler-> NettyClient->NettyClient.handler
  • 编码结构: NettyCodecAdapter.IntealEncoder->DubboCountCodec->DubboCodec
  • 解码结构: NettyCodecAdapter.IntealDecoder->DubboCountCodec->DubboCodec
protected void doOpen() throws Throwable {		封装handlerfinal NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);引导启动器bootstrap = new Bootstrap();设置线程组bootstrap.group(nioEventLoopGroup)		保持长链接.option(ChannelOption.SO_KEEPALIVE, true) 关闭小包滞连 Nagle算法.option(ChannelOption.TCP_NODELAY, true) 池化直接内存管理netty内存,参加伙伴算法分配,本文不介绍netty,后续分享netty源码.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)超时配置.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) 指定SocketChannel.channel(NioSocketChannel.class);bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));设置channel初始化器 初始化handlerbootstrap.handler(new ChannelInitializer() {			构建三层handler架构模型[decoder,encoder,handler]@Overrideprotected void initChannel(Channel ch) throws Exception {int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug.addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder())	固定空闲检测handler.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)).addLast("handler", nettyClientHandler);String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);if(socksProxyHost != null) {int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));ch.pipeline().addFirst(socks5ProxyHandler);}}});}

总结

  • 消费端通信层架设
  • netty编排的实现
  • handler采用装饰者设计模式
  • 采用netty实现时,dubbo remote三层对应netty编排如下
exchangetransportcodec
netty handler选择netty mina 实现netty 编解码handler
  • dubbo 映射为 handler 选择netty通信框架

扩展点一handler概述

handler为什么采用装饰者模式,而不采用netty的管道链式模型

  • 引用相关书籍解释: Netty一次完整的RPC调用贯穿了一系列的Handler,如果直接挂载到底层通信框架(Netty ,因为整个链路比较长,则需要触发大量链式查找和事件,不仅低效,而且浪费资源
  • 作者理解1: 相关书籍解释并未问题,但就效率来说,dubbo的场景是rpc调用,在实际业务场景中,这个调用链路最耗时的是业务IO,而非链式查找的cpu消耗;
  • 作者理解2: 假设作为dubbo框架开发,你需要完成dubbo handler对不同框架的适配,比如netty和mina,你肯定希望入侵越小越好,结合越简单越好,我们通过装饰者模式完成整个业务handler处理,只需要将最外层的handler与通信框架的处理器结合即可,比如netty的channelhandler,这样无需将多个handler编排到netty,而如果是多个handler,编排netty和mina一定是两套适配模板
  • 作者理解3: 综合2,作者认为其一个handler的暴露方式更适合模型的统一

handler作用

dubbo源码分析第九篇一消费者通信NettyClient _ JavaClub全栈架构师技术笔记

扩展点一 dubbo协议报文

  • 前16字节为dubbo协议魔术值[(0xdabb]
  • 32至64位表示映射请求响应的REQUEST_ID
  • 消息体长度
  • 消息体invocation内容
    dubbo源码分析第九篇一消费者通信NettyClient _ JavaClub全栈架构师技术笔记

扩展点一 codec与telnet codec

  • AbstractCodec主要提供基础能力,比如校验报文长度和查找编解码器
  • transportCodec 主要负责序列化反序列化以及自动清理流
  • telnet协议和dubbo codec协议共用一套 支持telnet命令
  • ExchangeCodec核心 负责字节流到Request/Response转换
  • DubboCodec 负责一些细节部分,比如dubbo协议的隐式参数附件等
  • DubboCountCodec 支持多消息处理
    dubbo源码分析第九篇一消费者通信NettyClient _ JavaClub全栈架构师技术笔记

作者:岁月人
来源链接:https://blog.csdn.net/qq_35529969/article/details/122519010

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/server/69282.html

标签:Netty Dubbo
分享给朋友:

“dubbo源码分析第九篇一消费者通信NettyClient” 的相关文章

Java 200+ 面试题补充③ Dubbo 模块 2022年07月09日 10:02:33
springboot整合netty 2022年08月08日 19:12:05
maven+springmvc+dubbo+zookeeper 2022年08月12日 18:18:27
SpringBoot集成Dubbo 2022年08月24日 20:06:14
dubbo-admin访问失败问题 2022年08月25日 10:25:42
1.Dubbo中的SPI机制 2022年08月25日 15:00:14
Dubbo快速开始与依赖(一) 2022年08月25日 17:26:54