当前位置: 首页 >服务端 > Netty介绍及实战(一)

Netty介绍及实战(一)

一、介绍

Netty的主要目的是构建基于NIO的高性能协议服务器,实现网络和业务逻辑组件的分离解耦。

二、核心

1.Netty是一个非阻塞框架。与阻塞IO相比,这导致了高吞吐量。

2.管道是Java NIO的基础。它表示一个能够进行读写等IO操作的链接。

3.特点:我们在调用之后立即返回每个请求操作。操作完成后,我们可以传递一个回调给ChannelFuture。

4.处理器:管道事件处理程序器基本接口是ChannelHandler及其子类的ChannelOutboundHandlerChannelInboundHandler。值得注意的是他们是空的实现,当我们想要实现相关业务时,我们必须要自己扩展对应的handler.

5.编解码:由于Netty是基于TCP进行的网络传输,所以我们需要执行数据序列化和反序列化。为此,Netty提供了ChannelInboundHandler的解码器ByteToMessageDecoder,Netty提供了ChannelOutboundHandler的编码器MessageToByteEncoder。

我们可以使用编码器和解码器将消息从字节序列转换为Java对象,反之亦然。

三、实战

1.首先我们引入netty的依赖:

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.10.Final</version></dependency>

2.我们创建客户端的代码(这里只是简单的利用main方法启动,服务端的demo会集成springboot):

package com.netty.client;import com.netty.client.echo.TestBusinessHandler;import com.netty.nettyServer.decoder.DecoderHabdler;import com.netty.nettyServer.encoder.EncoderHandler;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.logging.LoggingHandler;public class TcpClient {private String ip;private int port;public  void init() throws InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group);bootstrap.channel(NioSocketChannel.class);bootstrap.option(ChannelOption.SO_KEEPALIVE,true);bootstrap.handler(new ChannelInitializer() {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));ch.pipeline().addLast("encode",new EncoderHandler());//编码器。发送消息时候用ch.pipeline().addLast("decode",new DecoderHabdler());//解码器,接收消息时候用ch.pipeline().addLast("handle",new TestBusinessHandler());}});bootstrap.remoteAddress(ip,port);ChannelFuture future = bootstrap.connect().sync();future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {group.shutdownGracefully().sync();}}public TcpClient(String ip, int port) {this.ip = ip;this.port = port;}public static void main(String[] args) throws InterruptedException {new TcpClient("127.0.0.1",20000).init();}}

2.1客户端的业务处理器

package com.netty.client.echo;import com.netty.common.TestPctProtocol;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;public class TestBusinessHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {TestPctProtocol testPctProtocolReveive = new TestPctProtocol();if(msg instanceof Object){testPctProtocolReveive = (TestPctProtocol)msg;}else{retu;}}//连接成功后发送消息测试@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {TestPctProtocol testPctProtocol = new TestPctProtocol();testPctProtocol.setHeader((short)100);String message = "哈哈!!客户端连接上了";System.out.print(message);testPctProtocol.setLength(message.getBytes(CharsetUtil.UTF_8).length);testPctProtocol.setData(message.getBytes(CharsetUtil.UTF_8));ctx.writeAndFlush(testPctProtocol);}}

3.创建服务端的代码(利用springboot容器启动)

3.1创建application(注意我们继承了SpringBootServletInitializer)

