在SpringBoot中,如何使用Netty實現遠程調用方法總結

Netty

Netty是一個NIO客戶端服務器框架:

  • 它可快速輕松地開發網絡應用程序,例如協議服務器和客戶端。
  • 它極大地簡化和簡化瞭網絡編程,例如TCP和UDP套接字服務器。

NIO是一種非阻塞IO ,它具有以下的特點

  • 單線程可以連接多個客戶端。
  • 選擇器可以實現單線程管理多個Channel,新建的通道都要向選擇器註冊。
  • 一個SelectionKey鍵表示瞭一個特定的通道對象和一個特定的選擇器對象之間的註冊關系。
  • selector進行select()操作可能會產生阻塞,但是可以設置阻塞時間,並且可以用wakeup()喚醒selector,所以NIO是非阻塞IO。

Netty模型selector模式

它相對普通NIO的在性能上有瞭提升,采用瞭:

  • NIO采用多線程的方式可以同時使用多個selector
  • 通過綁定多個端口的方式,使得一個selector可以同時註冊多個ServerSocketServer
  • 單個線程下隻能有一個selector,用來實現Channel的匹配及復用

image

半包問題

TCP/IP在發送消息的時候,可能會拆包,這就導致接收端無法知道什麼時候收到的數據是一個完整的數據。在傳統的BIO中在讀取不到數據時會發生阻塞,但是NIO不會。為瞭解決NIO的半包問題,Netty在Selector模型的基礎上,提出瞭reactor模式,從而解決客戶端請求在服務端不完整的問題。

netty模型reactor模式

在selector的基礎上解決瞭半包問題。

image

上圖,簡單地可以描述為”boss接活,讓work幹”:manReactor用來接收請求(會與客戶端進行握手驗證),而subReactor用來處理請求(不與客戶端直接連接)。

SpringBoot使用Netty實現遠程調用

maven依賴

<!--lombok-->
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <version>1.18.2</version>
  <optional>true</optional>
</dependency>

<!--netty-->
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.17.Final</version>
</dependency>

服務端部分

NettyServer.java:服務啟動監聽器

@Slf4j
public class NettyServer {
    public void start() {
        InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8082);
        //new 一個主線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //new 一個工作線程組
        EventLoopGroup workGroup = new NioEventLoopGroup(200);
        ServerBootstrap bootstrap = new ServerBootstrap()
                .group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ServerChannelInitializer())
                .localAddress(socketAddress)
                //設置隊列大小
                .option(ChannelOption.SO_BACKLOG, 1024)
                // 兩小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文
                .childOption(ChannelOption.SO_KEEPALIVE, true);
        //綁定端口,開始接收進來的連接
        try {
            ChannelFuture future = bootstrap.bind(socketAddress).sync();
            log.info("服務器啟動開始監聽端口: {}", socketAddress.getPort());
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("服務器開啟失敗", e);
        } finally {
            //關閉主線程組
            bossGroup.shutdownGracefully();
            //關閉工作線程組
            workGroup.shutdownGracefully();
        }
    }
}

ServerChannelInitializer.java:netty服務初始化器

/**
* netty服務初始化器
**/
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //添加編解碼
        socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
        socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
        socketChannel.pipeline().addLast(new NettyServerHandler());
    }
}

NettyServerHandler.java:netty服務端處理器

/**
* netty服務端處理器
**/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 客戶端連接會觸發
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel active......");
    }

    /**
     * 客戶端發消息會觸發
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("服務器收到消息: {}", msg.toString());
        ctx.write("你也好哦");
        ctx.flush();
    }

    /**
     * 發生異常觸發
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

RpcServerApp.java:SpringBoot啟動類

/**
* 啟動類
*
*/
@Slf4j
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class RpcServerApp extends SpringBootServletInitializer {
    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
        return application.sources(RpcServerApp.class);
    }

    /**
     * 項目的啟動方法
     *
     * @param args
     */
    public static void main(String[] args) {
        SpringApplication.run(RpcServerApp.class, args);
        //開啟Netty服務
        NettyServer nettyServer =new  NettyServer ();
        nettyServer.start();
        log.info("======服務已經啟動========");
    }
}

客戶端部分

NettyClientUtil.java:NettyClient工具類

/**
* Netty客戶端
**/
@Slf4j
public class NettyClientUtil {

    public static ResponseResult helloNetty(String msg) {
        NettyClientHandler nettyClientHandler = new NettyClientHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap()
                .group(group)
                //該參數的作用就是禁止使用Nagle算法,使用於小數據即時傳輸
                .option(ChannelOption.TCP_NODELAY, true)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast("decoder", new StringDecoder());
                        socketChannel.pipeline().addLast("encoder", new StringEncoder());
                        socketChannel.pipeline().addLast(nettyClientHandler);
                    }
                });
        try {
            ChannelFuture future = bootstrap.connect("127.0.0.1", 8082).sync();
            log.info("客戶端發送成功....");
            //發送消息
            future.channel().writeAndFlush(msg);
            // 等待連接被關閉
            future.channel().closeFuture().sync();
            return nettyClientHandler.getResponseResult();
        } catch (Exception e) {
            log.error("客戶端Netty失敗", e);
            throw new BusinessException(CouponTypeEnum.OPERATE_ERROR);
        } finally {
            //以一種優雅的方式進行線程退出
            group.shutdownGracefully();
        }
    }
}

NettyClientHandler.java:客戶端處理器

/**
* 客戶端處理器
**/
@Slf4j
@Setter
@Getter
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    private ResponseResult responseResult;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客戶端Active .....");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("客戶端收到消息: {}", msg.toString());
        this.responseResult = ResponseResult.success(msg.toString(), CouponTypeEnum.OPERATE_SUCCESS.getCouponTypeDesc());
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

驗證

測試接口

@RestController
@Slf4j
public class UserController {

    @PostMapping("/helloNetty")
    @MethodLogPrint
    public ResponseResult helloNetty(@RequestParam String msg) {
        return NettyClientUtil.helloNetty(msg);
    }
}

訪問測試接口

image

服務端打印信息

image

客戶端打印信息

image

到此這篇關於在SpringBoot中,如何使用Netty實現遠程調用方法總結的文章就介紹到這瞭,更多相關Netty實現遠程調用內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: