当前位置:首页 > 服务端 > dubbo源码分析第九篇一消费者通信NettyClient

dubbo源码分析第九篇一消费者通信NettyClient

2022年11月10日 09:45:12服务端7

通信协议图示

  • dubbo使用netty4时建立三层handler: encode,decode,和nettyhandler[NettyClientHandler,NettyServerHandler]
  • dubbo通信三层exchange transport,codec分别表示交换层,传输层,编码层
  • exchange层主要是完成RpcRequest和RpcResponse的解析映射处理,同步转异步
  • transportor层主要是完成通信,比如选择netty 还是mina等
  • codec编解码以dubbo协议来说对应dubboCountCodec [netty的第一个handler]

dubbo源码分析第九篇一消费者通信NettyClient _ JavaClub全栈架构师技术笔记

源码分析

dubboProtocol.getClients

  • 默认采用共享单个客户端,多线程模型
private ExchangeClient[] getClients(URL url) {
     
 	是否共享连接
    boolean useShareConnect = false;
    获取要创建的连接数
    int connections = url.getParameter(CONNECTIONS_KEY, 0);
    共享的客户端
    List<ReferenceCountExchangeClient> shareClients = null;
    if (connections == 0) {
     默认为零
        useShareConnect = true;
        获取共享连接数量 默认1String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
        connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
		构建共享客户端数组
        shareClients = getSharedClient(url, connections);
    }
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
     
        if (useShareConnect) {
     
            clients[i] = shareClients.get(i);

        } else {
     
            clients[i] = initClient(url);
        }
    }
    return clients;
}

DubboProtocol.getSharedClient

  • 增加client被引用的数量,client引用数量为0则可以被销毁
  • ip:port为维度构建共享client
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
     
    String key = url.getAddress();
    List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
    if (checkClientCanUse(clients)) {
     
    	每个Invoker引用时候会增加客户端连接数 当客户端连接数<=0 可能会触发close
        batchClientRefIncr(clients);
        return clients;
    }
    locks.putIfAbsent(key, new Object());
    synchronized (locks.get(key)) {
     
        clients = referenceClientMap.get(key);
        二次检查
        if (checkClientCanUse(clients)) {
     
            batchClientRefIncr(clients);
            return clients;
        }
        至少创建一个连接 兜底逻辑
        connectNum = Math.max(connectNum, 1);
        创建客户端 放置referenceClientMap {
     ip:port,Client数组}
        if (CollectionUtils.isEmpty(clients)) {
     
            clients = buildReferenceCountExchangeClientList(url, connectNum);
            referenceClientMap.put(key, clients);
        } else {
     
        	检查不可用client 进行重建
            for (int i = 0; i < clients.size(); i++) {
     
                ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
                // If there is a client in the list that is no longer available, create a new one to replace him.
                if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
     
                    clients.set(i, buildReferenceCountExchangeClient(url));
                    continue;
                }
                referenceCountExchangeClient.incrementAndGetCount();
            }
        }
        locks.remove(key);
        return clients;
    }
}

buildReferenceCountExchangeClientList构建客户端

  • 按照连接数构建客户端
  private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
     
        List<ReferenceCountExchangeClient> clients = new ArrayList<>();
		默认共享场景下:共享连接为1for (int i = 0; i < connectNum; i++) {
     
            clients.add(buildReferenceCountExchangeClient(url));
        }

        return clients;
    }



构建客户端实现部分
 private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
     
        ExchangeClient exchangeClient = initClient(url);

        return new ReferenceCountExchangeClient(exchangeClient);
    }

构建 ExchangeClient

涉及dubbo远程通信三层 Exchange Transport codec

dubbo协议指定了codec 层 传入了Exchange层的requestHandler Transport层默认url不配置则根据spi指定为netty实现

initClient

  • 指定了codec编解码序列化层实现
  • 指定了传输层采用 netty
  • 指定了ExchangeHandlerAdapter作为exchange的核心实现
  • ExchangeHandlerAdapter通过Future加请求唯一id完成请求和响应映射
private ExchangeClient initClient(URL url) {
     

    指定transport层实现为netty
    String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
    指定codec编解码层为DubboCountCodec
    url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
    指定心跳的时长为1分钟
    url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

   ...... 删除其他代码
    ExchangeClient client;
    try {
     
        connection should be lazy 是否懒连接
        if (url.getParameter(LAZY_CONNECT_KEY, false)) {
     
            client = new LazyConnectExchangeClient(url, requestHandler);

        } else {
     
        	默认走直接创建Client
            client = Exchangers.connect(url, requestHandler);
        }

    } catch (RemotingException e) {
     
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }

    return client;
}

HeaderExchanger.connect

  • ExchangeClient构建 完成exchange层的handler增强 对dubboProtocol传入的handler进行增强形成完整的exchange层
  • 通过调用Transporters.bind完成传输层的选择,默认选择netty
    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
     
        if (url == null) {
     
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
     
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        基于spi获取Exchanger,有且只有HeaderExchanger
        return getExchanger(url).connect(url, handler);
    }
  • 完成 exchange-transport-codec三层统一暴露
