被蒙在鼓里的高性能 Reactor 的经典模式、多工作线程模式、多Reactor模式

JAVA herman 561浏览
公告:“业余草”微信公众号提供免费CSDN下载服务(只下Java资源),关注业余草微信公众号,添加作者微信:codedq,发送下载链接帮助你免费下载!
本博客日IP超过2000,PV 3000 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:codedq,之前的微信号好友位已满,备注:返现
饿了么大量招人,我内推!Java 方向!薪资不设上限,工作年龄不限!工作地点限魔都,可电话面试!简历,发我微信:codedq
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
视频教程免费领

说起 Reactor,相信大部分人都很陌生。但是在实际开发中你们可能都用到了它。

首先,在一些高性能 NIO 框架中有使用,比如:netty。再比如,Redis 中也有使用。所以说 Reactor 对我们来说是熟悉而又陌生的。

我写了一大堆的 WebFlux 教程,很多人可能还不知道。但其实它的底层也是 Reactor。Reactor 是未来的一种趋势,未来已来,而你还没做好准备。

Reactor 是一种应用在服务器端的开发模式,目的是提高服务端程序的并发能力,其实就是实现了 I/O 多路复用这种 I/O 模型。为了学习它,我们来看一看传统的多线程实现餐厅点餐的方式。

传统 IO 模型

上图,可能是我们去餐馆吃饭经常遇到的场景。这个流程可以归纳如下:

  • 服务员 X 给出菜单,并等待点菜
  • 顾客 Y 查看菜单,并点菜
  • 服务员 X 把菜单交给厨师,厨师照着做菜
  • 厨师 N 做好菜,上餐桌

这个模式,看似不错,但实际上效率并不高。如果餐厅生意越来越好,那么顾客人数就会不断增加,这时服务员就有点处理不过来了。需要老板增加人数了,但是精明的老板发现,每个服务员在服务完客人后,都要去休息一下, 因此老板就说,“你们都别休息了,在旁边待命”。这样可能 10 个服务员也来得及服务 20 个顾客了。这也是“线程池”的方式,通过重用线程来减少线程的创建和销毁时间,从而提高性能。

但是随着生意的继续变好,顾客人数又增加了。老板想到仅仅靠剥削服务员的休息时间也没有办法服务这么多顾客。于是精明的老板又发现每个服务员并不是一直在干活的,大部分时间他们只是站在餐桌旁边等客人点菜。

所以,老板就又变招了。要求各个服务员,客人点菜的时候你们就别傻站着了,先去服务其它客人,有客人点好的时候喊你们再过去。

按照这个模式,运行了两天,老板发现根本就不需要那么多的服务员,于是裁了一波员,甚至可以只有一个服务员。最终可以用下图进行归纳总结。

改进的 IO 模型

上面这个图就非常的符合 Reactor 模式的核心思想:减少等待。当遇到需要等待 IO 时,先释放资源,而在 IO 完成时,再通过事件驱动 (event driven) 的方式,继续接下来的处理。从整体上减少了资源的消耗。

在 Java 中 Reactor 又分为 3 种模式:经典模式、多工作线程模式、多 Reactor 模式。下面我们先来说第一种 Reactor 经典模式。

Reactor 经典模式

Reactor 经典模式又称为单线程模式。这种模式的特点就是,当多个 client 连接时,有一个专门的 ServerSocketChannel 负责 OP_ACCEPT 事件。对于具体的 I/O 操作,分配给另外一个 ServerSocketChannel 的 OP_READ 事件。简单的实现代码如下:

Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
  Set<SelectionKey> keys = selector.selectedKeys();
  Iterator<SelectionKey> iterator = keys.iterator();
  while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    iterator.remove();
    if (key.isAcceptable()) {
      ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
      SocketChannel socketChannel = acceptServerSocketChannel.accept();
      socketChannel.configureBlocking(false);
      LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
      socketChannel.register(selector, SelectionKey.OP_READ);
    } else if (key.isReadable()) {
      SocketChannel socketChannel = (SocketChannel) key.channel();
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      int count = socketChannel.read(buffer);
      if (count <= 0) {
        socketChannel.close();
        key.cancel();
        continue;
      }
    }
    keys.remove(key);
  }
}

这种单线程模式,有一个问题,不知道细心的网友有没有发现。这个 selector.select() 是阻塞的,当有至少一个通道可用时该方法返回可用通道个数。同时该方法只捕获 Channel 注册时指定的所关注的事件。所以,效率还有提升空间。

