【netty】三万字详解!JAVA高性能通信框架,关于netty,看这一篇就够了

06-25 1242阅读

【netty】三万字详解!JAVA高性能通信框架,关于netty,看这一篇就够了

目录

1.概述

2.hello world

3.EventLoop

4.channel

4.1.同步

4.2.异步

4.3.调试

4.4.关闭

4.5.为什么要用异步

5.future

6.promise

7.pipeline

8.byteBuf

8.1.创建

8.2.内存模式和池化

8.2.1.内存模式

8.2.2.池化

8.3.组成

8.4.操作

8.4.1.读写

8.4.2.释放

8.5.零拷贝

8.5.1.slice

8.5.2.composite

8.6.工具类

9.双向通信

10.粘包半包

10.1.问题成因

10.2.解决办法

10.2.1.短连接

10.2.2.解码器

1.概述

2.定长解码器

3.行解码器

4.固定帧长的解码器

11.协议解析

11.1.Redis

11.2.Http

12.协议设计

12.1.概述

12.2.编码


1.概述

netty,说人话就是封装NIO做出来的一个JAVA高性能通信框架。在JAVA领域,有高性能网络通信需求的时候,绝大多数都会选择netty作为通信框架。

关于JAVA的通信,我猜想可能博主的另外两篇关于BIO和NIO的文章作为本文的导读会不错:

详解TCP-CSDN博客

详解JAVA Socket-CSDN博客

JAVA BIO_java的bio有哪些-CSDN博客

全网最清晰JAVA NIO,看一遍就会-CSDN博客

