当前位置:首页 > 服务端 > Dubbo协议服务暴露与引用以及源码分析

Dubbo协议服务暴露与引用以及源码分析

2022年09月17日 20:32:17服务端4

前言

上次小编讲到了dubbo的远程传输详解,接下来本来要写dubbo的服务治理,不过鉴于dubbo版本以及基本是可视化的一些页面操作,小编就略过了,前面在小编的Dubbo 服务化最佳实践以及用redis作为注册中心的原理讲解。其中的控制后台与监控中心有引用他人的文章感兴趣的小伙伴可以看一下,接下来小编为大家带来dubbo服务的暴露与引用代码示例,同时带大家读一下这一块的dubbo核心源码。话不多说进入正题。

Dubbo 整体框架

dubbo调用整体框架示意图:

Dubbo协议服务暴露与引用以及源码分析 _ JavaClub全栈架构师技术笔记
上面是dubbo业务调用过程的整个框架示例图。前面小编也给大家讲过。接下来看一下各个功能在dubbo框架中的模块分包情况。

模块分包

Dubbo协议服务暴露与引用以及源码分析 _ JavaClub全栈架构师技术笔记

  1. register:注册模块,包含了消费和服务端。对应上面架构为服务暴露和服务发现。
  2. cluster:集群模块。对应上面架构的调用模块,容错负载均衡等等。
  3. common:公共模块,公共调用的方法等。
  4. config:配置模块,比方说serverConfig,referenceConfig等等一些配置体系。
  5. rpc:远程调用,上面架构为rpc协议部分,包含比较多。
  6. remoting:远程通讯,编解码与网络传输
  7. container:容器模块,主要包含SpringContainer
  8. monitor:监控,通过过滤器实现,可以监控服务和客户端的调用情况。

当然大家如果去看dubbo源码分包不是这样的,因为集群放入了rpc模块中,这边小编参考了大神的一些理解而做出的简易模块图。

上面rpc和remoting就可以实现远程调用。

Dubbo架构

dubbo官方提供的整体价格图
Dubbo协议服务暴露与引用以及源码分析 _ JavaClub全栈架构师技术笔记
Dubbo官方提供的该架构图很复杂,小编第一次看到的时候头疼,并且不好理解。这边小编稍微给大家整理一下:

  • 从左往右分为两部份,左半边蓝色背景的部分代表服务消费者,右半边绿色背景的部分代表服务提供者。
  • 从上往下看又分为九层,左边九层又被分为了三大类,分别是面向用户的 business,框架核心 RPC 以及负责远程传输的 Remoting。
  • 图的右边又划分为了两类,上面两层是面向用户的API,而下面七层是面向扩展提供者的SPI。
  • 图中的线代表对象与对象之间不同的关系,紫色代表继承、黑色代表依赖、蓝色虚线代表服务注册、服务订阅的过程,也就是部署阶段,红色代表一次完整的RPC调用,也就是运行阶段。顺着红色的线,可以体验一次完整的 rpc 调用是如何进行的。

接着小编一层一层大致说一下

  1. Service层:为业务层面,消费端引用接口并且调用,服务端的话提供接口并且实现。
  2. Config层:为配置层,对于服务提供方来说,使用ServiceConfig API来代表一个要发布的服务配置对象,对于服务消费方来说,ReferenceConfig代表了一个要消费的服务的配置对象。
  3. Proxy层:扩展接口为ProxyFactory,用来对服务提供方和服务消费方的服务进行代理(上图可生成Proxy和Invoker)。,Dubbo提供的实现主要有JavassistProxyFactory(默认使用)和JdkProxyFactory。
  4. Register层:封装服务地址的注册与发现。
  5. Cluster层:封装多个提供者的路由及负载均衡,集群容错等等。
  6. Monitor:RPC调用次数和时间监控。
  7. Protocol层:协议层,封装RPC调用,上图最重要的就是dubbo协议。
  8. Exchange层:交换层,对呀response与request的一些封装。
  9. Transport层:交流层,使用netty,绑定服务建立客户端连接,发送和接受消息,顺便完成了编解码的操作。
  10. Serialize层:数据序列化。

接下来我们讲最核心的,就是远程调用,即暴露服务,服务引用,服务调用。

服务暴露与服务引用

