分布式ID解决方案分析,百度uid-generator,美团Leaf原理分析

为什么需要分布式ID

在一些复杂的分布式系统中,往往需要对大量的数据和消息进行唯一标识。主要考虑两点:

  1. 为了后续扩展,数据库分库分表后需要有一个唯一ID来标识一条数据或消息,自增ID并不能满足分库分表的需求
  2. 如一些金融、支付、电商、物流等产品的系统中都需要有唯一ID做标识,如订单号、用户标识、优惠券号。因此需要一个能够生成全局唯一ID的分布式ID生成系统

分布式ID特性

基本特性

  1. 全局唯一性:不能出现重复的ID号
  2. 趋势递增,由于多数RDBMS使用B-tree的数据结构来存储索引数据,在主键的选择上面我们应该尽量使用有序的主键保证写入性能。
  3. 单调递增:保证下一个ID一定大于上一个ID,可以满足排序、IM增量消息、事务版本号等特殊需求。
  4. 信息安全:如果ID是连续的,很容易被恶意用户进行数据扒取与推算数据,所以要求ID尽量不规则。
  5. 可逆性:分布式id里面最好包含时间戳/业务标识且可逆分析,利于排查问题。

上述3和4特性是互斥的,但是可以进行取舍,例如基于SnowFlake算法的ID生成是单调递增的,但是根据获取的场景不是连续的。

设计要求

  1. 高性能:高可用低延时,ID要快速生成响应,避免成为业务瓶颈
  2. 高可用:一般很多关键的核心服务对分布式ID生成服务有强依赖,需要保证高可用避免影响系统正常运行。
  3. 易于接入:设计和实现上要尽可能的简单,做到拿来即用的设计。

分析几种分布式ID生成方案

简单方案[UUID/数据库自增ID/Redis incr]

UUID

  • 什么是UUID

UUID是由一组32位数的16进制数字所构成,UUID理论上的总数为16^32=2^128,若每纳秒(ns)产生1兆个UUID,要花100亿年才会将所有UUID用完。

  • UUID格式

UUID 的 16 个 8 位字节表示为 32 个十六进制(基数16)数字,显示在由连字符分隔 ‘-’ 的五个组中,“8-4-4-4-12” 总共 36 个字符(32 个字母数字字符和 4 个连接字符)

xxxxxxxx-xxxx-Mxxx-Nxxx-xxxxxxxxxxxx 例如:123e4567-e89b-12d3-a456-426655440000

其中四位数字 M表示 UUID 版本,数字 N的一至三个最高有效位表示 UUID 变体,布局如下:

Name Length (bytes) Length (hex digits) Contents
time_low 4 8 整数:低位 32 bits 时间
time_mid 2 4 整数:中间位 16 bits 时间
time_hi_and_version 2 4 最高有效位中的 4 bits“版本”,后面是高 12 bits 的时间
clock_seq_hi_and_res clock_seq_low 2 4 最高有效位为 1-3 bits“变体”,后跟13-15 bits 时钟序列
node 6 12 48 bits 节点 ID
  • 版本
    • 版本1 UUID 是根据时间和节点 ID(通常是MAC地址)生成
    • 版本2 UUID是根据标识符(通常是组或用户ID)、时间和节点ID生成
    • 版本3 和 版本5 确定性UUID 通过散列 (hashing) 名字空间 (namespace) 标识符和名称生成
    • 版本4 UUID 使用随机性或伪随机性生成
  • 碰撞

对于版本一,二使用MAC地址生成的UUID,版本三四五产生碰撞的可能想更大,但通常可能是忽略。在 103 万亿 个 版本4 UUID中找到重复概率是十亿分之一。

  • 方案分析

使用UUID做分布式ID是可行的,有点是生成方式简单,快速,但是并不推荐使用,因为使用UUID生成的是无意义的字符串,而且非趋势递增,用作主键存储性能差。

基于数据库自增ID

基于关系型数据库如MySql的主键自增(AUTO_INCREMENT)特性。建立一张专门的ID生成的表,需要获取ID就在表中插入一条记录获取自增的ID,也可以批量插入获取。

  • 方案分析

使用数据库自增ID的方式实现简单,但是生成的是递增连续的ID的信息不安全,而且不适合作为订单号等业务的主键;并且对数据库有强依赖关系,难以扩展有宕机风险。

基于Redis的incr命令

基于Redis的incr命令实现ID的原子自增,先使用set 命令设置初始值,每次获取ID的时候使用incr命令获取自增ID。

  • 方案分析

