1. 程式人生 > >詳解Twitter開源分散式自增ID演算法snowflake,附演算驗證過程

詳解Twitter開源分散式自增ID演算法snowflake,附演算驗證過程

@ToString
@Slf4j
public class SnowflakeIdFactory {

    private final long twepoch = 1288834974657L;
    private final long workerIdBits = 5L;
    private final long datacenterIdBits = 5L;
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
    private final long sequenceBits = 12L;
    private final long workerIdShift = sequenceBits;
    private final long datacenterIdShift = sequenceBits + workerIdBits;
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    private long workerId;
    private long datacenterId;
    private long sequence = 0L;
    private long lastTimestamp = -1L;



    public SnowflakeIdFactory(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }

    public synchronized long nextId() {
        long timestamp = timeGen();
        if (timestamp < lastTimestamp) {
            //伺服器時鐘被調整了,ID生成器停止服務.
            throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            sequence = 0L;
        }

        lastTimestamp = timestamp;
        return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence;
    }

    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    protected long timeGen() {
        return System.currentTimeMillis();
    }

    public static void testProductIdByMoreThread(int dataCenterId, int workerId, int n) throws InterruptedException {
        List<Thread> tlist = new ArrayList<>();
        Set<Long> setAll = new HashSet<>();
        CountDownLatch cdLatch = new CountDownLatch(10);
        long start = System.currentTimeMillis();
        int threadNo = dataCenterId;
        Map<String,SnowflakeIdFactory> idFactories = new HashMap<>();
        for(int i=0;i<10;i++){
            //用執行緒名稱做map key.
            idFactories.put("snowflake"+i,new SnowflakeIdFactory(workerId, threadNo++));
        }
        for(int i=0;i<10;i++){
            Thread temp =new Thread(new Runnable() {
                @Override
                public void run() {
                    Set<Long> setId = new HashSet<>();
                    SnowflakeIdFactory idWorker = idFactories.get(Thread.currentThread().getName());
                    for(int j=0;j<n;j++){
                        setId.add(idWorker.nextId());
                    }
                    synchronized (setAll){
                        setAll.addAll(setId);
                        log.info("{}生產了{}個id,併成功加入到setAll中.",Thread.currentThread().getName(),n);
                    }
                    cdLatch.countDown();
                }
            },"snowflake"+i);
            tlist.add(temp);
        }
        for(int j=0;j<10;j++){
            tlist.get(j).start();
        }
        cdLatch.await();

        long end1 = System.currentTimeMillis() - start;

        log.info("共耗時:{}毫秒,預期應該生產{}個id, 實際合併總計生成ID個數:{}",end1,10*n,setAll.size());

    }

    public static void testProductId(int dataCenterId, int workerId, int n){
        SnowflakeIdFactory idWorker = new SnowflakeIdFactory(workerId, dataCenterId);
        SnowflakeIdFactory idWorker2 = new SnowflakeIdFactory(workerId+1, dataCenterId);
        Set<Long> setOne = new HashSet<>();
        Set<Long> setTow = new HashSet<>();
        long start = System.currentTimeMillis();
        for (int i = 0; i < n; i++) {
            setOne.add(idWorker.nextId());//加入set
        }
        long end1 = System.currentTimeMillis() - start;
        log.info("第一批ID預計生成{}個,實際生成{}個<<<<*>>>>共耗時:{}",n,setOne.size(),end1);

        for (int i = 0; i < n; i++) {
            setTow.add(idWorker2.nextId());//加入set
        }
        long end2 = System.currentTimeMillis() - start;
        log.info("第二批ID預計生成{}個,實際生成{}個<<<<*>>>>共耗時:{}",n,setTow.size(),end2);

        setOne.addAll(setTow);
        log.info("合併總計生成ID個數:{}",setOne.size());

    }

    public static void testPerSecondProductIdNums(){
        SnowflakeIdFactory idWorker = new SnowflakeIdFactory(1, 2);
        long start = System.currentTimeMillis();
        int count = 0;
        for (int i = 0; System.currentTimeMillis()-start<1000; i++,count=i) {
            /**  測試方法一: 此用法純粹的生產ID,每秒生產ID個數為300w+ */
            idWorker.nextId();
            /**  測試方法二: 在log中列印,同時獲取ID,此用法生產ID的能力受限於log.error()的吞吐能力.
             * 每秒徘徊在10萬左右. */
            //log.error("{}",idWorker.nextId());
        }
        long end = System.currentTimeMillis()-start;
        System.out.println(end);
        System.out.println(count);
    }

    public static void main(String[] args) {
        /** case1: 測試每秒生產id個數?
         *   結論: 每秒生產id個數300w+ */
        //testPerSecondProductIdNums();

        /** case2: 單執行緒-測試多個生產者同時生產N個id,驗證id是否有重複?
         *   結論: 驗證通過,沒有重複. */
        //testProductId(1,2,10000);//驗證通過!
        //testProductId(1,2,20000);//驗證通過!

        /** case3: 多執行緒-測試多個生產者同時生產N個id, 全部id在全域性範圍內是否會重複?
         *   結論: 驗證通過,沒有重複. */
        try {
            testProductIdByMoreThread(1,2,100000);//單機測試此場景,效能損失至少折半!
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
測試用例:
/** case1: 測試每秒生產id個數?
 *   結論: 每秒生產id個數300w+ */
//testPerSecondProductIdNums();
/** case2: 單執行緒-測試多個生產者同時生產N個id,驗證id是否有重複?
 *   結論: 驗證通過,沒有重複. */
//testProductId(1,2,10000);//驗證通過!
//testProductId(1,2,20000);//驗證通過!
/** case3: 多執行緒-測試多個生產者同時生產N個id, 全部id在全域性範圍內是否會重複?
 *   結論: 驗證通過,沒有重複. */
try {
    testProductIdByMoreThread
(1,2,100000);//單機測試此場景,效能損失至少折半! } catch (InterruptedException e) { e.printStackTrace(); }