分布式ID解决方案分析,百度uid-generator,美团Leaf原理分析
为什么需要分布式ID
在一些复杂的分布式系统中,往往需要对大量的数据和消息进行唯一标识。主要考虑两点:
- 为了后续扩展,数据库分库分表后需要有一个唯一ID来标识一条数据或消息,自增ID并不能满足分库分表的需求
- 如一些金融、支付、电商、物流等产品的系统中都需要有唯一ID做标识,如订单号、用户标识、优惠券号。因此需要一个能够生成全局唯一ID的分布式ID生成系统
分布式ID特性
基本特性
- 全局唯一性:不能出现重复的ID号
- 趋势递增,由于多数RDBMS使用B-tree的数据结构来存储索引数据,在主键的选择上面我们应该尽量使用有序的主键保证写入性能。
- 单调递增:保证下一个ID一定大于上一个ID,可以满足排序、IM增量消息、事务版本号等特殊需求。
- 信息安全:如果ID是连续的,很容易被恶意用户进行数据扒取与推算数据,所以要求ID尽量不规则。
- 可逆性:分布式id里面最好包含时间戳/业务标识且可逆分析,利于排查问题。
上述3和4特性是互斥的,但是可以进行取舍,例如基于SnowFlake算法的ID生成是单调递增的,但是根据获取的场景不是连续的。
设计要求
- 高性能:高可用低延时,ID要快速生成响应,避免成为业务瓶颈
- 高可用:一般很多关键的核心服务对分布式ID生成服务有强依赖,需要保证高可用避免影响系统正常运行。
- 易于接入:设计和实现上要尽可能的简单,做到拿来即用的设计。
分析几种分布式ID生成方案
简单方案[UUID/数据库自增ID/Redis incr]
UUID
- 什么是UUID
UUID是由一组32位数的16进制数字所构成,UUID理论上的总数为16^32=2^128,若每纳秒(ns)产生1兆个UUID,要花100亿年才会将所有UUID用完。
- UUID格式
UUID 的 16 个 8 位字节表示为 32 个十六进制(基数16)数字,显示在由连字符分隔 ‘-’ 的五个组中,“8-4-4-4-12” 总共 36 个字符(32 个字母数字字符和 4 个连接字符)
xxxxxxxx-xxxx-Mxxx-Nxxx-xxxxxxxxxxxx
例如:123e4567-e89b-12d3-a456-426655440000
其中四位数字 M
表示 UUID 版本,数字 N
的一至三个最高有效位表示 UUID 变体,布局如下:
Name | Length (bytes) | Length (hex digits) | Contents |
---|---|---|---|
time_low | 4 | 8 | 整数:低位 32 bits 时间 |
time_mid | 2 | 4 | 整数:中间位 16 bits 时间 |
time_hi_and_version | 2 | 4 | 最高有效位中的 4 bits“版本”,后面是高 12 bits 的时间 |
clock_seq_hi_and_res clock_seq_low | 2 | 4 | 最高有效位为 1-3 bits“变体”,后跟13-15 bits 时钟序列 |
node | 6 | 12 | 48 bits 节点 ID |
- 版本
- 版本1 UUID 是根据时间和节点 ID(通常是MAC地址)生成
- 版本2 UUID是根据标识符(通常是组或用户ID)、时间和节点ID生成
- 版本3 和 版本5 确定性UUID 通过散列 (hashing) 名字空间 (namespace) 标识符和名称生成
- 版本4 UUID 使用随机性或伪随机性生成
- 碰撞
对于版本一,二使用MAC地址生成的UUID,版本三四五产生碰撞的可能想更大,但通常可能是忽略。在 103 万亿 个 版本4 UUID中找到重复概率是十亿分之一。
- 方案分析
使用UUID做分布式ID是可行的,有点是生成方式简单,快速,但是并不推荐使用,因为使用UUID生成的是无意义的字符串,而且非趋势递增,用作主键存储性能差。
基于数据库自增ID
基于关系型数据库如MySql的主键自增(AUTO_INCREMENT)特性。建立一张专门的ID生成的表,需要获取ID就在表中插入一条记录获取自增的ID,也可以批量插入获取。
- 方案分析
使用数据库自增ID的方式实现简单,但是生成的是递增连续的ID的信息不安全,而且不适合作为订单号等业务的主键;并且对数据库有强依赖关系,难以扩展有宕机风险。
基于Redis的incr命令
基于Redis的incr命令实现ID的原子自增,先使用set 命令设置初始值,每次获取ID的时候使用incr命令获取自增ID。
- 方案分析
与基于数据库自增ID存在相同的问题,利用Redis内存数据库与单线程模型的特性有更高的并发性,但需要注意数据持久化的问题。基于RDB的定时快照方式重启后可能会出现ID重复问题,基于AOF的命令记录方式又存在重启恢复时间长的问题。
基于数据库集群模式
基于数据库集群的方式是对基于自增ID方式的高可用优化。对数据库做多主模式集群。
实现方式
- 设置自增ID起始值与步长
如果是双主模式集群,那么需要设置自增ID的步长为2,可以通过如下命令设置自增ID的起始值与步长。
set @@auto_increment_offset = 1; -- 起始值
set @@auto_increment_increment = 2; -- 步长
第一台MySql的起始值为1,步长为2的话第二台的起始值为2,步长为2,以此类推。
- 后续扩容问题
如果后续需要增加MySql节点,需要修改一、二两台MySQL实例的ID起始值和步长,并把第三台机器的ID起始生成位置设定在比现有最大自增ID大。
方案分析
虽然解决了基于数据库自增ID方式的高可用性问题,但是在出现性能瓶颈后需要做繁琐的扩容处理,操作不当可能出现ID重复的风险。而且也存在着数据库自增ID方式的其他问题。
基于数据库号段模式
基于号段的方式旨在通过批量的获取自增ID的方式,降低对数据库的压力,每次从数据库取出一个号段范围。
实现方式
- 表结构设计
基于号段的方式获取ID在表设计上有多种选择,但核心在于需要记录当前最大的可用ID。
CREATE TABLE id_generator (
id int(10) NOT NULL,
max_id bigint(20) NOT NULL COMMENT '当前最大的可用id',
step int(20) NOT NULL COMMENT '号段的长度',
biz_id int(20) NOT NULL COMMENT '业务id',
version int(20) NOT NULL COMMENT '版本号',
PRIMARY KEY (`id`)
)
- ID批量获取
某个业务需要批量获取ID可用通过update语句,设置biz_id所在的max_id=max_id+step,获取到号段(max_id,max_id+step],例如max_id=1000,step=2000,那么获取到的号段为(1000,3000],获取后当前最大max_id=3000。
update id_generator set max_id = max_id+step, version = version + 1 where version = version and biz_id = ***
方案分析
通过批量获取ID的思路降低了对数据库的压力与强依赖关系,依据这个思路可以作为一些特定业务如用户短ID生成的方案。
基于雪花算法的ID生成
雪花算法(Snowflake)是twitter内部使用的分布式ID生成算法,旨在依据一定的算法逻辑,生成一个Long类型的ID(64bit)。组成结构大致如下:
正数位(1bit)+时间戳(41bit)+机器ID(5bit)+ 数据中心(5bit)+ 自增ID(12bit)
具体实现方式可参考博主之前的一篇文章:https://chinalhr.github.io/post/uid-generate-snowflake/
方案分析
基于雪花算法的ID生成方式简单,趋势递增且带有可逆性,用作主键存储性能高,也支持后续数据库分库分表,算法实现方式灵活性高可自定义组合优化,高性能每秒能够产生26万个ID,并且不依赖于数据库,以服务的方式部署,稳定性更高。
但是因为强依赖机器时钟,如果机器上时钟回拨,会导致发号重复或者服务会处于不可用状态,而且雪花算法通过提前规划好机器标识来实现,但在分布式生产环境中是不够友好的,一般需要自动启停,增减机器,因此很多互联网大厂都对改算法进行了一定的改造。
分布式ID开源项目分析-百度uid-generator项目
UidGenerator是由百度技术部开发的,通过Java实现, 基于Snowflake算法的唯一ID生成器。UidGenerator以组件形式工作在应用项目中,支持自定义workerId位数和初始化策略, 从而适用于docker等容器化环境下实例自动重启、漂移等场景。
UidGenerator项目地址:https://github.com/baidu/uid-generator
数据结构
UidGenerator生成的ID默认结构如下:
- sign(1bit):固定1bit符号标识,即生成的UID为正数。
- delta seconds (28 bits):当前时间,相对于时间基点"2016-05-20"的增量值,单位:秒,最多可支持约8.7年
- worker id (22 bits): 机器id,最多可支持约420w次机器启动。内置实现为在启动时由数据库分配,默认分配策略为用后即弃,后续可提供复用策略。
- sequence (13 bits): 每秒下的并发序列,13 bits可支持每秒8192个并发。
项目结构
项目目录:
├── BitsAllocator.java - Bit分配器(C)
├── UidGenerator.java - UID生成器接口(I)
├── buffer
│ ├── BufferPaddingExecutor.java - 填充RingBuffer的执行器(C)
│ ├── BufferedUidProvider.java - RingBuffer中UID的生产者(C)
│ ├── RejectedPutBufferHandler.java - 拒绝Put到RingBuffer的处理器(C)
│ ├── RejectedTakeBufferHandler.java - 拒绝从RingBuffer中Take的处理器(C)
│ └── RingBuffer.java - RingBuffer数据结构实现类(C)
├── exception - 相关的自定义异常类
├── impl
│ ├── CachedUidGenerator.java - 基于RingBuffer存储的UID生成器实现类(C)
│ └── DefaultUidGenerator.java - 默认UID生成器实现类(C)
├── utils - 工具类(日期,网络,线程池,枚举)
└── worker
├── DisposableWorkerIdAssigner.java - 一次性的WorkerId分配器(C)
├── WorkerIdAssigner.java - WorkerId分配器接口(I)
├── WorkerNodeType.java - 工作节点类型,分为容器与实体两种(E)
├── dao
│ └── WorkerNodeDAO.java - WorkerNode数据库访问接口
└── entity
└── WorkerNodeEntity.java - WorkerNode 实体类
项目依赖:
主要依赖了springframework的核心包,mybatis,Apache commons工具类,logback
表结构:
CREATE TABLE WORKER_NODE(
ID BIGINT NOT NULL AUTO_INCREMENT COMMENT 'auto increment id',
HOST_NAME VARCHAR(64) NOT NULL COMMENT 'host name',
PORT VARCHAR(64) NOT NULL COMMENT 'port',
TYPE INT NOT NULL COMMENT 'node type: ACTUAL or CONTAINER',
LAUNCH_DATE DATE NOT NULL COMMENT 'launch date',
MODIFIED TIMESTAMP NOT NULL COMMENT 'modified time',
CREATED TIMESTAMP NOT NULL COMMENT 'created time',
PRIMARY KEY(ID)
) COMMENT='DB WorkerID Assigner for UID Generator',ENGINE = INNODB;
从上面的项目目录来看,UidGenerator提供了两种实现方式,分别是实时ID生成方式的DefaultUidGenerator与预生成ID生成方式的CachedUidGenerator。
DefaultUidGenerator实现方式
UidGenerator接口如下
public interface UidGenerator {
/**
* 获取UID
*/
long getUID() throws UidGenerateException;
/**
* 解析格式化输出UID
*/
String parseUID(long uid);
}
DefaultUidGenerator实现了UidGenerator与InitializingBean,具体实现流程如下:
- Bean初始化,配置的timeBits,workerBits,seqBits位数与epochStr日期,因为最多可支持约8.7年,所以最好设置epochStr值为合适的日期。
- afterPropertiesSet方法初始化BitsAllocator设置timeBits,workerBits,seqBits等位数信息,初始化workId(每次初始化往WORKER_NODE表插入一条记录,返回的主键ID就是workId,所以最多可支持约420w次机器启动)。
- 核心方法nextId获取UID,具体方式如下
protected synchronized long nextId() {
//获取当前时间,单位秒,会校验是否超过最大的时间年限,超过则抛出异常
long currentSecond = getCurrentSecond();
// 如果出现时间回拨,就抛出异常
if (currentSecond < lastSecond) {
long refusedSeconds = lastSecond - currentSecond;
throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
}
// 同一秒内序列号自增,
if (currentSecond == lastSecond) {
sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
//超过最大序列,等待下一秒生成uid
if (sequence == 0) {
currentSecond = getNextSecond(lastSecond);
}
// 不同秒内序列号从0开始
} else {
sequence = 0L;
}
lastSecond = currentSecond;
// bits位移操作生成UID
return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
}
总结:DefaultUidGenerator的实现与雪花算法实现对比改动较小,对生成ID结构的位数可自定义,对分布式环境下增减机器的WorkId分配更友好,但没有解决时间回拨问题,采用了简单的抛出异常的方式处理。
RingBuffer
RingBuffer本质上是一个数组,数组中每个项被称为slot。UidGenerator设计了两个RingBuffer,一个保存唯一ID,一个保存flag。RingBuffer的尺寸是2^n。
RingBuffer Of UID
RingBuffer Of UID是存储UID的RingBuffer 使用两个指针,Tail指针表示最新生成的UID,如果这个Tail指针追上了Cursor指针,此时RingBuffer已经满了,不允许再继续生成UID了。可以通过属性rejectedPutBufferHandler指定拒绝策略。
Cursor指针表示最后一个已经给消费的UID,如果Cursor指针追上了Tail指针,此时RingBuffer已经空了,不允许继续获取UID了。可以通过rejectedTakeBufferHandler指定拒绝策略。
RingBuffer Of Flag
RingBuffer Of Flag是存储Uid状态的RingBuffer,每个slot的值都是0或者1,0是CAN_PUT_FLAG(可以填充)的标志位,1是CAN_TAKE_FLAG(可以消费)的标识位。
配置参数
<!-- 以下为可选配置, 如未指定将采用默认值 -->
<!-- RingBuffer size扩容参数, 可提高UID生成的吞吐量. -->
<!-- 默认:3, 原bufferSize=8192, 扩容后bufferSize= 8192 << 3 = 65536 -->
<property name="boostPower" value="3"></property>
<!-- 指定何时向RingBuffer中填充UID, 取值为百分比(0, 100), 默认为50 -->
<!-- 举例: bufferSize=1024, paddingFactor=50 -> threshold=1024 * 50 / 100 = 512. -->
<!-- 当环上可用UID数量 < 512时, 将自动对RingBuffer进行填充补全 -->
<property name="paddingFactor" value="50"></property>
<!-- 另外一种RingBuffer填充时机, 在Schedule线程中, 周期性检查填充 -->
<!-- 默认:不配置此项, 即不使用Schedule线程. 如需使用, 请指定Schedule线程时间间隔, 单位:秒 -->
<property name="scheduleInterval" value="60"></property>
<!-- 拒绝策略: 当环已满, 无法继续填充时 -->
<!-- 默认无需指定, 将丢弃Put操作, 仅日志记录. 如有特殊需求, 请实现RejectedPutBufferHandler接口(支持Lambda表达式) -->
<property name="rejectedPutBufferHandler" ref="XxxxYourPutRejectPolicy"></property>
<!-- 拒绝策略: 当环已空, 无法继续获取时 -->
<!-- 默认无需指定, 将记录日志, 并抛出UidGenerateException异常. 如有特殊需求, 请实现RejectedTakeBufferHandler接口(支持Lambda表达式) -->
<property name="rejectedTakeBufferHandler" ref="XxxxYourTakeRejectPolicy"></property>
CacheLine补齐
RingBuffer的数据都是使用数组来存储的,在内存中是连续分配的,可最大程度利用CPU cache以提升性能。但同时会带来「伪共享」FalseSharing问题。tail和cursor变量如果直接用原生的AtomicLong类型,tail和cursor可能会缓存在同一个cacheLine中,多个线程读取该变量可能会引发CacheLine的RFO请求,反而影响性能,为了防止伪共享问题,填充了6个long类型的成员变量,加上long类型的value成员变量,刚好占满一个Cache Line(其中Java对象还有8byte的对象头),这种方式称之为CacheLine补齐。项目中使用了PaddedAtomicLong进行包装。
public class PaddedAtomicLong extends AtomicLong {
private static final long serialVersionUID = -3415778863941386253L;
/** Padded 6 long (48 bytes) */
public volatile long p1, p2, p3, p4, p5, p6 = 7L;
/**
* Constructors from {@link AtomicLong}
*/
public PaddedAtomicLong() {
super();
}
public PaddedAtomicLong(long initialValue) {
super(initialValue);
}
/**
* To prevent GC optimizations for cleaning unused padded references
*/
public long sumPaddingToPreventOptimization() {
return p1 + p2 + p3 + p4 + p5 + p6;
}
}
- RingBuffer填充时机
- 初始化预填充:初始化时预先填充满
- 即时填充:take消费,即时检查剩余可用slot量是否小于设定阈值,小于则进行填充
- 周期填充:通过Schedule线程,定时补全空闲slots。
CachedUidGenerator实现方式
CachedUidGenerator是基于RingBuffer去实现的。
CachedUidGenerator继承自DefaultUidGenerator,实现了DisposableBean。
初始化
- 调用父类DefaultUidGenerator的afterPropertiesSet方法初始化BitsAllocator与WorkId。
- 根据配置的boostPower与paddingFactor初始化RingBuffer,根据配置的scheduleInterval初始化bufferPaddingExecutor,初始化拒绝策略rejectedPutBufferHandler与rejectedTakeBufferHandler。
- 初始化填满RingBuffer中所有slot(初始化填充UID),此处调用bufferPaddingExecuto的paddingBuffer()方法。
- 开启Ringbuffer补丁线程(前提是配置了属性scheduleInterval)
RingBuffer.put(填充)
CachedUidGenerator生成UID的方式主要是通过paddingBuffer方法实现的,生成流程如下图所示,主要关注点是在满足填充新的唯一ID条件时,通过时间值递增得到新的时间值(lastSecond.incrementAndGet()),而不是System.currentTimeMillis()这种方式,解决了运行时时间回拨问题。
public void paddingBuffer() {
LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
// is still running
if (!running.compareAndSet(false, true)) {
LOGGER.info("Padding buffer is still running. {}", ringBuffer);
return;
}
boolean isFullRingBuffer = false;
while (!isFullRingBuffer) {
//调用nextIdsForOneSecond方法获取UID list,注意此处的时间使用lastSecond.incrementAndGet()方法时间自增的方式获取而不是System.currentTimeMillis()获取,解决了运行时时间回拨问题
List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
for (Long uid : uidList) {
//调用put方法加入RingBuffer of UID中
isFullRingBuffer = !ringBuffer.put(uid);
if (isFullRingBuffer) {
break;
}
}
}
// not running now
running.compareAndSet(true, false);
LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
}
/**
* 指定秒内获取UID List,Size为max sequence(0~max sequence)
*
* @param currentSecond
* @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1
*/
protected List<Long> nextIdsForOneSecond(long currentSecond) {
// Initialize result list size of (max sequence + 1)
int listSize = (int) bitsAllocator.getMaxSequence() + 1;
List<Long> uidList = new ArrayList<>(listSize);
// Allocate the first sequence of the second, the others can be calculated with the offset
long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);
for (int offset = 0; offset < listSize; offset++) {
uidList.add(firstSeqUid + offset);
}
return uidList;
}
public synchronized boolean put(long uid) {
//获取Tail与Cursor指针
long currentTail = tail.get();
long currentCursor = cursor.get();
// 1. 判断Ring是否已满,满了执行Put拒绝策略
long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);
if (distance == bufferSize - 1) {
rejectedPutHandler.rejectPutBuffer(this, uid);
return false;
}
// 2. check标志是否为CAN_PUT_FLAG
int nextTailIndex = calSlotIndex(currentTail + 1);
if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {
rejectedPutHandler.rejectPutBuffer(this, uid);
return false;
}
// 3. 将UID插入下一个slots中
// 4. 设置标志为CAN_TAKE_FLAG
// 5. tail指针+1
slots[nextTailIndex] = uid;
flags[nextTailIndex].set(CAN_TAKE_FLAG);
tail.incrementAndGet();
return true;
}
RingBuffer.take(取值)
CachedUidGenerator获取UID通过RingBuffer.take()方法直接从RingBuffer中取出,获取流程如下图所示
public long take() {
// 获取当前Cursor指针与下一个可用Cursor指针并进行check
long currentCursor = cursor.get();
long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);
Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");
// 1. 判断如果达到阈值,则以异步模式触发填充
long currentTail = tail.get();
if (currentTail - nextCursor < paddingThreshold) {
LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
nextCursor, currentTail - nextCursor);
bufferPaddingExecutor.asyncPadding();
}
// 2. 当前Cursor指针等于下一个可用Cursor指针,Ring为空,执行Take拒绝策略
if (nextCursor == currentCursor) {
rejectedTakeHandler.rejectTakeBuffer(this);
}
// 3. 检查下一个slot是否为CAN_TAKE_FLAG
int nextCursorIndex = calSlotIndex(nextCursor);
Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");
// 4. 从slot中获取UID
// 5. 设置标志为CAN_PUT_FLAG.
long uid = slots[nextCursorIndex];
flags[nextCursorIndex].set(CAN_PUT_FLAG);
return uid;
}
总结
百度uid-generator项目针对snowflake算法落地进行了一些改造,主要是对workId的获取上,对分布式集群环境下面,实例自动伸缩,docker容器化的场景,通过每次重启获取新的workId进行优化。提供了DefaultUidGenerator实时生成UID与CachedUidGenerator预生成UID两种方式。CachedUidGenerator通过借用未来时间来解决雪花算法sequence存在的并发限制,而且通过时间值递增的方式解决雪花算法存在的时间回拨问题。
分布式ID开源项目分析-美团Leaf项目
Leaf是美团技术部推出的一个分布式ID生成服务,通过Java实现, 基于数据库号段模式和snowflake算法模式的UID获取。
Leaf项目地址:https://github.com/Meituan-Dianping/Leaf
项目结构
这里主要关注core模块:
├── common - 公共包
│ ├── CheckVO.java
│ ├── PropertyFactory.java - leaf.properties配置获取
│ ├── Result.java - 统一返回值
│ ├── Status.java - 状态值
│ ├── Utils.java - 工具类
│ └── ZeroIDGen.java - 生成0号ID
├── IDGen.java - ID生成接口(I)
├── segment
│ ├── dao - 号段相关Dao操作
│ │ ├── IDAllocDao.java
│ │ ├── IDAllocMapper.java
│ │ └── impl
│ │ └── IDAllocDaoImpl.java
│ ├── model - 号段相关model
│ │ ├── LeafAlloc.java
│ │ ├── SegmentBuffer.java
│ │ └── Segment.java
│ └── SegmentIDGenImpl.java - 号段相关ID生成实现
└── snowflake
├── exception
│ ├── CheckLastTimeException.java
│ ├── CheckOtherNodeException.jav
│ └── ClockGoBackException.java
├── SnowflakeIDGenImpl.java - SnowFlakeID生成实现
└── SnowflakeZookeeperHolder.java - Zookeeper相关操作类
Leaf-segment号段模式
Leaf-segment号段模式是对上文的基于数据库号段模式的一种实现与优化。 每次获取一个segmen号段的值, 减轻数据库的压力 。利用 biz_tag字段来区分 业务,隔离影响,后续可以依据 biz_tag进行分库分表。
数据库表结构
CREATE TABLE `leaf_alloc` (
`biz_tag` varchar(128) NOT NULL DEFAULT '' COMMENT '业务key',
`max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '当前已经分配了的最大id',
`step` int(11) NOT NULL COMMENT '初始步长,也是动态调整的最小步长',
`description` varchar(256) DEFAULT NULL COMMENT '业务key的描述',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据库维护的更新时间',
PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
可知,在进行UID获取之前,需要先插入数据进行biz_tag的设置与其他数据的初始化。
实现方式
Leaf-segment的实现类是SegmentIDGenImpl
- 初始化
@Override
public boolean init() {
//同步biz_tag到cache中
updateCacheFromDb();
initOK = true;
//开启定时任务定时调度updateCacheFromDb()
updateCacheFromDbAtEveryMinute();
return initOK;
}
private void updateCacheFromDb() {
//获取数据库表中所有的biz_tag
List<String> dbTags = dao.getAllTags();
if (dbTags == null || dbTags.isEmpty()) {
return;
}
//缓存的biz_tag
List<String> cacheTags = new ArrayList<String>(cache.keySet());
Set<String> insertTagsSet = new HashSet<>(dbTags);
Set<String> removeTagsSet = new HashSet<>(cacheTags);
//将db中新加的biz_tags加入cache中,并初始化SegmentBuffer
for(int i = 0; i < cacheTags.size(); i++){
String tmp = cacheTags.get(i);
if(insertTagsSet.contains(tmp)){
insertTagsSet.remove(tmp);
}
}
for (String tag : insertTagsSet) {
SegmentBuffer buffer = new SegmentBuffer();
buffer.setKey(tag);
Segment segment = buffer.getCurrent();
segment.setValue(new AtomicLong(0));
segment.setMax(0);
segment.setStep(0);
cache.put(tag, buffer);
}
//将db中被删除的biz_tag(已失效)从cache删除
for(int i = 0; i < dbTags.size(); i++){
String tmp = dbTags.get(i);
if(removeTagsSet.contains(tmp)){
removeTagsSet.remove(tmp);
}
}
for (String tag : removeTagsSet) {
cache.remove(tag);
}
}
SegmentIDGenImpl的init方法可以看出主要是对biz_tag的同步,同步到应用的缓存中并且初始化对应的Segment Buffer。并且会开启调度线程进行定时的cache刷新。
- 双Buffer
初始化cache的时候,会对每一个biz_tag初始化一个SegmentBuffer,通过其代码的实现可以看SegmentBuffer 中包含了一个号段数组,包含两个 Segment,第一个Segment已下发10%时,如果下一个Segment未更新,则另启一个更新线程去更新下一个Segment,这就是Leaf的双Buffer特性。
这样做有两个好处:
一是segment长度如果设置为服务高峰期发号QPS的600倍(10分钟),即使DB宕机,Leaf仍能持续发号10-20分钟不受影响。
二是每次请求都会判断下个号段的状态,从而更新此号段,避免偶发网络抖动影响下个号段的更新。
public class SegmentBuffer {
private String key;// 数据库的biz_tag
private Segment[] segments; //双buffer
private volatile int currentPos; //当前的使用的segment的index
private volatile boolean nextReady; //下一个segment是否处于可切换状态
private volatile boolean initOk; //是否初始化完成
private final AtomicBoolean threadRunning; //线程是否在运行中
private final ReadWriteLock lock;
private volatile int step;// 动态调整的step
private volatile int minStep; // 最小step
private volatile long updateTimestamp;// 更新时间戳
}
public class Segment {
private AtomicLong value = new AtomicLong(0);
private volatile long max;
private volatile int step;
private SegmentBuffer buffer;
}
- UID取值
UID通过调用get方法获取,具体实现逻辑大致如下
public Result get(final String key) {
//SegmentIDGenImpl是否初始化
...
//cache中是否已经缓存了对应biz_id的数据
if (cache.containsKey(key)) {
//检查SegmentBuffer中的Segment是否初始化
if (!buffer.isInitOk()) {
//初始化Segment
updateSegmentFromDb(key, buffer.getCurrent());
buffer.setInitOk(true);
}
}
return getIdFromSegmentBuffer(cache.get(key));
}
updateSegmentFromDb方法从数据库表中读取数据更新SegmentBuffer中的Segment。在第一次初始号段与第二次双buffer的异步准备另一个号段会被调用,注意前两次调用之后后续双buffer的异步准备另一个号段会动态调整申请号段的区间大小,调整规则主要跟号段申请频率有关,具体逻辑大致如下:
public void updateSegmentFromDb(String key, Segment segment) {
StopWatch sw = new Slf4JStopWatch();
SegmentBuffer buffer = segment.getBuffer();
LeafAlloc leafAlloc;
/**
* 第一次初始号段
* 1. 更新数据库中key对应记录的maxId(maxId=maxId+step)
* 2. 设置buffer的Step(步长)与MinStep
*/
if (!buffer.isInitOk()) {
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
buffer.setStep(leafAlloc.getStep());
buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
}
/**
* 第二次初始号段(双Buffer异步准备)
* 1. 更新数据库中key对应记录的maxId(maxId=maxId+step)
* 2. 设置buffer的Step(步长)与MinStep
*/
else if (buffer.getUpdateTimestamp() == 0) {
leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(leafAlloc.getStep());
buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
}
/**
* 后续双Buffer异步准备
* 动态调整step
* 1. duration < 15 分钟 : step 变为原来的2倍, 最大为 MAX_STEP
* 2. 15分钟 <= duration < 30分钟 : nothing
* 3. duration >= 30 分钟 : 缩小step, 最小为DB中配置的step
* 更新数据库中key对应记录的maxId(maxId=maxId+step)
* 设置buffer的Step(步长)与MinStep
*/
else {
//更新时间差
long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
int nextStep = buffer.getStep();
if (duration < SEGMENT_DURATION) {
if (nextStep * 2 > MAX_STEP) {
//do nothing
} else {
nextStep = nextStep * 2;
}
} else if (duration < SEGMENT_DURATION * 2) {
//do nothing with nextStep
} else {
nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
}
logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
LeafAlloc temp = new LeafAlloc();
temp.setKey(key);
temp.setStep(nextStep);
leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
buffer.setUpdateTimestamp(System.currentTimeMillis());
buffer.setStep(nextStep);
buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
}
/**
* 准备当前Segment号段,设置value初始值与max/step值为buffer的max与step
*/
long value = leafAlloc.getMaxId() - buffer.getStep();
segment.getValue().set(value);
segment.setMax(leafAlloc.getMaxId());
segment.setStep(buffer.getStep());
sw.stop("updateSegmentFromDb", key + " " + segment);
}
最后的getIdFromSegmentBuffer方法会调用segment.getValue().getAndIncrement()从segment中获取value得到当前的UID返回并进行自增,这里在获取之前会判断当前Segment已经使用超过10%,如果超过会异步准备双buffer的另一个Segment。如果当前号段已经用完,切换另一个Segment号段使用。流程图如下图所示:
Leaf-snowflake模式
Leaf-snowflake数据结构上沿用了snowflake的设计,正数位(占1比特)+ 时间戳(占41比特)+ 机器ID(占5比特)+ 机房ID(占5比特)+ 自增值(占12比特)
,主要针对workId的生成与时间回拨问题进行了优化处理。
实现方式
- 初始化WorkId生成
Leaf-snowflake依靠Zookeeper生成workId[机器ID+ 机房ID],Leaf中workId是基于ZooKeeper的顺序Id来生成的,每个应用在使用Leaf-snowflake时,启动时都会都在Zookeeper中生成一个顺序Id,相当于一台机器对应一个顺序节点,也就是一个workId。
启动步骤如下:
- 启动Leaf-snowflake服务,连接Zookeeper,在leaf_forever父节点下检查自己是否已经注册过(是否有该顺序子节点)。
- 如果有注册过直接取回自己的workerID(zk顺序节点生成的int类型ID号),启动服务。
- 如果没有注册过,就在该父节点下面创建一个持久顺序节点,创建成功后取回顺序号当做自己的workerID号,启动服务。
弱化对Zookeeper的依赖:Leaf-snowflake除了每次会去ZooKeeper拿数据以外,也会在本机文件系统上缓存一个workerID文件。当ZooKeeper出现问题,而且机器需要重启,能保证服务能够正常启动,做到了对三方组件的弱依赖。一定程度上提高了SLA。
- 时间回拨问题解决
为了解决Snowflake的时间回拨问题,leaf主要从两个地方进行优化,一个是启动时对自身系统时间做校验,主要比较对象为系统节点历史记录的时间与其余节点的系统时间。启动流程如下:
服务启动时首先检查自己是否写过ZooKeeper leaf_forever节点:
- 若写过,则用自身系统时间与leaf_forever/${self}节点记录时间做比较,若小于leaf_forever/${self}时间则认为机器时间发生了大步长回拨,服务启动失败并报警。
- 若未写过,证明是新服务节点,直接创建持久节点leaf_forever/${self}并写入自身系统时间,接下来综合对比其余Leaf节点的系统时间来判断自身系统时间是否准确,具体做法是取leaf_temporary下的所有临时节点(所有运行中的Leaf-snowflake节点)的服务IP:Port,然后通过RPC请求得到所有节点的系统时间,计算sum(time)/nodeSize。
- 若abs( 系统时间-sum(time)/nodeSize ) < 阈值,认为当前系统时间准确,正常启动服务,同时写临时节点leaf_temporary/${self} 维持租约。
- 否则认为本机系统时间发生大步长偏移,启动失败并报警。
- 每隔一段时间(3s)上报自身系统时间写入leaf_forever/${self}。
二是运行时对时间回拨做了等待处理,时间偏差大小小于5ms,则等待两倍时间,还是小于则做异常处理,具体逻辑如下。
//发生了回拨,此刻时间小于上次发号时间
if (timestamp < lastTimestamp) {
long offset = lastTimestamp - timestamp;
if (offset <= 5) {
try {
//时间偏差大小小于5ms,则等待两倍时间
wait(offset << 1);//wait
timestamp = timeGen();
if (timestamp < lastTimestamp) {
//还是小于,抛异常并上报
throwClockBackwardsEx(timestamp);
}
} catch (InterruptedException e) {
throw e;
}
} else {
//throw
throwClockBackwardsEx(timestamp);
}
}
总结
Leaf-segment是基于数据库号段模式的优化,规避网络波动:通过双Buffer的预生成方式规避了网络抖动对UID生成的影响;容灾性高:DB不可用情况下一段时间依旧可以进行发号(时间与segment步长有关)。
Leaf-snowflake是基于Snowflake算法的优化,在workId的获取上,对分布式集群环境下面,实例自动伸缩场景,通过对zookeeper的弱依赖实现WorkId的获取。针对时间回拨问题处理对比百度uid-generator比较粗暴,做了启动校验与时间等待的处理。