本文主要是介绍聊聊PowerJob的IdGenerateService,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
序
本文主要研究一下PowerJob的IdGenerateService
IdGenerateService
tech/powerjob/server/core/uid/IdGenerateService.java
@Slf4j
@Service
public class IdGenerateService {private final SnowFlakeIdGenerator snowFlakeIdGenerator;private static final int DATA_CENTER_ID = 0;public IdGenerateService(ServerInfoService serverInfoService) {long id = serverInfoService.fetchServiceInfo().getId();snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id);}/*** 分配分布式唯一ID* @return 分布式唯一ID*/public long allocate() {return snowFlakeIdGenerator.nextId();}}
IdGenerateService的构造器接收ServerInfoService,然后通过serverInfoService.fetchServiceInfo().getId()获取machineId,最后创建SnowFlakeIdGenerator,其DATA_CENTER_ID为0;其allocate返回的是snowFlakeIdGenerator.nextId()
ServerInfoService
tech/powerjob/server/remote/server/self/ServerInfoService.java
public interface ServerInfoService {/*** fetch current server info* @return ServerInfo*/ServerInfo fetchServiceInfo();}
ServerInfoService定义了fetchServiceInfo方法,返回ServerInfo
ServerInfoServiceImpl
tech/powerjob/server/remote/server/self/ServerInfoServiceImpl.java
@Slf4j
@Service
public class ServerInfoServiceImpl implements ServerInfoService {private final ServerInfo serverInfo;private final ServerInfoRepository serverInfoRepository;private static final long MAX_SERVER_CLUSTER_SIZE = 10000;private static final String SERVER_INIT_LOCK = "server_init_lock";private static final int SERVER_INIT_LOCK_MAX_TIME = 15000;@Autowiredpublic ServerInfoServiceImpl(LockService lockService, ServerInfoRepository serverInfoRepository) {this.serverInfo = new ServerInfo();String ip = NetUtils.getLocalHost();serverInfo.setIp(ip);serverInfo.setBornTime(System.currentTimeMillis());this.serverInfoRepository = serverInfoRepository;Stopwatch sw = Stopwatch.createStarted();while (!lockService.tryLock(SERVER_INIT_LOCK, SERVER_INIT_LOCK_MAX_TIME)) {log.info("[ServerInfoService] waiting for lock: {}", SERVER_INIT_LOCK);CommonUtils.easySleep(100);}try {// register server then get server_idServerInfoDO server = serverInfoRepository.findByIp(ip);if (server == null) {ServerInfoDO newServerInfo = new ServerInfoDO(ip);server = serverInfoRepository.saveAndFlush(newServerInfo);} else {serverInfoRepository.updateGmtModifiedByIp(ip, new Date());}if (server.getId() < MAX_SERVER_CLUSTER_SIZE) {serverInfo.setId(server.getId());} else {long retryServerId = retryServerId();serverInfo.setId(retryServerId);serverInfoRepository.updateIdByIp(retryServerId, ip);}} catch (Exception e) {log.error("[ServerInfoService] init server failed", e);throw e;} finally {lockService.unlock(SERVER_INIT_LOCK);}log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverInfo.getId(), sw);}@Scheduled(fixedRate = 15000, initialDelay = 15000)public void heartbeat() {serverInfoRepository.updateGmtModifiedByIp(serverInfo.getIp(), new Date());}private long retryServerId() {List<ServerInfoDO> serverInfoList = serverInfoRepository.findAll();log.info("[ServerInfoService] current server record num in database: {}", serverInfoList.size());// clean inactive server record firstif (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) {// use a large time interval to prevent valid records from being deleted when the local time is inaccurateDate oneDayAgo = DateUtils.addDays(new Date(), -1);int delNum =serverInfoRepository.deleteByGmtModifiedBefore(oneDayAgo);log.warn("[ServerInfoService] delete invalid {} server info record before {}", delNum, oneDayAgo);serverInfoList = serverInfoRepository.findAll();}if (serverInfoList.size() > MAX_SERVER_CLUSTER_SIZE) {throw new PowerJobException(String.format("The powerjob-server cluster cannot accommodate %d machines, please rebuild another cluster", serverInfoList.size()));}Set<Long> uedServerIds = serverInfoList.stream().map(ServerInfoDO::getId).collect(Collectors.toSet());for (long i = 1; i <= MAX_SERVER_CLUSTER_SIZE; i++) {if (uedServerIds.contains(i)) {continue;}log.info("[ServerInfoService] ID[{}] is not used yet, try as new server id", i);return i;}throw new PowerJobException("impossible");}@Autowired(required = false)public void setBuildProperties(BuildProperties buildProperties) {if (buildProperties == null) {return;}String pomVersion = buildProperties.getVersion();if (StringUtils.isNotBlank(pomVersion)) {serverInfo.setVersion(pomVersion);}}@Overridepublic ServerInfo fetchServiceInfo() {return serverInfo;}
}
ServerInfoServiceImpl实现了ServerInfoService接口,其构造器注入lockService和serverInfoRepository,先通过lockService.tryLock抢到
server_init_lock
,然后serverInfoRepository.findByIp找到ServerInfoDO执行saveAndFlush或者updateGmtModifiedByIp;其fetchServiceInfo返回的是serverInfo信息;它还以fixedRate为15s调度了heartbeat,主要是更新gmtModifed
SnowFlakeIdGenerator
tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java
public class SnowFlakeIdGenerator {/*** 起始的时间戳(a special day for me)*/private final static long START_STAMP = 1555776000000L;/*** 序列号占用的位数*/private final static long SEQUENCE_BIT = 6;/*** 机器标识占用的位数*/private final static long MACHINE_BIT = 14;/*** 数据中心占用的位数*/private final static long DATA_CENTER_BIT = 2;/*** 每一部分的最大值*/private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT);private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT);private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);/*** 每一部分向左的位移*/private final static long MACHINE_LEFT = SEQUENCE_BIT;private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;/*** 数据中心*/private final long dataCenterId;/*** 机器标识*/private final long machineId;/*** 序列号*/private long sequence = 0L;/*** 上一次时间戳*/private long lastTimestamp = -1L;public SnowFlakeIdGenerator(long dataCenterId, long machineId) {if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {throw new IllegalArgumentException("dataCenterId can't be greater than MAX_DATA_CENTER_NUM or less than 0");}if (machineId > MAX_MACHINE_NUM || machineId < 0) {throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");}this.dataCenterId = dataCenterId;this.machineId = machineId;}/*** 产生下一个ID*/public synchronized long nextId() {long currStamp = getNewStamp();if (currStamp < lastTimestamp) {return futureId();}if (currStamp == lastTimestamp) {//相同毫秒内,序列号自增sequence = (sequence + 1) & MAX_SEQUENCE;//同一毫秒的序列数已经达到最大if (sequence == 0L) {currStamp = getNextMill();}} else {//不同毫秒内,序列号置为0sequence = 0L;}lastTimestamp = currStamp;return (currStamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分| dataCenterId << DATA_CENTER_LEFT //数据中心部分| machineId << MACHINE_LEFT //机器标识部分| sequence; //序列号部分}/*** 发生时钟回拨时借用未来时间生成Id,避免运行过程中任务调度和工作流直接进入不可用状态* 注:该方式不可解决原算法中停服状态下时钟回拨导致的重复id问题*/private long futureId() {sequence = (sequence + 1) & MAX_SEQUENCE;if (sequence == 0L) {lastTimestamp = lastTimestamp + 1;}return (lastTimestamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分| dataCenterId << DATA_CENTER_LEFT //数据中心部分| machineId << MACHINE_LEFT //机器标识部分| sequence; //序列号部分}private long getNextMill() {long mill = getNewStamp();while (mill <= lastTimestamp) {mill = getNewStamp();}return mill;}private long getNewStamp() {return System.currentTimeMillis();}
}
SnowFlakeIdGenerator的dataCenterId(
最大值为3
)和machineId(最大值为16383
),sequence最大值为63
小结
PowerJob的IdGenerateService通过serverInfoService.fetchServiceInfo().getId()获取machineId,最后创建SnowFlakeIdGenerator,其DATA_CENTER_ID为0;其allocate返回的是snowFlakeIdGenerator.nextId();其InstanceInfoDO的instanceId就是idGenerateService.allocate()生成的。
这篇关于聊聊PowerJob的IdGenerateService的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!