服务暴露和引用的简单结构图
Dubbo协议服务暴露与引用以及源码分析 _ JavaClub全栈架构师技术笔记
从上图来看,协议最重要的核心是服务暴露和引用,当然还有服务注销,获取端口号等。
协议的代码理论图
Dubbo协议服务暴露与引用以及源码分析 _ JavaClub全栈架构师技术笔记
服务暴露说明:
上面exporterMap就是协议容器之一,保存了invoker,然后invoker根据反射原理调用到服务的实现,当然exporterMap还依赖到了Exporter对象,对invoker做了一层封装然后放进容器中。
然后得创建一个netteryServer来暴露服务,同样这个远程服务RemotingServer需要放入serverMap,当然如果同一个服务实现有多个端口,则需要创建多个NettyServer,同样中间还有两个Server(protocolServer和ExchangeServer)做一些封装等操作来绑定我们的远程服务。


服务引用说明
接下来是服务的引用,这边invokers就是引用所需要的容器,他包含invoker,invoker的实现为dubboinvoker,发起远程调用,和服务暴露相同,需要NettyClient来发起远程调用,dubboProtocol是来创建dubboInvoker。dubboProtocol中的requestHandler就是netty的channelHandler,当他接收消息之后,编解码等操作完成,nettyServer就可以做后续的调用操作了。

注意:客户端invoker的实现和服务端invoker实现逻辑不一样,客户端用来调用且封装了多个过滤器,同步转异步等操作,而服务端invoker主要做了反射调用到服务真正实现的方法。

服务暴露和服务引用的流程图
Dubbo协议服务暴露与引用以及源码分析 _ JavaClub全栈架构师技术笔记

源码示例

public class DubboProcotolTest {
     
    private static final String URL_TEXT = "dubbo://127.0.0.1:20880/com.learn.code.IUserService";
    /**
     * 暴露服务
     */
    @Test
    public void exportServer() throws IOException {
     
        DubboProtocol dubboProtocol = new DubboProtocol();
        ProxyFactory proxyFactory = new JdkProxyFactory();
        UserService userService = new UserService();
        Invoker<IUserService> invoker = proxyFactory.getInvoker(userService, IUserService.class, URL.valueOf(URL_TEXT));
        dubboProtocol.export(invoker);
        ApplicationModel.getServiceRepository().registerService(IUserService.class);
        System.out.println("服务暴露成功");
        System.in.read();
    }

    @Test
    public void referServer() throws IOException {
     
        DubboProtocol dubboProtocol = new DubboProtocol();
        Invoker<IUserService> invoker = dubboProtocol.refer(IUserService.class, URL.valueOf(URL_TEXT));
        ProxyFactory proxyFactory = new JdkProxyFactory();
        IUserService proxy = proxyFactory.getProxy(invoker);
        System.out.println(proxy.getUser(1L));

    }
}

打印结果

UserDto(id=1, age=18, name=Bob, desc=当前服务:null)

暴露服务的源代码解析

相关源码:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
     
		//获取封装进去的url
        URL url = invoker.getUrl();

        // export service.
        //key为com.learn.code.IUserService:20880
        String key = serviceKey(url);
        //封装成DubboExporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        //放入exportMap
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
     
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
     
                if (logger.isWarnEnabled()) {
     
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            }
        }
		//开启服务看下一段代码片段
        openServer(url);
        optimizeSerialization(url);

        return exporter;
    }

开启服务

