当前位置:首页 > 服务端 > Netty的简单使用和理解

Netty的简单使用和理解

2022年11月08日 13:04:14服务端10

Netty 学习笔记

1.Netty 介绍和应用场景

1.1 介绍

  1. Netty 是jboss的一个开源框架
  2. Netty是一个异步的,基于事件驱动的网络应用框架
  3. 基于nio

1.2 应用场景

  1. Rpc 例如dubbo
  2. 游戏
  3. 大数据

涉及到网络通信的应用都可以使用netty

2. i/o模型

2.1 介绍

  1. bio 同步并阻塞 一个连接对应服务器一个线程 适用于连接数较少的架构 jdk1.4
  2. nio 同步非阻塞 服务器一个线程处理多个连接 适用于连接数较多连接时间短 jdk1.4
  3. aio(nio.2) 异步非阻塞 适用于连接数多且连接时间长 jdk1.7

2.2 bio

blocking i/o

2.2.1 简单demo

开发一个服务端,创建一个线程池,当客户端发送一个请求,服务端对应创建一个线程处理,当有多个客户端请求时,就会创建多个线程对应处理

这里demo的客户端用telnet模拟

public static void handler(Socket socket){
     
        try(InputStream in = socket.getInputStream();){
     
            System.out.println("线程信息: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());

            byte[] bytes = new byte[1024];
            while (true){
     
                int read = in.read(bytes);
                if(read!=-1){
     
                    System.out.println("输出信息: "+new String(bytes,"UTF-8"));
                }else {
     
                    break;
                }
            }
        }catch (IOException e){
     
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
     
        try(ServerSocket serverSocket = new ServerSocket(6666);) {
     
            ExecutorService executorService = Executors.newCachedThreadPool();

            System.out.println("线程信息: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());

            while (true){
     
                System.out.println("等待链接");
                final Socket socket = serverSocket.accept();
                System.out.println("链接到一个客户端");

                executorService.execute(new Runnable() {
     
                    @Override
                    public void run() {
     
                        System.out.println("线程信息: id "+Thread.currentThread().getId()+" name " + Thread.currentThread().getName());
                        handler(socket);
                    }
                });
            }

        } catch (IOException e) {
     
            e.printStackTrace();
        }

    }

2.3 nio

non-blocking i/o 非阻塞

2.3.1 简介

三大核心

  1. channel 通道
  2. buffer 缓冲区
  3. selector 选择器

简述操作原理: selector 选择可用的channel, channelbuffer可以相互读写,应用程序并不直接对channel进行操作,而是通过对buffer进行操作,间接操作channel

一个线程中会有多个selector,一个selector中可以注册多个channel,如果并没有数据传输,线程还可以做其他事,并不会一直等待

2.4 nio与bio 的区别

  1. nio非阻塞 bio阻塞
  2. nio用块的方式处理io bio用流的方式处理io 块的方式比流的方式要快
  3. bio基于字节流/字符流 nio基于缓冲区和通道(channel) selector监听多个通道的事件,因此用一个线程就可以处理多个通道的数据

图示: nio

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

bio

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

3. nio详解

3.1 nio模型三大组件的关系

  1. 一个线程对应一个selector
  2. 一个selecor对应多个channel
  3. 一个channel对应一个buffer
  4. 一个线程对应多个channel
  5. channel与buffer都是双向的,就是既可以读也可以写 使用flip()方法切换
  6. buffer就是一个内存块,读写内存比较快
  7. selector会根据不同事件切换不同的channel

3.2 Buffer缓冲区

3.2.1 简介

本质是一个读写数据的内存块,可以理解成一个提供了操作内存块方法容器对象(数组)

缓冲区中内置了一些机制,这些机制可以检测到缓冲区的数据变化,状态变化

channel读写的数据必须都经过Buffer

3.2.2 源码分析

常用的几个操作方法

public static void main(String[] args) {
     
        //allocate 规定intbuffer的长度
        IntBuffer buffer = IntBuffer.allocate(5);

        //capacity()获取容量
        //put()写入
        for(int i = 0;i<buffer.capacity();i++){
     
            buffer.put(i*2);
        }

        //flip()反转 由写转为读
        buffer.flip();

        //读取
        //get()每次读取后 索引向后移动一位
        for(int i = 0;i<buffer.capacity();i++){
     
            System.out.println(buffer.get());

        }
    }

3.2.2.1 定义

IntBuffer中定义了一个int数组,其他类型的buffer类似

public abstract class IntBuffer
    extends Buffer
    implements Comparable<IntBuffer>
{
     

    // These fields are declared here rather than in Heap-X-Buffer in order to
    // reduce the number of virtual method invocations needed to access these
    // values, which is especially costly when coding small buffers.
    //
    final int[] hb;                  // Non-null only for heap buffers
    final int offset;
    boolean isReadOnly;                 // Valid only for heap buffers

最顶层的Buffer类中定义了四个属性

public abstract class Buffer {
     

    /**
     * The characteristics of Spliterators that traverse and split elements
     * maintained in Buffers.
     */
    static final int SPLITERATOR_CHARACTERISTICS =
        Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED;

    // Invariants: mark <= position <= limit <= capacity
    private int mark = -1; //标记
    private int position = 0; //当前索引的位置,不能超过limit
    private int limit;//最大能读写的长度
    private int capacity;//容量 allocate定义的长度

3.2.2.2 反转

   public final Buffer flip() {
     
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }

可以看到,反转之后,由读变为写,或者由写变为读

将索引归0,最大读写长度不能超过上次操作的索引

3.2 channel 通道

3.2.1 简介

  1. 通道类似于流/连接,但是流只能写入或者读取,通道可以即读取也写入
  2. 通道异步读写数据
  3. 通道可以读写数据到缓存区

3.2.2 层级关系

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

当有客户端发送请求时,服务端会创建一个ServerSocketChannel(实现类:ServerSocketChannelImpl) 再由ServerSocketChannel创建一个SocketChannel(实现类:SocketChannelImpl即真正读写数据的通道),这个SocketChannel就是与这个客户端请求所对应的

3.2.3 案例剖析

3.2.3.1 FileChannle 输出文件流

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * @author: zhangyao
 * @create:2020-08-25 14:50
 **/
public class FileChannelTest {
     
    public static void main(String[] args) {
     
        FileOutputStream fileOutputStream = null;
        try {
     
            //文件输出流
            fileOutputStream = new FileOutputStream("D:\\file01.txt");
			//文件输出流包装为FileChannel 此处FileChannel默认实现FileChannelImpl
            FileChannel fileChannel = fileOutputStream.getChannel();

            //创建对应的缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

            //数据写入缓冲区
            byteBuffer.put("hello nio".getBytes());

            //反转,因为接下来需要从缓冲区读取数据写入Channel
            byteBuffer.flip();

            //从缓冲区写入Channel
            fileChannel.write(byteBuffer);



        } catch (FileNotFoundException e) {
     
            e.printStackTrace();
        } catch (IOException e) {
     
            e.printStackTrace();
        } finally {
     
            //关闭文件流
            if(fileOutputStream!=null){
     
                try {
     
                    fileOutputStream.close();
                } catch (IOException e) {
     
                    e.printStackTrace();
                }
            }
        }
    }
}

整体流程就是把数据写入缓冲区,在读取缓存区写入通道Channel,在由文件输出流输出

图示如下

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

3.2.3.2 FileChanle 输入文件流

public static void main(String[] args) {
     
    FileInputStream fileInputStream = null;
    try {
     
        fileInputStream = new FileInputStream("D:\\file01.txt");

        //获取Channel
        FileChannel channel = fileInputStream.getChannel();

        //创建byteBuffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        //从channel中读取数据写入buffer
        channel.read(byteBuffer);

        //反转  下一步需要从buffer中读取数据输出
        byteBuffer.flip();

        //输出
        byte[] array = byteBuffer.array();
        System.out.println(new String(array));

    } catch (FileNotFoundException e) {
     
        e.printStackTrace();
    } catch (IOException e) {
     
        e.printStackTrace();
    } finally {
     
        try {
     
            fileInputStream.close();
        } catch (IOException e) {
     
            e.printStackTrace();
        }
    }

}

与上面的例子刚好相反,从文件中读取数据,通过通道写入buffer缓冲区,在输出

图示

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

3.2.3.3 FileChannel 拷贝文件

其实就是上面两个例子结合,把一个文件中的数据复制到另外一个文件中

public static void main(String[] args) {
     
    FileInputStream fileInputStream = null;
    FileOutputStream fileOutputStream = null;

    try {
     
        fileInputStream = new FileInputStream("D:\\file01.txt");
        fileOutputStream = new FileOutputStream("D:\\file02.txt");
        FileChannel channel = fileInputStream.getChannel();

        FileChannel channel1 = fileOutputStream.getChannel();

        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        while (true){
     
            //将byteBuffer复位
            byteBuffer.clear();
            int read = channel.read(byteBuffer);
            if(read==-1){
     
                break;
            }

            byteBuffer.flip();
            channel1.write(byteBuffer);
        }


    } catch (FileNotFoundException e) {
     
        e.printStackTrace();
    } catch (IOException e) {
     
        e.printStackTrace();
    } finally {
     
        try {
     
            fileInputStream.close();
            fileOutputStream.close();
        } catch (IOException e) {
     
            e.printStackTrace();
        }
    }


}

这里使用了byteBuffer.clear()方法

因为ByteBuffer缓冲区是有长度的,当读取的文件超过缓冲区的长度时,如果不对缓冲区进行清空,当进行下一次读取时,就会从上一次读取的位置开始读取,会出现死循环的情况

3.2.3.4 FileChannel 拷贝文件之TransferFrom

public static void main(String[] args) {
     
    FileInputStream fileInputStream = null;
    FileOutputStream fileOutputStream = null;

    try {
     
        fileInputStream = new FileInputStream("D:\\file01.txt");
        fileOutputStream = new FileOutputStream("D:\\file02.txt");
        FileChannel channel = fileInputStream.getChannel();

        FileChannel channel1 = fileOutputStream.getChannel();

	
        //从channel通道拷贝到 channel1通道
        channel1.transferFrom(channel, 0, channel.size());

    } catch (FileNotFoundException e) {
     
        e.printStackTrace();
    } catch (IOException e) {
     
        e.printStackTrace();
    } finally {
     
        try {
     
            fileInputStream.close();
            fileOutputStream.close();
        } catch (IOException e) {
     
            e.printStackTrace();
        }
    }


}

3.2.4 Buffer的分散和聚集

上面的例子都是使用单个buffer进行数据的读写,如果数据过大,也可用使用多个buffer(buffer数组)进行数据的读写,即用空间换时间

3.3 Selector选择器

3.3.1 基本简介

一个selector管理多个channel通道,使用异步的方式处理io

只有读写真正的发生时,才会处理数据,减小了线程的压力,不用每个请求都维护一个线程

避免了多线程之间的上下文切换导致的开销

3.3.2 selector的api

Selector:

  1. select() 阻塞
  2. select(Long timeout) 有超时时间
  3. selectNow() 非阻塞
  4. wakeup() 立即唤醒selector

3.3.2 selecor的工作流程

其实是Selector SelectionKey ServerSocketChannel SorkcetChannel的工作原理

  1. 当客户端链接时,通过ServerSockertChannel 得到SocketChannel 并且注册到 Selector上

    1. 注册源码
    public abstract SelectionKey register(Selector sel, int ops)
        throws ClosedChannelException;
    

    这是SocketChannel注册到Selector上的方法,第一个参数为要注册的Selector对象,第二个参数为事件驱动的类型

    public abstract class SelectionKey {
           
        public static final int OP_READ = 1;
        public static final int OP_WRITE = 4;
        public static final int OP_CONNECT = 8;
        public static final int OP_ACCEPT = 16;
        private volatile Object attachment = null;
    
  2. 当注册完成后返回一个Selectionkey,这个selectionKey会和SocketChannel关联

  3. Selector通过select方法监听Channel,如果有事件发生,返回对应的selectionKey集合

    1. 源码

      public int select(long var1) throws IOException {
               
              if (var1 < 0L) {
               
                  throw new IllegalArgumentException("Negative timeout");
              } else {
               
                  return this.lockAndDoSelect(var1 == 0L ? -1L : var1);
              }
          }
      
          public int select() throws IOException {
               
              return this.select(0L);
          }
      
          public int selectNow() throws IOException {
               
              return this.lockAndDoSelect(0L);
          }
      
      	private int lockAndDoSelect(long var1) throws IOException {
               
              synchronized(this) {
               
                  if (!this.isOpen()) {
               
                      throw new ClosedSelectorException();
                  } else {
               
                      int var10000;
                      synchronized(this.publicKeys) {
               
                          synchronized(this.publicSelectedKeys) {
               
                              var10000 = this.doSelect(var1);
                          }
                      }
      
                      return var10000;
                  }
              }
          }
      
  4. 通过得到的selectionKey可以反向获取Channel

    1. 源码

          public abstract SelectableChannel channel();
      
  5. 最后通过channel处理业务

3.3.3 案例

服务端思路:

  1. 创建serverSocketChannel绑定端口6666,把这个channel注册到Selector上,注册事件是OP_ACCEPT
  2. 循环监听,判断是否channel中是否有事件发生,如果有事件发生,判断不同的事件类型进行不同的链接,读/写操作

客户端思路

  1. 创建一个SocketChannel,连接上服务器之后,发送消息,并保持链接不关闭

3.3.3.1 server端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * @author: zhangyao
 * @create:2020-08-26 16:55
 **/
public class ServerChannel {
     
    public static void main(String[] args) {
     

        try {
     
            //生成一个ServerScoketChannel
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //设置为非阻塞的
            serverSocketChannel.configureBlocking(false);
            //serverSocket监听6666端口
            serverSocketChannel.socket().bind(new InetSocketAddress(6666));

            //创建Selector
            Selector selector = Selector.open();

            //serverSocketChannel注册到Selector
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);


            //循环等待链接
            while (true){
     
                //如果没有事件发生,就继续循环
                if(selector.select(1000) == 0){
     
                    System.out.println("等待1s,无连接");
                    continue;
                }

                //如果有事件驱动,就需要遍历事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
     
                    SelectionKey key = iterator.next();
                    //如果事件是连接
                    if(key.isAcceptable()){
     
                        try {
     
                            SocketChannel channel = serverSocketChannel.accept();
                            channel.configureBlocking(false);
                            SelectionKey register = channel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                            System.out.println("链接成功");
                        } catch (IOException e) {
     
                            e.printStackTrace();
                        }
                    }

                    //如果是读取数据
                    if(key.isReadable()){
     
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
                        try {
     
                            int read = channel.read(byteBuffer);
                            byte[] array = byteBuffer.array();
                            System.out.println("读取数据:"+ new String(byteBuffer.array()));
                        } catch (IOException e) {
     
                            e.printStackTrace();
                        }
                    }
                    iterator.remove();


                };

            }

        } catch (IOException e) {
     
            e.printStackTrace();
        }


    }
}

3.3.3.2 客户端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @author: zhangyao
 * @create:2020-08-26 17:24
 **/
public class ClientChannel {
     
    public static void main(String[] args) {
     

        //创建一个SocketChannel
        try {
     
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
            if(!socketChannel.connect(inetSocketAddress)){
     
                while (!socketChannel.finishConnect()){
     
                    System.out.println("服务器连接中,线程并不阻塞,可以进行其他操作");
                }
            }

            //连接成功
            socketChannel.write(ByteBuffer.wrap("hello ,server".getBytes()));

            System.in.read();




        } catch (IOException e) {
     
            e.printStackTrace();
        }
    }
}

4.Netty

4.1 简介

https://netty.io/ 官网

netty是对java nio api的封装,简化了nio程序的开发,jdk要求最低1.6

流行的网络编程通信框架,Dubbo Elasticsearch 等框架底层的网络通信框架都是 Netty

架构模型

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

版本

netty 共有 3.x 4.x 5.x三个大版本

3.x较老,5.x有重大bug,被官网废弃 现在主要使用4.x

4.2 线程模型

有以下几种线程模型

4.2.1 传统I/O阻塞模型

每一个链接都需要一个对应的线程进行处理,并且当链接建立后,如果当前链接没有数据传输时,此线程会被阻塞在read()方法

4.2.2 Reactor模式

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

原理图示如上

主要是针对了传统I/O模型一个连接会阻塞一个线程的问题进行了改进,当连接建立后都通过ServiceHandler调用线程池中的线程进行处理,这样就只用阻塞一个ServiceHandler线程,达到多路复用的目的

Reactor模式有三种实现方式

4.2.2.1单Reactor单线程

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

使用一个线程通过多路复用完成所有操作,包括读写连接

redis使用的就是这种模型 单线程

4.2.2.2单Reactor多线程

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

相对于单Reactor单线程,主线程不在进行业务处理,当有请求过来之后,具体的业务处理交与线程池中的线程处理,线程处理完成后再通过handler返回给Client

4.2.2.3 主从Reactor多线程

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

相比于单Reacotr,主从Reactor将Reactor分为MainReactor和SubReactor

MainReactor中负责分发和连接

SubReactor中负责读写

一个MainReactor可以对应多个SubReactor

4.2.3 Netty模型

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

简述Netty模型

  1. 角色

    1. BossGroup BossGroup的类型是NioEventLoopGroup,其中包含了很多NioEventLoop
    2. NioEventLoop nio事件循环,每个NioEventLoop中都有一个Selctor和一个任务队列
    3. WorkerGroup 类型是NioEventLoopGroup,与BossGroup类似,只不过功能不同,BossGroup只负责与客户端建立连接, WorkerGroup需要读写,处理业务
    4. PipeLine 管道,对Channel进行的封装,具体的业务处理是通过Pipline对Channel进行处理
  2. 具体流程

    1. 当客户端发送请求时,首先进入BossGroup,有NioEventLoop对请求进行事件轮询,如果是连接事件就进行处理
      1. 处理的步骤分为三步
      2. 轮询
      3. 注册 这里的注册指的是将生成的SocketChannel注册到workerGroup中的某个NioEventLoop中的Selector上
      4. 执行任务列表
    2. 当请求的事件是读写时,就有workerGroup对请求进行具体的业务处理
      1. 处理的步骤BossGroup类似
  3. 总结

    由此可以看出,Netty的模型与主从Reactor模型类似,都是由一个主Reactor负责连接事件,由一个从Reactor负责读写事件

4.2.4 案例demo

4.2.4.1 服务端

4.2.4.1.1 NettyServer
package netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author: zhangyao
 * @create:2020-09-03 08:55
 **/
public class NettyServer {
     

    public static void main(String[] args) {
     

        //创建BossGroup 和 WorkerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        ChannelFuture channelFuture = null;
        try {
     
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
     

                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
     
                        socketChannel.pipeline().addLast(new NettyServerHandler());
                    }
                });

        System.out.println("服务器就绪.....");


        //绑定端口
            channelFuture = serverBootstrap.bind(6668).sync();
        }catch (Exception e){
     
            e.printStackTrace();
        }finally {
     
            try {
     
                channelFuture.channel().closeFuture().sync();

                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            } catch (InterruptedException e) {
     
                e.printStackTrace();
            }
        }
    }
}

4.2.4.1.2 NettyServerHandler
package netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author: zhangyao
 * @create:2020-09-03 09:12
 **/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
     

    //读取数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
     
        ByteBuf buf = (ByteBuf) msg;

        System.out.println("客户端发送消息:"+ buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:"+ ctx.channel().remoteAddress());

    }

    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
     
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端",CharsetUtil.UTF_8));
    }

    //处理异常 关闭ctx
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
     
        ctx.close();
    }
}

4.2.4.2 客户端

4.2.4.2.1 NettyClient
package netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author: zhangyao
 * @create:2020-09-03 09:52
 **/
public class NettyClient {
     
    public static void main(String[] args) {
     


        EventLoopGroup executors = new NioEventLoopGroup();


        Bootstrap bootstrap = new Bootstrap();

        try {
     
            bootstrap.group(executors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
     
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
     
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }
                    });

            System.out.println("客户端就绪........");

            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //关闭通道
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
     
            e.printStackTrace();
        }finally {
     
            try {
     
                executors.shutdownGracefully().sync();
            } catch (InterruptedException e) {
     
                e.printStackTrace();
            }
        }

    }
}

4.2.4.2.2 NettyClientHandler
package netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author: zhangyao
 * @create:2020-09-03 10:00
 **/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
     


    //就绪时触发
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
     
        System.out.println("ctx: "+ctx);

        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服务端", CharsetUtil.UTF_8));

    }

    //读取信息
    //这里读取的是服务器返回的信息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
     

        ByteBuf byteBuf = (ByteBuf) msg;

        System.out.println("服务端发送消息: "+ byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端地址: "+ ctx.channel().remoteAddress());

    }

    //异常处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
     
        ctx.close();
    }
}

一个简单的TCP服务通信,客户端发送消息,服务端接收消息并返回消息给客户端