实际上,在多核 CPU 的时代,它并不能重复的利用 CPU 的多核优势,所以,这种模式用的不多。于是,基于多线程模式的 Reactor 就出现了。

Reactor 多线程模式

这种模式就是在 read 的时候,利用多线程模型。而 acceptor 还是有一个线程来负责。具体的简化代码如下所示:

Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
  if(selector.selectNow() < 0) {
    continue;
  }
  Set<SelectionKey> keys = selector.selectedKeys();
  Iterator<SelectionKey> iterator = keys.iterator();
  while(iterator.hasNext()) {
    SelectionKey key = iterator.next();
    iterator.remove();
    if (key.isAcceptable()) {
      ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
      SocketChannel socketChannel = acceptServerSocketChannel.accept();
      socketChannel.configureBlocking(false);
      SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
      readKey.attach(new Processor());
    } else if (key.isReadable()) {
      Processor processor = (Processor) key.attachment();
      processor.process(key);
    }
  }
}

具体的读请求处理在如下所示的 Processor 类中。该类中设置了一个静态的线程池处理所有请求。而 process 方法并不直接处理 I/O 请求, 而是把该 I/O 操作提交给上述线程池去处理,这样就充分利用了多线程的优势,同时将对新连接的处理和读/写操作的处理放在了不同的线程中, 读/写操作不再阻塞对新连接请求的处理。

public class Processor {
  private static final ExecutorService service = Executors.newFixedThreadPool(16);
  public void process(SelectionKey selectionKey) {
    service.submit(() -> {
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
      int count = socketChannel.read(buffer);
      if (count < 0) {
        socketChannel.close();
        selectionKey.cancel();
        return null;
      } else if(count == 0) {
        return null;
      }
      return null;
    });
  }
}

上面的模式看似完美,但实际上还有改进空间。

多 Reactor 模式

其实,我们可以将 Reactor 分成两部分,一部分时 mainReactor,一部分是 subReactor,这就是多 Reactor 模式。

mainReactor 负责监听并 accept 新连接,然后将建立的 socket 通过多路复用器(Acceptor)分派给 subReactor。subReactor 负责多路分离已连接的 socket,读写网络数据;业务处理功能,其交给 worker 线程池完成。通常,subReactor 个数上可与 CPU 个数等同。多 Reactor 示例代码如下所示:

Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
int coreNum = Runtime.getRuntime().availableProcessors();
Processor[] processors = new Processor[coreNum];
for (int i = 0; i < processors.length; i++) {
  processors[i] = new Processor();
}
int index = 0;
while (selector.select() > 0) {
  Set<SelectionKey> keys = selector.selectedKeys();
  for (SelectionKey key : keys) {
    keys.remove(key);
    if (key.isAcceptable()) {
      ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
      SocketChannel socketChannel = acceptServerSocketChannel.accept();
      socketChannel.configureBlocking(false);
      Processor processor = processors[(int) ((index++) % coreNum)];
      processor.addChannel(socketChannel);
      processor.wakeup();
    }
  }
}

Processor 的相关代码如下所示:

public class Processor {
  private static final ExecutorService service =
      Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
  private Selector selector;
  public Processor() throws IOException {
    this.selector = SelectorProvider.provider().openSelector();
    start();
  }
  public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }
  public void wakeup() {
    this.selector.wakeup();
  }
  public void start() {
    service.submit(() -> {
      while (true) {
        if (selector.select(500) <= 0) {
          continue;
        }
        Set<SelectionKey> keys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
          SelectionKey key = iterator.next();
          iterator.remove();
          if (key.isReadable()) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            SocketChannel socketChannel = (SocketChannel) key.channel();
            int count = socketChannel.read(buffer);
            if (count < 0) {
              socketChannel.close();
              key.cancel();
              continue;
            } else if (count == 0) {
              continue;
            }
          }
        }
      }
    });
  }
}

这个多 Reactor 的模式其实就是 Netty 的模式,模仿 Netty 的关键代码。

业余草公众号

最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加作者微信号1:xmtxtt(5000人已满),微信号2:xttblog(5000人已满),微信号3:codedq(超3800)。备注:“1”,添加博主微信拉你进微信群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作也可添加作者微信进行联系!

本文原文出处:业余草: » 被蒙在鼓里的高性能 Reactor 的经典模式、多工作线程模式、多Reactor模式