1. 程式人生 > 實用技巧 >演算法:CLH鎖的原理及實現

演算法:CLH鎖的原理及實現

一、背景
1.1 SMP(Symmetric Multi-Processor)
對稱多處理器結構,它是相對非對稱多處理技術而言的、應用十分廣泛的並行技術。在這種架構中,一臺計算機由多個CPU組成,並共享記憶體和其他資源,所有的CPU都可以平等地訪問記憶體、I/O和外部中斷。雖然同時使用多個CPU,但是從管理的角度來看,它們的表現就像一臺單機一樣。作業系統將任務佇列對稱地分佈於多個CPU之上,從而極大地提高了整個系統的資料處理能力。但是隨著CPU數量的增加,每個CPU都要訪問相同的記憶體資源,共享資源可能會成為系統瓶頸,導致CPU資源浪費。

1.2 NUMA(Non-Uniform Memory Access)

非一致儲存訪問,將CPU分為CPU模組,每個CPU模組由多個CPU組成,並且具有獨立的本地記憶體、I/O槽口等,模組之間可以通過互聯模組相互訪問,訪問本地記憶體(本CPU模組的記憶體)的速度將遠遠高於訪問遠地記憶體(其他CPU模組的記憶體)的速度,這也是非一致儲存訪問的由來。NUMA較好地解決SMP的擴充套件問題,當CPU數量增加時,因為訪問遠地記憶體的延時遠遠超過本地記憶體,系統性能無法線性增加。

1.3 CLH、MCS命名來源

  • MCS:John Mellor-Crummey and Michael Scott
  • CLH:Craig,Landin andHagersten

二、CLH鎖
CLH是一種基於單向連結串列的高效能、公平的自旋鎖。申請加鎖的執行緒通過前驅節點的變數進行自旋。在前置節點解鎖後,當前節點會結束自旋,並進行加鎖。在SMP架構下,CLH更具有優勢。在NUMA架構下,如果當前節點與前驅節點不在同一CPU模組下,跨CPU模組會帶來額外的系統開銷,而MCS鎖更適用於NUMA架構。

鎖值:我把自旋條件定義為鎖值 locked。locked == true 表示節點的處於加鎖狀態或者等待加鎖狀態,locked == false 表示節點處於解鎖狀態。

  1. 基於執行緒當前節點的前置節點的鎖值(locked)進行自旋,locked == true 自旋,locked == false 加鎖成功。
  2. locked == true 表示節點處於加鎖狀態或者等待加鎖狀態。
  3. locked == false 表示節點處於解鎖狀態。
  4. 每個節點在解鎖時更新自己的鎖值(locked),在這一時刻,該節點的後置節點會結束自旋,並進行加鎖。

2.1 加鎖邏輯

  1. 獲取當前執行緒的鎖節點,如果為空則進行初始化。
  2. 通過同步方法獲取尾節點,並將當前節點置為尾節點,此時獲取到的尾節點為當前節點的前驅節點。
  3. 如果尾節點為空,則表示當前節點為第一個節點,加鎖成功。
  4. 如果尾節點不為空,則基於前驅節點的鎖值(locked==true)進行自旋,直到前驅節點的鎖值 locked == false。

2.2 解鎖邏輯

  1. 獲取當前執行緒的鎖節點,如果節點為空或者鎖值(locked== false)則無需解鎖,直接返回。
  2. 使用同步方法為尾節點賦空值,賦值不成功則表示當前節點不是尾節點,需要將當前節點的 locked == false 已保證解鎖該節點。如果當前節點為尾節點,則無需設定該節點的鎖值。因為該節點沒有後置節點,即使設定了,也沒有實際意義。

2.3 Java程式碼

package org.learn.lock;

import java.util.concurrent.atomic.AtomicReference;

/**
 * MCS:John Mellor-Crummey and Michael Scott
 * CLH:Craig,Landin and Hagersten
 * @author zhibo
 * @version 1.0
 * @date 2018/11/7 10:39
 */
public class CLHLock implements Lock {
    private AtomicReference<CLHNode> tail;
    private ThreadLocal<CLHNode> threadLocal;

    public CLHLock() {
        this.tail = new AtomicReference<>();
        this.threadLocal = new ThreadLocal<>();
    }

    @Override
    public void lock() {
        CLHNode curNode = threadLocal.get();
        if(curNode == null){
            curNode = new CLHNode();
            threadLocal.set(curNode);
        }

        CLHNode predNode = tail.getAndSet(curNode);
        if(predNode != null){
            while (predNode.getLocked()){

            }
        }
    }

    @Override
    public void unlock() {
        CLHNode curNode = threadLocal.get();
        threadLocal.remove();

        if(curNode == null || curNode.getLocked() == false){
            return;
        }

        if(!tail.compareAndSet(curNode, null)){
            curNode.setLocked(false);
        }
    }

    public static void main(String[] args) {
        final Lock clhLock = new CLHLock();

        for (int i = 0; i < 10; i++) {
            new Thread(new DemoTask(clhLock, i + "")).start();
        }
    }

    class CLHNode {
        private volatile boolean locked = true;

        public boolean getLocked() {
            return locked;
        }

        public void setLocked(boolean locked) {
            this.locked = locked;
        }
    }
}

package org.learn.lock;

/**
 * @author zhibo
 * @version 1.0
 * @date 2018/11/7 14:22
 */
public class DemoTask implements Runnable {
    private Lock lock;
    private String taskId;

    public DemoTask(final Lock lock, final String taskId){
        this.lock = lock;
        this.taskId = taskId;
    }

    @Override
    public void run() {
        try {
            lock.lock();
            Thread.sleep(500);
            System.out.println(String.format("Thread %s Completed", taskId));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}