与基于数据库自增ID存在相同的问题,利用Redis内存数据库与单线程模型的特性有更高的并发性,但需要注意数据持久化的问题。基于RDB的定时快照方式重启后可能会出现ID重复问题,基于AOF的命令记录方式又存在重启恢复时间长的问题。

基于数据库集群模式

基于数据库集群的方式是对基于自增ID方式的高可用优化。对数据库做多主模式集群。

实现方式

  • 设置自增ID起始值与步长

如果是双主模式集群,那么需要设置自增ID的步长为2,可以通过如下命令设置自增ID的起始值与步长。

set @@auto_increment_offset = 1;     -- 起始值
set @@auto_increment_increment = 2;  -- 步长

第一台MySql的起始值为1,步长为2的话第二台的起始值为2,步长为2,以此类推。

  • 后续扩容问题

如果后续需要增加MySql节点,需要修改一、二两台MySQL实例的ID起始值和步长,并把第三台机器的ID起始生成位置设定在比现有最大自增ID大。

方案分析

虽然解决了基于数据库自增ID方式的高可用性问题,但是在出现性能瓶颈后需要做繁琐的扩容处理,操作不当可能出现ID重复的风险。而且也存在着数据库自增ID方式的其他问题。

基于数据库号段模式

基于号段的方式旨在通过批量的获取自增ID的方式,降低对数据库的压力,每次从数据库取出一个号段范围。

实现方式

  • 表结构设计

基于号段的方式获取ID在表设计上有多种选择,但核心在于需要记录当前最大的可用ID。

CREATE TABLE id_generator (
  id int(10) NOT NULL,
  max_id bigint(20) NOT NULL COMMENT '当前最大的可用id',
  step int(20) NOT NULL COMMENT '号段的长度',
  biz_id	int(20) NOT NULL COMMENT '业务id',
  version int(20) NOT NULL COMMENT '版本号',
  PRIMARY KEY (`id`)
) 
  • ID批量获取

某个业务需要批量获取ID可用通过update语句,设置biz_id所在的max_id=max_id+step,获取到号段(max_id,max_id+step],例如max_id=1000,step=2000,那么获取到的号段为(1000,3000],获取后当前最大max_id=3000。

update id_generator set max_id = max_id+step, version = version + 1 where version = version and biz_id = ***

方案分析

通过批量获取ID的思路降低了对数据库的压力与强依赖关系,依据这个思路可以作为一些特定业务如用户短ID生成的方案。

基于雪花算法的ID生成

雪花算法(Snowflake)是twitter内部使用的分布式ID生成算法,旨在依据一定的算法逻辑,生成一个Long类型的ID(64bit)。组成结构大致如下:

正数位(1bit)+时间戳(41bit)+机器ID(5bit)+ 数据中心(5bit)+ 自增ID(12bit)

具体实现方式可参考博主之前的一篇文章:https://chinalhr.github.io/post/uid-generate-snowflake/

方案分析

基于雪花算法的ID生成方式简单,趋势递增且带有可逆性,用作主键存储性能高,也支持后续数据库分库分表,算法实现方式灵活性高可自定义组合优化,高性能每秒能够产生26万个ID,并且不依赖于数据库,以服务的方式部署,稳定性更高。

但是因为强依赖机器时钟,如果机器上时钟回拨,会导致发号重复或者服务会处于不可用状态,而且雪花算法通过提前规划好机器标识来实现,但在分布式生产环境中是不够友好的,一般需要自动启停,增减机器,因此很多互联网大厂都对改算法进行了一定的改造。

分布式ID开源项目分析-百度uid-generator项目

UidGenerator是由百度技术部开发的,通过Java实现, 基于Snowflake算法的唯一ID生成器。UidGenerator以组件形式工作在应用项目中,支持自定义workerId位数和初始化策略, 从而适用于docker等容器化环境下实例自动重启、漂移等场景。

UidGenerator项目地址:https://github.com/baidu/uid-generator

数据结构

UidGenerator生成的ID默认结构如下:

图片

  • sign(1bit):固定1bit符号标识,即生成的UID为正数。
  • delta seconds (28 bits):当前时间,相对于时间基点"2016-05-20"的增量值,单位:秒,最多可支持约8.7年
  • worker id (22 bits): 机器id,最多可支持约420w次机器启动。内置实现为在启动时由数据库分配,默认分配策略为用后即弃,后续可提供复用策略。
  • sequence (13 bits): 每秒下的并发序列,13 bits可支持每秒8192个并发。

项目结构

