1. 程式人生 > >執行緒的幾種鎖及基本操作

執行緒的幾種鎖及基本操作

我們先來看一段程式碼:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>         //建立兩個執行緒,分別對兩個全變數進行++操作,判斷兩個變數是否相等,不相等列印

int a = 0;
int b = 0;
// 未初始化 和0初始化的成員放在bbs
pthread_mutex_t mutex;

void* route()
{
    while(1)            //初衷不會列印
    {
        a++;
        b++;
        if(a != b)
        {
            printf("a =%d, b = %d\n", a, b);
            a = 0;
            b = 0;
        }
    }
}

int main()
{
    pthread_t tid1, tid2;
    pthread_create(&tid1, NULL, route, NULL);
    pthread_create(&tid2, NULL, route, NULL);
    
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    return 0;
}

段程式碼的執行結果優點出乎我們的預料:


我們預計的結構應該是不會列印的,而這裡去打印出了我們意想不到的結果。連相等的資料都列印了出來,為什麼會出現這樣的情況呢?

解釋:兩個執行緒互相搶佔CPU資源,一個執行緒對全域性變數做了++操作之後,還沒來得及比較輸出操作,另一個執行緒搶佔CPU,進行比較列印輸出。為了避免這樣的情況,就需要用到下面介紹的互斥鎖。

互斥量(鎖):用於保護關鍵的程式碼段,以確保其獨佔式的訪問。

1.定義互斥量: pthread_mutex_t mutex;
2.初始化互斥量: pthread_mutex_init(&mutex, NULL);//第二個引數不研究置NULL;          //初始化為 1 (僅做記憶)
3.上鎖     pthread_mutex_lock(&mutex);   1->0;    0   等待
4.解鎖           pthread_mutex_unlock(&mutex);   置1 返回

5.銷燬           pthread_mutex_destroy(&mutex);  

返回值:若成功返回0,若出錯返回錯誤編號。

說明: 互斥鎖,在多個執行緒對共享資源進行訪問時,在訪問共享資源前對互斥量進行加鎖,在訪問完再進行解鎖,在互斥量加鎖後其他的執行緒將阻塞,直到當前的執行緒訪問完畢並釋放鎖。如果釋放互斥鎖時有多個執行緒阻塞,所有阻塞執行緒都會變成可執行狀態,第一個變成可執行狀態的執行緒可以對互斥量加鎖。這樣就保證了每次只有一個執行緒訪問共享資源。

至此,我們好像能通過互斥鎖解決上面的問題:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

int a = 0;
int b = 0;
// 未初始化 和0初始化的成員放在bbs
pthread_mutex_t mutex;