4.2.4.3 案例demo源码分析

4.2.4.3.1 NioEventGroup
public NioEventLoopGroup() {
     
    this(0);
}

public NioEventLoopGroup(int nThreads) {
     
    this(nThreads, (Executor)null);
}

可以看到,如果使用无参的NioEventGroup,默认传递的是0,也可以指定线程数

一层一层找下去:

private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); //NettyRuntime.availableProcessors()获取当前计算机的核数(逻辑处理器)

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
     
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

发现最后找到NioEventGroup父类的方法

如果指定了NioEventGroup的线程数,且不为0的时候,就使用指定的线程数

否则,就使用当前计算机的核数*2作为线程数

debug 看结果

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

电脑是12核,默认是24个线程

指定一个线程

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

就只有一个线程数

4.2.5 异步模型

上文中的案例Demo中的 ChannelFuture

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();

异步模型Future是相对与同步来说的

异步指的是当一个异步调用发出后,不会立刻得到结果,而是通过回调,状态来通知调用者调用的结果

Netty 中的 connect 和 bind() sync方法就是返回了一个异步的结果,之后再通过监听获取到结果

也就是 Future-Listener机制

当Future对象刚创建的时候,处于未完成的状态,可以通过返回的ChannelFuture查看操作执行的状态,也可以注册监听函数来执行完成后的操作

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