public class HeaderExchanger implements Exchanger {
     
    public static final String NAME = "header";
    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
     
    	构建exchange层 封装transport层和exchange层的核心 handler
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }
}

Transporters.connect

  • getTransporter通过spi选择NettyTransporter
  • NettyTransporter实现netty骨架编排
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
     
   	...... 删除其他代码
    return getTransporter().connect(url, handler);
}
public class NettyTransporter implements Transporter {
     
    public static final String NAME = "netty";
    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
     
    	负责netty骨架编排
        return new NettyClient(url, listener);
    }
}

nettyclient编排

  • 增强handler
  • 根据url获取codec=>DubboCountCodec
  • 编排netty模型 netty的channelhandle统一采用三层架构[encode decode bizHandler]
MultiMessageHandler HeartbeatHandler AllChannelHandler DecodeHandler HeaderExchangeHandler ExchangeHandlerAdapter
多消息处理 心跳处理 支持多线程注入 支持编解码 request和response 映射 完成方法查找调用
  public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
     
  		netty框架编排前 handler增强成如下结构
    	MultiMessageHandler->HeartbeatHandler->AllChannelHandler
    	DecodeHandler->HeaderExchangeHandler->DubboProtocol.ExchangeHandlerAdapter
    	super(url, wrapChannelHandler(url, handler));
    }
    public AbstractEndpoint(URL url, ChannelHandler handler) {
     
    	将handler设置到父类的handler属性
        super(url, handler);
        将codec属性设置为spi名为dubbo的编解码器DubboCountCodec
        this.codec = getChannelCodec(url);
        this.timeout = url.getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    }
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
     
    根据url赋值codec到codec属性
    handler赋值到handler属性
    super(url, handler);
    netty框架 编排
    doOpen();
    打开链接
    connect();
    设置线程池
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
            .getDefaultExtension().get(CONSUMER_SIDE, Integer.toString(url.getPort()));
    ExtensionLoader.getExtensionLoader(DataStore.class)
            .getDefaultExtension().remove(CONSUMER_SIDE, Integer.toString(url.getPort()));
}

NettyClient.doOpen完成netty框架编排

  • handler结构: NettyClientHandler-> NettyClient->NettyClient.handler
  • 编码结构: NettyCodecAdapter.InternalEncoder->DubboCountCodec->DubboCodec
  • 解码结构: NettyCodecAdapter.InternalDecoder->DubboCountCodec->DubboCodec
protected void doOpen() throws Throwable {
     
		封装handler
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        引导启动器
        bootstrap = new Bootstrap();
        设置线程组
        bootstrap.group(nioEventLoopGroup)
        		保持长链接
                .option(ChannelOption.SO_KEEPALIVE, true)
                 关闭小包滞连 Nagle算法
                .option(ChannelOption.TCP_NODELAY, true)
                 池化直接内存管理netty内存,参加伙伴算法分配,本文不介绍netty,后续分享netty源码
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                超时配置
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                 指定SocketChannel
                .channel(NioSocketChannel.class);

        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
        设置channel初始化器 初始化handler
        bootstrap.handler(new ChannelInitializer() {
     
			构建三层handler架构模型[decoder,encoder,handler]
            @Override
            protected void initChannel(Channel ch) throws Exception {
     
                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", adapter.getDecoder())
                        .addLast("encoder", adapter.getEncoder())	
                        固定空闲检测handler
                        .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                        .addLast("handler", nettyClientHandler);
                String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
                if(socksProxyHost != null) {
     
                    int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                    Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                    ch.pipeline().addFirst(socks5ProxyHandler);
                }
            }
        });
    }

总结

  • 消费端通信层架设
  • netty编排的实现
  • handler采用装饰者设计模式
  • 采用netty实现时,dubbo remote三层对应netty编排如下
exchange transport codec
netty handler 选择netty mina 实现 netty 编解码handler
  • dubbo 映射为 handler 选择netty通信框架

扩展点一handler概述

