本文主要是介绍Netty(四) 简化版Netty源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
简化版Netty源码
netty学习目录
一、Netty(一) NIO例子
二、Netty(二) netty服务端
三、Netty(三) Netty客户端+服务端
四、Netty(四) 简化版Netty源码
五、Netty(五)Netty5.x服务端
六、Netty(六) Netty Http 服务器例子
七、Netty(七) Netty服务端+客户端代码
八、Netty(八) Netty多客户端连接例子
九、Netty(九) Netty会话清除
十、Netty(十) Netty自定义编码器解码器
十一、Netty(十一) Netty对象传输
package com.zqw.nio.netty.n3.pool;import java.nio.channels.ServerSocketChannel;public interface Boss {/*** 加入一个新的ServerSocket* @param serverChannel*/public void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
package com.zqw.nio.netty.n3.pool;import com.zqw.nio.netty.n3.NioServerBoss;
import com.zqw.nio.netty.n3.NioServerWorker;import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;public class NioSelectorRunnablePool {/*** boss线程数组*/private final AtomicInteger bossIndex = new AtomicInteger();private Boss[] bosses;/*** worker线程数组*/private final AtomicInteger workerIndex = new AtomicInteger();private Worker[] workeres;public NioSelectorRunnablePool(Executor boss, Executor worker) {initBoss(boss, 1);initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);}/*** 初始化boss线程* @param boss* @param count*/private void initBoss(Executor boss, int count) {this.bosses = new NioServerBoss[count];for (int i = 0; i < bosses.length; i++) {bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);}}/*** 初始化worker线程* @param worker* @param count*/private void initWorker(Executor worker, int count) {this.workeres = new NioServerWorker[count];for (int i = 0; i < workeres.length; i++) {workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);}}/*** 获取一个worker* @return*/public Worker nextWorker() {return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];}/*** 获取一个boss* @return*/public Boss nextBoss() {return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];}}
package com.zqw.nio.netty.n3.pool;import java.nio.channels.SocketChannel;public interface Worker {/*** 加入一个新的客户端会话* @param channel*/public void registerNewChannelTask(SocketChannel channel);}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;public abstract class AbstractNioSelector implements Runnable {/*** 线程池*/private final Executor executor;/*** 选择器*/protected Selector selector;/*** 选择器wakenUp状态标记*/protected final AtomicBoolean wakenUp = new AtomicBoolean();/*** 任务队列*/private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();/*** 线程名称*/private String threadName;/*** 线程管理对象*/protected NioSelectorRunnablePool selectorRunnablePool;AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {this.executor = executor;this.threadName = threadName;this.selectorRunnablePool = selectorRunnablePool;openSelector();}/*** 获取selector并启动线程*/private void openSelector() {try {this.selector = Selector.open();} catch (IOException e) {throw new RuntimeException("Failed to create a selector.");}executor.execute(this);}@Overridepublic void run() {Thread.currentThread().setName(this.threadName);while (true) {try {wakenUp.set(false);select(selector);processTaskQueue();process(selector);} catch (Exception e) {// ignore}}}/*** 注册一个任务并激活selector* * @param task*/protected final void registerTask(Runnable task) {taskQueue.add(task);Selector selector = this.selector;if (selector != null) {if (wakenUp.compareAndSet(false, true)) {selector.wakeup();}} else {taskQueue.remove(task);}}/*** 执行队列里的任务*/private void processTaskQueue() {for (;;) {final Runnable task = taskQueue.poll();if (task == null) {break;}task.run();}}/*** 获取线程管理对象* @return*/public NioSelectorRunnablePool getSelectorRunnablePool() {return selectorRunnablePool;}/*** select抽象方法* * @param selector* @return* @throws IOException*/protected abstract int select(Selector selector) throws IOException;/*** selector的业务处理* * @param selector* @throws IOException*/protected abstract void process(Selector selector) throws IOException;}
package com.zqw.nio.netty.n3;import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;import com.zqw.nio.netty.n3.pool.Boss;
import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;
import com.zqw.nio.netty.n3.pool.Worker;public class NioServerBoss extends AbstractNioSelector implements Boss {public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {super(executor, threadName, selectorRunnablePool);}@Overrideprotected void process(Selector selector) throws IOException {Set<SelectionKey> selectedKeys = selector.selectedKeys();if (selectedKeys.isEmpty()) {return;}for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {SelectionKey key = i.next();i.remove();ServerSocketChannel server = (ServerSocketChannel) key.channel();// 新客户端SocketChannel channel = server.accept();// 设置为非阻塞channel.configureBlocking(false);// 获取一个workerWorker nextworker = getSelectorRunnablePool().nextWorker();// 注册新客户端接入任务nextworker.registerNewChannelTask(channel);System.out.println("新客户端链接");}}public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){final Selector selector = this.selector;registerTask(new Runnable() {@Overridepublic void run() {try {//注册serverChannel到selectorserverChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (ClosedChannelException e) {e.printStackTrace();}}});}@Overrideprotected int select(Selector selector) throws IOException {return selector.select();}}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;
import com.zqw.nio.netty.n3.pool.Worker;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;public class NioServerWorker extends AbstractNioSelector implements Worker {public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {super(executor, threadName, selectorRunnablePool);}@Overrideprotected void process(Selector selector) throws IOException {Set<SelectionKey> selectedKeys = selector.selectedKeys();if (selectedKeys.isEmpty()) {return;}Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();while (ite.hasNext()) {SelectionKey key = (SelectionKey) ite.next();// 移除,防止重复处理ite.remove();// 得到事件发生的Socket通道SocketChannel channel = (SocketChannel) key.channel();// 数据总长度int ret = 0;boolean failure = true;ByteBuffer buffer = ByteBuffer.allocate(1024);//读取数据try {ret = channel.read(buffer);failure = false;} catch (Exception e) {// ignore}//判断是否连接已断开if (ret <= 0 || failure) {key.cancel();System.out.println("客户端断开连接");}else{System.out.println("收到数据:" + new String(buffer.array()));//回写数据ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());channel.write(outBuffer);// 将消息回送给客户端}}}/*** 加入一个新的socket客户端*/public void registerNewChannelTask(final SocketChannel channel){final Selector selector = this.selector;registerTask(new Runnable() {@Overridepublic void run() {try {//将客户端注册到selector中channel.register(selector, SelectionKey.OP_READ);} catch (ClosedChannelException e) {e.printStackTrace();}}});}@Overrideprotected int select(Selector selector) throws IOException {return selector.select(500);}}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.Boss;
import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;public class ServerBootstrap {private NioSelectorRunnablePool selectorRunnablePool;public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {this.selectorRunnablePool = selectorRunnablePool;}/*** 绑定端口* @param localAddress*/public void bind(final SocketAddress localAddress){try {// 获得一个ServerSocket通道ServerSocketChannel serverChannel = ServerSocketChannel.open();// 设置通道为非阻塞serverChannel.configureBlocking(false);// 将该通道对应的ServerSocket绑定到port端口serverChannel.socket().bind(localAddress);//获取一个boss线程Boss nextBoss = selectorRunnablePool.nextBoss();//向boss注册一个ServerSocket通道nextBoss.registerAcceptChannelTask(serverChannel);} catch (Exception e) {e.printStackTrace();}}
}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;import java.net.InetSocketAddress;
import java.util.concurrent.Executors;public class Start {public static void main(String[] args) {//初始化线程NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());//获取服务类ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);//绑定端口bootstrap.bind(new InetSocketAddress(10101));System.out.println("start");}}
这篇关于Netty(四) 简化版Netty源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!