isSucess()是否成功

isDone()是否完成

isCancelable() 是否取消

cause() 失败原因

addListener 增加监听器

//绑定端口
            channelFuture = serverBootstrap.bind(6668).sync();
            channelFuture.addListener(new ChannelFutureListener() {
     
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
     
                    if(channelFuture.isSuccess()){
     
                        System.out.println("监听端口成功");
                    }else {
     
                        System.out.println("监听端口失败");
                    }
                }
            });

4.2.6 Netty Http服务

做一个简单的demo 浏览器(客户端)访问服务器端7001端口,返回一个字符串信息

浏览器访问是http请求,服务器也需要相应一个httpResponse

4.2.6.1 服务端

NettyHttpServer 启动类
package netty.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author: zhangyao
 * @create:2020-09-04 11:16
 **/
public class NettyHttpServer {
     

    public static void main(String[] args) throws InterruptedException {
     

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        serverBootstrap.group(bossGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new NettyHttpInitialize());

        ChannelFuture channelFuture = serverBootstrap.bind(7001).sync();

        channelFuture.channel().closeFuture().sync();

        bossGroup.shutdownGracefully();

        workerGroup.shutdownGracefully();

    }
}
NettyHttpInitialize 处理器类

对之前的ChannelInitialize(SocketChannel)进行封装

