[linux kernel-併發與同步]2.atomic工作原理
一 源由-需要解決什麼問題
我們的程式邏輯經常遇到這樣的操作序列:
- 讀一個位於memory中的變數的值到暫存器中
- 修改該變數的值(也就是修改暫存器中的值)
- 將暫存器中的數值寫回memory中的變數值
如果這個操作序列是序列化的操作(在一個thread中序列執行),那麼一切OK,然而,世界總是不能如你所願。在多CPU體系結構中,執行在兩個CPU上的兩個核心控制路徑同時並行執行上面操作序列,有可能發生下面的場景:
CPU1上的操作 | CPU2上的操作 |
---|---|
讀操作 | |
讀操作 | |
修改 | 修改 |
寫操作 | |
寫操作 |
多個CPUs和memory chip是通過匯流排互聯的,在任意時刻,只能有一個匯流排master裝置(例如CPU、DMA controller)訪問該Slave裝置(在這個場景中,slave裝置是RAM chip)。因此,來自兩個CPU上的讀memory操作被序列化執行,分別獲得了同樣的舊值。完成修改後,兩個CPU都想進行寫操作,把修改的值寫回到memory。但是,硬體arbiter的限制使得CPU的寫回必須是序列化的,因此CPU1首先獲得了訪問權,進行寫回動作,隨後,CPU2完成寫回動作。在這種情況下,CPU1的對memory的修改被CPU2的操作覆蓋了,因此執行結果是錯誤的。 不僅是多CPU,在單CPU上也會由於有多個核心控制路徑的交錯而導致上面描述的錯誤。一個具體的例子如下:
系統呼叫的控制路徑 | 中斷handler控制路徑 |
---|---|
讀操作 | |
讀操作 | |
修改 | |
寫操作 | |
修改 | |
寫操作 |
系統呼叫的控制路徑上,完成讀操作後,硬體觸發中斷,開始執行中斷handler。這種場景下,中斷handler控制路徑的寫回的操作被系統呼叫控制路徑上的寫回覆蓋了,結果也是錯誤的。
二 對策-解決方案
對於那些有多個核心控制路徑進行read-modify-write的變數,核心提供了一個特殊的型別atomic_t,具體定義如下:code:include/linux/types.h
typedef struct { int counter; } atomic_t;
從上面的定義來看,atomic_t實際上就是一個int型別的counter,不過定義這樣特殊的型別atomic_t是有其思考的:核心定義了若干atomic_xxx的介面API函式,這些函式只會接收atomic_t型別的引數。這樣可以確保atomic_xxx的介面函式只會操作atomic_t型別的資料。同樣的,如果你定義了atomic_t型別的變數(你期望用atomic_xxx的介面API函式操作它),這些變數也不會被那些普通的、非原子變數操作的API函式接受。簡單一句話就是原子操作保護的變數,只有原子操作的API才能夠訪問和修改,其他的操作方式不被允許. 具體的部分介面API函式整理如下(在ARM64檔案arch/arm64/include/asm/atomic.h定義了眾多的可以呼叫的API介面):
介面函式 | 描述 |
---|---|
static inline void atomic_add(int i, atomic_t *v) | 給一個原子變數v增加i |
static inline int atomic_add_return(int i, atomic_t *v) | 同上,只不過將變數v的最新值返回 |
static inline void atomic_sub(int i, atomic_t *v) | 給一個原子變數v減去i |
static inline int atomic_sub_return(int i, atomic_t *v) | 同上,只不過將變數v的最新值返回 |
static inline int atomic_cmpxchg(atomic_t *ptr, int old, int new) | 比較old和原子變數ptr中的值,如果相等,那麼就把new值賦給原子變數。 返回舊的原子變數ptr中的值 |
atomic_read | 獲取原子變數的值 |
atomic_set | 設定原子變數的值 |
atomic_inc(v) | 原子變數的值加一 |
atomic_inc_return(v) | 同上,只不過將變數v的最新值返回 |
atomic_dec(v) | 原子變數的值減去一 |
atomic_dec_return(v) | 同上,只不過將變數v的最新值返回 |
atomic_sub_and_test(i, v) | 給一個原子變數v減去i,並判斷變數v的最新值是否等於0 |
atomic_add_negative(i,v) | 給一個原子變數v增加i,並判斷變數v的最新值是否是負數 |
static inline int atomic_add_unless(atomic_t *v, int a, int u) | 只要原子變數v不等於u,那麼就執行原子變數v加a的操作。 如果v不等於u,返回非0值,否則返回0值 |
三 arm32的實現
arm32分架構有不同的實現,source code:arch/arm/include/asm/atomic.h:
#define ATOMIC_INIT(i) { (i) }
#ifdef __KERNEL__
/*
* On ARM, ordinary assignment (str instruction) doesn't clear the local
* strex/ldrex monitor on some implementations. The reason we can use it for
* atomic_set() is the clrex or dummy strex done on every exception return.
*/
#define atomic_read(v) READ_ONCE((v)->counter)
#define atomic_set(v,i) WRITE_ONCE(((v)->counter), (i))
#if __LINUX_ARM_ARCH__ >= 6
/*
* ARMv6 UP and SMP safe atomic ops. We use load exclusive and
* store exclusive to ensure that these are atomic. We may loop
* to ensure that the update happens.
*/
#define ATOMIC_OP(op, c_op, asm_op) \
static inline void atomic_##op(int i, atomic_t *v) \
{ \
unsigned long tmp; \
int result; \
\
prefetchw(&v->counter); \
__asm__ __volatile__("@ atomic_" #op "\n" \
"1: ldrex %0, [%3]\n" \
" " #asm_op " %0, %0, %4\n" \
" strex %1, %0, [%3]\n" \
" teq %1, #0\n" \
" bne 1b" \
: "=&r" (result), "=&r" (tmp), "+Qo" (v->counter) \
: "r" (&v->counter), "Ir" (i) \
: "cc"); \
} \
#define ATOMIC_OP_RETURN(op, c_op, asm_op) \
static inline int atomic_##op##_return_relaxed(int i, atomic_t *v) \
{ \
unsigned long tmp; \
int result; \
\
prefetchw(&v->counter); \
\
__asm__ __volatile__("@ atomic_" #op "_return\n" \
"1: ldrex %0, [%3]\n" \
" " #asm_op " %0, %0, %4\n" \
" strex %1, %0, [%3]\n" \
" teq %1, #0\n" \
" bne 1b" \
: "=&r" (result), "=&r" (tmp), "+Qo" (v->counter) \
: "r" (&v->counter), "Ir" (i) \
: "cc"); \
\
return result; \
}
#define atomic_add_return_relaxed atomic_add_return_relaxed
#define atomic_sub_return_relaxed atomic_sub_return_relaxed
static inline int atomic_cmpxchg_relaxed(atomic_t *ptr, int old, int new)
{
int oldval;
unsigned long res;
prefetchw(&ptr->counter);
do {
__asm__ __volatile__("@ atomic_cmpxchg\n"
"ldrex %1, [%3]\n"
"mov %0, #0\n"
"teq %1, %4\n"
"strexeq %0, %5, [%3]\n"
: "=&r" (res), "=&r" (oldval), "+Qo" (ptr->counter)
: "r" (&ptr->counter), "Ir" (old), "r" (new)
: "cc");
} while (res);
return oldval;
}
#define atomic_cmpxchg_relaxed atomic_cmpxchg_relaxed
static inline int __atomic_add_unless(atomic_t *v, int a, int u)
{
int oldval, newval;
unsigned long tmp;
smp_mb();
prefetchw(&v->counter);
__asm__ __volatile__ ("@ atomic_add_unless\n"
"1: ldrex %0, [%4]\n"
" teq %0, %5\n"
" beq 2f\n"
" add %1, %0, %6\n"
" strex %2, %1, [%4]\n"
" teq %2, #0\n"
" bne 1b\n"
"2:"
: "=&r" (oldval), "=&r" (newval), "=&r" (tmp), "+Qo" (v->counter)
: "r" (&v->counter), "r" (u), "r" (a)
: "cc");
if (oldval != u)
smp_mb();
return oldval;
}
#else /* ARM_ARCH_6 */
#ifdef CONFIG_SMP
#error SMP not supported on pre-ARMv6 CPUs
#endif
#define ATOMIC_OP(op, c_op, asm_op) \
static inline void atomic_##op(int i, atomic_t *v) \
{ \
unsigned long flags; \
\
raw_local_irq_save(flags); \
v->counter c_op i; \
raw_local_irq_restore(flags); \
} \
#define ATOMIC_OP_RETURN(op, c_op, asm_op) \
static inline int atomic_##op##_return(int i, atomic_t *v) \
{ \
unsigned long flags; \
int val; \
\
raw_local_irq_save(flags); \
v->counter c_op i; \
val = v->counter; \
raw_local_irq_restore(flags); \
\
return val; \
}
static inline int atomic_cmpxchg(atomic_t *v, int old, int new)
{
int ret;
unsigned long flags;
raw_local_irq_save(flags);
ret = v->counter;
if (likely(ret == old))
v->counter = new;
raw_local_irq_restore(flags);
return ret;
}
static inline int __atomic_add_unless(atomic_t *v, int a, int u)
{
int c, old;
c = atomic_read(v);
while (c != u && (old = atomic_cmpxchg((v), c, c + a)) != c)
c = old;
return c;
}
#endif /* __LINUX_ARM_ARCH__ */
我抽出來一部分分析,如下:注意#asm_op作為一個通用的操作符,可以是add,sub等等.下面分析是add的實現過程:
#define ATOMIC_INIT(i) { (i) }
#ifdef __KERNEL__
/*
* On ARM, ordinary assignment (str instruction) doesn't clear the local
* strex/ldrex monitor on some implementations. The reason we can use it for
* atomic_set() is the clrex or dummy strex done on every exception return.
*/
#define atomic_read(v) READ_ONCE((v)->counter)
#define atomic_set(v,i) WRITE_ONCE(((v)->counter), (i))
#if __LINUX_ARM_ARCH__ >= 6 -----------------------(1)
/*
* ARMv6 UP and SMP safe atomic ops. We use load exclusive and
* store exclusive to ensure that these are atomic. We may loop
* to ensure that the update happens.
*/
#define ATOMIC_OP(op, c_op, asm_op) \
static inline void atomic_##op(int i, atomic_t *v) \
{ \
unsigned long tmp; \
int result; \
\
prefetchw(&v->counter);-------------------------(2) \
__asm__ __volatile__("@ atomic_" #op "\n" \-------------(3)
"1: ldrex %0, [%3]\n" \--------------(4)
" " #asm_op " %0, %0, %4\n" \------------(5)
" strex %1, %0, [%3]\n" \------------(6)
" teq %1, #0\n" \-----------------(7)
" bne 1b" \
: "=&r" (result), "=&r" (tmp), "+Qo" (v->counter) \
: "r" (&v->counter), "Ir" (i) \
: "cc"); \
} \
#else /* ARM_ARCH_6 */
#ifdef CONFIG_SMP
#error SMP not supported on pre-ARMv6 CPUs
#endif
#define ATOMIC_OP(op, c_op, asm_op) \
static inline void atomic_##op(int i, atomic_t *v) \
{ \
unsigned long flags; \
\
raw_local_irq_save(flags); \
v->counter c_op i; \
raw_local_irq_restore(flags); \
} \
#endif /* __LINUX_ARM_ARCH__ */
對上面的1234567分別分析如下: (1). ARMv6之前的CPU並不支援SMP,之後的ARM架構都是支援SMP的(例如我們熟悉的ARMv7-A)。因此,對於ARM處理,其原子操作分成了兩個陣營,一個是支援SMP的ARMv6之後的CPU,另外一個就是ARMv6之前的,只有單核架構的CPU。對於UP,原子操作就是通過關閉CPU中斷來完成的。
(2). 這裡的程式碼和preloading cache相關。在strex指令之前將要操作的memory內容載入到cache中可以顯著提高效能。
(3). 為了完整性,我還是重複一下彙編嵌入c程式碼的語法:嵌入式彙編的語法格式是:asm(code : output operand list : input operand list : clobber list)。output operand list 和 input operand list是c程式碼和嵌入式彙編程式碼的介面,clobber list描述了彙編程式碼對暫存器的修改情況。為何要有clober list?我們的c程式碼是gcc來處理的,當遇到嵌入彙編程式碼的時候,gcc會將這些嵌入式彙編的文字送給gas進行後續處理。這樣,gcc需要了解嵌入彙編程式碼對暫存器的修改情況,否則有可能會造成大麻煩。例如:gcc對c程式碼進行處理,將某些變數值儲存在暫存器中,如果嵌入彙編修改了該暫存器的值,又沒有通知gcc的話,那麼,gcc會以為暫存器中仍然儲存了之前的變數值,因此不會重新載入該變數到暫存器,而是直接使用這個被嵌入式彙編修改的暫存器,這時候,我們唯一能做的就是靜靜的等待程式的崩潰。還好,在output operand list 和 input operand list中涉及的暫存器都不需要體現在clobber list中(gcc分配了這些暫存器,當然知道嵌入彙編程式碼會修改其內容),因此,大部分的嵌入式彙編的clobber list都是空的,或者只有一個cc,通知gcc,嵌入式彙編程式碼更新了condition code register。
對著上面的code就可以分開各段內容了。@符號標識該行是註釋。
這裡的__volatile__主要是用來防止編譯器優化的。也就是說,在編譯該c程式碼的時候,如果使用優化選項(-O)進行編譯,對於那些沒有宣告__volatile__的嵌入式彙編,編譯器有可能會對嵌入c程式碼的彙編進行優化,編譯的結果可能不是原來你撰寫的彙編程式碼,但是如果你的嵌入式彙編使用__asm__ volatile(嵌入式彙編)的語法格式,那麼也就是告訴編譯器,不要隨便動我的嵌入彙編程式碼。
(4). 我們先看ldrex和strex這兩條彙編指令的使用方法。ldr和str這兩條指令大家都是非常的熟悉了,字尾的ex表示Exclusive,是ARMv7提供的為了實現同步的彙編指令。
LDREX
是base register,儲存memory的address,LDREX指令從base register中獲取memory address,並且將memory的內容載入到
STREX ,
和LDREX指令類似,是base register,儲存memory的address,STREX指令從base register中獲取memory address,並且將
thread 1 | thread 2 | local monitor的狀態 |
---|---|---|
Open Access state | ||
LDREX | Exclusive Access state | |
LDREX | Exclusive Access state | |
Modify | Exclusive Access state | |
STREX | Open Access state | |
Modify | Open Access state | |
STREX | 在Open Access state的狀態下,執行STREX指令會導致該指令執行失敗 | |
保持Open Access state,直到下一個LDREX指令 |
開始的時候,local monitor處於Open Access state的狀態,thread 1執行LDREX 命令後,local monitor的狀態遷移到Exclusive Access state(標記本地CPU對xxx地址進行了LDREX的操作),這時候,中斷髮生了,在中斷handler中,又一次執行了LDREX ,這時候,local monitor的狀態保持不變,直到STREX指令成功執行,local monitor的狀態遷移到Open Access state的狀態(清除xxx地址上的LDREX的標記)。返回thread 1的時候,在Open Access state的狀態下,執行STREX指令會導致該指令執行失敗(沒有LDREX的標記,何來STREX),說明有其他的核心控制路徑插入了。 對於shareable memory,需要系統中所有的local monitor和global monitor共同工作,完成exclusive access,概念類似,這裡就不再贅述了。 大概的原理已經描述完畢,下面回到具體實現面。
"1: ldrex %0, [%3]\n"
其中%3就是input operand list中的"r" (&v->counter),r是限制符(constraint),用來告訴編譯器gcc,你看著辦吧,你幫我選擇一個通用暫存器儲存該運算元吧。%0對應output openrand list中的"=&r" (result),=表示該運算元是write only的,&表示該運算元是一個earlyclobber operand,具體是什麼意思呢?編譯器在處理嵌入式彙編的時候,傾向使用盡可能少的暫存器,如果output operand沒有&修飾的話,彙編指令中的input和output運算元會使用同樣一個暫存器。因此,&確保了%3和%0使用不同的暫存器。
(5). 完成步驟(4)後,%0這個output運算元已經被賦值為atomic_t變數的old value,毫無疑問,這裡的操作是要給old value加上i。這裡%4對應"Ir" (i),這裡“I”這個限制符對應ARM平臺,表示這是一個有特定限制的立即數,該數必須是0~255之間的一個整數通過rotation的操作得到的一個32bit的立即數。這是和ARM的data-processing instructions如何解析立即數有關的。每個指令32個bit,其中12個bit被用來表示立即數,其中8個bit是真正的資料,4個bit用來表示如何rotation。更詳細的內容請參考ARM文件。
(6). 這一步將修改後的new value儲存在atomic_t變數中。是否能夠正確的操作的狀態標記儲存在%1運算元中,也就是"=&r" (tmp)。
(7). 檢查memory update的操作是否正確完成,如果OK,皆大歡喜,如果發生了問題(有其他的核心路徑插入),那麼需要跳轉到lable 1那裡,從新進行一次read-modify-write的操作。
四 arm64的實現
對於arm64的source code如下(arch/arm64/include/asm/atomic_lse.h):
static inline void atomic_add(int i, atomic_t *v)
{
register int w0 asm ("w0") = i; --------------------(1)
register atomic_t *x1 asm ("x1") = v; -----------------(2)
asm volatile(ARM64_LSE_ATOMIC_INSN(__LL_SC_ATOMIC(add),
" stadd %w[i], %[v]\n") ----------------------(3)
: [i] "+r" (w0), [v] "+Q" (v->counter) -----------------(4)
: "r" (x1) ----------------------(5)
: __LL_SC_CLOBBERS); ----------------------------(6)
}
(1). 將變數i放入暫存器w0中 (2). 將atomic_t結構體變數v放入暫存器x1中 (3). 標記兩個變數相加,即彙編指令 (4). 到底哪兩個變數相加? 解析暫存器數值,並執行相加操作.即輸出運算元列表 (5). 將相加的數值存入暫存器x1中,相當於在原來v的基礎上進行v->counter + i的操作.即輸入運算元列表 (6). 改變資源列表,即告訴編輯器哪些資源被修改,需要去update
原理比較簡單.