1. 程式人生 > >[linux kernel-併發與同步]2.atomic工作原理

[linux kernel-併發與同步]2.atomic工作原理

一 源由-需要解決什麼問題

我們的程式邏輯經常遇到這樣的操作序列:

  • 讀一個位於memory中的變數的值到暫存器中
  • 修改該變數的值(也就是修改暫存器中的值)
  • 將暫存器中的數值寫回memory中的變數值

如果這個操作序列是序列化的操作(在一個thread中序列執行),那麼一切OK,然而,世界總是不能如你所願。在多CPU體系結構中,執行在兩個CPU上的兩個核心控制路徑同時並行執行上面操作序列,有可能發生下面的場景:

CPU1上的操作 CPU2上的操作
讀操作
讀操作
修改 修改
寫操作
寫操作

多個CPUs和memory chip是通過匯流排互聯的,在任意時刻,只能有一個匯流排master裝置(例如CPU、DMA controller)訪問該Slave裝置(在這個場景中,slave裝置是RAM chip)。因此,來自兩個CPU上的讀memory操作被序列化執行,分別獲得了同樣的舊值。完成修改後,兩個CPU都想進行寫操作,把修改的值寫回到memory。但是,硬體arbiter的限制使得CPU的寫回必須是序列化的,因此CPU1首先獲得了訪問權,進行寫回動作,隨後,CPU2完成寫回動作。在這種情況下,CPU1的對memory的修改被CPU2的操作覆蓋了,因此執行結果是錯誤的。 不僅是多CPU,在單CPU上也會由於有多個核心控制路徑的交錯而導致上面描述的錯誤。一個具體的例子如下:

系統呼叫的控制路徑 中斷handler控制路徑
讀操作
讀操作
修改
寫操作
修改
寫操作

系統呼叫的控制路徑上,完成讀操作後,硬體觸發中斷,開始執行中斷handler。這種場景下,中斷handler控制路徑的寫回的操作被系統呼叫控制路徑上的寫回覆蓋了,結果也是錯誤的。

二 對策-解決方案

對於那些有多個核心控制路徑進行read-modify-write的變數,核心提供了一個特殊的型別atomic_t,具體定義如下:code:include/linux/types.h

typedef struct {  
    int counter;  
} atomic_t;  

從上面的定義來看,atomic_t實際上就是一個int型別的counter,不過定義這樣特殊的型別atomic_t是有其思考的:核心定義了若干atomic_xxx的介面API函式,這些函式只會接收atomic_t型別的引數。這樣可以確保atomic_xxx的介面函式只會操作atomic_t型別的資料。同樣的,如果你定義了atomic_t型別的變數(你期望用atomic_xxx的介面API函式操作它),這些變數也不會被那些普通的、非原子變數操作的API函式接受。簡單一句話就是原子操作保護的變數,只有原子操作的API才能夠訪問和修改,其他的操作方式不被允許. 具體的部分介面API函式整理如下(在ARM64檔案arch/arm64/include/asm/atomic.h定義了眾多的可以呼叫的API介面):

介面函式 描述
static inline void atomic_add(int i, atomic_t *v) 給一個原子變數v增加i
static inline int atomic_add_return(int i, atomic_t *v) 同上,只不過將變數v的最新值返回
static inline void atomic_sub(int i, atomic_t *v) 給一個原子變數v減去i
static inline int atomic_sub_return(int i, atomic_t *v) 同上,只不過將變數v的最新值返回
static inline int atomic_cmpxchg(atomic_t *ptr, int old, int new) 比較old和原子變數ptr中的值,如果相等,那麼就把new值賦給原子變數。 返回舊的原子變數ptr中的值
atomic_read 獲取原子變數的值
atomic_set 設定原子變數的值
atomic_inc(v) 原子變數的值加一
atomic_inc_return(v) 同上,只不過將變數v的最新值返回
atomic_dec(v) 原子變數的值減去一
atomic_dec_return(v) 同上,只不過將變數v的最新值返回
atomic_sub_and_test(i, v) 給一個原子變數v減去i,並判斷變數v的最新值是否等於0
atomic_add_negative(i,v) 給一個原子變數v增加i,並判斷變數v的最新值是否是負數
static inline int atomic_add_unless(atomic_t *v, int a, int u) 只要原子變數v不等於u,那麼就執行原子變數v加a的操作。 如果v不等於u,返回非0值,否則返回0值