package netty.http;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

/**
 * @author: zhangyao
 * @create:2020-09-04 11:21
 **/
public class NettyHttpInitialize extends ChannelInitializer<SocketChannel> {
     


    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
     
        //得到管道
        ChannelPipeline pipeline = sc.pipeline();

        //管道中加入处理器 主要是处理Http请求的,解析请求头之类的
        pipeline.addLast("myDecoder",new HttpServerCodec());

        //加入处理器
        pipeline.addLast("myServerHandler",new NettyServerHandler());
    }
}
NettyServerHandler 具体的处理(返回http响应)
package netty.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import org.springframework.http.HttpStatus;

/**
 * @author: zhangyao
 * @create:2020-09-04 14:01
 **/
public class NettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {
     
    //读信息
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
     
        System.out.println(httpObject);
        System.out.println("客户端地址+"+ channelHandlerContext.channel().remoteAddress());


        ByteBuf byteBuf = Unpooled.copiedBuffer("hello , im 服务端", CharsetUtil.UTF_8);

        //返回客户端信息
        FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK,byteBuf);

        fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");

        fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes());


        channelHandlerContext.writeAndFlush(fullHttpResponse);



    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
     
        ctx.close();
    }
}

出现的问题记录:

  1. 第一次绑定6668端口,浏览器访问失败,换成7001就可以了

    原因: 谷歌浏览器禁用了6665-6669以及其他一些不安全的端口

  2. 访问后第一次请求 出现异常,可以正常返回数据

    Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

    原因: NettyServerHandler中没有对异常处理方法进行重写

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
           
        ctx.close();
    }
    

    加上这部分就可以了,报错信息也报的很明显