package com.netty;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.builder.SpringApplicationBuilder;import org.springframework.boot.web.support.SpringBootServletInitializer;/** * @author kedacom * @date 2018-11-16 */@SpringBootApplication( scanBasePackages = {"com.netty.**"})public class Application extends SpringBootServletInitializer {public static void main(String[] args) {SpringApplication.run(Application.class, args);}@Overrideprotected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {// 注意这里要指向原先用main方法执行的Application启动类retu builder.sources(Application.class);}}

3.2创建服务端代码

package com.netty.nettyServer;import com.netty.nettyServer.businessHandler.BusinessHandler;import com.netty.nettyServer.decoder.DecoderHabdler;import com.netty.nettyServer.encoder.EncoderHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.logging.LoggingHandler;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import lombok.exte.log4j.Log4j;import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;import java.nio.ByteOrder;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;@Component@Log4jpublic class TcpServer {@AutowiredBusinessHandler testBusinessHandler;@Value("${netty.tcp.listener.port:20000}")private  int port=20000;private static Map<String, Channel> map = new ConcurrentHashMap<String, Channel>();NioEventLoopGroup boss = new NioEventLoopGroup();//主线程组NioEventLoopGroup work = new NioEventLoopGroup();//工作线程组private Channel serverChannel;public  void init(){log.info("正在启动tcp服务器……");try {ServerBootstrap bootstrap = new ServerBootstrap();//引导对象bootstrap.group(boss,work);//配置工作线程组bootstrap.channel(NioServerSocketChannel.class);//配置为NIO的socket通道bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {//绑定通道参数//在官方提供的示例中,Length是0x000C,高位在前,低位在后 但是报文给的是低位在前 高位在后//解决粘包问题  释义:读第72个字节后面的4个字节 来截取报文的长度ch.pipeline().addLast("chaiBao",new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN,10*1024,72,4,0,0,true));ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));//设置log监听器,并且日志级别为debug,方便观察运行流程ch.pipeline().addLast("encode",new EncoderHandler());//编码器。发送消息时候用ch.pipeline().addLast("decode",new DecoderHabdler());//解码器,接收消息时候用ch.pipeline().addLast("handler",testBusinessHandler);//业务处理类,最终的消息会在这个handler中进行业务处理}});bootstrap.option(ChannelOption.SO_BACKLOG,1024);//缓冲区bootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);//ChannelOption对象设置TCP套接字的参数,非必须步骤ChannelFuture future = bootstrap.bind(port).sync();//使用了Future来启动线程,并绑定了端口log.info("启动tcp服务器启动成功,正在监听端口:"+port);future.channel().closeFuture().sync();//以异步的方式关闭端口serverChannel =  bootstrap.bind(port).sync().channel().closeFuture().sync().channel();}catch (InterruptedException e) {log.info("启动出现异常:"+e);}finally {work.shutdownGracefully();boss.shutdownGracefully();//出现异常后,关闭线程组log.info("tcp服务器已经关闭");}}public static void main(String[] args) {new TcpServer().init();}public static Map<String, Channel> getMap() {retu map;}public static void setMap(Map<String, Channel> map) {TcpServer.map = map;}@PreDestroypublic void destory() throws InterruptedException {boss.shutdownGracefully().sync();work.shutdownGracefully().sync();log.info("关闭tomcat的时候同时--》关闭Netty-------------------------------------------");}}

3.3 将服务端绑定到spring的生命周期(这一步十分的重要)

package com.netty.nettyServer;import lombok.SneakyThrows;import lombok.exte.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.core.annotation.Order;import org.springframework.scheduling.annotation.Async;import org.springframework.stereotype.Component;/** * @description: */@Component@Slf4j@Order(1)public class BaseDataInit implements CommandLineRunner {@AutowiredTcpServer tcpServer;@SneakyThrows@Override@Asyncpublic void run(String... strings){//注意 一定要等到bean加载完之后再加载netty  否则netty里面使用的时候会无响应log.info("开始启动netty服务端");tcpServer.init();}}

3.4 编写业务处理器

package com.netty.nettyServer.businessHandler;import com.netty.common.TestPctProtocol;import com.netty.nettyServer.TcpServer;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;import org.springframework.stereotype.Component;@Component@io.netty.channel.ChannelHandler.Sharablepublic class BusinessHandler extends ChannelInboundHandlerAdapter {/** todo 根据业务注入需要的bean* */@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);System.out.print("客户端"+getRemoteAddress(ctx)+" 接入连接");//往channel map中添加channel信息TcpServer.getMap().put(getIPString(ctx), ctx.channel());}public static String getIPString(ChannelHandlerContext ctx){String ipString = "";String socketString = ctx.channel().remoteAddress().toString();int colonAt = socketString.indexOf(":");ipString = socketString.substring(1, colonAt);retu ipString;}public static String getRemoteAddress(ChannelHandlerContext ctx){String socketString = "";socketString = ctx.channel().remoteAddress().toString();retu socketString;}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {//删除Channel Map中的失效ClientTcpServer.getMap().remove(getIPString(ctx));ctx.close();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {TestPctProtocol testPctProtocolReveive = new TestPctProtocol();if(msg instanceof Object){testPctProtocolReveive = (TestPctProtocol)msg;}else{retu;}testPctProtocolReveive.setHeader((short)100);String message = "我接收到了"+getIPString(ctx)+"发过来的消息";testPctProtocolReveive.setLength(message.length());this.constructHeader((short)100,message,ctx);}public void constructHeader(short command,String message,ChannelHandlerContext ctx){TestPctProtocol testPctProtocol = new TestPctProtocol();testPctProtocol.setHeader((short)261);byte[] message1 = message.getBytes(CharsetUtil.UTF_8);testPctProtocol.setLength(message.length());testPctProtocol.setData(message1);ctx.writeAndFlush(testPctProtocol);}}

3.5 编解码器

package com.netty.nettyServer.encoder;import com.netty.common.TestPctProtocol;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;import org.apache.log4j.Logger;public class EncoderHandler extends MessageToByteEncoder {private Logger logger = Logger.getLogger(this.getClass());protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {if (msg instanceof TestPctProtocol){TestPctProtocol protocol = (TestPctProtocol) msg;out.writeShortLE(protocol.getHeader());out.writeIntLE(protocol.getLength());out.writeBytes(protocol.getData());logger.debug("数据编码成功:"+out);}else {logger.info("不支持的数据协议:"+msg.getClass()+"\t期待的数据协议类是:"+ TestPctProtocol.class);}}}
package com.netty.nettyServer.decoder;import com.netty.common.TestPctProtocol;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;import org.apache.log4j.Logger;import java.util.List;public class DecoderHabdler extends ByteToMessageDecoder {private Logger logger = Logger.getLogger(this.getClass());protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//标记读写位置in.markReaderIndex();TestPctProtocol readTestPctProtocol = new TestPctProtocol();readTestPctProtocol.setHeader(in.readShortLE());readTestPctProtocol.setLength(in.readIntLE());//解决tcp传输报文过长 自动拆包问题if(readTestPctProtocol.getLength()>in.readableBytes()){logger.debug(String.format("数据长度不够,数据协议len长度为:%1$d,数据包实际可读内容为:%2$d正在等待处理拆包……",readTestPctProtocol.getLength(),in.readableBytes()));in.resetReaderIndex();}byte[] dataByte = new byte[readTestPctProtocol.getLength()];in.readBytes(dataByte);readTestPctProtocol.setData(dataByte);out.add(readTestPctProtocol);// 回收已读字节in.discardReadBytes();}}

以上就是netty集成到springboot的全部代码,github地址为:GitHub - 15568886496/netty-springboot: springboot集成netty 依赖注入

下载即可使用,非常方便。欢迎大家一起探讨留言,如有错误,烦请指出,感谢 fighting fighting。

Netty介绍及实战(一) _ JavaClub全栈架构师技术笔记

 

 

 

作者:小飞机爱旅游
来源链接:https://blog.csdn.net/lc15568886496/article/details/124212967

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。





本文链接:https://www.javaclub.cn/server/117283.html

标签:Netty
分享给朋友:

“Netty介绍及实战(一)” 的相关文章