本文主要是介绍全网最细RocketMQ源码一:NameSrv,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、入口
NameServer的启动源码在NameStartup,现在开始debug之旅
二、createNamesrcController
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());// 启动时的参数信息 有commandLine 管理了。commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return null;}// namesrv 配置。final NamesrvConfig namesrvConfig = new NamesrvConfig();// netty 服务器配置。final NettyServerConfig nettyServerConfig = new NettyServerConfig();// namesrv服务器 监听端口 修改为9876nettyServerConfig.setListenPort(9876);if (commandLine.hasOption('c')) {// 读取 -c 选项的值String file = commandLine.getOptionValue('c');if (file != null) {// 读取 config 文件数据 到 properties 内InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);// 如果 config 配置文件 内的配置 涉及到 namesrvConfig 或者 nettyServerConfig 的字段,那么进行复写。MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);// 将读取的 配置文件 路径 保存 到 字段。namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}// 将启动时 命令行 设置的kv 复写到 namesrvConfig内。MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 创建日志对象。LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);// 创建 控制器// 参数1:namesrvConfig// 参数2:网络层配置 nettyServerConfigfinal NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;}
主要做了几件事:
- 创建了NamesrvConfig
- 创建了nettyServerConfig
- 创建了NamesrvController
NamesrvController详解:
public class NamesrvController {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);private final NamesrvConfig namesrvConfig;private final NettyServerConfig nettyServerConfig;// 调度线程池,执行定时任务,两件事:1. 检查存活的broker状态 2. 打印配置private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("NSScheduledThread"));// 管理kv配置。private final KVConfigManager kvConfigManager;// 管理路由信息的对象,重要。private final RouteInfoManager routeInfoManager;// 网络层封装对象,重要。private RemotingServer remotingServer;// ChannelEventListener ,用于监听channel 状态,当channel状态 发生改变时 close idle... 会向 事件队列发起事件,事件最终由 该service处理。private BrokerHousekeepingService brokerHousekeepingService;// 业务线程池,netty 线程 主要任务是 解析报文 将 报文 解析成 RemotingCommand 对象,然后 就将 该对象 交给 业务 线程池 再继续处理。private ExecutorService remotingExecutor;private Configuration configuration;private FileWatchService fileWatchService;// 参数1:namesrvConfig// 参数2:网络层配置 nettyServerConfigpublic NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {this.namesrvConfig = namesrvConfig;this.nettyServerConfig = nettyServerConfig;this.kvConfigManager = new KVConfigManager(this);this.routeInfoManager = new RouteInfoManager();this.brokerHousekeepingService = new BrokerHousekeepingService(this);this.configuration = new Configuration(log,this.namesrvConfig, this.nettyServerConfig);this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");}}
start(NamesrvController)
public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}// 初始化方法..boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// JVM HOOK ,平滑关闭的逻辑。 当JVM 被关闭时,主动调用 controller.shutdown() 方法,让服务器平滑关机。Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {@Overridepublic Void call() throws Exception {controller.shutdown();return null;}}));// 启动服务器。controller.start();return controller;}
主要做了几件事:
- controller初始化
public boolean initialize() {// 加载本地kv配置this.kvConfigManager.load();// 创建网络服务器对象this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);// 创建业务线程池,默认线程数 8this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));// 注册协议处理器(缺省协议处理器)this.registerProcessor();// 定时任务1:每10秒钟检查 broker 存活状态,将idle状态的 broker 移除。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);// 定时任务2:每10分钟 打印一遍 kv 配置。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);}
- controller启动
会调用到NettyRemotingServer.start方法
public void start() {// 当向channel pipeline 添加 handler 时 指定了 group 时,网络事件传播到 当前handler时,事件处理 由 分配给 handler 的线程执行。this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 创建共享的 处理器 handlerprepareSharableHandlers();ServerBootstrap childHandler =// 配置服务端 启动对象// 配置工作组 boss 和 worker 组this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 设置服务端 ServerSocketChannel类型.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 设置服务端ch选项.option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false)// 客户端ch选项.childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())// 设置服务器端口.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))//.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {// 初始化 客户端ch pipeline 的逻辑ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {// 客户端开启 内存池,使用的内存池是 PooledByteBufAllocator.DEFAULTchildHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {// 服务器 绑定端口。ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();// 将服务器成功绑定的端口号 赋值给 字段 port。this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}// housekeepingService 不为空,则创建 网络异常事件 处理器if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 提交定时任务,每一秒 执行一次。// 扫描 responseTable 表,将过期的 responseFuture 移除。this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}
这篇关于全网最细RocketMQ源码一:NameSrv的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!