4.2.6.2 Http服务的过滤

可以对一些不希望处理的请求进行过滤,其实就是在对http请求的处理过程中判断一下请求的uri

在上文中 NettyServerHandler类中 加入以下代码,即可拦截/favicon.ico请求

HttpRequest re  = (HttpRequest) httpObject;

String uri = re.uri();
if(uri.equals("/favicon.ico")){
     
    System.out.println("不想处理,返回");
    return;
}

4.3 Netty API梳理

基于上述的各种demo,对Netty常用的类和方法进行系统梳理

4.3.1 Bootstrap

  1. ServerBootstrap 服务端启动引导类

    BootStrap 客户端启动引导类

    1. .group() 给BootStrap设置NioEventLoopGroup,可以设置多个
    2. .channel() 设置服务使用的通道类
    3. .option() 设置通道参数
    4. .handler() 对BossGroup进行设置
    5. .childrenHandler() 对workerGroup 进行设置
    6. .bind() 服务端绑定一个端口号,监听端口
    7. connect() 客户端用于连接服务端

4.3.2 Future

​ Netty中的io操作都是异步的,也就是不能立刻返回结果,而是当完成了之后再通知调用者

  1. Future

  2. ChannelFutrue

    方法

    1. channel() 返回当前正在进行IO操作的通道
    2. sync() 转为同步,等待异步操作完毕

4.3.3 Channel

不同协议,不同阻塞类型都有对应的Channel

  1. NioSocketChannel 异步tcp协议Socket连接
  2. NioServerSocketChannel 异步tcp协议服务端连接
  3. NioDatagramChannel 异步udp连接
  4. NioSctpChannel 异步sctp客户端连接
  5. NioSctpServerChannel 异步sctp服务端连接

4.3.4 Selector

Netty 基于Nio Selector对象实现多路复用,一个selector管理多个channel

4.3.5 ChannelHandler

主要是用于对数据的处理,其中有很多封装好的方法,使用的时候继承其子类即可

实现类

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

子类很多,常用的几个

channelHandler

  1. ChannelInboundHandler
  2. ChannelOutboundHandler
  3. 适配器
    1. channelInboundHandlerAdapter
    2. channelOutboundHandlerAdapter

4.3.6 pipeline

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

结构图如上

channel中可以创建出一个ChannelPipeline, ChannelPipeline中有维护了一个由ChannelHandlerContext组成的双向链表

每个ChannelHandlerContext又对应了一个Channelhandler

常用方法:

​ addFirst(); 添加一个Handler到链表中的第一个位置

​ addLast(); 添加到链表的最后一个位置

4.3.7 channelHandlerContext

每一个channelhandlerContext包含了一个channelHandler(业务处理)

channelHandlerContext中还可以获取到对应的channel和pipeline信息

channelHandlerContext.channel();

channelHandlerCOntext.pipeline();

4.3.8 EventLoopGroup

netty一般提供两个EventLoopGroup BossEventLoopGroup 和 workerEventLoopGroup

EventLoopGroup可以指定使用的核心是多少个

4.3.9 Unplooed

Netty提供的用来操作缓冲区数据的工具类

常用方法:

​ copiedBuffer(); 返回的是Netty提供的 Bytebuf对象

4.3.10 ByteBuf

Netty的数据容器(缓冲区)

可以直接进行读/写 读写之间不需要进行flip(),原因是ByteBuf内部维护了两个索引 readIndex writeIndex

常用方法

getByte()

readByte()

writeByte()

capacity()

4.4 Netty 心跳检测机制

当客户端长时间没有读/写操作时,服务端需要检测客户端是否还处于连接状态,也就是心跳检测

Netty提供了心跳检测的处理类 IdleStateHandler

示例代码

package netty.hearbeat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**
 * @author: zhangyao
 * @create:2020-09-10 10:04
 **/
public class MyServer {
     


    public static void main(String[] args) throws Exception{
     

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler())
                .childHandler(new ChannelInitializer<SocketChannel>() {
     
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
     
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
                        pipeline.addLast("inleHandler",new MyHearBeatHandler());
                    }
                });

        ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();

        channelFuture.channel().closeFuture().sync();

        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

package netty.hearbeat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
 * @author: zhangyao
 * @create:2020-09-10 16:50
 **/
public class MyHearBeatHandler extends ChannelInboundHandlerAdapter {
     
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
     
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
     