三 arm32的實現

arm32分架構有不同的實現,source code:arch/arm/include/asm/atomic.h:

#define ATOMIC_INIT(i)  { (i) }  
  
#ifdef __KERNEL__  
  
/* 
 * On ARM, ordinary assignment (str instruction) doesn't clear the local 
 * strex/ldrex monitor on some implementations. The reason we can use it for 
 * atomic_set() is the clrex or dummy strex done on every exception return. 
 */  
#define atomic_read(v)  READ_ONCE((v)->counter)  
#define atomic_set(v,i) WRITE_ONCE(((v)->counter), (i))  
  
#if __LINUX_ARM_ARCH__ >= 6  
  
/* 
 * ARMv6 UP and SMP safe atomic ops.  We use load exclusive and 
 * store exclusive to ensure that these are atomic.  We may loop 
 * to ensure that the update happens. 
 */  
  
#define ATOMIC_OP(op, c_op, asm_op)                 \  
static inline void atomic_##op(int i, atomic_t *v)          \  
{                                   \  
    unsigned long tmp;                      \  
    int result;                         \  
                                    \  
    prefetchw(&v->counter);                      \  
    __asm__ __volatile__("@ atomic_" #op "\n"           \  
"1: ldrex   %0, [%3]\n"                     \  
"   " #asm_op " %0, %0, %4\n"                   \  
"   strex   %1, %0, [%3]\n"                     \  
"   teq %1, #0\n"                       \  
"   bne 1b"                         \  
    : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter)        \  
    : "r" (&v->counter), "Ir" (i)                    \  
    : "cc");                            \  
}                                   \  
  
#define ATOMIC_OP_RETURN(op, c_op, asm_op)              \  
static inline int atomic_##op##_return_relaxed(int i, atomic_t *v)  \  
{                                   \  
    unsigned long tmp;                      \  
    int result;                         \  
                                    \  
    prefetchw(&v->counter);                      \  
                                    \  
    __asm__ __volatile__("@ atomic_" #op "_return\n"        \  
"1: ldrex   %0, [%3]\n"                     \  
"   " #asm_op " %0, %0, %4\n"                   \  
"   strex   %1, %0, [%3]\n"                     \  
"   teq %1, #0\n"                       \  
"   bne 1b"                         \  
    : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter)        \  
    : "r" (&v->counter), "Ir" (i)                    \  
    : "cc");                            \  
                                    \  
    return result;                          \  
}  
  
#define atomic_add_return_relaxed   atomic_add_return_relaxed  
#define atomic_sub_return_relaxed   atomic_sub_return_relaxed  
  
static inline int atomic_cmpxchg_relaxed(atomic_t *ptr, int old, int new)  
{  
    int oldval;  
    unsigned long res;  
  
    prefetchw(&ptr->counter);  
  
    do {  
        __asm__ __volatile__("@ atomic_cmpxchg\n"  
        "ldrex  %1, [%3]\n"  
        "mov    %0, #0\n"  
        "teq    %1, %4\n"  
        "strexeq %0, %5, [%3]\n"  
            : "=&r" (res), "=&r" (oldval), "+Qo" (ptr->counter)  
            : "r" (&ptr->counter), "Ir" (old), "r" (new)  
            : "cc");  
    } while (res);  
  
    return oldval;  
}  
#define atomic_cmpxchg_relaxed      atomic_cmpxchg_relaxed  
  