void* route()
{
    while(1)            //初衷不會列印
    {
        pthread_mutex_lock(&mutex);   
        a++;
        b++;
        if(a != b)
        {
            printf("a =%d, b = %d\n", a, b);
            a = 0;
            b = 0;
        }
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    pthread_t tid1, tid2;
    pthread_mutex_init(&mutex, NULL);
    pthread_create(&tid1, NULL, route, NULL);
    pthread_create(&tid2, NULL, route, NULL);
    
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    pthread_mutex_destroy(&mutex);

    return 0;
}

現有如下場景:執行緒1和執行緒2,執行緒1執行函式A,執行緒2執行函式B,現只使用一把鎖,分別對A,B函式的執行過程加鎖和解鎖。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>           //執行緒的取消動作發生在加鎖和解鎖過程中時,當發生執行緒2取消後而沒有進行解鎖時,就會出現執行緒1將一直阻塞

pthread_mutex_t mutex;

void* odd(void* arg)
{
  int i = 1;
  for(; ; i+=2)
  {
    pthread_mutex_lock(&mutex);
    printf("%d\n", i);
    pthread_mutex_unlock(&mutex);
  }
}

void* even(void* arg)
{
  int i = 0;
  for(; ; i+=2)
  {
    pthread_mutex_lock(&mutex);
    printf("%d\n", i);
    pthread_mutex_unlock(&mutex);
  }
}


int main()
{
    pthread_t t1, t2;
    pthread_mutex_init(&mutex, NULL);
    pthread_create(&t1, NULL, even, NULL);
    pthread_create(&t2, NULL, odd, NULL);
    //pthread_create(&t3, NULL, even, NULL);
    
    sleep(3);
    pthread_cancel(t2);             //取消執行緒2,這個動作可能發生線上程2加鎖之後和解鎖之前

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_mutex_destroy(&mutex);

    return 0;
}

一種極限情況是:執行緒2的取消發生線上程2的解鎖之前,那麼就會導致因為鎖沒解開,而執行緒1無法繼續執行。

解決這樣的問題我們可以用到下面的巨集函式:

巨集:              //註冊執行緒回撥函式,可用來防止執行緒取消後沒有解鎖的問題

void pthread_cleanup_push(void (*routine)(void *), //回撥函式
                                              void *arg);//回撥函式的引數
//回撥函式執行時機
           1.pthread_exit
           2.pthread_cancel

           3.cleanaup_pop引數不為0,當執行到cleaup_pop時,呼叫回撥函式

void pthread_cleanup_pop(int execute);

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>           //執行緒的取消動作發生在加鎖和解鎖過程中時,當發生執行緒2取消後而沒有進行解鎖時,就會出現執行緒1將一直阻塞

pthread_mutex_t mutex;

void callback(void* arg)      //在cancel中進行解鎖
{
  printf("callback\n");
  sleep(1);
  pthread_mutex_unlock(&mutex); 
}

void* odd(void* arg)
{
  int i = 1;
  for(; ; i+=2)
  {
    pthread_cleanup_push(callback, NULL);//因為呼叫了cancel函式,從而觸發了回撥函式。
    pthread_mutex_lock(&mutex);
    printf("%d\n", i);
    pthread_mutex_unlock(&mutex);
    pthread_cleanup_pop(0);
  }
}

void* even(void* arg)
{
  int i = 0;
  for(; ; i+=2)
  {
    pthread_mutex_lock(&mutex);
    printf("%d\n", i);
    pthread_mutex_unlock(&mutex);
  }
}


int main()
{
    pthread_t t1, t2;
    pthread_mutex_init(&mutex, NULL);
    pthread_create(&t1, NULL, even, NULL);
    pthread_create(&t2, NULL, odd, NULL);
    //pthread_create(&t3, NULL, even, NULL);
    
    sleep(3);
    pthread_cancel(t2);             //取消執行緒2,這個動作可能發生線上程2加鎖之後和解鎖之前
    //pthread_mutex_unlock(&mutex);   有問題,如果執行even的程式有兩個,而一個取消執行緒的函式執行時正好t3函式阻塞,就會導致t3和t1同時在執行even

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_mutex_destroy(&mutex);

    return 0;
}

注意:

1.不要銷燬一個已經加鎖的互斥量,銷燬的互斥量確保後面不會再有執行緒使用。

2.上鎖和解鎖函式要成對的使用

3.選擇合適的鎖的粒度(數量)。如果粒度太粗,就會出現很多執行緒阻塞等待相同鎖,源自併發性的改善微乎其微。如果鎖的粒度太細,那麼太多的鎖的開銷會使系統的效能受到影響,而且程式碼會變得相當複雜。

4.加鎖要加最小(範圍)鎖,減少系統負擔

使用互斥鎖一定要注意避免死鎖:《Linux高效能伺服器程式設計》  14.5.3 介紹了兩個互斥量因請求順序產生死鎖問題

          如果執行緒試圖對同一個互斥量加鎖兩次,那麼它自身就會陷入死鎖狀態,使用互斥量時,還有其他更不明顯的方式也能產生死鎖。例如,程式中使用多個互斥量時,如果允許一個執行緒一直佔有第一個互斥量,並且在試圖鎖住第二個互斥量時處於阻塞狀態,但是擁有第二個互斥量的執行緒也在試圖鎖住第一個互斥量,這時就會發生死鎖。因為兩個執行緒都在相互請求另一個執行緒擁有的資源,所以這兩個執行緒都無法向前執行,於是就產生死鎖。

          可以通過小心地控制互斥量加鎖的順序來避免死鎖的發生。例如,假設需要對兩個互斥量A和B同時加鎖,如果所有執行緒總是在對互斥量B加鎖之前鎖住互斥量A,那麼使用這兩個互斥量不會產生死鎖(當然在其他資源上仍可能出現死鎖);類似地,如果所有的執行緒總是在鎖住互斥量A之前鎖住互斥量B,那麼也不會發生死鎖。只有在一個執行緒試圖以與另一個執行緒相反的順序鎖住互斥量時,才可能出現死鎖。

         為了應對死鎖,在實際的程式設計中除除了加上同步互斥量之外,還可以通過以下三原則來避免寫出死鎖的程式碼:

1>短:寫的程式碼儘量簡潔

2>平:程式碼中沒有複雜的函式呼叫

3>快:程式碼的執行速度儘可能快

自旋鎖:  應用在實時性要求較高的場合(缺點:CPU浪費較大)

pthread_mutex_spin;

pthread_spin_lock() ; //得不到時,進入忙等待,不斷向CPU進行詢問請求

pthread_spin_unlock(); 

 pthread_spin_destroy(pthread_spinlock_t *lock);

pthread_spin_init(pthread_spinlock_t *lock, int pshared);

讀寫鎖(共享-獨佔鎖):應用場景---大量的讀操作  較少的寫操作

注意:讀讀共享, 讀寫互斥,寫優先順序高(同時到達)

1. pthread_rwlock_t rwlock;//定義

2.int pthread_rwlock_init()//初始化

3.pthread_rwlock_rdlock()//pthread_rwlock_wrlock//讀鎖/寫鎖

4.pthread_rwlock_unlock() // 解鎖

5.int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);//銷燬鎖

