利用Redis实现集群或开发环境下SnowFlake自动配置机器号

2023-12-25 17:48

本文主要是介绍利用Redis实现集群或开发环境下SnowFlake自动配置机器号,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言:

系统流小说 wap.kuwx.net

SnowFlake 雪花ID 算法是推特公司推出的著名分布式ID生成算法。利用预先分配好的机器ID,工作区ID,机器时间可以生成全局唯一的随时间趋势递增的Long类型ID.长度在17-19位。随着时间的增长而递增,在MySQL数据库中,InnoDB存储引擎可以更快的插入递增的主键。而不像UUID那样因为写入是乱序的,InnoDB不得不频繁的做页分裂操作,耗时且容易产生碎片。

对于SnowFlake 的原理介绍,可以参考该文章:理解分布式id生成算法SnowFlake

理解了雪花的基本原理之后,我们试想:在分布式集群或者开发环境下,不同服务之间/相同服务的不同机器之间应该如何产生差异呢?有以下几种方案:

  1. 通过在 yml 文件中配置不同的参数,启动 spring 容器时通过读取该参数来实现不同服务与不同机器的workerId不同。但是这里不方便新增机器/新同事的自动化配置
  2. 向第三方应用如zookeeper、Redis中注册ID,以获得唯一的ID。
  3. 对于开发环境,可以取机器的IP后三位。因为大家在一个办公室的话IP后三位肯定是0-255之前不重复。但是这样机器ID需要8个Bit,留给数据中心的位数就只有4个了。

本方案结合了以上方案的优点,按照业务的实际情况对雪花中的数据中心和机器ID所占的位数进行调整:数据中心占4Bit,范围从0-15。机器ID占6Bit,范围从0-63
。对不同的服务在yml中配置服务名称,以服务编号作为数据中心ID。如果按照开发+测试+生产环境区分的话,可以部署5个不同的服务。application.yml 中配置如下的参数

# 分布式雪花ID不同机器ID自动化配置
snowFlake:dataCenter: 1 # 数据中心的idappName: test # 业务类型名称

而机器ID采用以下的策略实现:

  1. 获取当前机器的IP地址 localIp,模32,获得0-31的整数 machineId
  2. 向Redis中注册,使用 appName + dataCenter + machineId 作为key ,以本机IP localIp 作为 value。
  3. 注册成功后,设置键过期时间 24 h,并开启一个计时器,在 23h 后更新注册的 key
  4. 如果注册失败,可能有以下两个原因:
    1. 上次服务异常中断,没有来得及删除key。这里的解决方案是通过key获取value,如果value和localIp一致,则仍然视为注册成功
    2. IP和别人的IP模32的结果一样,导致机器ID冲突。这是就遍历 0-31 获取其中为注册的数字作为本机的机器号
  5. 如果不幸Redis连接失败,系统将从32-63之间随机获取ID,并使用 log.error() 打印醒目的提示消息这里建议IDEA + Grep Console 实现不同级别的日志不同前景色显示,方便及时获取错误信息
  6. 当服务停止前,向Redis发送请求,删除该Key的占用。

具体的代码如下:

自动配置机器ID,并在容器启动时放入SnowFlake实例对象