项目目录

 ├── BitsAllocator.java			- Bit分配器(C)
 ├── UidGenerator.java			- UID生成器接口(I)
 ├── buffer
 │   ├── BufferPaddingExecutor.java		- 填充RingBuffer的执行器(C)
 │   ├── BufferedUidProvider.java		- RingBuffer中UID的生产者(C)
 │   ├── RejectedPutBufferHandler.java	- 拒绝Put到RingBuffer的处理器(C)
 │   ├── RejectedTakeBufferHandler.java	- 拒绝从RingBuffer中Take的处理器(C)
 │   └── RingBuffer.java			- RingBuffer数据结构实现类(C)
 ├── exception						- 相关的自定义异常类
 ├── impl
 │   ├── CachedUidGenerator.java		- 基于RingBuffer存储的UID生成器实现类(C)
 │   └── DefaultUidGenerator.java		- 默认UID生成器实现类(C)
 ├── utils 								- 工具类(日期,网络,线程池,枚举)
 └── worker								
     ├── DisposableWorkerIdAssigner.java	- 一次性的WorkerId分配器(C)
     ├── WorkerIdAssigner.java		- WorkerId分配器接口(I)
     ├── WorkerNodeType.java		- 工作节点类型,分为容器与实体两种(E)
     ├── dao
     │   └── WorkerNodeDAO.java		- WorkerNode数据库访问接口
     └── entity
         └── WorkerNodeEntity.java		- WorkerNode 实体类

项目依赖

主要依赖了springframework的核心包,mybatis,Apache commons工具类,logback

表结构

CREATE TABLE WORKER_NODE(
ID BIGINT NOT NULL AUTO_INCREMENT COMMENT 'auto increment id',
HOST_NAME VARCHAR(64) NOT NULL COMMENT 'host name',
PORT VARCHAR(64) NOT NULL COMMENT 'port',
TYPE INT NOT NULL COMMENT 'node type: ACTUAL or CONTAINER',
LAUNCH_DATE DATE NOT NULL COMMENT 'launch date',
MODIFIED TIMESTAMP NOT NULL COMMENT 'modified time',
CREATED TIMESTAMP NOT NULL COMMENT 'created time',
PRIMARY KEY(ID)
) COMMENT='DB WorkerID Assigner for UID Generator',ENGINE = INNODB;

从上面的项目目录来看,UidGenerator提供了两种实现方式,分别是实时ID生成方式的DefaultUidGenerator与预生成ID生成方式的CachedUidGenerator。

DefaultUidGenerator实现方式

UidGenerator接口如下

public interface UidGenerator {

    /**
     * 获取UID
     */
    long getUID() throws UidGenerateException;

    /**
     * 解析格式化输出UID
     */
    String parseUID(long uid);
}

DefaultUidGenerator实现了UidGenerator与InitializingBean,具体实现流程如下:

  1. Bean初始化,配置的timeBits,workerBits,seqBits位数与epochStr日期,因为最多可支持约8.7年,所以最好设置epochStr值为合适的日期。
  2. afterPropertiesSet方法初始化BitsAllocator设置timeBits,workerBits,seqBits等位数信息,初始化workId(每次初始化往WORKER_NODE表插入一条记录,返回的主键ID就是workId,所以最多可支持约420w次机器启动)。
  3. 核心方法nextId获取UID,具体方式如下
 protected synchronized long nextId() {
     	//获取当前时间,单位秒,会校验是否超过最大的时间年限,超过则抛出异常
        long currentSecond = getCurrentSecond();

        // 如果出现时间回拨,就抛出异常
        if (currentSecond < lastSecond) {
            long refusedSeconds = lastSecond - currentSecond;
            throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
        }

        // 同一秒内序列号自增,
        if (currentSecond == lastSecond) {
            sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
            //超过最大序列,等待下一秒生成uid
            if (sequence == 0) {
                currentSecond = getNextSecond(lastSecond);
            }

        // 不同秒内序列号从0开始
        } else {
            sequence = 0L;
        }

        lastSecond = currentSecond;

        // bits位移操作生成UID
        return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
    }

总结:DefaultUidGenerator的实现与雪花算法实现对比改动较小,对生成ID结构的位数可自定义,对分布式环境下增减机器的WorkId分配更友好,但没有解决时间回拨问题,采用了简单的抛出异常的方式处理。

RingBuffer

RingBuffer本质上是一个数组,数组中每个项被称为slot。UidGenerator设计了两个RingBuffer,一个保存唯一ID,一个保存flag。RingBuffer的尺寸是2^n。

图片