static inline int __atomic_add_unless(atomic_t *v, int a, int u)  
{  
    int oldval, newval;  
    unsigned long tmp;  
  
    smp_mb();  
    prefetchw(&v->counter);  
  
    __asm__ __volatile__ ("@ atomic_add_unless\n"  
"1: ldrex   %0, [%4]\n"  
"   teq %0, %5\n"  
"   beq 2f\n"  
"   add %1, %0, %6\n"  
"   strex   %2, %1, [%4]\n"  
"   teq %2, #0\n"  
"   bne 1b\n"  
"2:"  
    : "=&r" (oldval), "=&r" (newval), "=&r" (tmp), "+Qo" (v->counter)  
    : "r" (&v->counter), "r" (u), "r" (a)  
    : "cc");  
  
    if (oldval != u)  
        smp_mb();  
  
    return oldval;  
}  
  
#else /* ARM_ARCH_6 */  
  
#ifdef CONFIG_SMP  
#error SMP not supported on pre-ARMv6 CPUs  
#endif  
  
#define ATOMIC_OP(op, c_op, asm_op)                 \  
static inline void atomic_##op(int i, atomic_t *v)          \  
{                                   \  
    unsigned long flags;                        \  
                                    \  
    raw_local_irq_save(flags);                  \  
    v->counter c_op i;                       \  
    raw_local_irq_restore(flags);                   \  
}                                   \  
  
#define ATOMIC_OP_RETURN(op, c_op, asm_op)              \  
static inline int atomic_##op##_return(int i, atomic_t *v)      \  
{                                   \  
    unsigned long flags;                        \  
    int val;                            \  
                                    \  
    raw_local_irq_save(flags);                  \  
    v->counter c_op i;                       \  
    val = v->counter;                        \  
    raw_local_irq_restore(flags);                   \  
                                    \  
    return val;                         \  
}  
  
static inline int atomic_cmpxchg(atomic_t *v, int old, int new)  
{  
    int ret;  
    unsigned long flags;  
  
    raw_local_irq_save(flags);  
    ret = v->counter;  
    if (likely(ret == old))  
        v->counter = new;  
    raw_local_irq_restore(flags);  
  
    return ret;  
}  
  
static inline int __atomic_add_unless(atomic_t *v, int a, int u)  
{  
    int c, old;  
  
    c = atomic_read(v);  
    while (c != u && (old = atomic_cmpxchg((v), c, c + a)) != c)  
        c = old;  
    return c;  
}  
  
#endif /* __LINUX_ARM_ARCH__ */  

我抽出來一部分分析,如下:注意#asm_op作為一個通用的操作符,可以是add,sub等等.下面分析是add的實現過程:

#define ATOMIC_INIT(i)  { (i) }  
  
#ifdef __KERNEL__  
  
/* 
 * On ARM, ordinary assignment (str instruction) doesn't clear the local 
 * strex/ldrex monitor on some implementations. The reason we can use it for 
 * atomic_set() is the clrex or dummy strex done on every exception return. 
 */  
#define atomic_read(v)  READ_ONCE((v)->counter)  
#define atomic_set(v,i) WRITE_ONCE(((v)->counter), (i))  
  
#if __LINUX_ARM_ARCH__ >= 6  -----------------------(1) 
  
/* 
 * ARMv6 UP and SMP safe atomic ops.  We use load exclusive and 
 * store exclusive to ensure that these are atomic.  We may loop 
 * to ensure that the update happens. 
 */  
  
#define ATOMIC_OP(op, c_op, asm_op)                 \  
static inline void atomic_##op(int i, atomic_t *v)          \  
{                                   \  
    unsigned long tmp;                      \  
    int result;                         \  
                                    \  
    prefetchw(&v->counter);-------------------------(2)                       \  
    __asm__ __volatile__("@ atomic_" #op "\n" \-------------(3) 
"1: ldrex   %0, [%3]\n"                     \--------------(4)   
"   " #asm_op " %0, %0, %4\n"                   \------------(5)  
"   strex   %1, %0, [%3]\n"                     \------------(6)  
"   teq %1, #0\n"                       \-----------------(7)   
"   bne 1b"                         \  
    : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter)        \  
    : "r" (&v->counter), "Ir" (i)                    \  
    : "cc");                            \  
}                                   \  
  
#else /* ARM_ARCH_6 */  
  
#ifdef CONFIG_SMP  
#error SMP not supported on pre-ARMv6 CPUs  
#endif  
  
