Netty(四) 简化版Netty源码

2024-06-20 08:58
文章标签 netty 简化版 源码

本文主要是介绍Netty(四) 简化版Netty源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简化版Netty源码

netty学习目录
一、Netty(一) NIO例子
二、Netty(二) netty服务端
三、Netty(三) Netty客户端+服务端
四、Netty(四) 简化版Netty源码
五、Netty(五)Netty5.x服务端
六、Netty(六) Netty Http 服务器例子
七、Netty(七) Netty服务端+客户端代码
八、Netty(八) Netty多客户端连接例子
九、Netty(九) Netty会话清除
十、Netty(十) Netty自定义编码器解码器
十一、Netty(十一) Netty对象传输


package com.zqw.nio.netty.n3.pool;import java.nio.channels.ServerSocketChannel;public interface Boss {/*** 加入一个新的ServerSocket* @param serverChannel*/public void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
package com.zqw.nio.netty.n3.pool;import com.zqw.nio.netty.n3.NioServerBoss;
import com.zqw.nio.netty.n3.NioServerWorker;import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;public class NioSelectorRunnablePool {/*** boss线程数组*/private final AtomicInteger bossIndex = new AtomicInteger();private Boss[] bosses;/*** worker线程数组*/private final AtomicInteger workerIndex = new AtomicInteger();private Worker[] workeres;public NioSelectorRunnablePool(Executor boss, Executor worker) {initBoss(boss, 1);initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);}/*** 初始化boss线程* @param boss* @param count*/private void initBoss(Executor boss, int count) {this.bosses = new NioServerBoss[count];for (int i = 0; i < bosses.length; i++) {bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);}}/*** 初始化worker线程* @param worker* @param count*/private void initWorker(Executor worker, int count) {this.workeres = new NioServerWorker[count];for (int i = 0; i < workeres.length; i++) {workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);}}/*** 获取一个worker* @return*/public Worker nextWorker() {return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];}/*** 获取一个boss* @return*/public Boss nextBoss() {return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];}}
package com.zqw.nio.netty.n3.pool;import java.nio.channels.SocketChannel;public interface Worker {/*** 加入一个新的客户端会话* @param channel*/public void registerNewChannelTask(SocketChannel channel);}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;public abstract class AbstractNioSelector implements Runnable {/*** 线程池*/private final Executor executor;/*** 选择器*/protected Selector selector;/*** 选择器wakenUp状态标记*/protected final AtomicBoolean wakenUp = new AtomicBoolean();/*** 任务队列*/private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();/*** 线程名称*/private String threadName;/*** 线程管理对象*/protected NioSelectorRunnablePool selectorRunnablePool;AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {this.executor = executor;this.threadName = threadName;this.selectorRunnablePool = selectorRunnablePool;openSelector();}/*** 获取selector并启动线程*/private void openSelector() {try {this.selector = Selector.open();} catch (IOException e) {throw new RuntimeException("Failed to create a selector.");}executor.execute(this);}@Overridepublic void run() {Thread.currentThread().setName(this.threadName);while (true) {try {wakenUp.set(false);select(selector);processTaskQueue();process(selector);} catch (Exception e) {// ignore}}}/*** 注册一个任务并激活selector* * @param task*/protected final void registerTask(Runnable task) {taskQueue.add(task);Selector selector = this.selector;if (selector != null) {if (wakenUp.compareAndSet(false, true)) {selector.wakeup();}} else {taskQueue.remove(task);}}/*** 执行队列里的任务*/private void processTaskQueue() {for (;;) {final Runnable task = taskQueue.poll();if (task == null) {break;}task.run();}}/*** 获取线程管理对象* @return*/public NioSelectorRunnablePool getSelectorRunnablePool() {return selectorRunnablePool;}/*** select抽象方法* * @param selector* @return* @throws IOException*/protected abstract int select(Selector selector) throws IOException;/*** selector的业务处理* * @param selector* @throws IOException*/protected abstract void process(Selector selector) throws IOException;}
package com.zqw.nio.netty.n3;import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;import com.zqw.nio.netty.n3.pool.Boss;
import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;
import com.zqw.nio.netty.n3.pool.Worker;public class NioServerBoss extends AbstractNioSelector implements Boss {public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {super(executor, threadName, selectorRunnablePool);}@Overrideprotected void process(Selector selector) throws IOException {Set<SelectionKey> selectedKeys = selector.selectedKeys();if (selectedKeys.isEmpty()) {return;}for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {SelectionKey key = i.next();i.remove();ServerSocketChannel server = (ServerSocketChannel) key.channel();// 新客户端SocketChannel channel = server.accept();// 设置为非阻塞channel.configureBlocking(false);// 获取一个workerWorker nextworker = getSelectorRunnablePool().nextWorker();// 注册新客户端接入任务nextworker.registerNewChannelTask(channel);System.out.println("新客户端链接");}}public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){final Selector selector = this.selector;registerTask(new Runnable() {@Overridepublic void run() {try {//注册serverChannel到selectorserverChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (ClosedChannelException e) {e.printStackTrace();}}});}@Overrideprotected int select(Selector selector) throws IOException {return selector.select();}}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;
import com.zqw.nio.netty.n3.pool.Worker;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;public class NioServerWorker extends AbstractNioSelector implements Worker {public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {super(executor, threadName, selectorRunnablePool);}@Overrideprotected void process(Selector selector) throws IOException {Set<SelectionKey> selectedKeys = selector.selectedKeys();if (selectedKeys.isEmpty()) {return;}Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();while (ite.hasNext()) {SelectionKey key = (SelectionKey) ite.next();// 移除,防止重复处理ite.remove();// 得到事件发生的Socket通道SocketChannel channel = (SocketChannel) key.channel();// 数据总长度int ret = 0;boolean failure = true;ByteBuffer buffer = ByteBuffer.allocate(1024);//读取数据try {ret = channel.read(buffer);failure = false;} catch (Exception e) {// ignore}//判断是否连接已断开if (ret <= 0 || failure) {key.cancel();System.out.println("客户端断开连接");}else{System.out.println("收到数据:" + new String(buffer.array()));//回写数据ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());channel.write(outBuffer);// 将消息回送给客户端}}}/*** 加入一个新的socket客户端*/public void registerNewChannelTask(final SocketChannel channel){final Selector selector = this.selector;registerTask(new Runnable() {@Overridepublic void run() {try {//将客户端注册到selector中channel.register(selector, SelectionKey.OP_READ);} catch (ClosedChannelException e) {e.printStackTrace();}}});}@Overrideprotected int select(Selector selector) throws IOException {return selector.select(500);}}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.Boss;
import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;public class ServerBootstrap {private NioSelectorRunnablePool selectorRunnablePool;public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {this.selectorRunnablePool = selectorRunnablePool;}/*** 绑定端口* @param localAddress*/public void bind(final SocketAddress localAddress){try {// 获得一个ServerSocket通道ServerSocketChannel serverChannel = ServerSocketChannel.open();// 设置通道为非阻塞serverChannel.configureBlocking(false);// 将该通道对应的ServerSocket绑定到port端口serverChannel.socket().bind(localAddress);//获取一个boss线程Boss nextBoss = selectorRunnablePool.nextBoss();//向boss注册一个ServerSocket通道nextBoss.registerAcceptChannelTask(serverChannel);} catch (Exception e) {e.printStackTrace();}}
}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;import java.net.InetSocketAddress;
import java.util.concurrent.Executors;public class Start {public static void main(String[] args) {//初始化线程NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());//获取服务类ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);//绑定端口bootstrap.bind(new InetSocketAddress(10101));System.out.println("start");}}

这篇关于Netty(四) 简化版Netty源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1077677

相关文章

SpringBoot集成Netty,Handler中@Autowired注解为空

最近建了个技术交流群,然后好多小伙伴都问关于Netty的问题,尤其今天的问题最特殊,功能大概是要在Netty接收消息时把数据写入数据库,那个小伙伴用的是 Spring Boot + MyBatis + Netty,所以就碰到了Handler中@Autowired注解为空的问题 参考了一些大神的博文,Spring Boot非controller使用@Autowired注解注入为null的问题,得到

springboot家政服务管理平台 LW +PPT+源码+讲解

3系统的可行性研究及需求分析 3.1可行性研究 3.1.1技术可行性分析 经过大学四年的学习,已经掌握了JAVA、Mysql数据库等方面的编程技巧和方法,对于这些技术该有的软硬件配置也是齐全的,能够满足开发的需要。 本家政服务管理平台采用的是Mysql作为数据库,可以绝对地保证用户数据的安全;可以与Mysql数据库进行无缝连接。 所以,家政服务管理平台在技术上是可以实施的。 3.1

高仿精仿愤怒的小鸟android版游戏源码

这是一款很完美的高仿精仿愤怒的小鸟android版游戏源码,大家可以研究一下吧、 为了报复偷走鸟蛋的肥猪们,鸟儿以自己的身体为武器,仿佛炮弹一样去攻击肥猪们的堡垒。游戏是十分卡通的2D画面,看着愤怒的红色小鸟,奋不顾身的往绿色的肥猪的堡垒砸去,那种奇妙的感觉还真是令人感到很欢乐。而游戏的配乐同样充满了欢乐的感觉,轻松的节奏,欢快的风格。 源码下载

基于Java医院药品交易系统详细设计和实现(源码+LW+调试文档+讲解等)

💗博主介绍:✌全网粉丝10W+,CSDN作者、博客专家、全栈领域优质创作者,博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌💗 🌟文末获取源码+数据库🌟 感兴趣的可以先收藏起来,还有大家在毕设选题,项目以及论文编写等相关问题都可以给我留言咨询,希望帮助更多的人  Java精品实战案例《600套》 2023-2025年最值得选择的Java毕业设计选题大全:1000个热

美容美发店营销版微信小程序源码

打造线上生意新篇章 一、引言:微信小程序,开启美容美发行业新纪元 在数字化时代,微信小程序以其便捷、高效的特点,成为了美容美发行业营销的新宠。本文将带您深入了解美容美发营销微信小程序,探讨其独特优势及如何助力商家实现业务增长。 二、微信小程序:美容美发行业的得力助手 拓宽客源渠道:微信小程序基于微信社交平台,轻松实现线上线下融合,帮助商家快速吸引潜在客户,拓宽客源渠道。 提升用户体验:

风水研究会官网源码系统-可展示自己的领域内容-商品售卖等

一款用于展示风水行业,周易测算行业,玄学行业的系统,并支持售卖自己的商品。 整洁大气,非常漂亮,前端内容均可通过后台修改。 大致功能: 支持前端内容通过后端自定义支持开启关闭会员功能,会员等级设置支持对接官方支付支持添加商品类支持添加虚拟下载类支持自定义其他类型字段支持生成虚拟激活卡支持采集其他站点文章支持对接收益广告支持文章评论支持积分功能支持推广功能更多功能,搭建完成自行体验吧! 原文

HTML5文旅文化旅游网站模板源码

文章目录 1.设计来源文旅宣传1.1 登录界面演示1.2 注册界面演示1.3 首页界面演示1.4 文旅之行界面演示1.5 文旅之行文章内容界面演示1.6 关于我们界面演示1.7 文旅博客界面演示1.8 文旅博客文章内容界面演示1.9 联系我们界面演示 2.效果和源码2.1 动态效果2.2 源代码2.3 源码目录 源码下载万套模板,程序开发,在线开发,在线沟通 作者:xcLeigh

mediasoup 源码分析 (八)分析PlainTransport

mediasoup 源码分析 (六)分析PlainTransport 一、接收裸RTP流二、mediasoup 中udp建立过程 tips 一、接收裸RTP流 PlainTransport 可以接收裸RTP流,也可以接收AES加密的RTP流。源码中提供了一个通过ffmpeg发送裸RTP流到mediasoup的脚本,具体地址为:mediasoup-demo/broadcaste

Netty ByteBuf 释放详解:内存管理与最佳实践

Netty ByteBuf 释放详解:内存管理与最佳实践 在Netty中(学习netty请参考:🔗深入浅出Netty:高性能网络应用框架的原理与实践),管理ByteBuf的内存是至关重要的(学习ByteBuf请参考:🔗Netty ByteBuf 详解:高性能数据缓冲区的全面介绍)。未能正确释放ByteBuf可能会导致内存泄漏,进而影响应用的性能和稳定性。本文将详细介绍如何正确地释放ByteB

Java并发编程—阻塞队列源码分析

在前面几篇文章中,我们讨论了同步容器(Hashtable、Vector),也讨论了并发容器(ConcurrentHashMap、CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便。今天我们来讨论另外一类容器:阻塞队列。   在前面我们接触的队列都是非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了D