RingBuffer Of UID

RingBuffer Of UID是存储UID的RingBuffer 使用两个指针,Tail指针表示最新生成的UID,如果这个Tail指针追上了Cursor指针,此时RingBuffer已经满了,不允许再继续生成UID了。可以通过属性rejectedPutBufferHandler指定拒绝策略。

Cursor指针表示最后一个已经给消费的UID,如果Cursor指针追上了Tail指针,此时RingBuffer已经空了,不允许继续获取UID了。可以通过rejectedTakeBufferHandler指定拒绝策略。

RingBuffer Of Flag

RingBuffer Of Flag是存储Uid状态的RingBuffer,每个slot的值都是0或者1,0是CAN_PUT_FLAG(可以填充)的标志位,1是CAN_TAKE_FLAG(可以消费)的标识位。

配置参数

	<!-- 以下为可选配置, 如未指定将采用默认值 -->
		<!-- RingBuffer size扩容参数, 可提高UID生成的吞吐量. --> 
		<!-- 默认:3, 原bufferSize=8192, 扩容后bufferSize= 8192 << 3 = 65536 -->
		<property name="boostPower" value="3"></property> 
		
		<!-- 指定何时向RingBuffer中填充UID, 取值为百分比(0, 100), 默认为50 -->
		<!-- 举例: bufferSize=1024, paddingFactor=50 -> threshold=1024 * 50 / 100 = 512. -->
		<!-- 当环上可用UID数量 < 512时, 将自动对RingBuffer进行填充补全 -->
		<property name="paddingFactor" value="50"></property> 
		
		<!-- 另外一种RingBuffer填充时机, 在Schedule线程中, 周期性检查填充 -->
		<!-- 默认:不配置此项, 即不使用Schedule线程. 如需使用, 请指定Schedule线程时间间隔, 单位:秒 -->
		<property name="scheduleInterval" value="60"></property> 
		
		<!-- 拒绝策略: 当环已满, 无法继续填充时 -->
		<!-- 默认无需指定, 将丢弃Put操作, 仅日志记录. 如有特殊需求, 请实现RejectedPutBufferHandler接口(支持Lambda表达式) -->
		<property name="rejectedPutBufferHandler" ref="XxxxYourPutRejectPolicy"></property> 
		
		<!-- 拒绝策略: 当环已空, 无法继续获取时 -->
		<!-- 默认无需指定, 将记录日志, 并抛出UidGenerateException异常. 如有特殊需求, 请实现RejectedTakeBufferHandler接口(支持Lambda表达式) -->
		<property name="rejectedTakeBufferHandler" ref="XxxxYourTakeRejectPolicy"></property>

CacheLine补齐

RingBuffer的数据都是使用数组来存储的,在内存中是连续分配的,可最大程度利用CPU cache以提升性能。但同时会带来「伪共享」FalseSharing问题。tail和cursor变量如果直接用原生的AtomicLong类型,tail和cursor可能会缓存在同一个cacheLine中,多个线程读取该变量可能会引发CacheLine的RFO请求,反而影响性能,为了防止伪共享问题,填充了6个long类型的成员变量,加上long类型的value成员变量,刚好占满一个Cache Line(其中Java对象还有8byte的对象头),这种方式称之为CacheLine补齐。项目中使用了PaddedAtomicLong进行包装。

public class PaddedAtomicLong extends AtomicLong {
    private static final long serialVersionUID = -3415778863941386253L;

    /** Padded 6 long (48 bytes) */
    public volatile long p1, p2, p3, p4, p5, p6 = 7L;

    /**
     * Constructors from {@link AtomicLong}
     */
    public PaddedAtomicLong() {
        super();
    }

    public PaddedAtomicLong(long initialValue) {
        super(initialValue);
    }

    /**
     * To prevent GC optimizations for cleaning unused padded references
     */
    public long sumPaddingToPreventOptimization() {
        return p1 + p2 + p3 + p4 + p5 + p6;
    }

}

图片

  • RingBuffer填充时机
  1. 初始化预填充:初始化时预先填充满
  2. 即时填充:take消费,即时检查剩余可用slot量是否小于设定阈值,小于则进行填充
  3. 周期填充:通过Schedule线程,定时补全空闲slots。

CachedUidGenerator实现方式

CachedUidGenerator是基于RingBuffer去实现的。

CachedUidGenerator继承自DefaultUidGenerator,实现了DisposableBean。

初始化

  1. 调用父类DefaultUidGenerator的afterPropertiesSet方法初始化BitsAllocator与WorkId。
  2. 根据配置的boostPower与paddingFactor初始化RingBuffer,根据配置的scheduleInterval初始化bufferPaddingExecutor,初始化拒绝策略rejectedPutBufferHandler与rejectedTakeBufferHandler。
  3. 初始化填满RingBuffer中所有slot(初始化填充UID),此处调用bufferPaddingExecuto的paddingBuffer()方法。
  4. 开启Ringbuffer补丁线程(前提是配置了属性scheduleInterval)

RingBuffer.put(填充)

CachedUidGenerator生成UID的方式主要是通过paddingBuffer方法实现的,生成流程如下图所示,主要关注点是在满足填充新的唯一ID条件时,通过时间值递增得到新的时间值(lastSecond.incrementAndGet()),而不是System.currentTimeMillis()这种方式,解决了运行时时间回拨问题。

图片

public void paddingBuffer() {
        LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);

        // is still running
        if (!running.compareAndSet(false, true)) {
            LOGGER.info("Padding buffer is still running. {}", ringBuffer);
            return;
        }
    
        boolean isFullRingBuffer = false;
        while (!isFullRingBuffer) {
            //调用nextIdsForOneSecond方法获取UID list,注意此处的时间使用lastSecond.incrementAndGet()方法时间自增的方式获取而不是System.currentTimeMillis()获取,解决了运行时时间回拨问题
            List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
            for (Long uid : uidList) {
                //调用put方法加入RingBuffer of UID中
                isFullRingBuffer = !ringBuffer.put(uid);
                if (isFullRingBuffer) {
                    break;
                }
            }
        }

        // not running now
        running.compareAndSet(true, false);
        LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
    }
/**
     * 指定秒内获取UID List,Size为max sequence(0~max sequence)
     * 
     * @param currentSecond
     * @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1
     */
    protected List<Long> nextIdsForOneSecond(long currentSecond) {
        // Initialize result list size of (max sequence + 1)
        int listSize = (int) bitsAllocator.getMaxSequence() + 1;
        List<Long> uidList = new ArrayList<>(listSize);

        // Allocate the first sequence of the second, the others can be calculated with the offset
        long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);
        for (int offset = 0; offset < listSize; offset++) {
            uidList.add(firstSeqUid + offset);
        }

        return uidList;
    }
public synchronized boolean put(long uid) {
    	//获取Tail与Cursor指针
        long currentTail = tail.get();
        long currentCursor = cursor.get();

        // 1. 判断Ring是否已满,满了执行Put拒绝策略
        long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);
        if (distance == bufferSize - 1) {
            rejectedPutHandler.rejectPutBuffer(this, uid);
            return false;
        }

        // 2. check标志是否为CAN_PUT_FLAG
        int nextTailIndex = calSlotIndex(currentTail + 1);
        if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {
            rejectedPutHandler.rejectPutBuffer(this, uid);
            return false;
        }

        // 3. 将UID插入下一个slots中
        // 4. 设置标志为CAN_TAKE_FLAG
        // 5. tail指针+1
        slots[nextTailIndex] = uid;
        flags[nextTailIndex].set(CAN_TAKE_FLAG);
        tail.incrementAndGet();
        return true;
    }

RingBuffer.take(取值)

CachedUidGenerator获取UID通过RingBuffer.take()方法直接从RingBuffer中取出,获取流程如下图所示

图片

public long take() {
        // 获取当前Cursor指针与下一个可用Cursor指针并进行check
        long currentCursor = cursor.get();
        long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);
        Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");

        // 1. 判断如果达到阈值,则以异步模式触发填充
        long currentTail = tail.get();
        if (currentTail - nextCursor < paddingThreshold) {
            LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
                    nextCursor, currentTail - nextCursor);
            bufferPaddingExecutor.asyncPadding();
        }

        // 2. 当前Cursor指针等于下一个可用Cursor指针,Ring为空,执行Take拒绝策略
        if (nextCursor == currentCursor) {
            rejectedTakeHandler.rejectTakeBuffer(this);
        }

        // 3. 检查下一个slot是否为CAN_TAKE_FLAG
        int nextCursorIndex = calSlotIndex(nextCursor);
        Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");

        // 4. 从slot中获取UID
        // 5. 设置标志为CAN_PUT_FLAG.
        long uid = slots[nextCursorIndex];
        flags[nextCursorIndex].set(CAN_PUT_FLAG);
        return uid;
    }

总结

