聊聊PowerJob的IdGenerateService

2024-01-10 09:36

本文主要是介绍聊聊PowerJob的IdGenerateService,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文主要研究一下PowerJob的IdGenerateService

IdGenerateService

tech/powerjob/server/core/uid/IdGenerateService.java

@Slf4j
@Service
public class IdGenerateService {private final SnowFlakeIdGenerator snowFlakeIdGenerator;private static final int DATA_CENTER_ID = 0;public IdGenerateService(ServerInfoService serverInfoService) {long id = serverInfoService.fetchServiceInfo().getId();snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id);}/*** 分配分布式唯一ID* @return 分布式唯一ID*/public long allocate() {return snowFlakeIdGenerator.nextId();}}

IdGenerateService的构造器接收ServerInfoService,然后通过serverInfoService.fetchServiceInfo().getId()获取machineId,最后创建SnowFlakeIdGenerator,其DATA_CENTER_ID为0;其allocate返回的是snowFlakeIdGenerator.nextId()

ServerInfoService

tech/powerjob/server/remote/server/self/ServerInfoService.java

public interface ServerInfoService {/*** fetch current server info* @return ServerInfo*/ServerInfo fetchServiceInfo();}

ServerInfoService定义了fetchServiceInfo方法,返回ServerInfo

ServerInfoServiceImpl

tech/powerjob/server/remote/server/self/ServerInfoServiceImpl.java