#define ATOMIC_OP(op, c_op, asm_op)                 \  
static inline void atomic_##op(int i, atomic_t *v)          \  
{                                   \  
    unsigned long flags;                        \  
                                    \  
    raw_local_irq_save(flags);                  \  
    v->counter c_op i;                       \  
    raw_local_irq_restore(flags);                   \  
}                                   \  
  
#endif /* __LINUX_ARM_ARCH__ */  

對上面的1234567分別分析如下: (1). ARMv6之前的CPU並不支援SMP,之後的ARM架構都是支援SMP的(例如我們熟悉的ARMv7-A)。因此,對於ARM處理,其原子操作分成了兩個陣營,一個是支援SMP的ARMv6之後的CPU,另外一個就是ARMv6之前的,只有單核架構的CPU。對於UP,原子操作就是通過關閉CPU中斷來完成的。

(2). 這裡的程式碼和preloading cache相關。在strex指令之前將要操作的memory內容載入到cache中可以顯著提高效能。

(3). 為了完整性,我還是重複一下彙編嵌入c程式碼的語法:嵌入式彙編的語法格式是:asm(code : output operand list : input operand list : clobber list)。output operand list 和 input operand list是c程式碼和嵌入式彙編程式碼的介面,clobber list描述了彙編程式碼對暫存器的修改情況。為何要有clober list?我們的c程式碼是gcc來處理的,當遇到嵌入彙編程式碼的時候,gcc會將這些嵌入式彙編的文字送給gas進行後續處理。這樣,gcc需要了解嵌入彙編程式碼對暫存器的修改情況,否則有可能會造成大麻煩。例如:gcc對c程式碼進行處理,將某些變數值儲存在暫存器中,如果嵌入彙編修改了該暫存器的值,又沒有通知gcc的話,那麼,gcc會以為暫存器中仍然儲存了之前的變數值,因此不會重新載入該變數到暫存器,而是直接使用這個被嵌入式彙編修改的暫存器,這時候,我們唯一能做的就是靜靜的等待程式的崩潰。還好,在output operand list 和 input operand list中涉及的暫存器都不需要體現在clobber list中(gcc分配了這些暫存器,當然知道嵌入彙編程式碼會修改其內容),因此,大部分的嵌入式彙編的clobber list都是空的,或者只有一個cc,通知gcc,嵌入式彙編程式碼更新了condition code register。

對著上面的code就可以分開各段內容了。@符號標識該行是註釋。

這裡的__volatile__主要是用來防止編譯器優化的。也就是說,在編譯該c程式碼的時候,如果使用優化選項(-O)進行編譯,對於那些沒有宣告__volatile__的嵌入式彙編,編譯器有可能會對嵌入c程式碼的彙編進行優化,編譯的結果可能不是原來你撰寫的彙編程式碼,但是如果你的嵌入式彙編使用__asm__ volatile(嵌入式彙編)的語法格式,那麼也就是告訴編譯器,不要隨便動我的嵌入彙編程式碼。

(4). 我們先看ldrex和strex這兩條彙編指令的使用方法。ldr和str這兩條指令大家都是非常的熟悉了,字尾的ex表示Exclusive,是ARMv7提供的為了實現同步的彙編指令。

LDREX , []

是base register,儲存memory的address,LDREX指令從base register中獲取memory address,並且將memory的內容載入到(destination register)中。這些操作和ldr的操作是一樣的,那麼如何體現exclusive呢?其實,在執行這條指令的時候,還放出兩條“狗”來負責觀察特定地址的訪問(就是儲存在[]中的地址了),這兩條狗一條叫做local monitor,一條叫做global monitor。

STREX , , []

和LDREX指令類似,是base register,儲存memory的address,STREX指令從base register中獲取memory address,並且將 (source register)中的內容載入到該memory中。這裡的儲存了memeory 更新成功或者失敗的結果,0表示memory更新成功,1表示失敗。STREX指令是否能成功執行是和local monitor和global monitor的狀態相關的。對於Non-shareable memory(該memory不是多個CPU之間共享的,只會被一個CPU訪問),只需要放出該CPU的local monitor這條狗就OK了,下面的表格可以描述這種情況