百度uid-generator项目针对snowflake算法落地进行了一些改造,主要是对workId的获取上,对分布式集群环境下面,实例自动伸缩,docker容器化的场景,通过每次重启获取新的workId进行优化。提供了DefaultUidGenerator实时生成UID与CachedUidGenerator预生成UID两种方式。CachedUidGenerator通过借用未来时间来解决雪花算法sequence存在的并发限制,而且通过时间值递增的方式解决雪花算法存在的时间回拨问题。

分布式ID开源项目分析-美团Leaf项目

Leaf是美团技术部推出的一个分布式ID生成服务,通过Java实现, 基于数据库号段模式和snowflake算法模式的UID获取。

Leaf项目地址:https://github.com/Meituan-Dianping/Leaf

项目结构

这里主要关注core模块:

├── common								- 公共包
│   ├── CheckVO.java					
│   ├── PropertyFactory.java			- leaf.properties配置获取
│   ├── Result.java						- 统一返回值
│   ├── Status.java						- 状态值
│   ├── Utils.java						- 工具类
│   └── ZeroIDGen.java					- 生成0号ID	
├── IDGen.java							- ID生成接口(I)
├── segment								
│   ├── dao								- 号段相关Dao操作
│   │   ├── IDAllocDao.java
│   │   ├── IDAllocMapper.java
│   │   └── impl
│   │       └── IDAllocDaoImpl.java
│   ├── model							- 号段相关model
│   │   ├── LeafAlloc.java
│   │   ├── SegmentBuffer.java
│   │   └── Segment.java
│   └── SegmentIDGenImpl.java			- 号段相关ID生成实现
└── snowflake							
    ├── exception
    │   ├── CheckLastTimeException.java
    │   ├── CheckOtherNodeException.jav
    │   └── ClockGoBackException.java
    ├── SnowflakeIDGenImpl.java			- SnowFlakeID生成实现
    └── SnowflakeZookeeperHolder.java	- Zookeeper相关操作类

Leaf-segment号段模式

Leaf-segment号段模式是对上文的基于数据库号段模式的一种实现与优化。 每次获取一个segmen号段的值, 减轻数据库的压力 。利用 biz_tag字段来区分 业务,隔离影响,后续可以依据 biz_tag进行分库分表。

数据库表结构

