本文主要是介绍Reactor 网络模型、Java代码实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 1. 概述
- 2. Reactor 单线程模型
- 2.1 ByteBufferUtil
- 2.2 服务端代码
- 2.3 客户端
- 2.4 运行截图
- 3. Reactor多线程模型
- 3.1 服务端代码
- 3.2 运行截图
- 4. 主从 Reactor多线程模型
- 4.1 服务端代码
- 4.2 运行截图
- 参考文献
1. 概述
在 I/O 多路复用的场景下,当有数据处于就绪状态后,需要一个事件分发器(Event Dispather),它负责将读写事件分发给对应的读写事件处理器(Event Handler)。
Reactor 模型主要分为三种
- Reactor 单线程模型
- Reactor 多线程模型
- 主从 Reactor 多线程模型
Doug Lea 教授的课件 : https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
Java Socket 网络编程实例(阻塞IO、非阻塞IO、多路复用Selector、AIO)
2. Reactor 单线程模型
Reactor 单线程模型,是指所有I/O操作(监听服务端, 接受客户端连接请求;消息的读取、解码、编码、发送)都在同一个NIO线程上面完成
2.1 ByteBufferUtil
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;/*** ByteBufferUtil类提供了ByteBuffer和String之间转换的便捷方法。* 这些方法使用UTF-8编码进行转换,确保了数据的正确性和一致性。*/
public class ByteBufferUtil {/*** 从ByteBuffer中读取字符串。** @param byteBuffer 待读取的ByteBuffer,应确保其为读模式。* @return 从ByteBuffer解码得到的字符串。* @throws CharacterCodingException 如果解码过程中发生错误。*/public static String read(ByteBuffer byteBuffer) throws CharacterCodingException {// 使用UTF-8解码器将ByteBuffer中的字节解码为CharBuffer。CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBuffer);// 将CharBuffer转换为字符串并返回。return charBuffer.toString();}/*** 将字符串写入ByteBuffer。** @param string 待写入的字符串。* @return 编码后的ByteBuffer。* @throws CharacterCodingException 如果编码过程中发生错误。*/public static ByteBuffer read(String string) throws CharacterCodingException {// 使用UTF-8编码器将字符串编码为ByteBuffer。return StandardCharsets.UTF_8.encode(string);}/*** 主函数用于演示ByteBuffer和字符串之间的相互转换。** @param args 命令行参数。* @throws CharacterCodingException 如果编码或解码过程中发生错误。*/public static void main(String[] args) throws CharacterCodingException {// 将字符串"test"编码为ByteBuffer,然后从ByteBuffer解码回字符串并打印。System.out.println(ByteBufferUtil.read(ByteBufferUtil.read("test")));}}
2.2 服务端代码
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.util.Set;public class Reactor implements Runnable {final Selector selector;final ServerSocketChannel serverSocket;/*** 初始化Reactor,打开选择器和服务器套接字通道,并注册接受操作。** @param port 服务器监听的端口号。* @throws IOException 如果打开选择器或服务器套接字通道失败。*/public Reactor(int port) throws IOException {selector = Selector.open();serverSocket = ServerSocketChannel.open();serverSocket.socket().bind(new InetSocketAddress(port));serverSocket.configureBlocking(false);SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);sk.attach(new Acceptor());}/*** Reactor的主要运行方法,负责循环监听选择器上的事件,并分派处理。*/public void run() {try {while (!Thread.interrupted()) {selector.select();Set<SelectionKey> selected = selector.selectedKeys();System.out.println("selected:" + selected.size());for (SelectionKey selectionKey : selected) {dispatch(selectionKey);}selected.clear();}} catch (IOException ex) {ex.printStackTrace();}}/*** 根据选择键分派相应的处理逻辑。** @param k 选择键。* @throws IOException 如果发生I/O错误。*/void dispatch(SelectionKey k) throws IOException {Run r = (Run) (k.attachment());if (r != null)r.run();}/*** 接受者类,负责接受新的客户端连接。*/class Acceptor implements Run { // innerpublic void run() {try {SocketChannel c = serverSocket.accept();if (c != null)new Handler(selector, c);} catch (IOException ex) {ex.printStackTrace();}}}/*** 处理者类,负责处理客户端的读写操作。*/final class Handler implements Run {final SocketChannel socket;final SelectionKey sk;ByteBuffer input = ByteBuffer.allocate(1024);ByteBuffer output = ByteBuffer.allocate(1024);/*** 初始化处理者,注册读操作兴趣。** @param sel 选择器。* @param c 客户端套接字通道。* @throws IOException 如果注册操作失败。*/Handler(Selector sel, SocketChannel c)throws IOException {socket = c;c.configureBlocking(false);sk = socket.register(sel, 0);sk.attach(this);sk.interestOps(SelectionKey.OP_READ);sel.wakeup();}/*** 检查输入缓冲区是否已完成读取。** @return 如果输入缓冲区还有剩余数据,则返回true;否则返回false。*/boolean inputIsComplete() {return input.hasRemaining();}/*** 检查输出缓冲区是否已完成写入。** @return 如果输出缓冲区没有剩余空间,则返回true;否则返回false。*/boolean outputIsComplete() {return !output.hasRemaining();}/*** 处理输入缓冲区的数据。** @throws CharacterCodingException 如果字符编码转换失败。*/void process() throws CharacterCodingException {// 否则,将缓冲区反转并打印读取的数据input.flip();String request = ByteBufferUtil.read(input);System.out.println(request);input.clear();output = ByteBufferUtil.read("你好: " + request);}/*** 执行处理逻辑,包括读取数据、处理数据和准备写操作。** @throws IOException 如果发生I/O错误。*/public void run() throws IOException {socket.read(input);if (inputIsComplete()) {process();sk.attach(new Sender());sk.interestOps(SelectionKey.OP_WRITE);sk.selector().wakeup();}}/*** 发送者类,负责将处理后的数据写回客户端。*/class Sender implements Run {public void run() throws IOException {socket.write(output);if (outputIsComplete()) {new Handler(selector, socket);}}}}/*** 接口Run定义了所有处理逻辑的运行方法。*/public interface Run {public abstract void run() throws IOException;}/*** 程序入口点,创建并启动Reactor线程。** @param args 命令行参数。* @throws IOException 如果创建Reactor失败。*/public static void main(String[] args) throws IOException {Reactor reactor = new Reactor(6666);new Thread(reactor).start();while (true) ;}
}
2.3 客户端
import org.apache.commons.lang3.StringUtils;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;public class SelectorClient {public static void main(String[] args) throws IOException, InterruptedException {// 创建Socket通道并连接到服务器SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost", 6666));// 初始化输入和输出ByteBufferByteBuffer inputBuffer = ByteBuffer.allocate(512);ByteBuffer serverOutput = ByteBuffer.allocate(512);// 循环接收用户输入并发送给服务器while (true) {// 使用Scanner获取用户输入Scanner in = new Scanner(System.in);String input = in.nextLine();System.out.println("user input: " + input);if (StringUtils.isBlank(input)) {continue;}// 清空输入缓冲区,放入用户输入,然后反转准备写入inputBuffer.clear();inputBuffer.put(input.getBytes(StandardCharsets.UTF_8));inputBuffer.flip();// 将输入数据写入Socket通道sc.write(inputBuffer);System.out.println("send to server " + input);// 循环读取服务器响应int times = 1;while (true) {// 清空服务器响应缓冲区,准备读取数据serverOutput.clear();// 从Socket通道读取数据sc.read(serverOutput);// 如果没有读取到数据,继续尝试读取if (!serverOutput.hasRemaining()) {TimeUnit.SECONDS.sleep(1);times++;System.out.println(times);if (times > 10) {break;}continue;}// 反转缓冲区,读取数据并打印serverOutput.flip();System.out.println("server response " + ByteBufferUtil.read(serverOutput));// 读取完成后退出内层循环break;}}}
}
2.4 运行截图
3. Reactor多线程模型
Reactor多线程模型 和 Reactor单线程模型最大的区别就是有一组NIO线程来处理I/O操作:
- 有一个NIO线程 Acceptor线程,监听服务端, 接受客户端连接请求
- 网络I/O操作,读写(消息的读取、解码、编码、发送)等由一个NIO线程池负责
- 一个NIO线程可以处理N条链路, 一个链路之对应一个NIO线程, 防止出现并发问题
3.1 服务端代码
服务端端代码如下,客户端同上:
import lombok.SneakyThrows;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 该类实现了使用线程池处理NIO服务器的逻辑。*/
public class ReactorWithThreadPool implements Runnable {/*** 处理器线程池,用于执行具体的处理任务。*/static ThreadPoolExecutor HANDLER_POOL = new ThreadPoolExecutor(2, 4,10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(20), new ThreadPoolExecutor.CallerRunsPolicy());final Selector selector;final ServerSocketChannel serverSocket;/*** 创建一个NIO服务器,监听指定端口。** @param port 服务器监听的端口号。* @throws IOException 如果打开选择器或服务器套接字失败。*/public ReactorWithThreadPool(int port) throws IOException {selector = Selector.open();serverSocket = ServerSocketChannel.open();serverSocket.socket().bind(new InetSocketAddress(port));serverSocket.configureBlocking(false);SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);sk.attach(new Acceptor());}/*** 主循环,负责监听选择器上的事件。*/public void run() {try {while (!Thread.interrupted()) {selector.select();Set<SelectionKey> selected = selector.selectedKeys();System.out.println("selected:" + selected.size());for (SelectionKey selectionKey : selected) {if (selectionKey.isReadable()) {System.out.println("selectionKey read");}if (selectionKey.isWritable()) {System.out.println("selectionKey write");}dispatch(selectionKey);}selected.clear();}} catch (IOException ex) {ex.printStackTrace();}}/*** 分派选择键对应的处理程序。** @param k 需要处理的选择键。* @throws IOException 如果操作通道失败。*/void dispatch(SelectionKey k) throws IOException {Run r = (Run) (k.attachment());if (r != null)r.run();}/*** 接受者处理程序,负责接受新的客户端连接。*/class Acceptor implements Run { // innerpublic void run() {try {SocketChannel c = serverSocket.accept();if (c != null)new Handler(selector, c);} catch (IOException ex) {ex.printStackTrace();}}}/*** 处理客户端请求的处理程序。*/final class Handler implements Run {final SocketChannel socket;final SelectionKey sk;ByteBuffer input = ByteBuffer.allocate(1024);ByteBuffer output = ByteBuffer.allocate(1024);/*** 创建一个新的处理程序实例。** @param sel 选择器。* @param c 客户端套接字通道。* @throws IOException 如果注册选择键或配置套接字失败。*/Handler(Selector sel, SocketChannel c)throws IOException {socket = c;c.configureBlocking(false);sk = socket.register(sel, 0);sk.attach(this);sk.interestOps(SelectionKey.OP_READ);sel.wakeup();}/*** 检查输入缓冲区是否已完成读取。** @return 如果输入缓冲区还有剩余,则为true;否则为false。*/boolean inputIsComplete() {return input.hasRemaining();}/*** 检查输出缓冲区是否已完成发送。** @return 如果输出缓冲区没有剩余,则为true;否则为false。*/boolean outputIsComplete() {return !output.hasRemaining();}/*** 处理输入数据。** @throws CharacterCodingException 如果字符编码失败。*/void process() throws CharacterCodingException {// 否则,将缓冲区反转并打印读取的数据input.flip();String request = ByteBufferUtil.read(input);System.out.println(request);input.clear();output = ByteBufferUtil.read("你好: " + request);}/*** 读取客户端输入,并根据情况启动处理程序或发送器。** @throws IOException 如果读取通道失败。*/public void run() throws IOException {socket.read(input);if (inputIsComplete()) {HANDLER_POOL.execute(new Processor());}}/*** 发送器处理程序,负责向客户端发送数据。*/class Sender implements Run {public void run() throws IOException {socket.write(output);if (outputIsComplete()) {new Handler(selector, socket);}}}/*** 处理请求的处理程序,负责处理输入数据并准备输出。*/class Processor implements Runnable {@Override@SneakyThrowspublic void run() {process();sk.attach(new Sender());sk.interestOps(SelectionKey.OP_WRITE);sk.selector().wakeup();}}}/*** 处理器接口,定义了处理程序应实现的运行方法。*/public interface Run {public abstract void run() throws IOException;}/*** 程序入口点。** @param args 命令行参数。* @throws IOException 如果启动服务器失败。*/public static void main(String[] args) throws IOException {ReactorWithThreadPool reactor = new ReactorWithThreadPool(6666);new Thread(reactor).start();while (true) ;}
}
3.2 运行截图
4. 主从 Reactor多线程模型
主从 Reactor多线程模型的特点:服务端接受客户端连接,不再是一个单独的NIO线程,而是一个独立的NIO线程池。
Acceptor 接收到客户端TCP连接请求并处理完成后, 将新创建的SocketChannel 注册到 I/O线程池 (sub Reactor)。
Acceptor线程池仅负责客户端的登陆、握手、安全认证, 一旦链路建立成功, 就将链路注册到 I/O线程池 (sub Reactor), I/O线程池 (sub Reactor)负责后续的 I/O操作
4.1 服务端代码
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.SneakyThrows;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class MultiReactor implements Runnable {Selector selector = null;ServerSocketChannel serverSocket;static ThreadPoolExecutor REACTOR_THREAD_POOL = new ThreadPoolExecutor(2, 16,10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(20),new ThreadFactoryBuilder().setNameFormat("REACTOR_THREAD_POOL-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());/*** 构造函数,初始化多线程反应器。** @param port 服务器监听端口。* @throws IOException 如果打开selector或服务器SocketChannel时发生错误。*/public MultiReactor(int port) throws IOException {selector = Selector.open();serverSocket = ServerSocketChannel.open();serverSocket.socket().bind(new InetSocketAddress(port));serverSocket.configureBlocking(false);SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);sk.attach(new Acceptor(serverSocket));}/*** 构造函数,使用已有的selector。** @param selector 已打开的selector。*/public MultiReactor(Selector selector) throws IOException {this.selector = selector;}/*** 主运行方法,负责监听和分发事件。*/@Overridepublic void run() {try {while (!Thread.interrupted()) {System.out.println(Thread.currentThread() + " select start");selector.select();Set<SelectionKey> selected = selector.selectedKeys();System.out.println(Thread.currentThread() + " " + "selected:" + selected.size());for (SelectionKey selectionKey : selected) {if (selectionKey.isReadable()) {System.out.println("selectionKey read");}if (selectionKey.isWritable()) {System.out.println("selectionKey write");}dispatch(selectionKey);}selected.clear();}} catch (IOException ex) {ex.printStackTrace();}}/*** 分发已选择的事件到相应的处理程序。** @param k 选择的关键。* @throws IOException 如果操作通道时发生错误。*/void dispatch(SelectionKey k) throws IOException {Run r = (Run) (k.attachment());if (r != null)r.run();}/*** Acceptor类负责接受新的客户端连接,并将它们分配给子反应器处理。*/class Acceptor implements Run {private final ServerSocketChannel listenSocketChannel;private final List<MultiReactor> subReactors = new ArrayList<>(ACCEPTOR_POOL_NUM);private static final int ACCEPTOR_POOL_NUM = 4;private final ThreadPoolExecutor ACCEPTOR_POOL = new ThreadPoolExecutor(ACCEPTOR_POOL_NUM, ACCEPTOR_POOL_NUM,10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(20),new ThreadFactoryBuilder().setNameFormat("ACCEPTOR_POOL-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());Acceptor(ServerSocketChannel listenSocketChannel) throws IOException {this.listenSocketChannel = listenSocketChannel;for (int i = 0; i < ACCEPTOR_POOL_NUM; i++) {MultiReactor subReactor = new MultiReactor(Selector.open());subReactors.add(subReactor);ACCEPTOR_POOL.execute(subReactor);}}/*** 接受新的客户端连接,并分配给子反应器处理。*/@Overridepublic void run() {try {SocketChannel clientSocketChannel = listenSocketChannel.accept();// 设置为非阻塞// 任意选择一个从Reactor,让其监听连接的客户端的READ事件Optional<MultiReactor> anySubReactor = subReactors.stream().findAny();if (anySubReactor.isPresent() && clientSocketChannel != null) {MultiReactor subReactor = anySubReactor.get();System.out.println(Thread.currentThread() + ": "+ subReactor);new Handler(subReactor.selector, clientSocketChannel);}} catch (IOException e) {e.printStackTrace();}}}/*** Handler类负责处理与客户端的通信,包括读取请求和发送响应。*/final class Handler implements Run {final SocketChannel socket;final SelectionKey sk;ByteBuffer input = ByteBuffer.allocate(1024);ByteBuffer output = ByteBuffer.allocate(1024);Handler(Selector sel, SocketChannel c)throws IOException {socket = c;c.configureBlocking(false);sel.wakeup();sk = socket.register(sel, SelectionKey.OP_READ);sk.attach(this);sk.interestOps(SelectionKey.OP_READ);}/*** 检查输入缓冲区是否已完成读取。** @return 如果输入缓冲区还有剩余,则返回true;否则返回false。*/boolean inputIsComplete() {return input.hasRemaining();}/*** 检查输出缓冲区是否已完成发送。** @return 如果输出缓冲区没有剩余,则返回true;否则返回false。*/boolean outputIsComplete() {return !output.hasRemaining();}/*** 处理输入数据,将其解码并准备生成响应。** @throws CharacterCodingException 如果字符编码发生错误。*/void process() throws CharacterCodingException {// 否则,将缓冲区反转并打印读取的数据input.flip();String request = ByteBufferUtil.read(input);System.out.println(Thread.currentThread() + ": " + request);input.clear();System.out.println(input.toString());System.out.println(output.toString());output = ByteBufferUtil.read("你好: " + request);}/*** 读取客户端请求,并根据需要启动处理过程。** @throws IOException 如果读取通道时发生错误。*/@Overridepublic void run() throws IOException {socket.read(input);if (inputIsComplete()) {REACTOR_THREAD_POOL.execute(new Processor(sk.selector()));}}/*** Sender类负责发送响应给客户端。*/class Sender implements Run {private Selector selector;public Sender(Selector selector) {this.selector = selector;}/*** 发送输出缓冲区中的数据到客户端。** @throws IOException 如果写入通道时发生错误。*/public void run() throws IOException {System.out.println("start write");socket.write(output);if (outputIsComplete()) {new Handler(this.selector, socket);}}}/*** Processor类负责处理请求并准备响应。*/class Processor implements Runnable {private Selector selector;public Processor(Selector selector) {this.selector = selector;}@Override@SneakyThrowspublic void run() {process();sk.attach(new Sender(this.selector));sk.interestOps(SelectionKey.OP_WRITE);sk.selector().wakeup();}}}/*** Run接口定义了处理事件的运行时行为。*/public interface Run {void run() throws IOException;}/*** 程序入口点。** @param args 命令行参数。* @throws IOException 如果初始化反应器时发生错误。*/public static void main(String[] args) throws IOException {MultiReactor reactor = new MultiReactor(6666);new Thread(reactor).start();while (true) ;}
}
4.2 运行截图
参考文献
- Doug Lea 教授的课件 : https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
- Netty权威指南(第2版)李林锋 / 著
- https://juejin.cn/post/7210375522512666679?searchId=20240612213218FE474007F2FADD0130AA
这篇关于Reactor 网络模型、Java代码实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!