您的位置:首页 >资讯>快报

Reactor 模式与Tomcat中的Reactor

2023-06-23 20:04:41 来源:博客园 收藏

系列文章目录和关于我


(资料图)

参考:[nio.pdf (oswego.edu)](https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)
一丶什么是Reactor

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

Reactor模式是一种用于处理高并发的设计模式,也被称为事件驱动模式。在这种模式中,应用程序会将输入事件交给一个事件处理器,称为Reactor,Reactor会监听所有输入事件,并将它们分发给相应的处理程序进行处理。这种模式可以大大提高应用程序的性能和可扩展性,因为它使用了非阻塞I/O和异步处理技术,使得一个进程可以同时处理多个事件,而不会因为某个事件的处理时间过长而影响其他事件的处理。Reactor模式被广泛应用于网络编程和操作系统级别的事件驱动程序。

二丶为什么需要Reactor1.传统BIO

在传统BIO模式中有多少个客户端请求,就需要多少个对于的线程进行一对一的处理。

这种模型有如下缺点:

同步阻塞IO,读写阻塞,大量线程挂起指定线程数的时候,只能依据系统的cpu核心数,无法根据并发请求数来指定。大量线程导致上下文切换开销大,线程占用内存大。2.NIO

Java NIO 带来非阻塞IO,和IO多路复用。

得益于非阻塞IO和IO多路复用,让服务可以处理更多的并发请,不再受限于一个客户端一个线程来处理,而是一个线程可以维护多个客户端。

可以看到java 中NIO有点reactor的意思:

Selector多路复用器监听IO事件进行分发,针对连接事件,读写事件进行不同的处理。

Reactor核心是Reactor加上对应的处理器Handler,Reactor在一个单独的线程中运行,负责监听和分发事件,将接收到的事件交给不同的Handler来处理,Handler是处理程序执行I/O事件的实际操作。

高并发:Reactor模式可以在同一时间内处理大量的客户端请求,提高了系统的并发处理能力。得益于Java NIO 非阻塞IO 于 IO多路复用。可扩展性:Reactor模式可以很容易地扩展到更多的处理器,以满足更高的并发量。编码简单:Reactor模式可以使编码更加简单明了,因为它将不同的事件分离开来处理,降低了代码的复杂度。例如Netty就使用了Reactor模式,程序员只需要写如何处理事件效率高:Reactor模式采用非阻塞I/O和异步处理技术,可以使得一个进程可以同时处理多个事件,而不会因为某个事件的处理时间过长而影响其他事件的处理,从而提高了系统的效率。可移植性好:Reactor模式可以很方便地移植到不同的平台上,因为它遵循了标准的Java NIO接口,可以在不同的操作系统上实现。三丶Reactor模型于简单代码实现1.单Reactor单线程模型

这个模型诠释了Reactor模式的组成部分:

Reactor 负责分离套接字,对于触发connect的io事件交给Acceptor处理,对于IO读写事件交给Handler处理Acceptor负责创建Handler,将Handler和socketChannel进行绑定,当socketChannel读事件触发后,Reactor进行分发给对应Handler处理。
public class Reactor implements Runnable {        //多路复用器    final Selector selector;//服务端Channel    final ServerSocketChannel serverSocket;    Reactor(int port) throws IOException {        selector = Selector.open();        serverSocket = ServerSocketChannel.open();        serverSocket.socket().bind(                new InetSocketAddress(port));        serverSocket.configureBlocking(false);        // 注册io多路复用器连接事件        SelectionKey sk =                serverSocket.register(selector,                        SelectionKey.OP_ACCEPT);        // 将服务端Channel 关联一个Acceptor        sk.attach(new Acceptor());    }    @Override    public void run() {        try {            while (!Thread.interrupted()) {                selector.select();                Set selected = selector.selectedKeys();                Iterator it = selected.iterator();                while (it.hasNext())                    //分发                    dispatch(it.next());                selected.clear();            }        } catch (IOException ex) { /* ... */ }    }    void dispatch(SelectionKey k) {        // 拿到关联的acceptor 或者handler        Runnable r = (Runnable) (k.attachment());        if (r != null)            r.run();    }    //内部类 负责处理连接事件    class Acceptor implements Runnable {        public void run() {            try {                // 拿到Channel                SocketChannel c = serverSocket.accept();                if (c != null)                    // 创建handler                    new Handler(selector, c);            } catch (IOException ex) { /* ... */ }        }    }    final class Handler implements Runnable {        final SocketChannel socket;        final SelectionKey sk;        ByteBuffer input = ByteBuffer.allocate(1024);        ByteBuffer output = ByteBuffer.allocate(1024);        static final int READING = 0, SENDING = 1;        int state = READING;        //设置非阻塞        //监听可读事件        Handler(Selector sel, SocketChannel c)                throws IOException {            socket = c;            c.configureBlocking(false);            sk = socket.register(sel, 0);            sk.attach(this);            sk.interestOps(SelectionKey.OP_READ);            sel.wakeup();        }        boolean inputIsComplete() {            return false;        }        boolean outputIsComplete() {            return false;        }        void process() {        }        public void run() {            try {                //如果可读                if (state == READING) read();//如果可写                else if (state == SENDING) send();            } catch (IOException ex) { /* ... */ }        }        void read() throws IOException {            socket.read(input);            if (inputIsComplete()) {                process();                state = SENDING;                sk.interestOps(SelectionKey.OP_WRITE);            }        }        void send() throws IOException {            socket.write(output);            if (outputIsComplete()) sk.cancel();        }    }}

可以看到Reactor模式将Channel和Acceptor,Handler进行绑定依赖于SelectionKey#attach方法,通过此方法在不同的事件发生时调用SelectionKey#attachment方法,获取到对应的处理程序进行处理。

Reactor由单线程运行,通过IO多路复用Selector监听多个事件是否就绪,得益于Channel提供的非阻塞IO能力,当IO没有就绪的时候,单线程不会阻塞而是继续处理下一个。

由于其单线程的原因,无法利用计算机多核心资源,并且如果读取请求内容处理的过程存在耗时操作(比如数据库,rpc等)那么回导致下一个事件得不到快速的响应。

2.单Reactor多线程模型

引入多线程解决单线程Reactor的不足

可以看到多线程模型引入了线程池,对于就绪的可读,可写IO事件交给线程池进行处理。

主要是对单线程模型中的Handler进行改造,将处理逻辑提交到线程池中。

多线程模型涉及到共享资源的使用,不如读写Channel依赖的Buffer如何分配。

可以看到多线程模型的缺点:线程通信和同步逻辑复杂,需要处理多线程安全问题。

3.多Reactor多线程模型

在这种模型中,mainReactor负责处理连接建立事件,只需要一个线程即可。subReactor负责和建立连接的socket进行数据交互并处理业务逻辑,并且每一个subReactor可持有一个独立的Selector进行IO多路复用事件监听。

// SubReactor 池子,负责负载均衡的选择SubReactorpublic class SubReactorPool {    final static SubReactor[] subReactors;    static final AtomicInteger count = new AtomicInteger();    static {        int availableProcessors = Runtime.getRuntime().availableProcessors();        subReactors = new SubReactor[availableProcessors];        for (int i = 0; i < subReactors.length; i++) {            subReactors[i] = new SubReactor();        }    }    static class SubReactor implements Runnable{        // 业务处理线程池        final static Executor poolExecutor = Executors.newCachedThreadPool();// io多路复用        Selector selector;        SubReactor()  {            try {                selector = Selector.open();            } catch (IOException e) {                throw new RuntimeException(e);            }        }        public void registry(SocketChannel socketChannel) throws ClosedChannelException {            socketChannel.register(selector,SelectionKey.OP_READ);        }        @Override        public void run() {            while (true){                try {                    selector.select();                } catch (IOException e) {                    throw new RuntimeException(e);                }                Set selectionKeys = selector.selectedKeys();                Iterator iterator = selectionKeys.iterator();                while (iterator.hasNext()){                    SelectionKey sk = iterator.next();                    if (sk.isReadable()) {                        poolExecutor.execute(()->new Handler(sk));                    }                    // 可写,。。。。                    iterator.remove();                }            }        }    }        //选择合适的SubReactor    static SubReactor loadBalanceChoose(SocketChannel socketChannel){        int countInt = count.getAndAdd(1);        return subReactors[countInt % subReactors.length];    }}

多Reactor解决了单个Selector注册连接,读写事件,导致内核轮询的时候需要判断太多fd而效率缓慢的问题。

四丶Tomcat中Reactor

在Tomcat请求处理流程与源码浅析 - Cuzzz - 博客园 (cnblogs.com)中,说到Tomcat Connector的设计

其中

Endpoint:tomcat中没有这个接口,只有AbstractEndpoint,它负责启动线程来监听服务器端口,并且在接受到数据后交给Processor处理Processor:Processor读取到客户端请求后按照请求地址映射到具体的容器进行处理,这个过程请求映射,Processor实现请求映射依赖于Mapper对象,在容器发生注册和注销的时候,MapperListener会监听到对应的事件,从而来变更Mapper中维护的请求映射信息。ProtocolHandler:协议处理器,针对不同的IO方式(NIO,BIO等)和不同的协议(Http,AJP)具备不同的实现,ProtocolHandler包含一个Endpoint来开启端口监听,并且包含一个Processor用于按照协议读取数据并将请求交给容器处理。Acceptor:Acceptor实现了Runnable接口,可以作为一个线程启动,使用Socket API监听指定端口,用于接收用户请求。Poller:主要用于监测注册在原始 scoket 上的事件是否发生,Acceptor接受到请求后,会注册到Poller的队列中。

下图展示了Acceptor 和 Poller的协作

1.Acceptor 等待客户端连接

这一步借助ServerSocketChannel#accept方法,进行等待客户端连接,Acceptor单线程进行监听。

2.Acceptor选择Poller进行注册

这一步设置非阻塞,并且使用计数取模的方式实现多个Poller的负载均衡

然后将事件保证为PollerEvent 提交到Poller的阻塞队列中

3.Poller 轮询阻塞队列中的PollerEvent并注册到Selector上

轮询阻塞队列中的PollerEvent,并且调用run方法,run方法会把事件注册到Poller的Selector上,注意下面的注册将NioSocketWrapper作为attachment进行了绑定

4.Poller中Selector IO多路复用处理事件,并处理事件

tomcat处理事件的时候,会创建出SocketProcessor进行处理,SocketProcessor是一个Runnable,最后会提交到线程池。

关键词:

相关的文章>>

标签:
[系列文章目录和关于我](https: www cnblogs com cuzzz p 16609728 ht[ 查看全文 ]
标签:
【红黑孩子】米兰摄影师告别托纳利:“桑德罗是米兰的核心与灵魂,他是[ 查看全文 ]
标签:
手握20万元想购入一款SUV,很多人都会纠结选唐新能源还是选ID 4X,今天[ 查看全文 ]
标签:
【河南南阳:中心城区买首套新房契税补贴50%多子女家庭再补最多2万元】[ 查看全文 ]
标签:
1、早上7点-8点到南站坐车到溪口45分钟!然后坐残疾车4元到蒋氏故居!大[ 查看全文 ]
标签:
中新网南昌6月21日电(记者吴鹏泉)记者21日从井冈山大学蜘蛛生物学研究[ 查看全文 ]
标签:
龙华区2023年义务教育阶段公办学校小一、初一招生计划序号街道学校名称[ 查看全文 ]
标签:
李沧区围绕“3+2+4”现代产业集群做强园区、突破片区,打造示范区,为[ 查看全文 ]
标签:
1、大千生色,万类蒙滋,敬维座下禅理日畅,福慧时增!2、祈万佛之祥光[ 查看全文 ]
标签:
1、单击技术支持。2、选择Apple网站的技术支持选项。3、选择一个服务计[ 查看全文 ]
标签:
端午文创产品。肖为摄农历五月初五,一年一度的端午佳节又到了。为弘扬[ 查看全文 ]
标签:
各地高考成绩今天起陆续公布按照之前各地公布的高考查分时间,各地高考[ 查看全文 ]
标签:
会有许多以闪亮之名玩家还不知道以闪亮之名服连接网络失败是什么原因详[ 查看全文 ]
标签:
2023年换流阀概念上市公司是哪些?(6月23日),换流阀概念股2023年有:[ 查看全文 ]
标签:
21日上午,“寺街印象”调研共建项目签约暨“碧血丹心——寺街英烈事迹[ 查看全文 ]
标签:
中国藏族网通讯6月21日,环青海湖自行车联赛总决赛正式开放报名通道,[ 查看全文 ]
标签:
6月23日广东省教育考试院发布消息称2023年广东普通高考评卷工作已经完[ 查看全文 ]
标签:
海南省商务厅副厅长岳阳表示,今年1-5月,全省社会消费品零售总额1031 [ 查看全文 ]
标签:
湘潭在线6月21日讯(湘潭日报社全媒体记者谭涛通讯员欧阳佳)近日,岳[ 查看全文 ]
标签:
6月21日消息,来自巴克莱分析师的研报显示,第四代iPhoneSE不会在2024[ 查看全文 ]

热门标签

热门搜索:

春季养生 健康问答 资讯

资讯

更多
  • 「红黑孩子」米兰...

  • 即时看!唐新能源...

  • 奉化溪口旅游攻略...

  • 科研人员发现一种...

  • 深圳龙华区2023年...

  • 各地高考成绩今天...

图说健康

更多

体育健身

更多