handler为什么采用装饰者模式,而不采用netty的管道链式模型

  • 引用相关书籍解释: Netty一次完整的RPC调用贯穿了一系列的Handler,如果直接挂载到底层通信框架(Netty ,因为整个链路比较长,则需要触发大量链式查找和事件,不仅低效,而且浪费资源
  • 作者理解1: 相关书籍解释并未问题,但就效率来说,dubbo的场景是rpc调用,在实际业务场景中,这个调用链路最耗时的是业务IO,而非链式查找的cpu消耗;
  • 作者理解2: 假设作为dubbo框架开发,你需要完成dubbo handler对不同框架的适配,比如netty和mina,你肯定希望入侵越小越好,结合越简单越好,我们通过装饰者模式完成整个业务handler处理,只需要将最外层的handler与通信框架的处理器结合即可,比如netty的channelhandler,这样无需将多个handler编排到netty,而如果是多个handler,编排netty和mina一定是两套适配模板
  • 作者理解3: 综合2,作者认为其一个handler的暴露方式更适合模型的统一

handler作用

dubbo源码分析第九篇一消费者通信NettyClient _ JavaClub全栈架构师技术笔记

扩展点一 dubbo协议报文

  • 前16字节为dubbo协议魔术值[(0xdabb]
  • 32至64位表示映射请求响应的REQUEST_ID
  • 消息体长度
  • 消息体invocation内容
    dubbo源码分析第九篇一消费者通信NettyClient _ JavaClub全栈架构师技术笔记

扩展点一 codec与telnet codec

  • AbstractCodec主要提供基础能力,比如校验报文长度和查找编解码器
  • transportCodec 主要负责序列化反序列化以及自动清理流
  • telnet协议和dubbo codec协议共用一套 支持telnet命令
  • ExchangeCodec核心 负责字节流到Request/Response转换
  • DubboCodec 负责一些细节部分,比如dubbo协议的隐式参数附件等
  • DubboCountCodec 支持多消息处理
    dubbo源码分析第九篇一消费者通信NettyClient _ JavaClub全栈架构师技术笔记

作者:岁月人
来源链接:https://blog.csdn.net/qq_35529969/article/details/122519010

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/server/69282.html

标签: NettyDubbo
分享给朋友:

“dubbo源码分析第九篇一消费者通信NettyClient” 的相关文章

Spring Boot 中如何使用 Dubbo Activate 扩展点

Spring Boot 中如何使用 Dubbo Activate 扩展点

摘要: 原创出处 www.bysocket.com 「泥瓦匠BYSocket 」欢迎转载,保留摘要,谢谢! 『 公司的核心竞争力在于创新 – 《启示录》 』 继续上一篇:《 Springboot 整合 Dubbo/ZooKeeper 》,在...

Java高级面试必问—Dubbo面试题汇总

1、默认使用的是什么通信框架,还有别的选择吗?2、服务调用是阻塞的吗?3、一般使用什么注册中心?还有别的选择吗?4、默认使用什么序列化框架,你知道的还有哪些?5、服务提供者能实现失效踢出是什么原理?6、服务上线怎么不影响旧版本?7、如何解决服务调用链过长的问题?8、说说核心的配置...

SpringMVC、Zookeeper、Dubbo使用

SpringMVC、Zookeeper、Dubbo使用

互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式服务架构以及流动计算架构势在必行,Dubbo是一个分布式服务框架,在这种情况下诞生的。现在核心业务抽取出来,作为独立的服务,使前端应用能更快速和稳定的响应。 第一:介绍Dubbo背景 ...

Spring Boot Dubbo applications.properties 配置清单

Spring Boot Dubbo applications.properties 配置清单

摘要: 原创出处 www.bysocket.com 「泥瓦匠BYSocket 」欢迎转载,保留摘要,谢谢! 『 与其纠结,不如行动学习。Innovate ,And out execute ! 』 本文提纲 一、前言 二、applications.properti...

详解SpringMVC注解方式集成Dubbo

由于最近项目需要SpringMVC集成Dubbo,本文大概记录下详细集成过程: 一、首先项目中Maven引入Jar包 <!-- dubbo相关 --> <dependency> <gr...

springcloud gateway nullpointerexception (NettyRoutingFilter)

springcloud gateway nullpointerexception (NettyRoutingFilter)

最近在做一个下载功能时,发现直接调用服务是可以下载的,但是通过gateway路由下载会报NPE异常,具体如下 java.lang.NullPointerException: null at java.util.concurrent.ConcurrentH...

搭建SpringBoot+dubbo+zookeeper+maven框架(一)

搭建SpringBoot+dubbo+zookeeper+maven框架(一)

这几天项目还没来,所以就自己试着参考网上的一些资料,搭建了一个SpringBoot+dubbo+zookeeper+maven框架,网上参考的很多资料照着他们一步一步搭建,最后很多都运行不通,很是郁闷,最后自己在总结了他们之后自己搭建了一个,项目的源码我会附在文章的最后,并且后期还会继...

Java与Netty实现高性能高并发

Java与Netty实现高性能高并发

http://blog.csdn.net/nicajonh/article/details/54985352 1. 背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了1...

SpringCloud采用Dubbo远程调用(SpringCloud Alibaba)

SpringCloud采用Dubbo远程调用(SpringCloud Alibaba)

系统架构:      这里只演示以下组件,其他组件的使用和SpringCloud一样     application-1 :应用1,模拟应用,提供http接口服务。     service-1 :微服务1,模拟微服务,提供dubbo接口服务。     se...

Dubbo常用标签

Dubbo中常用有7个标签。 分为三个类别:公用标签,服务提供者标签,服务消费者标签 公用标签 <dubbo:application/>和  <dubbo:registry/> A、配置应用信息 <du...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。