Netty(四) 简化版Netty源码

2024-06-20 08:58
文章标签 源码 netty 简化版

本文主要是介绍Netty(四) 简化版Netty源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简化版Netty源码

netty学习目录
一、Netty(一) NIO例子
二、Netty(二) netty服务端
三、Netty(三) Netty客户端+服务端
四、Netty(四) 简化版Netty源码
五、Netty(五)Netty5.x服务端
六、Netty(六) Netty Http 服务器例子
七、Netty(七) Netty服务端+客户端代码
八、Netty(八) Netty多客户端连接例子
九、Netty(九) Netty会话清除
十、Netty(十) Netty自定义编码器解码器
十一、Netty(十一) Netty对象传输


package com.zqw.nio.netty.n3.pool;import java.nio.channels.ServerSocketChannel;public interface Boss {/*** 加入一个新的ServerSocket* @param serverChannel*/public void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
package com.zqw.nio.netty.n3.pool;import com.zqw.nio.netty.n3.NioServerBoss;
import com.zqw.nio.netty.n3.NioServerWorker;import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;public class NioSelectorRunnablePool {/*** boss线程数组*/private final AtomicInteger bossIndex = new AtomicInteger();private Boss[] bosses;/*** worker线程数组*/private final AtomicInteger workerIndex = new AtomicInteger();private Worker[] workeres;public NioSelectorRunnablePool(Executor boss, Executor worker) {initBoss(boss, 1);initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);}/*** 初始化boss线程* @param boss* @param count*/private void initBoss(Executor boss, int count) {this.bosses = new NioServerBoss[count];for (int i = 0; i < bosses.length; i++) {bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);}}/*** 初始化worker线程* @param worker* @param count*/private void initWorker(Executor worker, int count) {this.workeres = new NioServerWorker[count];for (int i = 0; i < workeres.length; i++) {workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);}}/*** 获取一个worker* @return*/public Worker nextWorker() {return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];}/*** 获取一个boss* @return*/public Boss nextBoss() {return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];}}
package com.zqw.nio.netty.n3.pool;import java.nio.channels.SocketChannel;public interface Worker {/*** 加入一个新的客户端会话* @param channel*/public void registerNewChannelTask(SocketChannel channel);}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;public abstract class AbstractNioSelector implements Runnable {/*** 线程池*/private final Executor executor;/*** 选择器*/protected Selector selector;/*** 选择器wakenUp状态标记*/protected final AtomicBoolean wakenUp = new AtomicBoolean();/*** 任务队列*/private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();/*** 线程名称*/private String threadName;/*** 线程管理对象*/protected NioSelectorRunnablePool selectorRunnablePool;AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {this.executor = executor;this.threadName = threadName;this.selectorRunnablePool = selectorRunnablePool;openSelector();}/*** 获取selector并启动线程*/private void openSelector() {try {this.selector = Selector.open();} catch (IOException e) {throw new RuntimeException("Failed to create a selector.");}executor.execute(this);}@Overridepublic void run() {Thread.currentThread().setName(this.threadName);while (true) {try {wakenUp.set(false);select(selector);processTaskQueue();process(selector);} catch (Exception e) {// ignore}}}/*** 注册一个任务并激活selector* * @param task*/protected final void registerTask(Runnable task) {taskQueue.add(task);Selector selector = this.selector;if (selector != null) {if (wakenUp.compareAndSet(false, true)) {selector.wakeup();}} else {taskQueue.remove(task);}}/*** 执行队列里的任务*/private void processTaskQueue() {for (;;) {final Runnable task = taskQueue.poll();if (task == null) {break;}task.run();}}/*** 获取线程管理对象* @return*/public NioSelectorRunnablePool getSelectorRunnablePool() {return selectorRunnablePool;}/*** select抽象方法* * @param selector* @return* @throws IOException*/protected abstract int select(Selector selector) throws IOException;/*** selector的业务处理* * @param selector* @throws IOException*/protected abstract void process(Selector selector) throws IOException;}
package com.zqw.nio.netty.n3;import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;import com.zqw.nio.netty.n3.pool.Boss;
import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;
import com.zqw.nio.netty.n3.pool.Worker;public class NioServerBoss extends AbstractNioSelector implements Boss {public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {super(executor, threadName, selectorRunnablePool);}@Overrideprotected void process(Selector selector) throws IOException {Set<SelectionKey> selectedKeys = selector.selectedKeys();if (selectedKeys.isEmpty()) {return;}for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {SelectionKey key = i.next();i.remove();ServerSocketChannel server = (ServerSocketChannel) key.channel();// 新客户端SocketChannel channel = server.accept();// 设置为非阻塞channel.configureBlocking(false);// 获取一个workerWorker nextworker = getSelectorRunnablePool().nextWorker();// 注册新客户端接入任务nextworker.registerNewChannelTask(channel);System.out.println("新客户端链接");}}public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){final Selector selector = this.selector;registerTask(new Runnable() {@Overridepublic void run() {try {//注册serverChannel到selectorserverChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (ClosedChannelException e) {e.printStackTrace();}}});}@Overrideprotected int select(Selector selector) throws IOException {return selector.select();}}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;
import com.zqw.nio.netty.n3.pool.Worker;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;public class NioServerWorker extends AbstractNioSelector implements Worker {public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {super(executor, threadName, selectorRunnablePool);}@Overrideprotected void process(Selector selector) throws IOException {Set<SelectionKey> selectedKeys = selector.selectedKeys();if (selectedKeys.isEmpty()) {return;}Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();while (ite.hasNext()) {SelectionKey key = (SelectionKey) ite.next();// 移除,防止重复处理ite.remove();// 得到事件发生的Socket通道SocketChannel channel = (SocketChannel) key.channel();// 数据总长度int ret = 0;boolean failure = true;ByteBuffer buffer = ByteBuffer.allocate(1024);//读取数据try {ret = channel.read(buffer);failure = false;} catch (Exception e) {// ignore}//判断是否连接已断开if (ret <= 0 || failure) {key.cancel();System.out.println("客户端断开连接");}else{System.out.println("收到数据:" + new String(buffer.array()));//回写数据ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());channel.write(outBuffer);// 将消息回送给客户端}}}/*** 加入一个新的socket客户端*/public void registerNewChannelTask(final SocketChannel channel){final Selector selector = this.selector;registerTask(new Runnable() {@Overridepublic void run() {try {//将客户端注册到selector中channel.register(selector, SelectionKey.OP_READ);} catch (ClosedChannelException e) {e.printStackTrace();}}});}@Overrideprotected int select(Selector selector) throws IOException {return selector.select(500);}}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.Boss;
import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;public class ServerBootstrap {private NioSelectorRunnablePool selectorRunnablePool;public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {this.selectorRunnablePool = selectorRunnablePool;}/*** 绑定端口* @param localAddress*/public void bind(final SocketAddress localAddress){try {// 获得一个ServerSocket通道ServerSocketChannel serverChannel = ServerSocketChannel.open();// 设置通道为非阻塞serverChannel.configureBlocking(false);// 将该通道对应的ServerSocket绑定到port端口serverChannel.socket().bind(localAddress);//获取一个boss线程Boss nextBoss = selectorRunnablePool.nextBoss();//向boss注册一个ServerSocket通道nextBoss.registerAcceptChannelTask(serverChannel);} catch (Exception e) {e.printStackTrace();}}
}
package com.zqw.nio.netty.n3;import com.zqw.nio.netty.n3.pool.NioSelectorRunnablePool;import java.net.InetSocketAddress;
import java.util.concurrent.Executors;public class Start {public static void main(String[] args) {//初始化线程NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());//获取服务类ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);//绑定端口bootstrap.bind(new InetSocketAddress(10101));System.out.println("start");}}

这篇关于Netty(四) 简化版Netty源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1077677

相关文章

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

red5-server源码

red5-server源码:https://github.com/Red5/red5-server

TL-Tomcat中长连接的底层源码原理实现

长连接:浏览器告诉tomcat不要将请求关掉。  如果不是长连接,tomcat响应后会告诉浏览器把这个连接关掉。    tomcat中有一个缓冲区  如果发送大批量数据后 又不处理  那么会堆积缓冲区 后面的请求会越来越慢。