返回值:成功返回0,出錯返回錯誤編號

說明:不管什麼時候要增加一個作業到佇列中或者從佇列中刪除作業,都用寫鎖,

不管何時搜尋隊列,首先獲取讀模式下的鎖,允許所有的工作執行緒併發的搜尋隊列。在這樣的情況下只有執行緒

搜尋隊列的頻率遠遠高於增加或刪除作業時,使用讀寫鎖才可能改善效能。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>                //建立8個執行緒,3個寫執行緒,5個讀執行緒

pthread_rwlock_t rwlock;
int counter = 0;

void* readfunc(void* arg)
{
  int id = *(int*)arg;
  free(arg);
  while(1)
  {
    pthread_rwlock_rdlock(&rwlock);
    printf("read thread %d : %d\n", id, counter);
    pthread_rwlock_unlock(&rwlock);
    usleep(100000);
  }
}

void* writefunc(void* arg)
{
  int id = *(int*)arg;
  free(arg);
  while(1)
  {
    int t = counter;
    pthread_rwlock_wrlock(&rwlock);
    printf("write thread %d : t= %d,  %d\n", id, t, ++counter);
    pthread_rwlock_unlock(&rwlock);
    usleep(100000);
  }
}
int main()
{
    pthread_t tid[8];
    pthread_rwlock_init(&rwlock, NULL);
    int i = 0;
    for(i = 0; i < 3; i++)
    {
      int* p =(int*) malloc(sizeof(int));
      *p = i;
      pthread_create(&tid[i], NULL, writefunc, (void*)p);
    }
    for(i = 0; i < 5; i++)
    {
      int* p = (int*)malloc(sizeof(int));
      *p = i;
      pthread_create(&tid[3+i], NULL, readfunc, (void*)p);
    }

    for(i = 0; i < 8; i++)
    {
      pthread_join(tid[i], NULL);
    }

    pthread_rwlock_destroy(&rwlock);

    return 0;
}

條件變數:  如果說互斥鎖是用於同步執行緒對共享資料的訪問的化,那麼條件變數這是用於線上程之間同步共享資料的值。條件變數提供了一種執行緒間的通訊機制:當某個共享資料達到某個值的時候,喚醒等待這個共享資料的執行緒       

1.定義條件變數  pthread_cond_t cond;
2.初始化        pthread_cond_init(&cond, NULL);
3.等待條件      pthread_cond_wait(&cond, &mutex);
                                 mutex :如果沒有在互斥環境,形同虛設
                                 在互斥環境下:wait函式將mutex置1,wait返回,mutex恢復成原來的值
4.修改條件      pthread_cond_signal(&cond);
5.銷燬條件      pthread_cond_destroy(&cond);
規範寫法:
pthread_mutex_lock();
    while(條件不滿足)
    pthread_cond_wait();