package cn.keats.util;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;@Configuration
@Slf4j
public class MachineIdConfig {@Resourceprivate JedisPool jedisPool;@Value("${snowFlake.dataCenter}")private Integer dataCenterId;@Value("${snowFlake.appName}")private String APP_NAME;/*** 机器id*/public static Integer machineId;/*** 本地ip地址*/private static String localIp;/*** 获取ip地址** @return* @throws UnknownHostException*/private String getIPAddress() throws UnknownHostException {InetAddress address = InetAddress.getLocalHost();return address.getHostAddress();}/*** hash机器IP初始化一个机器ID*/@Beanpublic SnowFlake initMachineId() throws Exception {localIp = getIPAddress(); // 192.168.0.233Long ip_ = Long.parseLong(localIp.replaceAll("\\.", ""));// 1921680233//machineId = ip_.hashCode() % 32;// 0-31// 创建一个机器IDcreateMachineId();log.info("初始化 machine_id :{}", machineId);return new SnowFlake(machineId, dataCenterId);}/*** 容器销毁前清除注册记录*/@PreDestroypublic void destroyMachineId() {try (Jedis jedis = jedisPool.getResource()) {jedis.del(APP_NAME + dataCenterId + machineId);}}/*** 主方法:首先获取机器 IP 并 % 32 得到 0-31* 使用 业务名 + 组名 + IP 作为 Redis 的 key,机器IP作为 value,存储到Redis中** @return*/public Integer createMachineId() {try {// 向redis注册,并设置超时时间log.info("注册一个机器ID到Redis " + machineId + " IP:" + localIp);Boolean flag = registerMachine(machineId, localIp);// 注册成功if (flag) {// 启动一个线程更新超时时间updateExpTimeThread();// 返回机器Idlog.info("Redis中端口没有冲突 " + machineId + " IP:" + localIp);return machineId;}// 注册失败,可能原因 Hash%32 的结果冲突if (!checkIfCanRegister()) {// 如果 0-31 已经用完,使用 32-64之间随机的IDgetRandomMachineId();createMachineId();} else {// 如果存在剩余的IDlog.warn("Redis中端口冲突了,使用 0-31 之间未占用的Id " + machineId + " IP:" + localIp);createMachineId();}} catch (Exception e) {// 获取 32 - 63 之间的随机Id// 返回机器Idlog.error("Redis连接异常,不能正确注册雪花机器号 " + machineId + " IP:" + localIp, e);log.warn("使用临时方案,获取 32 - 63 之间的随机数作为机器号,请及时检查Redis连接");getRandomMachineId();return machineId;}return machineId;}/*** 检查是否被注册满了** @return*/private Boolean checkIfCanRegister() {// 判断0~31这个区间段的机器IP是否被占满try (Jedis jedis = jedisPool.getResource()) {Boolean flag = true;for (int i = 0; i < 32; i++) {flag = jedis.exists(APP_NAME + dataCenterId + i);// 如果不存在。设置机器Id为这个不存在的数字if (!flag) {machineId = i;break;}}return !flag;}}/*** 1.更新超時時間* 注意,更新前检查是否存在机器ip占用情况*/private void updateExpTimeThread() {// 开启一个线程执行定时任务:// 每23小时更新一次超时时间new Timer(localIp).schedule(new TimerTask() {@Overridepublic void run() {// 检查缓存中的ip与本机ip是否一致, 一致则更新时间,不一致则重新获取一个机器idBoolean b = checkIsLocalIp(String.valueOf(machineId));if (b) {log.info("IP一致,更新超时时间 ip:{},machineId:{}, time:{}", localIp, machineId, new Date());try (Jedis jedis = jedisPool.getResource()) {jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24 );}} else {// IP冲突log.info("重新生成机器ID ip:{},machineId:{}, time:{}", localIp, machineId, new Date());// 重新生成机器ID,并且更改雪花中的机器IDgetRandomMachineId();// 重新生成并注册机器idcreateMachineId();// 更改雪花中的机器IDSnowFlake.setWorkerId(machineId);// 结束当前任务log.info("Timer->thread->name:{}", Thread.currentThread().getName());this.cancel();}}}, 10 * 1000, 1000 * 60 * 60 * 23);}/*** 获取32-63随机数*/public void getRandomMachineId() {machineId = (int) (Math.random() * 31) + 31;}/*** 检查Redis中对应Key的Value是否是本机IP** @param mechineId* @return*/private Boolean checkIsLocalIp(String mechineId) {try (Jedis jedis = jedisPool.getResource()) {String ip = jedis.get(APP_NAME + dataCenterId + mechineId);log.info("checkIsLocalIp->ip:{}", ip);return localIp.equals(ip);}}/*** 1.注册机器* 2.设置超时时间** @param machineId 取值为0~31* @return*/private Boolean registerMachine(Integer machineId, String localIp) throws Exception {// try with resources 写法,出异常会释放括号内的资源 Java7特性try (Jedis jedis = jedisPool.getResource()) {// key 业务号 + 数据中心ID + 机器ID value 机器IPLong result = jedis.setnx(APP_NAME + dataCenterId + machineId, localIp);if(result == 1){// 过期时间 1 天jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);return true;} else {// 如果Key存在,判断Value和当前IP是否一致,一致则返回TrueString value = jedis.get(APP_NAME + dataCenterId + machineId);if(localIp.equals(value)){// IP一致,注册机器ID成功jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);return true;}return false;}}}
}

雪花ID:

import org.springframework.context.annotation.Configuration;/*** 功能:分布式ID生成工具类**/
@Configuration
public class SnowFlake {/*** 开始时间截 (2019-09-08) 服务一旦运行过之后不能修改。会导致ID生成重复*/private final long twepoch = 1567872000000L;/*** 机器Id所占的位数 0 - 64*/private final long workerIdBits = 6L;/*** 工作组Id所占的位数 0 - 16*/private final long dataCenterIdBits = 4L;/*** 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)*/private final long maxWorkerId = -1L ^ (-1L << workerIdBits);/*** 支持的最大数据标识id,结果是15*/private final long maxDatacenterId = -1L ^ (-1L << dataCenterIdBits);/*** 序列在id中占的位数*/private final long sequenceBits = 12L;/*** 机器ID向左移12位*/private final long workerIdShift = sequenceBits;/*** 数据标识id向左移17位(12+5)*/private final long datacenterIdShift = sequenceBits + workerIdBits;/*** 时间截向左移22位(5+5+12)*/private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;/*** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)*/private final long sequenceMask = -1L ^ (-1L << sequenceBits);/*** 工作机器ID(0~63)*/private static long workerId;/*** 数据中心ID(0~16)*/private long datacenterId;/*** 毫秒内序列(0~4095)*/private long sequence = 0L;/*** 上次生成ID的时间截*/private long lastTimestamp = -1L;//==============================Constructors=====================================/*** 构造函数** @param workerId     工作ID (0~63)* @param datacenterId 数据中心ID (0~15)*/public SnowFlake(long workerId, long datacenterId) {if (workerId > maxWorkerId || workerId < 0) {throw new IllegalArgumentException(String.format("机器ID必须小于 %d 且大于 0", maxWorkerId));}if (datacenterId > maxDatacenterId || datacenterId < 0) {throw new IllegalArgumentException(String.format("工作组ID必须小于 %d 且大于 0", maxDatacenterId));}this.workerId = workerId;this.datacenterId = datacenterId;}/*** 构造函数**/public SnowFlake() {this.workerId = 0;this.datacenterId = 0;}/*** 获得下一个ID (该方法是线程安全的)** @return SnowFlakeId*/public synchronized long nextId() {long timestamp = timeGen();//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常if (timestamp < lastTimestamp) {throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));}// 如果是同一时间生成的,则进行毫秒内序列if (lastTimestamp == timestamp) {sequence = (sequence + 1) & sequenceMask;// 毫秒内序列溢出if (sequence == 0) {// 阻塞到下一个毫秒,获得新的时间戳timestamp = tilNextMillis(lastTimestamp);}}//时间戳改变,毫秒内序列重置else {sequence = 0L;}// 上次生成ID的时间截lastTimestamp = timestamp;// 移位并通过或运算拼到一起组成64位的IDreturn ((timestamp - twepoch) << timestampLeftShift) //| (datacenterId << datacenterIdShift) //| (workerId << workerIdShift) //| sequence;}/*** 阻塞到下一个毫秒,直到获得新的时间戳** @param lastTimestamp 上次生成ID的时间截* @return 当前时间戳*/protected long tilNextMillis(long lastTimestamp) {long timestamp = timeGen();while (timestamp <= lastTimestamp) {timestamp = timeGen();}return timestamp;}/*** 返回以毫秒为单位的当前时间** @return 当前时间(毫秒)*/protected long timeGen() {return System.currentTimeMillis();}public long getWorkerId() {return workerId;}public static void setWorkerId(long workerId) {SnowFlake.workerId = workerId;}public long getDatacenterId() {return datacenterId;}public void setDatacenterId(long datacenterId) {this.datacenterId = datacenterId;}
}

Redis 配置

public class RedisConfig {@Value("${spring.redis.host}")private String host;@Value("${spring.redis.port:6379}")private Integer port;@Value("${spring.redis.password:-1}")private String password;@Beanpublic JedisPool jedisPool() {// 1.设置连接池的配置对象JedisPoolConfig config = new JedisPoolConfig();// 设置池中最大连接数config.setMaxTotal(50);// 设置空闲时池中保有的最大连接数config.setMaxIdle(10);config.setMaxWaitMillis(3000L);config.setTestOnBorrow(true);log.info(password);// 2.设置连接池对象if("-1".equals(password)){log.info("Redis不通过密码连接");return new JedisPool(config, host, port,0);} else {log.info("Redis通过密码连接" + password);return new JedisPool(config, host, port,0, password);}}
}

使用方法