private void openServer(URL url) {
     
        // find server. 获取address地址127.0.0.1:20880
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        //这边双重锁判断是否已经创建
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
     
            ProtocolServer server = serverMap.get(key);
            if (server == null) {
     
                synchronized (this) {
     
                    server = serverMap.get(key);
                    if (server == null) {
     
                    	//创建一个server放入serverMap,创建逻辑下一个代码快
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
     
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

创建一个服务

private ProtocolServer createServer(URL url) {
     
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
     
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
     
        	//上面做一系列封装后,exchangers用来绑定的,requestHandler非常重要,主要是用来反射调用到对应的方法实现
        	//bind最终调用到的地方请看org.apache.dubbo.remoting.transport.netty4.NettyTransporter.bind
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
     
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
     
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
     
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return new DubboProtocolServer(server);
    }

//NettyTransporter netty相关代码
public class NettyTransporter implements Transporter {
     

    public static final String NAME = "netty";

    @Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
     
        return new NettyServer(url, handler);
    }


}

requestHandler根据传入的方法参数方法等真正调用到服务端的实现方法

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
     

        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
     

            if (!(message instanceof Invocation)) {
     
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
     
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
     
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
     
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
     
                        if (inv.getMethodName().equals(method)) {
     
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
     
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            //上面代码演示则是userService.getUser
            Result result = invoker.invoke(inv);
            return result.thenApply(Function.identity());
        }

服务暴露与理论及流程图关联小结

  1. 暴露服务核心为DubboProtocol调用export方法,传入参数为Invoker,这Invoker主要是对服务的调用,这个Invoker会存储在exporterMap中,key为interface + port
  2. 接着需要去开启服务,即open一个nettyServer,当然这个同样存储在serverMap中key为ip + port号
  3. 这边比较重要的就是requestHandler,这个requestHandler的触发时机主要为客户端发送的时候
  4. 客户端会发送一个Invocation,Invocation包含了接口,接口方法名以及方法参数,这样可以通过exporterMap找到对应的Invoker,之后就可以通过反射调用服务端的方法了。

这样上面的所有暴露服务的理论与源码关联起来了

暴露引用的源代码解析

相关源码:org.apache.dubbo.rpc.protocol.AbstractProtocol#refer

@Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
     
    	//异步转同步,protocolBindingRefer最终会调到dubooProctol中去
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }

DubboProtocol

@Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
     
        optimizeSerialization(url);

        // create rpc invoker.
        // getClients获取连接
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }
private ExchangeClient[] getClients(URL url) {
     
        // whether to share connection
		//是否共享连接
        boolean useShareConnect = false;
		//默认为0
        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
     
        	
            useShareConnect = true;

            /*
             * The xml configuration should have a higher priority than properties.
             */
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                    DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            //获取共享连接
            shareClients = getSharedClient(url, connections);
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
     
            if (useShareConnect) {
     
                clients[i] = shareClients.get(i);

            } else {
     
                clients[i] = initClient(url);
            }
        }

        return clients;
    }
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
     
        String key = url.getAddress();
        //是否已经存在,有直接返回
        List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);

        if (checkClientCanUse(clients)) {
     
            batchClientRefIncr(clients);
            return clients;
        }

        locks.putIfAbsent(key, new Object());
        //双重锁检测
        synchronized (locks.get(key)) {
     
            clients = referenceClientMap.get(key);
            // dubbo check
            if (checkClientCanUse(clients)) {
     
                batchClientRefIncr(clients);
                return clients;
            }

            // connectNum must be greater than or equal to 1
            connectNum = Math.max(connectNum, 1);

            // If the clients is empty, then the first initialization is
            if (CollectionUtils.isEmpty(clients)) {
     
            	//第一次初始化连接代码
                clients = buildReferenceCountExchangeClientList(url, connectNum);
                referenceClientMap.put(key, clients);

            } else {
     
                for (int i = 0; i < clients.size(); i++) {
     
                    ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
                    // If there is a client in the list that is no longer available, create a new one to replace him.
                    if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
     
                        clients.set(i, buildReferenceCountExchangeClient(url));
                        continue;
                    }

                    referenceCountExchangeClient.incrementAndGetCount();
                }
            }

            /*
             * I understand that the purpose of the remove operation here is to avoid the expired url key
             * always occupying this memory space.
             */
            locks.remove(key);

            return clients;
        }
    }
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
     
        List<ReferenceCountExchangeClient> clients = new ArrayList<>();

        for (int i = 0; i < connectNum; i++) {
     
        	//最终调用到org.apache.dubbo.remoting.transport.netty4.NettyTransporter.connect
            clients.add(buildReferenceCountExchangeClient(url));
        }

        return clients;
    }
//NettyTransporter netty相关代码
public class NettyTransporter implements Transporter {
     

    public static final String NAME = "netty";

    @Override
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
     
        return new NettyClient(url, handler);
    }

}

服务引用与理论及流程图关联小结
服务引用最主要的事情第一个是建立连接,这里是nettyClient这样就可以远程调用,第二个创建一个代理对象,构建invoker。

  1. 创建DubboInvoker,然后用AsyncToSyncInvoker包装返回
  2. 创建DubboInvoker的时候需要获取连接,判断是否共享连接
  3. 默认共享连接,然后去获得连接,最终就会使用NettyClient创建连接(可以设置私有连接,则会有多个连接)
  4. referenceClientMap保存连接 key 为ip + port
  5. 返回Invoker后使用代理进行方法的调用即可