//為什麼會使用while?
//因為pthread_cond_wait是阻塞函式,可能被訊號打斷而返回(喚醒),返回後從當前位置向下執行, 被訊號打斷而返回(喚醒),即為假喚醒,繼續阻塞
pthread_mutex_unlock();

pthread_mutex_lock();
pthread_cond_signal(); //訊號通知   ----   如果沒有執行緒在等待,訊號會被丟棄(不會儲存起來)。
pthread_mutex_unlock();
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>             //建立兩個執行緒一個wait print,一個signal sleep()

pthread_cond_t cond;
pthread_mutex_t mutex;

void* f1(void* arg)
{
  while(1)
  {
    pthread_cond_wait(&cond, &mutex);
    printf("running!\n");
  }
}
void* f2(void* arg)
{
  while(1)
  {
    sleep(1);
    pthread_cond_signal(&cond);
  }
}

int main()
{
  pthread_t tid1, tid2;
  pthread_cond_init(&cond, NULL);
  pthread_mutex_init(&mutex, NULL);

  pthread_create(&tid1, NULL, f1, NULL);
  pthread_create(&tid2, NULL, f2, NULL);

  pthread_join(tid1, NULL);
  pthread_join(tid2, NULL);

  pthread_cond_destroy(&cond);
  pthread_mutex_destroy(&mutex);
  return 0;
}

                   System V //基於核心持續性

訊號量:      POSIX    //基於檔案持續性的訊號量
1.定義訊號量: sem_t sem;
2,初始化訊號量:    sem_init(sem_t* sem,
                                                int shared,   //0表示程序內有多少個執行緒使用
                                                int val);     //訊號量初值
3.PV操作        int sem_wait(sem_t* sem);   //sem--;如果小於0,阻塞    P操作
                      int sem_post(sem_t* sem);   //sem++;                 V操作
4.銷燬            sem_destroy(sem_t* sem);

訊號量實現生產者消費者模型:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
//倉庫中裝產品編號,沒裝產品的位置,置為-1,裝了的地方置為產品的編號


#define PRO_COUNT 3
#define CON_COUNT 2
#define BUFSIZE 5


sem_t sem_full;     //標識可生產的產品個數
sem_t sem_empty;    //表示可消費的產品個數
pthread_mutex_t mutex;  //互斥量
int num = 0;        //產品編號
int buf[BUFSIZE];   //倉庫
int wr_idx;     //寫索引
int rd_idx;     //讀索引

void* pro(void* arg)
{
  int i = 0;
  int id = *(int*)arg;
  free(arg);
  while(1)
  {
      sem_wait(&sem_full);    //先判斷倉庫是否滿
      pthread_mutex_lock(&mutex); //互斥的訪問具體的倉庫的空閒位置
      printf("%d生產者開始生產%d\n", id, num);
      for(i = 0; i < BUFSIZE; i++)
      {
        printf("\tbuf[%d]=%d", i, buf[i]);
        if(i == wr_idx)
        {
          printf("<=====");
        }
        printf("\n");
      }
      buf[wr_idx] = num++;      //存放產品
      wr_idx = (wr_idx + 1) % BUFSIZE;
      printf("%d生產者結束生產\n", id);
      pthread_mutex_unlock(&mutex);
      sem_post(&sem_empty);
      sleep(rand()%3);
    }
}

void* con(void* arg)
{
  int i = 0;
    int id = *(int*)arg;
    free(arg);
    while(1)
    {
        sem_wait(&sem_empty);
        pthread_mutex_lock(&mutex);
        
        printf("%d消費者開始消費%d\n", id, num);
        for(i = 0; i < BUFSIZE; i++)
        {
          printf("buf[%d]=%d", i, buf[i]);
          if(i == rd_idx)
          {
            printf("=====>");
          }
          printf("\n");
        }
        int r = buf[rd_idx];
        buf[rd_idx] = -1;
        rd_idx = (rd_idx+1)%BUFSIZE;
        sleep(rand()%4);
        printf("%d\n消費者消費完%d\n", id, r);
        pthread_mutex_unlock(&mutex);
        sem_post(&sem_full);
        sleep(rand()%2);
    }
}