CREATE TABLE `leaf_alloc` (
  `biz_tag` varchar(128) NOT NULL DEFAULT '' COMMENT '业务key',
  `max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '当前已经分配了的最大id',
  `step` int(11) NOT NULL COMMENT '初始步长,也是动态调整的最小步长',
  `description` varchar(256) DEFAULT NULL COMMENT '业务key的描述',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据库维护的更新时间',
  PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

可知,在进行UID获取之前,需要先插入数据进行biz_tag的设置与其他数据的初始化。

实现方式

Leaf-segment的实现类是SegmentIDGenImpl

  • 初始化
@Override
    public boolean init() {
        //同步biz_tag到cache中
        updateCacheFromDb();
        initOK = true;
        //开启定时任务定时调度updateCacheFromDb()
        updateCacheFromDbAtEveryMinute();
        return initOK;
    }

private void updateCacheFromDb() {
    		//获取数据库表中所有的biz_tag
            List<String> dbTags = dao.getAllTags();
            if (dbTags == null || dbTags.isEmpty()) {
                return;
            }
    		//缓存的biz_tag
            List<String> cacheTags = new ArrayList<String>(cache.keySet());
    		Set<String> insertTagsSet = new HashSet<>(dbTags);
            Set<String> removeTagsSet = new HashSet<>(cacheTags);
            //将db中新加的biz_tags加入cache中,并初始化SegmentBuffer
            for(int i = 0; i < cacheTags.size(); i++){
                String tmp = cacheTags.get(i);
                if(insertTagsSet.contains(tmp)){
                    insertTagsSet.remove(tmp);
                }
            }
            for (String tag : insertTagsSet) {
                SegmentBuffer buffer = new SegmentBuffer();
                buffer.setKey(tag);
                Segment segment = buffer.getCurrent();
                segment.setValue(new AtomicLong(0));
                segment.setMax(0);
                segment.setStep(0);
                cache.put(tag, buffer);
            }
            //将db中被删除的biz_tag(已失效)从cache删除
            for(int i = 0; i < dbTags.size(); i++){
                String tmp = dbTags.get(i);
                if(removeTagsSet.contains(tmp)){
                    removeTagsSet.remove(tmp);
                }
            }
            for (String tag : removeTagsSet) {
                cache.remove(tag);
            }
      
    }

SegmentIDGenImpl的init方法可以看出主要是对biz_tag的同步,同步到应用的缓存中并且初始化对应的Segment Buffer。并且会开启调度线程进行定时的cache刷新。

  • 双Buffer

初始化cache的时候,会对每一个biz_tag初始化一个SegmentBuffer,通过其代码的实现可以看SegmentBuffer 中包含了一个号段数组,包含两个 Segment,第一个Segment已下发10%时,如果下一个Segment未更新,则另启一个更新线程去更新下一个Segment,这就是Leaf的双Buffer特性。

这样做有两个好处:

一是segment长度如果设置为服务高峰期发号QPS的600倍(10分钟),即使DB宕机,Leaf仍能持续发号10-20分钟不受影响。

二是每次请求都会判断下个号段的状态,从而更新此号段,避免偶发网络抖动影响下个号段的更新。

public class SegmentBuffer {
    private String key;// 数据库的biz_tag
    private Segment[] segments; //双buffer
    private volatile int currentPos; //当前的使用的segment的index
    private volatile boolean nextReady; //下一个segment是否处于可切换状态
    private volatile boolean initOk; //是否初始化完成
    private final AtomicBoolean threadRunning; //线程是否在运行中
    private final ReadWriteLock lock;

    private volatile int step;// 动态调整的step
    private volatile int minStep; // 最小step
    private volatile long updateTimestamp;// 更新时间戳
}

public class Segment {
    private AtomicLong value = new AtomicLong(0);
    private volatile long max;
    private volatile int step;
    private SegmentBuffer buffer;
}

图片

  • UID取值

UID通过调用get方法获取,具体实现逻辑大致如下

    public Result get(final String key) {
        //SegmentIDGenImpl是否初始化
        ...
        //cache中是否已经缓存了对应biz_id的数据
        if (cache.containsKey(key)) {
            //检查SegmentBuffer中的Segment是否初始化
                    if (!buffer.isInitOk()) {
                            //初始化Segment
                            updateSegmentFromDb(key, buffer.getCurrent());
                            buffer.setInitOk(true);
                }
        }
            return getIdFromSegmentBuffer(cache.get(key));
}

updateSegmentFromDb方法从数据库表中读取数据更新SegmentBuffer中的Segment。在第一次初始号段与第二次双buffer的异步准备另一个号段会被调用,注意前两次调用之后后续双buffer的异步准备另一个号段会动态调整申请号段的区间大小,调整规则主要跟号段申请频率有关,具体逻辑大致如下:

    public void updateSegmentFromDb(String key, Segment segment) {
        StopWatch sw = new Slf4JStopWatch();
        SegmentBuffer buffer = segment.getBuffer();
        LeafAlloc leafAlloc;
        /**
         * 第一次初始号段
         * 1. 更新数据库中key对应记录的maxId(maxId=maxId+step)
         * 2. 设置buffer的Step(步长)与MinStep
         */
        if (!buffer.isInitOk()) {
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
        }
        /**
         * 第二次初始号段(双Buffer异步准备)
         * 1. 更新数据库中key对应记录的maxId(maxId=maxId+step)
         * 2. 设置buffer的Step(步长)与MinStep
         */
        else if (buffer.getUpdateTimestamp() == 0) {
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
        }
        /**
         * 后续双Buffer异步准备
         * 动态调整step
         *      1. duration < 15 分钟 : step 变为原来的2倍, 最大为 MAX_STEP
         *      2. 15分钟 <= duration < 30分钟 : nothing
         *      3. duration >= 30 分钟 : 缩小step, 最小为DB中配置的step
         * 更新数据库中key对应记录的maxId(maxId=maxId+step)
         * 设置buffer的Step(步长)与MinStep
         */
        else {
            //更新时间差
            long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
            int nextStep = buffer.getStep();
            if (duration < SEGMENT_DURATION) {
                if (nextStep * 2 > MAX_STEP) {
                    //do nothing
                } else {
                    nextStep = nextStep * 2;
                }
            } else if (duration < SEGMENT_DURATION * 2) {
                //do nothing with nextStep
            } else {
                nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
            }
            logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
            LeafAlloc temp = new LeafAlloc();
            temp.setKey(key);
            temp.setStep(nextStep);
            leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            buffer.setStep(nextStep);
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
        }

        /**
         * 准备当前Segment号段,设置value初始值与max/step值为buffer的max与step
         */
        long value = leafAlloc.getMaxId() - buffer.getStep();
        segment.getValue().set(value);
        segment.setMax(leafAlloc.getMaxId());
        segment.setStep(buffer.getStep());
        sw.stop("updateSegmentFromDb", key + " " + segment);
    }

最后的getIdFromSegmentBuffer方法会调用segment.getValue().getAndIncrement()从segment中获取value得到当前的UID返回并进行自增,这里在获取之前会判断当前Segment已经使用超过10%,如果超过会异步准备双buffer的另一个Segment。如果当前号段已经用完,切换另一个Segment号段使用。流程图如下图所示:

图片

Leaf-snowflake模式

Leaf-snowflake数据结构上沿用了snowflake的设计,正数位(占1比特)+ 时间戳(占41比特)+ 机器ID(占5比特)+ 机房ID(占5比特)+ 自增值(占12比特),主要针对workId的生成与时间回拨问题进行了优化处理。

实现方式

  • 初始化WorkId生成

Leaf-snowflake依靠Zookeeper生成workId[机器ID+ 机房ID],Leaf中workId是基于ZooKeeper的顺序Id来生成的,每个应用在使用Leaf-snowflake时,启动时都会都在Zookeeper中生成一个顺序Id,相当于一台机器对应一个顺序节点,也就是一个workId。

启动步骤如下:

  1. 启动Leaf-snowflake服务,连接Zookeeper,在leaf_forever父节点下检查自己是否已经注册过(是否有该顺序子节点)。
  2. 如果有注册过直接取回自己的workerID(zk顺序节点生成的int类型ID号),启动服务。
  3. 如果没有注册过,就在该父节点下面创建一个持久顺序节点,创建成功后取回顺序号当做自己的workerID号,启动服务。

图片

弱化对Zookeeper的依赖:Leaf-snowflake除了每次会去ZooKeeper拿数据以外,也会在本机文件系统上缓存一个workerID文件。当ZooKeeper出现问题,而且机器需要重启,能保证服务能够正常启动,做到了对三方组件的弱依赖。一定程度上提高了SLA。

  • 时间回拨问题解决

为了解决Snowflake的时间回拨问题,leaf主要从两个地方进行优化,一个是启动时对自身系统时间做校验,主要比较对象为系统节点历史记录的时间与其余节点的系统时间。启动流程如下:

图片

服务启动时首先检查自己是否写过ZooKeeper leaf_forever节点:

  1. 若写过,则用自身系统时间与leaf_forever/${self}节点记录时间做比较,若小于leaf_forever/${self}时间则认为机器时间发生了大步长回拨,服务启动失败并报警。
  2. 若未写过,证明是新服务节点,直接创建持久节点leaf_forever/${self}并写入自身系统时间,接下来综合对比其余Leaf节点的系统时间来判断自身系统时间是否准确,具体做法是取leaf_temporary下的所有临时节点(所有运行中的Leaf-snowflake节点)的服务IP:Port,然后通过RPC请求得到所有节点的系统时间,计算sum(time)/nodeSize。
  3. 若abs( 系统时间-sum(time)/nodeSize ) < 阈值,认为当前系统时间准确,正常启动服务,同时写临时节点leaf_temporary/${self} 维持租约。
  4. 否则认为本机系统时间发生大步长偏移,启动失败并报警。
  5. 每隔一段时间(3s)上报自身系统时间写入leaf_forever/${self}。

二是运行时对时间回拨做了等待处理,时间偏差大小小于5ms,则等待两倍时间,还是小于则做异常处理,具体逻辑如下。

//发生了回拨,此刻时间小于上次发号时间
 if (timestamp < lastTimestamp) {
  			  
            long offset = lastTimestamp - timestamp;
            if (offset <= 5) {
                try {
                	//时间偏差大小小于5ms,则等待两倍时间
                    wait(offset << 1);//wait
                    timestamp = timeGen();
                    if (timestamp < lastTimestamp) {
                       //还是小于,抛异常并上报
                        throwClockBackwardsEx(timestamp);
                      }    
                } catch (InterruptedException e) {  
                   throw  e;
                }
            } else {
                //throw
                throwClockBackwardsEx(timestamp);
            }
        }

总结

Leaf-segment是基于数据库号段模式的优化,规避网络波动:通过双Buffer的预生成方式规避了网络抖动对UID生成的影响;容灾性高:DB不可用情况下一段时间依旧可以进行发号(时间与segment步长有关)。

Leaf-snowflake是基于Snowflake算法的优化,在workId的获取上,对分布式集群环境下面,实例自动伸缩场景,通过对zookeeper的弱依赖实现WorkId的获取。针对时间回拨问题处理对比百度uid-generator比较粗暴,做了启动校验与时间等待的处理。