        if(o instanceof IdleStateEvent){
     

            IdleStateEvent event = (IdleStateEvent) o;

            IdleState state = event.state();
            switch (state){
     
                case READER_IDLE:
                    System.out.println("读空闲");
                    break;
                case WRITER_IDLE:
                    System.out.println("写空闲");
                    break;
                case ALL_IDLE:
                    System.out.println("读写空闲");
                    break;
            }
        }
    }




}

客户端没有区别

4.5 Netty 之 webSocket

使用netty编写webSocket长连接

4.5.1 服务端

package netty.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * @author: zhangyao
 * @create:2020-09-11 14:43
 **/
public class MyServer {
     


    public static void main(String[] args) throws Exception{
     

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();


        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
     
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
     
                        ChannelPipeline pipeline = socketChannel.pipeline();

                        //添加http解码器
                        pipeline.addLast(new HttpServerCodec());
                        //添加块传输处理器
                        pipeline.addLast(new ChunkedWriteHandler());
                        //http分段传输,增加一个聚合处理
                        pipeline.addLast(new HttpObjectAggregator(8192));
                        //增加websocket协议处理
                        pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));


                        //增加自定义处理业务处理器
                        pipeline.addLast(new MyServerHandler());
                    }
                });


        ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();

        channelFuture.channel().closeFuture().sync();

        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

4.5.2 服务端自定义的处理器

package netty.websocket;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Date;

/**
 * @author: zhangyao
 * @create:2020-09-11 14:49
 **/
public class MyServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
     


    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
     
        //当客户端关闭连接
        System.out.println("客户端关闭连接..."+ ctx.channel().id());
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
     
        //当客户端连接到服务端
        System.out.println("有客户端连接到服务端 id为" + ctx.channel().id());

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
     
        //异常关闭连接
        ctx.close();
    }

    //收到消息
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
     
        //读取消息并返回相同的消息返回给客户端
        String text = textWebSocketFrame.text();
        System.out.println("服务端收到消息" + text);

        //返回给客户端

        Channel channel = channelHandlerContext.channel();
        channel.writeAndFlush(new TextWebSocketFrame(LocalDateTime.now()+"  服务端返回消息:" + text));

    }
}

4.5.3 页面(webSocket客户端)

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>测试netty+webSocket</title>
</head>
<body>

    <form onsubmit="return false">
        <textarea id="sendMessage" style="height: 300px;width: 300px" placeholder="请输入要发送的消息"></textarea>
        <button onclick="send(document.getElementById('sendMessage').value)">发送消息</button>

        <textarea id="responseMessage" style="height: 300px;width: 300px" ></textarea>
        <button onclick="document.getElementById('responseMessage').value=''">清空消息</button>

    </form>
</body>


<script type="application/javascript">

    var websocket;

    if(!window.WebSocket){
       
        alert("浏览器不支持webSocket")
    }else {
       
        //进行webSocket的开启 关闭

        websocket = new WebSocket("ws://localhost:7000/hello");
        //webSocket 开启事件
        //给消息返回框加入一条数据
        websocket.onopen = function (ev) {
       
            document.getElementById("responseMessage").value = '连接到服务端';
        }

        websocket.onclose = function (ev) {
       
            document.getElementById("responseMessage").value += '\n 连接关闭';
        }

        //当服务端响应消息时触发  将服务端返回的消息回显致文本框
        websocket.onmessage = function (ev) {
       
            document.getElementById("responseMessage").value += '\n ' ;
            document.getElementById("responseMessage").value += ev.data ;
        }

    }

    //发送消息
    function send (message) {
       
        if(!window.websocket){
       
            alert("socket还未初始化完成");
            return;
        }

        if(websocket.readyState == WebSocket.OPEN){
       
            websocket.send(message);
            document.getElementById('sendMessage').value=''
        }
    }


</script>
</html>

4.6 Netty 编码解码

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

网络传输过程中的编码解码过程

codec 编解码器 包含 encoder编码器 和 decoder解码器

netty中提供了一些StringCodec ObjectCodec的编解码器,但是这些编解码器还是依赖java底层的序列化技术,java底层的序列化是比较低效的,所以需要引入新的高效的序列化技术

4.6.1 ProtoBuf

4.6.2 自定义编码解码器

package netty.inboundAndOutbound;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import netty.inboundAndOutbound.client.MyClientMessageToByteHandler;

/**
 * @author: zhangyao
 * @create:2020-09-18 14:53
 **/
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
     
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
     
        ChannelPipeline pipeline = socketChannel.pipeline();

        //添加入栈解码器
        pipeline.addLast(new ByteToMessageHandler());

        //添加出栈编码器
        pipeline.addLast(new MyClientMessageToByteHandler());

        //添加自定义处理器
        pipeline.addLast(new MyServerHandler());
    }
}

package netty.inboundAndOutbound;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * @author: zhangyao
 * @create:2020-09-18 14:54
 **/
public class ByteToMessageHandler extends ByteToMessageDecoder {
     
    //自定义实现的入栈解码器
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
     
        if(byteBuf.readableBytes()>=8){
     
            list.add(byteBuf.readLong());
        }
    }
}
package netty.inboundAndOutbound.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * @author: zhangyao
 * @create:2020-09-18 16:16
 **/