  1. 项目中引入 Redis 、 Jedis 依赖
  2. 复制上面两个类到项目until包下
  3. application.yml 配置服务名称,机器序号,Redis账号,密码
  4. 配置Jedis,使得项目启动时池中有Redis连接对象
  5. 启动项目
  6. 在需要生成ID的类中注入
    @Autowiredprivate SnowFlake snowFlake;// 生产IDsnowFlake.nextId(); 方法生产ID

这篇关于利用Redis实现集群或开发环境下SnowFlake自动配置机器号的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/536385

相关文章

windos server2022的配置故障转移服务的图文教程

《windosserver2022的配置故障转移服务的图文教程》本文主要介绍了windosserver2022的配置故障转移服务的图文教程,以确保服务和应用程序的连续性和可用性,文中通过图文介绍的非... 目录准备环境:步骤故障转移群集是 Windows Server 2022 中提供的一种功能,用于在多个

windos server2022里的DFS配置的实现

《windosserver2022里的DFS配置的实现》DFS是WindowsServer操作系统提供的一种功能,用于在多台服务器上集中管理共享文件夹和文件的分布式存储解决方案,本文就来介绍一下wi... 目录什么是DFS?优势:应用场景:DFS配置步骤什么是DFS?DFS指的是分布式文件系统(Distr

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服

基于Python开发电脑定时关机工具

《基于Python开发电脑定时关机工具》这篇文章主要为大家详细介绍了如何基于Python开发一个电脑定时关机工具,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 简介2. 运行效果3. 相关源码1. 简介这个程序就像一个“忠实的管家”,帮你按时关掉电脑,而且全程不需要你多做

C#使用yield关键字实现提升迭代性能与效率

《C#使用yield关键字实现提升迭代性能与效率》yield关键字在C#中简化了数据迭代的方式,实现了按需生成数据,自动维护迭代状态,本文主要来聊聊如何使用yield关键字实现提升迭代性能与效率,感兴... 目录前言传统迭代和yield迭代方式对比yield延迟加载按需获取数据yield break显式示迭

Python实现高效地读写大型文件

《Python实现高效地读写大型文件》Python如何读写的是大型文件,有没有什么方法来提高效率呢,这篇文章就来和大家聊聊如何在Python中高效地读写大型文件,需要的可以了解下... 目录一、逐行读取大型文件二、分块读取大型文件三、使用 mmap 模块进行内存映射文件操作(适用于大文件)四、使用 pand

python实现pdf转word和excel的示例代码

《python实现pdf转word和excel的示例代码》本文主要介绍了python实现pdf转word和excel的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录一、引言二、python编程1,PDF转Word2,PDF转Excel三、前端页面效果展示总结一

Python xmltodict实现简化XML数据处理

《Pythonxmltodict实现简化XML数据处理》Python社区为提供了xmltodict库,它专为简化XML与Python数据结构的转换而设计,本文主要来为大家介绍一下如何使用xmltod... 目录一、引言二、XMLtodict介绍设计理念适用场景三、功能参数与属性1、parse函数2、unpa

C#实现获得某个枚举的所有名称

《C#实现获得某个枚举的所有名称》这篇文章主要为大家详细介绍了C#如何实现获得某个枚举的所有名称,文中的示例代码讲解详细,具有一定的借鉴价值,有需要的小伙伴可以参考一下... C#中获得某个枚举的所有名称using System;using System.Collections.Generic;usi

Go语言实现将中文转化为拼音功能

《Go语言实现将中文转化为拼音功能》这篇文章主要为大家详细介绍了Go语言中如何实现将中文转化为拼音功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 有这么一个需求:新用户入职 创建一系列账号比较麻烦,打算通过接口传入姓名进行初始化。想把姓名转化成拼音。因为有些账号即需要中文也需要英