thread 1 thread 2 local monitor的狀態
Open Access state
LDREX Exclusive Access state
LDREX Exclusive Access state
Modify Exclusive Access state
STREX Open Access state
Modify Open Access state
STREX 在Open Access state的狀態下,執行STREX指令會導致該指令執行失敗
保持Open Access state,直到下一個LDREX指令

開始的時候,local monitor處於Open Access state的狀態,thread 1執行LDREX 命令後,local monitor的狀態遷移到Exclusive Access state(標記本地CPU對xxx地址進行了LDREX的操作),這時候,中斷髮生了,在中斷handler中,又一次執行了LDREX ,這時候,local monitor的狀態保持不變,直到STREX指令成功執行,local monitor的狀態遷移到Open Access state的狀態(清除xxx地址上的LDREX的標記)。返回thread 1的時候,在Open Access state的狀態下,執行STREX指令會導致該指令執行失敗(沒有LDREX的標記,何來STREX),說明有其他的核心控制路徑插入了。 對於shareable memory,需要系統中所有的local monitor和global monitor共同工作,完成exclusive access,概念類似,這裡就不再贅述了。 大概的原理已經描述完畢,下面回到具體實現面。

"1:    ldrex    %0, [%3]\n"  

其中%3就是input operand list中的"r" (&v->counter),r是限制符(constraint),用來告訴編譯器gcc,你看著辦吧,你幫我選擇一個通用暫存器儲存該運算元吧。%0對應output openrand list中的"=&r" (result),=表示該運算元是write only的,&表示該運算元是一個earlyclobber operand,具體是什麼意思呢?編譯器在處理嵌入式彙編的時候,傾向使用盡可能少的暫存器,如果output operand沒有&修飾的話,彙編指令中的input和output運算元會使用同樣一個暫存器。因此,&確保了%3和%0使用不同的暫存器。

(5). 完成步驟(4)後,%0這個output運算元已經被賦值為atomic_t變數的old value,毫無疑問,這裡的操作是要給old value加上i。這裡%4對應"Ir" (i),這裡“I”這個限制符對應ARM平臺,表示這是一個有特定限制的立即數,該數必須是0~255之間的一個整數通過rotation的操作得到的一個32bit的立即數。這是和ARM的data-processing instructions如何解析立即數有關的。每個指令32個bit,其中12個bit被用來表示立即數,其中8個bit是真正的資料,4個bit用來表示如何rotation。更詳細的內容請參考ARM文件。

(6). 這一步將修改後的new value儲存在atomic_t變數中。是否能夠正確的操作的狀態標記儲存在%1運算元中,也就是"=&r" (tmp)。

(7). 檢查memory update的操作是否正確完成,如果OK,皆大歡喜,如果發生了問題(有其他的核心路徑插入),那麼需要跳轉到lable 1那裡,從新進行一次read-modify-write的操作。

四 arm64的實現

對於arm64的source code如下(arch/arm64/include/asm/atomic_lse.h):

static inline void atomic_add(int i, atomic_t *v)  
{  
    register int w0 asm ("w0") = i; --------------------(1) 
    register atomic_t *x1 asm ("x1") = v; -----------------(2) 
  
    asm volatile(ARM64_LSE_ATOMIC_INSN(__LL_SC_ATOMIC(add),  
    "   stadd   %w[i], %[v]\n")   ----------------------(3)
    : [i] "+r" (w0), [v] "+Q" (v->counter) -----------------(4) 
    : "r" (x1)  				   ----------------------(5)
    : __LL_SC_CLOBBERS);  ----------------------------(6)
}  

(1). 將變數i放入暫存器w0中 (2). 將atomic_t結構體變數v放入暫存器x1中 (3). 標記兩個變數相加,即彙編指令 (4). 到底哪兩個變數相加? 解析暫存器數值,並執行相加操作.即輸出運算元列表 (5). 將相加的數值存入暫存器x1中,相當於在原來v的基礎上進行v->counter + i的操作.即輸入運算元列表 (6). 改變資源列表,即告訴編輯器哪些資源被修改,需要去update

原理比較簡單.