这样上面的所有服务引用的理论与源码关联起来了

共享连接:当客户端有多个服务调用的时候,比方有Aservice和Bservice调用远程服务只有一个,则dubbo默认情况下只有一条连接,这样Aservice和Bservice共用一条连接,这个共享连接为长连接。当远程服务提供有多个,则会创建另一条共享连接。因为共享连接的原因,所以dubbo要求数据传输尽可能小一点,并且希望快进快出。
当然可以设置私有连接,则客户端调用的时候使用connections,如@DubboReference(connections =2 ),这样就会有多个私有连接,到同一个服务提供者。

总结

小编认为源码阅读以及源码对应的理论还是需要多实践,即多打断点才能整理清楚。希望大家和小编一样有所收获,这边感谢源码阅读网,鲁班大叔的相关讲解。

作者:木兮君
来源链接:https://blog.csdn.net/a1032722788/article/details/115094271

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/server/42895.html

标签: Dubbo
分享给朋友:

“Dubbo协议服务暴露与引用以及源码分析” 的相关文章

SpringBoot整合Dubbo与zookeeper纯注解版

SpringBoot整合Dubbo与zookeeper纯注解版

一、Dubbo和zk的作用 上回讲到,Dubbo作为一款优秀的RPC框架,封装了dubbo-provider(提供者)和dubbo-consumer(消费者),而provider和consumer之间需要通过注册中心来作为可发现的服务目录。而zookeeper(此处简称zk)提供了服务接口注...

Springboot 整合 Dubbo/ZooKeeper 详解 SOA 案例

Springboot 整合 Dubbo/ZooKeeper 详解 SOA 案例

摘要: 原创出处:www.bysocket.com 泥瓦匠BYSocket 希望转载,保留摘要,谢谢!...

dubbo+spring_maven 遇到的问题 Error creating bean with name '***': Instantiation of bean failed;

Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'cpUserAgent': Instantia...

dubbo监控报错Error creating bean with name 'uriBrokerService'

dubbo监控报错Error creating bean with name 'uriBrokerService'

在jdk1.8下面会出现此错误   解决方法: 1、更换服务器jdk版本。(会影响其他项目环境) 2、修改dubbo-admin tomcat默认jdk版本。 3、修改dubbo-admin项目依赖(dependency)从新打包。 进入h...

dubbo 2.5.4-SNAPSHOT dubbo-admin 报错

问题描述:RROR context.ContextLoader - Context initialization failed org.springframework.beans.factory.BeanCreationException: Error creating bean wi...

SpringBoot与Dubbo整合的三种方式

SpringBoot与Dubbo整合的三种方式

1. 使用默认application.properties和注解的方式 导入dubbo-starter,在application.properties配置属性,使用@Service注解来暴露服务,使用@Reference来引用服务。具体可参考 Dubbo整合Spring...

SpringMVC、Zookeeper、Dubbo使用

SpringMVC、Zookeeper、Dubbo使用

互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式服务架构以及流动计算架构势在必行,Dubbo是一个分布式服务框架,在这种情况下诞生的。现在核心业务抽取出来,作为独立的服务,使前端应用能更快速和稳定的响应。 第一:介绍Dubbo背景 ...

详解SpringMVC注解方式集成Dubbo

由于最近项目需要SpringMVC集成Dubbo,本文大概记录下详细集成过程: 一、首先项目中Maven引入Jar包 <!-- dubbo相关 --> <dependency> <gr...

maven+springmvc+dubbo+zookeeper

maven+springmvc+dubbo+zookeeper

springmvc+dubbo+zookeeper整合     为什么要用dubbo?   还是让官方来解释吧: http://d...

Springboot整合dubbo(一):搭建一个springboot + dubbo的微服务

Springboot整合dubbo(一):搭建一个springboot + dubbo的微服务

现在越来越多的公司开发项目中使用了springboot作为搭建服务的框架,因为springboot集成了一套完整项目所需要的基本的开发工具的jar包,无需再像之前开发一个spring项目中需要引入各种配置,只需要引入简单的几个配置就能达到项目的启动,大大减少了开发周期,使开发越来...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。