public class MyClientByteToLong extends ByteToMessageDecoder {
     
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
     
        if(byteBuf.readableBytes()>=8){
     
            list.add(byteBuf.readLong());
        }
    }
}

4.6.3 handler处理机制及出栈入栈

通过上面的编码解码进而延申到handler的处理机制

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

简单的netty出栈入栈的解释图

出栈和入栈是相对于而言的,当客户端发送消息到达服务端,对于客户端来说就是出栈,对于服务端来说就是入栈,反之亦然

4.7 Tcp沾包 拆包

4.7.1 沾包拆包介绍

tcp服务再发送消息的时候,如果发送的多个包.数据量小且包的数量比较多,Tcp就会通过算法将多个包合并为一个大的数据包发送给接受端,这样产生的问题就是接收端无法识别出完整的包,由此产生的问题就是沾包拆包

Netty的简单使用和理解 _ JavaClub全栈架构师技术笔记

如上图所示,Client端向Server端发送D1 D2两个数据包

Server端读取的时候,可能会产生四种情况

1.分两次读取 分别读取到了D1 D2两个数据包,不存在沾包拆包现象

2.一次读取,读到了D1D2两个数据包合在一起的包,出现沾包现象

3.分两次读取,第一次读取到了D1和D2的一部分数据,第二次读取到了D2的剩余部分数据,出现了拆包现象

4.分两次读取,第一次读取到了D1的一部分数据,第二次读取到 了D1的剩余部分数据和D2的所有数据,出现拆包现象

4.7.2 解决方案

思路:控制接收端读取内容的长度来解决问题

方案: 通过自定义解析+编解码器来解决拆包沾包问题

作者:zhang.yao
来源链接:https://blog.csdn.net/white_bird_shit/article/details/122311322

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/server/68534.html

标签: Netty
分享给朋友:

“Netty的简单使用和理解” 的相关文章

Java 200+ 面试题补充② Netty 模块

Java 200+ 面试题补充② Netty 模块

让我们每天都能看到自己的进步。老王带你打造最全的 Java 面试清单,认真把一件事做到最好。 本文是前文《Java 最常见的 200+ 面试题》的第二个补充模块,第一模块为:《Java 200+ 面试题补充 ThreadLocal 模块》。 1.Netty...

Java高并发网络编程(五)Netty应用

Java高并发网络编程(五)Netty应用

推送系统   一、系统设计             二、拆包和粘包 粘包、拆包表现形式 现在假设客户端向服务端连续发送了两个数据包...

netty实现http客户端请求远程http服务

netty实现http客户端请求远程http服务

    一般http请求,我们会使用httpclient来实现连接池方式的连接,根据请求的类型,封装get,post等请求,设置参数,设置请求头,调用方法,发送请求之后等待请求返回结果,根据结果解析出我们需要的数据。netty也可以实现httpclient类似...

springboot整合netty(二)

springboot整合netty(二)

目录 前言 正文 代码 1. 新建一个springboot项目,在pom文件中添加netty依赖: 2.新建netty服务 3.netty调用...

netty的设计模式

如果要阅读源码,首先就要学会基本的设计模式。 设计模式是前人总结出来的软件设计方法,有利于使代码更加简洁优雅。 了解了netty的设计模式,再去看源码,会有一种焕然大悟的感觉。 一、单例模式 单例模式是最常见的设计模式: 1、忽略反射的影响,全局只有...

java netty rpc框架

java netty rpc框架

目录 第1篇 I/O基础篇 第1章 网络通信原理 1.1 网络基础架构 1.1.1 C/S架构 1.1.2 C/S信息传输流程 1.2 TCP/IP五层模型详解 1.2.1 物理层 1.2.2 数据链路层...

netty在dubbo中的应用

netty在dubbo中的应用

netty在dubbo中的应用 dubbo的底层通信是利用netty来实现的,出于好奇是如何实现的,把发现的过程记录一下。 首先down下来dubbo的源码,里面包含一个模块dubbo-demo,包含了dubbo-demo-provider和dubbo-demo-co...

Netty通过Nginx配置 wss 协议访问(实践可行)

先写个比较简单的,后面再写一篇Vue + springboot +netty  Netty在互联网以及物联网公司用的很多,底层走的还是 websocket协议,好处很多,就不一一列了,相关的文章很多,大家可以搜下; 在实际开发应用中,基本上都是采用前后端分离...

Android使用Netty网络框架实践(客户端、服务端)

Android使用Netty网络框架实践(客户端、服务端) 使用开发工具为Android Studio 1、配置build.gradle文件 build.gradle文件的dependencies标签下添加Netty引用 depe...

【Netty】codec框架

【Netty】codec框架

一、前言   前面已经学习完了Netty框架中的主要组件,接着学习codec框架。 二、codec框架   每个网络应用程序必须定义如何将在对等体之间传输的原始字节解析并转换为目标程序的数据格式,这种转换逻辑有codec处理,其由编码器和解码器组成,每个编码器和解码器...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。