int main()
{
    pthread_t tid[PRO_COUNT+CON_COUNT];
    pthread_mutex_init(&mutex, NULL); //初始化
    sem_init(&sem_empty, 0, 0);
    sem_init(&sem_full, 0, BUFSIZE);
    srand(getpid());

    int i = 0;
    for(i = 0; i < BUFSIZE; i++)      //初始化倉庫  -1表示沒有品
        buf[i] = -1;

    for(i = 0; i < PRO_COUNT; i++)    //產生生產者
    {
        int *p = (int*)malloc(sizeof(int));
        *p = i;
        pthread_create(&tid[i], NULL, pro, p);
    }

    for(i = 0; i < CON_COUNT; i++)
    {
        int *p = (int*)malloc(sizeof(int));
        *p = i;
        pthread_create(&tid[i+CON_COUNT], NULL, con, p);
    }
    
    for(i = 0; i < PRO_COUNT + CON_COUNT; i++)
    {
        pthread_join(tid[i], NULL);
    }

    pthread_mutex_destroy(&mutex);  //銷燬
    sem_destroy(&sem_empty);
    sem_destroy(&sem_full);

    return 0;
}

拓展學習:

樂觀鎖和悲觀鎖?

樂觀鎖:

     在關係資料庫管理系統裡,樂觀併發控制(又名”樂觀鎖”,Optimistic Concurrency Control,縮寫”OCC”)是一種併發控制的方法。它假設多使用者併發的事務在處理時不會彼此互相影響,各事務能夠在不產生鎖的情況下處理各自影響的那部分資料。在提交資料更新之前,每個事務會先檢查在該事務讀取資料後,有沒有其他事務又修改了該資料。如果其他事務有更新的話,正在提交的事務會進行回滾。

樂觀併發控制的事務包括以下階段: 
1. 讀取:事務將資料讀入快取,這時系統會給事務分派一個時間戳。 
2. 校驗:事務執行完畢後,進行提交。這時同步校驗所有事務,如果事務所讀取的資料在讀取之後又被其他事務修改,則產生衝突,事務被中斷回滾。 

3. 寫入:通過校驗階段後,將更新的資料寫入資料庫。

優點和不足:

       樂觀併發控制相信事務之間的資料競爭(data race)的概率是比較小的,因此儘可能直接做下去,直到提交的時候才去鎖定,所以不會產生任何鎖和死鎖。但如果直接簡單這麼做,還是有可能會遇到不可預期的結果,例如兩個事務都讀取了資料庫的某一行,經過修改以後寫回資料庫,這時就遇到了問題。

悲觀鎖:

    在關係資料庫管理系統裡,悲觀併發控制(又名”悲觀鎖”,Pessimistic Concurrency Control,縮寫”PCC”)是一種併發控制的方法。它可以阻止一個事務以影響其他使用者的方式來修改資料。如果一個事務執行的操作讀某行資料應用了鎖,那只有當這個事務把鎖釋放,其他事務才能夠執行與該鎖衝突的操作。

優點和不足:悲觀併發控制實際上是“先取鎖再訪問”的保守策略,為資料處理的安全提供了保證。但是在效率方面,處理加鎖的機制會讓資料庫產生額外的開銷,還有增加產生死鎖的機會;另外,在只讀型事務處理中由於不會產生衝突,也沒必要使用鎖,這樣做只能增加系統負載;還有會降低了並行性,一個事務如果鎖定了某行資料,其他事務就必須等待該事務處理完才可以處理那行數

系統最多能夠建立多少個執行緒? (一般以實測為準,但根據每次開闢的棧的大小不同,測試結果也會不同)。

一個是直接在命令列檢視    cat /proc/sys/kernel/threads-max  我的電腦顯示是 7572

另一個是自己計算 使用者空間大小3G 即是3072M/8M棧空間  = 380     

第三個寫程式:   跑到32754(理論值 32768)

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>               //建立執行緒

void* foo(void* arg)
{
}

int main()
{
    int count = 0;
    pthread_t thread;

    while(1)
    {
        if(pthread_create(&thread, NULL, foo, NULL) != 0)
        return 1;
        count++;
    printf("MAX = %d\n", count);
    }

    return 0;
}