@Slf4j
@Service
public class ServerInfoServiceImpl implements ServerInfoService {private final ServerInfo serverInfo;private final ServerInfoRepository serverInfoRepository;private static final long MAX_SERVER_CLUSTER_SIZE = 10000;private static final String SERVER_INIT_LOCK = "server_init_lock";private static final int SERVER_INIT_LOCK_MAX_TIME = 15000;@Autowiredpublic ServerInfoServiceImpl(LockService lockService, ServerInfoRepository serverInfoRepository) {this.serverInfo = new ServerInfo();String ip = NetUtils.getLocalHost();serverInfo.setIp(ip);serverInfo.setBornTime(System.currentTimeMillis());this.serverInfoRepository = serverInfoRepository;Stopwatch sw = Stopwatch.createStarted();while (!lockService.tryLock(SERVER_INIT_LOCK, SERVER_INIT_LOCK_MAX_TIME)) {log.info("[ServerInfoService] waiting for lock: {}", SERVER_INIT_LOCK);CommonUtils.easySleep(100);}try {// register server then get server_idServerInfoDO server = serverInfoRepository.findByIp(ip);if (server == null) {ServerInfoDO newServerInfo = new ServerInfoDO(ip);server = serverInfoRepository.saveAndFlush(newServerInfo);} else {serverInfoRepository.updateGmtModifiedByIp(ip, new Date());}if (server.getId() < MAX_SERVER_CLUSTER_SIZE) {serverInfo.setId(server.getId());} else {long retryServerId = retryServerId();serverInfo.setId(retryServerId);serverInfoRepository.updateIdByIp(retryServerId, ip);}} catch (Exception e) {log.error("[ServerInfoService] init server failed", e);throw e;} finally {lockService.unlock(SERVER_INIT_LOCK);}log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverInfo.getId(), sw);}@Scheduled(fixedRate = 15000, initialDelay = 15000)public void heartbeat() {serverInfoRepository.updateGmtModifiedByIp(serverInfo.getIp(), new Date());}private long retryServerId() {List<ServerInfoDO> serverInfoList = serverInfoRepository.findAll();log.info("[ServerInfoService] current server record num in database: {}", serverInfoList.size());// clean inactive server record firstif (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) {// use a large time interval to prevent valid records from being deleted when the local time is inaccurateDate oneDayAgo = DateUtils.addDays(new Date(), -1);int delNum =serverInfoRepository.deleteByGmtModifiedBefore(oneDayAgo);log.warn("[ServerInfoService] delete invalid {} server info record before {}", delNum, oneDayAgo);serverInfoList = serverInfoRepository.findAll();}if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) {throw new PowerJobException(String.format("The powerjob-server cluster cannot accommodate %d machines, please rebuild another cluster", serverInfoList.size()));}Set<Long> uedServerIds = serverInfoList.stream().map(ServerInfoDO::getId).collect(Collectors.toSet());for (long i = 1; i <= MAX_SERVER_CLUSTER_SIZE; i++) {if (uedServerIds.contains(i)) {continue;}log.info("[ServerInfoService] ID[{}] is not used yet, try as new server id", i);return i;}throw new PowerJobException("impossible");}@Autowired(required = false)public void setBuildProperties(BuildProperties buildProperties) {if (buildProperties == null) {return;}String pomVersion = buildProperties.getVersion();if (StringUtils.isNotBlank(pomVersion)) {serverInfo.setVersion(pomVersion);}}@Overridepublic ServerInfo fetchServiceInfo() {return serverInfo;}
}

ServerInfoServiceImpl实现了ServerInfoService接口,其构造器注入lockService和serverInfoRepository,先通过lockService.tryLock抢到server_init_lock,然后serverInfoRepository.findByIp找到ServerInfoDO执行saveAndFlush或者updateGmtModifiedByIp;其fetchServiceInfo返回的是serverInfo信息;它还以fixedRate为15s调度了heartbeat,主要是更新gmtModifed

SnowFlakeIdGenerator

tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java

public class SnowFlakeIdGenerator {/*** 起始的时间戳(a special day for me)*/private final static long START_STAMP = 1555776000000L;/*** 序列号占用的位数*/private final static long SEQUENCE_BIT = 6;/*** 机器标识占用的位数*/private final static long MACHINE_BIT = 14;/*** 数据中心占用的位数*/private final static long DATA_CENTER_BIT = 2;/*** 每一部分的最大值*/private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT);private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT);private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);/*** 每一部分向左的位移*/private final static long MACHINE_LEFT = SEQUENCE_BIT;private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;/*** 数据中心*/private final long dataCenterId;/*** 机器标识*/private final long machineId;/*** 序列号*/private long sequence = 0L;/*** 上一次时间戳*/private long lastTimestamp = -1L;public SnowFlakeIdGenerator(long dataCenterId, long machineId) {if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {throw new IllegalArgumentException("dataCenterId can't be greater than MAX_DATA_CENTER_NUM or less than 0");}if (machineId > MAX_MACHINE_NUM || machineId < 0) {throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");}this.dataCenterId = dataCenterId;this.machineId = machineId;}/*** 产生下一个ID*/public synchronized long nextId() {long currStamp = getNewStamp();if (currStamp < lastTimestamp) {return futureId();}if (currStamp == lastTimestamp) {//相同毫秒内,序列号自增sequence = (sequence + 1) & MAX_SEQUENCE;//同一毫秒的序列数已经达到最大if (sequence == 0L) {currStamp = getNextMill();}} else {//不同毫秒内,序列号置为0sequence = 0L;}lastTimestamp = currStamp;return (currStamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分| dataCenterId << DATA_CENTER_LEFT       //数据中心部分| machineId << MACHINE_LEFT             //机器标识部分| sequence;                             //序列号部分}/*** 发生时钟回拨时借用未来时间生成Id,避免运行过程中任务调度和工作流直接进入不可用状态* 注:该方式不可解决原算法中停服状态下时钟回拨导致的重复id问题*/private long futureId() {sequence = (sequence + 1) & MAX_SEQUENCE;if (sequence == 0L) {lastTimestamp = lastTimestamp + 1;}return (lastTimestamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分| dataCenterId << DATA_CENTER_LEFT       //数据中心部分| machineId << MACHINE_LEFT             //机器标识部分| sequence;                             //序列号部分}private long getNextMill() {long mill = getNewStamp();while (mill <= lastTimestamp) {mill = getNewStamp();}return mill;}private long getNewStamp() {return System.currentTimeMillis();}
}

SnowFlakeIdGenerator的dataCenterId(最大值为3)和machineId(最大值为16383),sequence最大值为63

小结

PowerJob的IdGenerateService通过serverInfoService.fetchServiceInfo().getId()获取machineId,最后创建SnowFlakeIdGenerator,其DATA_CENTER_ID为0;其allocate返回的是snowFlakeIdGenerator.nextId();其InstanceInfoDO的instanceId就是idGenerateService.allocate()生成的。

这篇关于聊聊PowerJob的IdGenerateService的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/590340

相关文章

聊聊说话的习惯

1 在日常生活中,每个人都有固定的说话习惯。心理学研究表明,通过一个人的说话习惯,也可以分析出他的性格特点。对于每一个人来讲,说话习惯已经融为他们生活中的一部分。在社交活动中,一些不良的说话习惯很可能会给他们带来麻烦。因此,了解说话习惯对心理活动的影响是十分有必要的。 2 具有顺畅的说话习惯的人,大多思路清晰、语速适中、用词准确并且声声人耳,是典型的顺畅型说话方式这种类型的人要么不说话,要么

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事

聊聊资源调度

资源调度 般分为两个阶段: 是实现物理资源的虚拟化(即资源的抽象)于当前机器的性能越来越好,硬件配置越来越高,直接用物理机跑业务比较浪费,所以将物理机分割成更小单位的虚拟机,这样可以显著提升机器的利用效率,在公司内部一般采用容器技术来隔离资源 是将资源虚拟化后进 步在时间和空间上实现更细粒度的编排 ,优化资源的使用。 1 .一些数据 如果公司的几万台机器都是物理机,那么资源的使用率稍低: CP

聊聊Spark中的宽依赖和窄依赖

开门见山,本文就针对一个点,谈谈Spark中的宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: 基于此图,分析下这里为什么前面的流程都是窄依赖,而后面的却是宽依赖: 我们仔细看看,map和filter算子中,对于父RDD来说,一个分区内的数据,有且仅有一个子RDD的分区来

聊聊灰度发布

有没有在北京面试java的小伙伴,每家公司面试问的问题都不一样,昨天面试官问到了灰度发布,一脸懵,好像在哪儿听说过,毕竟我都没发布过,之前都是项目组长在干这些事儿,所以聊聊,了解一下 什么是灰度发布 全量发布:把旧服务kill掉,把新服务启动,这个过程就可以理解为全量发布 回滚周期长 如果我们更新完应用之后,我们做线上回归测试的时候发现有BUG,这个时候就要做回滚,过程就是把新服

聊聊随机测试和猴子测试

目录 随机测试的特点 1.不可预测性 2.缺乏针对性 3.自动化 4.资源密集型 猴子测试 随机测试 (Random Testing) 猴子测试 (Monkey Testing) 特点: 区别 1.控制程度 2.目标差异 3.实现方式 在我们测试的过程中,通常会使用到随机测试和猴子测试,其中随机测试侧重于人工测试,猴子测试侧重于借助工具执行命令进行测试。 随机测试

【聊聊经济社会】论阶级跨越

为什么要在市场中寻求自由,在市场中寻求洒脱,原因不胜其数,其中便有一条,现实生活中多是xx,可能社会属性本身就具备党同伐异,像是一股意志,平庸一切不平庸,中和一切特立独行,最终以达到一种变态的稳定. 消其意志,断其未来,耗其钱财 ,而我称之为阶级壁垒 阶级之所以难以跨越,主要也在于这三点 一:没有这样的志向,像那种羡慕有钱,或者羡慕有权,权当做梦。这样的志向,正常人只停留于羡慕的层次,而一旦受到丁

聊聊PC端页面适配

聊聊PC端页面适配  目也pc端有适配的需求:目前我们pc项目的设计稿尺寸是宽度1920,高度最小是1080。 适配目标: 1.在不同分辨率的电脑上,网页可以正常显示 2.放大或者缩小屏幕,网页可以正常显示 对于宽度的适配   对于宽度适配: 首先设置html,body{width:100%;overflow-x:hidden;} 然后我们可以把页面分解为背景层(

来聊聊我用go手写redis这件事

写在文章开头 网上有看过一些实现redis的项目,要么完全脱离go语言的理念,要么又完全去迎合c的实现理念,也不是说这些项目写的不好,只能说不符合笔者所认为的那种"平衡",于是整理了一段时间的设计稿,自己尝试着用go语言写了一版"有redis味道"的mini-redis。 截至目前,笔者已经完成了redis服务端和客户端交互的基本通信架构和实现基调,如下所示,可以看到笔者已经实现了ping

供应链劫持?聊聊什么是RepoJacking

介绍        近几个月来,对开源存储库的主要威胁就包括存储仓库劫持,通常称为RepoJacking。RepoJacking 是指恶意攻击者通过一定手段接管托管存储库的所有权或维护者的账户。通过获取对账户的访问权限,攻击者可以将恶意代码注入到使用对应仓库作为依赖项的项目中。 RepoJacking 如何攻击?        存储库攻击,也称为供应链攻击,通常利用 GitH