当前位置:首页 > 服务端 > netty在dubbo中的应用

netty在dubbo中的应用

2022年09月16日 13:24:20服务端4

netty在dubbo中的应用

dubbo的底层通信是利用netty来实现的,出于好奇是如何实现的,把发现的过程记录一下。

首先down下来dubbo的源码,里面包含一个模块dubbo-demo,包含了dubbo-demo-provider和dubbo-demo-consumer,provider提供了一个DemoServiceImpl,我们需要启动它,启动时报错,看报错原因是连接不上注册中心,启动完zookeeper并修改了provider.xml中注册中心的配置,再次启动,ok!同上启动consumer,ok!

 

要想知道dubbo是如何应用netty的,首先要知道dubbo在哪里用到了,从dubbo的官网找到dubbo的框架图,再结合官网的介绍,可以大致推测出在Exchange和Transport这层用到的netty

http://dubbo.apache.org/docs/zh-cn/dev/sources/images/dubbo-framework.jpg

在Exchange这一层中找到ExchangeClient,点进abstract父类Client中,查看Client的继承类,找到了NettyClient,在NettyClient的方法中有doOpen和doConnect方法。隐隐觉得netty的初始化就是在这里,分别在这两个方法首行打上断点,然后debug provider,在启动的过程中居然没有进来。突然想到这是Client,应该是consumer启动的时候初始化才对。debug consumer,启动时果然进来了。

 

先看一下这个doOpen方法

 

 

@Override
protected void doOpen() throws Throwable {
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(nioEventLoopGroup)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
            .channel(NioSocketChannel.class);

    if (getTimeout() < 3000) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    } else {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
    }

    bootstrap.handler(new ChannelInitializer() {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                    .addLast("decoder", adapter.getDecoder())
                    .addLast("encoder", adapter.getEncoder())
                    .addLast("handler", nettyClientHandler);
        }
    });
}

从doOpen这个方法中可以看到在consumer初始化时会去使用netty去创建channel,从调用栈中追溯信息

 


可以看到在获取bean的时候的大致流程,dubbo重写了springFactoryBean,所以从ApplicationContext中获取bean的时候,调用的是ReferenceBean.getObject。然后init ReferenceConfig,创建了代理。中间有一段过程暂时不去理会,以后慢慢研究,大致应该是创建代理时需要从注册中心获取到provider的信息。在拉取到provider的信息后,需要open channel,这个时候就到了上面的NettyClient.doOpen这里。

 

当open了channel,这样一个service的代理就创建出来了。从架构图中可以看到,在真正调用这个service的时候,是调用的DubboInvoker,DubboInvoker再调用client.request。找到DubboInvoker,可以看到这个doInvoker方法

 

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
            ResponseFuture future = currentClient.request(inv, timeout);
            // For compatibility
            FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
            RpcContext.getContext().setFuture(futureAdapter);

            Result result;
            if (isAsyncFuture) {
                // register resultCallback, sometimes we need the asyn result being processed by the filter chain.
                result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
            } else {
                result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
            }
            return result;
        } else {
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

 

入参的invocation中携带了本次调用的信息,如接口名,方法名,参数等等,在set了attachment后,需要选择一个client,在这里打上断点,


可以看到,currentClient中的client是使用的netty,所以currentClient.send(inv, isSent);是使用了netty来send的。找到NettyClient,没有send方法,找到父类AbstractClient,找到send方法如下

 

@Override
public void send(Object message, boolean sent) throws RemotingException {
    if (send_reconnect && !isConnected()) {
        connect();
    }
    Channel channel = getChannel();
    //TODO Can the value returned by getChannel() be null? need improvement.
    if (channel == null || !channel.isConnected()) {
        throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
    }
    channel.send(message, sent);
}

 

 

居然还有个TODO? 在这里send了消息。当然还会对message做serialize处理才真正发送出去。

 

provider使用netty

 

从架构图中的transport层可以找到相关信息,Transport.bind()这里创建了server,找到Transport,然后找到Transport的子类NettyTransport

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}

可以看到这里的bind创建了NettyServer,戳进去

@Override
protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();

    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));

    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("handler", nettyServerHandler);
                }
            });
    // bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();

}

在NettyServer里有个doOpen(),这里看到了netty的熟悉的代码,和我们自己创建一个Netty Server的代码基本没有什么太大的差别,在这里打个断点,追踪一个栈信息