netty底层就是封装的NIO。如果自己使用NIO的话至少会有以下的不便:

  • 需要自己构建协议。

  • 需要自己解决TCP传输问题,如粘包、半包。

  • API过于底层,不便于使用。

    netty其实就是封装了一下NIO,使得NIO更便于使用。

    2.hello world

    依赖:

       io.netty
       netty-all
       4.1.39.Final
    

    服务器:

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    ​
    public class HelloServer {
        public static void main(String[] args) {
            //ServerBootstrap,启动器,负责组装netty组件
            new ServerBootstrap()
                    //1.怎样去接收IO?
                    //事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件
                    .group(new NioEventLoopGroup())
                    //2.接收成什么?
                    //服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的
                    //除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel
                    .channel(NioServerSocketChannel.class)
                    //3.做什么处理?
                    //支持用责任链模式来对收到的IO进行链式处理
                    .childHandler(new ChannelInitializer() {
                        //连接建立后才会调用初始化方法
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            //指定解码方式
                            nioSocketChannel.pipeline().addLast(new StringDecoder());
                            //ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口
                            nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    System.out.println(msg);
                                }
                            });
                        }
                    })
                    //4.绑定监听端口
                    .bind(8080);
        }
    }

    客户端:

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    ​
    import java.net.InetSocketAddress;
    ​
    public class HelloCleint {
        public static void main(String[] args) throws InterruptedException {
            new Bootstrap()
                    .group(new NioEventLoopGroup())
                    //用什么进行发送?
                    //可以是BIO,也可以是NIO,也可以是epoll
                    .channel(NioSocketChannel.class)
                    //处理器
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            //指定编码方式
                            nioSocketChannel.pipeline().addLast(new StringEncoder());
                        }
                    })
                    //连接到服务器
                    .connect(new InetSocketAddress("localhost",8080))
                    //同步通信
                    .sync()
                    //代表连接对象
                    .channel()
                    //发送数据
                    .writeAndFlush("hello world");
        }
    }

    3.EventLoop

    eventLoop,事件循环对象,是一个单线程执行器,本质上就是一条线程+一个selector,用来单线程监听处理IO事件。

    实际使用上很少直接使用EventLoop,而是使用EventLoopGroup,EventLoopGroup的构造方法中可以指定其中的EventLoop数量。

    eventLoop除了继承Netty体系类的一些标准化接口外,还继承了JDK中的ScheduledExecutorService,使得其自身具备线程池一切的能力。既然是线程池,就可以用来执行任务。

    eventLoop执行普通任务:

    EventLoopGroup group =new NioEventLoopGroup(5);
    ​
            group.next().submit(()->{
                try {
                    Thread.sleep(10000);
                    System.out.println("success!");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

    eventLoop执行IO任务:

    一个EventGroupLoop其实就是一条线程,用来处理一条通信连接。

    public static void main(String[] args) {
            //ServerBootstrap,启动器,负责组装netty组件
            new ServerBootstrap()
                    //1.怎样去接收IO?
                    //事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件
                    .group(new NioEventLoopGroup())
                    //2.接收成什么?
                    //服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的
                    //除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel
                    .channel(NioServerSocketChannel.class)
                    //3.做什么处理?
                    //支持用责任链模式来对收到的IO进行链式处理
                    .childHandler(new ChannelInitializer() {
                        //连接建立后才会调用初始化方法
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            //指定解码方式
                            nioSocketChannel.pipeline().addLast(new StringDecoder());
                            //ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口
                            nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    System.out.println(msg);
                                }
                            });
                        }
                    })
                    //4.绑定监听端口
                    .bind(8080);
        }

    netty还给了一种更加细粒度的分层,就是让一部分EventLoop来选择IO,一部分EventLoop来处理IO,说白了就是一部分EventLoop出selector,一部分EventLoop出Thread。

    public static void main(String[] args) {
            //ServerBootstrap,启动器,负责组装netty组件
            new ServerBootstrap()
                    //boss线程只负责accept事件,worker线程只负责io读写
                    .group(new NioEventLoopGroup(),new NioEventLoopGroup())
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            nioSocketChannel.pipeline().addLast(new StringDecoder());
                            nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    System.out.println(msg);
                                }
                            });
                        }
                    })
                    .bind(8080);
        }

    4.channel

    channel,对NIO的channel的二次封装,内核段缓冲区的抽象。不管是服务端还是客户端,只要调用channel()方法都能获取当前工作的这条channel。channel无非要注意的点就是它的同步和异步。

    在实际应用中我们要知道在读的时候同步和异步是没有意义的,不可能在读IO的时候还区分同步读或者异步读,只可能是准备好了就读。只有写IO的时候区分同步和异步才是意义。所以在netty体系里很少会去服务端操作channel的同步和异步,一般都是在客户端操作channel的同步和异步。

    4.1.同步

    服务端:

    在服务端让建立连接的时候休眠3秒。

    public static void main(String[] args) {
            //ServerBootstrap,启动器,负责组装netty组件
            new ServerBootstrap()
                    //1.怎样去接收IO?
                    //事件组,事件组里面包含thread和selector,可以理解为netty种用来选择IO的组件
                    .group(new NioEventLoopGroup())
                    //2.接收成什么?
                    //服务器ServerSocketChannel实现,由于上面用的Nio的事件组,所选nio的
                    //除此以外,还支持BIO和特定操作系统的,如Linux的EpollServerSocketChannel
                    .channel(NioServerSocketChannel.class)
                    //3.做什么处理?
                    //支持用责任链模式来对收到的IO进行链式处理
                    .childHandler(new ChannelInitializer() {
                        //连接建立后才会调用初始化方法
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            //指定解码方式
                            nioSocketChannel.pipeline().addLast(new StringDecoder());
                            //ChannelInboundHandlerAdapter接口是netty种让用户自定义handler的接口
                            nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    Thread.sleep(3000);
                                    System.out.println(msg);
                                }
                            });
                        }
                    })
                    //4.绑定监听端口
                    .bind(8080);
        }

    客户端:

    客户端使用channel的sync来进行同步通信,同步模式下在connect建立连接的时候,主线程会同步等待,连接建立后再向下执行。

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    ​
    import java.net.InetSocketAddress;
    ​
    public class HelloCleint {
        public static void main(String[] args) throws InterruptedException {
            new Bootstrap()
                    .group(new NioEventLoopGroup())
                    //用什么进行发送?
                    //可以是BIO,也可以是NIO,也可以是epoll
                    .channel(NioSocketChannel.class)
                    //处理器
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            //指定编码方式
                            nioSocketChannel.pipeline().addLast(new StringEncoder());
                        }
                    })
                    //连接到服务器
                    .connect(new InetSocketAddress("localhost",8080))
                    //同步通信
                    .sync()
                    //代表连接对象
                    .channel()
                    //发送数据
                    .writeAndFlush("hello world");
        }
    }

    4.2.异步

    channel默认处于异步通信模式,connect建立连接的时候,不会同步等待,而是会继续向下执行,由于服务器端延迟了3秒来建立连接,所以客户端发送这条“hello server”发送时,连接并未建立完成,最终效果就是丢包,服务器收不到这条数据。

    public static void main(String[] args) throws InterruptedException {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    //用什么进行发送?
                    //可以是BIO,也可以是NIO,也可以是epoll
                    .channel(NioSocketChannel.class)
                    //处理器
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            //指定编码方式
                            nioSocketChannel.pipeline().addLast(new StringEncoder());
                        }
                    })
                    //连接到服务器
                    .connect(new InetSocketAddress("localhost", 8080));
            //异步
            channelFuture.channel().writeAndFlush("hello world");
        }

    当然,在异步通信上,netty支持了监听器,建立连接完成后,用事件回调的方式触发监听器。利用监听器,可以使得异步通信不丢包:

    //异步
    channelFuture.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture channelFuture) throws Exception {
        channelFuture.channel().writeAndFlush("hello world");
      }
    });

    用监听器发送数据后,在当前业务场景下,即使服务端延迟了三秒才建立连接,但是任然能收到“hello world”这条消息。

    4.3.调试

    EmbeddedChannel是Netty中提供的一种特殊类型的Channel实现,主要用于单元测试。它允许你在测试中模拟输入事件(例如读取数据、写入数据)并检查输出事件(例如读取到的数据)。使用EmbeddedChannel可以在不启动真实的网络连接的情况下测试你的ChannelHandler逻辑。

    代码示例:

    自定义一个handler:

    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    ​
    public class UpperCaseHandler extends SimpleChannelInboundHandler {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) {
            String upperCaseMsg = msg.toUpperCase();
            ctx.writeAndFlush(upperCaseMsg);
        }
    }

    测试:

    import io.netty.channel.embedded.EmbeddedChannel;
    import org.junit.jupiter.api.Test;
    ​
    import static org.junit.jupiter.api.Assertions.assertEquals;
    ​
    public class UpperCaseHandlerTest {
        @Test
        public void testUpperCaseHandler() {
            // 创建EmbeddedChannel,并添加要测试的Handler
            EmbeddedChannel channel = new EmbeddedChannel(new UpperCaseHandler());
    ​
            // 写入一个字符串消息到Channel
            channel.writeInbound("hello");
    ​
            // 读取Channel的输出
            String output = channel.readOutbound();
    ​
            // 验证处理后的消息是否符合预期
            assertEquals("HELLO", output);
    ​
            // 关闭Channel
            channel.finish();
        }
    }

    4.4.关闭

    由于channel的close方法是异步的,所以在关闭资源时会存在风险。比如代码顺序为:

    • close掉channel

    • close掉其它资源

      有可能在close掉其它资源的时候,channel并没有close掉,也就可能出现,channel中还有数据没处理完,其它资源被关掉了,导致数据处理失败的问题。所以更为稳妥的方式是用同步的机制来关闭channel。netty中封装了CloseFuture来同步关闭channel。

      ChannelFuture closeFuture = channelFuture.channel().closeFuture();
      //同步关闭
      closeFuture.sync();

      要注意的是channel停止后如果EventLoopGroup还有其它线程时,程序是不会中止的,想要中止程序,必须再close掉group,EventLoopGroup提供了优雅停机的API——shutdownGracefully,会先停止接收请求,驻留的请求处理完成后,关掉group。

      4.5.为什么要用异步

      我们可以看到channel里面大量的用到了异步,对一个channel的操作,connect是一条线程,write是一条线程,close也是一条线程......

      用异步的方式来处理,不仅不会加快单个IO任务的速度,反而还会略微拉长一个IO的响应时间,但是异步能明显提高吞吐量。

      举个例子,一个病人看病,分为挂号、看病、缴费。取药,同步的方式就是一个医生走完一个病人的所有流程:

      【netty】三万字详解!JAVA高性能通信框架,关于netty,看这一篇就够了

      而异步的方式就是医生分工合作,每个医生单独负责一个项目,这样一个时间段内虽然处理的任务综合是一样的,但是在峰值的吞吐量上,异步是同步的四倍:

      【netty】三万字详解!JAVA高性能通信框架,关于netty,看这一篇就够了

      5.future

      JDK的future是表示一个任务,netty的future是对JDK的future做了二次封装。

      同步:

      public static void main(String[] args) throws Exception {
              NioEventLoopGroup nioEventLoopGroup=new NioEventLoopGroup();
              Future future = nioEventLoopGroup.submit(new Callable() {
                  public String call() throws Exception {
                      Thread.sleep(1000);
                      return "success!";
                  }
              });
              //future的get方法是同步的,同步等待线程返回返回值为止
              System.out.println(future.get());
          }

      异步:

      用监听器实现异步

      public static void main(String[] args) throws Exception {
              NioEventLoopGroup nioEventLoopGroup=new NioEventLoopGroup();
              Future future = nioEventLoopGroup.submit(new Callable() {
                  public String call() throws Exception {
                      Thread.sleep(1000);
                      return "success!";
                  }
              });
              //用监听器来实现异步
              future.addListener(new GenericFutureListener
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]