原来dubbo的ServiceBean implement了ApplicationListener,在spring容器启动的时候,调用了ServiceBean的onApplicationEvent,在这里最终创建了NettyServer。启动后就可以以netty来监听端口并接收消息。接收到消息后再调用相应的DubboHandler,解析消息,最终找到需要的Service,再通过反射来invoker。

作者:宗叶青
来源链接:https://blog.csdn.net/zongyeqing/article/details/83242557

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/server/42052.html

标签: Dubbo
分享给朋友:

“netty在dubbo中的应用” 的相关文章

Dubbo原理浅析

Dubbo原理浅析

一、Dubbo是什么 百度百科(没有什么比度娘知道的更多):Dubbo是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和Spring框架无缝集成。Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接...

Java 200+ 面试题补充③ Dubbo 模块

Java 200+ 面试题补充③ Dubbo 模块

昨天在我的 Java 面试粉丝群里,有一个只有一年开发经验的小伙伴只用了三天时间,就找到了一个年薪 20 万的工作,真是替他感到开心。 他的经历告诉我们:除了加强自我实战经验之外,还要努力积累自己的理论知识。 人生没有白走的路,也没有白吃的苦。你学的某一种知识...

Java高级面试必问—Dubbo面试题汇总

1、默认使用的是什么通信框架,还有别的选择吗?2、服务调用是阻塞的吗?3、一般使用什么注册中心?还有别的选择吗?4、默认使用什么序列化框架,你知道的还有哪些?5、服务提供者能实现失效踢出是什么原理?6、服务上线怎么不影响旧版本?7、如何解决服务调用链过长的问题?8、说说核心的配置...

Spring Boot Dubbo applications.properties 配置清单

Spring Boot Dubbo applications.properties 配置清单

摘要: 原创出处 www.bysocket.com 「泥瓦匠BYSocket 」欢迎转载,保留摘要,谢谢! 『 与其纠结,不如行动学习。Innovate ,And out execute ! 』 本文提纲 一、前言 二、applications.properti...

Springboot整合dubbo(一):搭建一个springboot + dubbo的微服务

Springboot整合dubbo(一):搭建一个springboot + dubbo的微服务

现在越来越多的公司开发项目中使用了springboot作为搭建服务的框架,因为springboot集成了一套完整项目所需要的基本的开发工具的jar包,无需再像之前开发一个spring项目中需要引入各种配置,只需要引入简单的几个配置就能达到项目的启动,大大减少了开发周期,使开发越来...

史上最全 40 道 Dubbo 面试题及答案,看完碾压面试官!

史上最全 40 道 Dubbo 面试题及答案,看完碾压面试官!

想往高处走,怎么能不懂 Dubbo? Dubbo是国内最出名的分布式服务框架,也是 Java 程序员必备的必会的框架之一。Dubbo 更是中高级面试过程中经常会问的技术,无论你是否用过,你都必须熟悉。 下面我为大家准备了一些 Dubbo 常见的的面...

搭建SpringBoot+dubbo+zookeeper+maven框架(一)

搭建SpringBoot+dubbo+zookeeper+maven框架(一)

这几天项目还没来,所以就自己试着参考网上的一些资料,搭建了一个SpringBoot+dubbo+zookeeper+maven框架,网上参考的很多资料照着他们一步一步搭建,最后很多都运行不通,很是郁闷,最后自己在总结了他们之后自己搭建了一个,项目的源码我会附在文章的最后,并且后期还会继...

Dubbo整合Nacos

Dubbo整合Nacos

Dubbo项目将Nacos作为其注册中心和配置中心 Nacos提供了四个主要功能 服务发现和服务运行状况检查(服务治理):Nacos使服务易于注册自己并通过DNS或HTTP接口发现其他服务。Nacos还提供服务的实时运行状况检查,以防...

SpringCloud系列之集成Dubbo应用篇

SpringCloud系列之集成Dubbo应用篇

目录 前言 项目版本 项目说明 集成Dubbo 2.6.x 新项目模块 老项目模块 集成Dubbo 2....

SpringCloud与Dubbo区别

为什么放弃Dubbo 使用SpringCloud?   相同点:SpringCloud 和Dubbo可以实现RPC远程调用框架,可以实现服务治理。   不同点:  SpringCloud是一套目前比较网站微服务框架了,整合了分...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。