1. 程式人生 > >Go語言學習之sync包(臨時物件池Pool、互斥鎖Mutex、等待Cond)(the way to go)

Go語言學習之sync包(臨時物件池Pool、互斥鎖Mutex、等待Cond)(the way to go)

golang的特點就是語言層面支援併發,並且實現併發非常簡單,只需在需要併發的函式前面新增關鍵字go
但是如何處理go併發機制中不同goroutine之間的同步與通訊,golang 中提供了sync包來解決相關的問題,當然還有其他的方式比如channel,原子操作atomic等等,這裡先介紹sync包的用法.

這裡,跟大家一些學習golang的標準庫,sync。

package sync

簡介

Package sync provides basic synchronization primitives such as mutual exclusion locks. Other than the Once and WaitGroup types, most are intended for use by low-level library routines. Higher-level synchronization is better done via channels and communication.

sync 包提供了互斥鎖這類的基本的同步原語.除 Once 和 WaitGroup 之外的型別大多用於底層庫的例程。更高階的同步操作通過通道與通訊進行。

Cond

Cond用於在併發環境下routine的等待和通知
type Cond

type Cond struct {

        // L is held while observing or changing the condition
        L Locker
        // contains filtered or unexported fields
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在“檢查條件”或“更改條件”時 L 應該鎖定

func NewCond

func NewCond(l Locker) *Cond
  
  • 1

NewCond returns a new Cond with Locker l.
建立一個條件等待

func (*Cond) Broadcast

func (c *Cond) Broadcast()
  
  • 1

Broadcast wakes all goroutines waiting on c.

It is allowed but not required for the caller to hold c.L during the call.
Broadcast 喚醒所有等待的 Wait,建議在“更改條件”時鎖定 c.L,更改完畢再解鎖

func (*Cond) Signal

func (c *Cond) Signal()
  
  • 1

Signal wakes one goroutine waiting on c, if there is any.

It is allowed but not required for the caller to hold c.L during the call.
Signal 喚醒一個等待的 Wait,建議在“更改條件”時鎖定 c.L,更改完畢再解鎖。

func (*Cond) Wait

func (c *Cond) Wait()
  
  • 1

Wait atomically unlocks c.L and suspends execution of the calling goroutine. After later resuming execution, Wait locks c.L before returning. Unlike in other systems, Wait cannot return unless awoken by Broadcast or Signal.

Because c.L is not locked when Wait first resumes, the caller typically cannot assume that the condition is true when Wait returns. Instead, the caller should Wait in a loop:
Wait 會解鎖 c.L 並進入等待狀態,在被喚醒時,會重新鎖定 c.L

使用示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    m.Lock()
    c := sync.NewCond(&m)
    go func() {
        m.Lock()
        defer m.Unlock()
        fmt.Println("3. goroutine is owner of lock")
        time.Sleep(2 * time.Second)
        c.Broadcast() //喚醒所有等待的 Wait
        fmt.Println("4. goroutine will release lock soon (deffered Unlock)")
    }()
    fmt.Println("1. main goroutine is owner of lock")
    time.Sleep(1 * time.Second)
    fmt.Println("2. main goroutine is still lockek")
    c.Wait()
    m.Unlock()
    fmt.Println("Done")
}

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

輸出:
1. main goroutine is owner of lock
2. main goroutine is still lockek
3. goroutine is owner of lock
4. goroutine will release lock soon (deffered Unlock)
Done

Map

併發安全的map,關於golang中map的併發不安全之後再詳細介紹。
type Map

type Map struct {
        // contains filtered or unexported fields
}
  
  • 1
  • 2
  • 3

func (*Map) Delete

func (m *Map) Delete(key interface{})
  
  • 1

Delete deletes the value for a key.
刪除一個鍵值。

func (*Map) Load

func (m *Map) Load(key interface{}) (value interface{}, ok bool)
  
  • 1

Load returns the value stored in the map for a key, or nil if no value is present. The ok result indicates whether value was found in the map.
載入方法,也就是提供一個鍵key,查詢對應的值value,如果不存在,通過ok反映。

func (*Map) LoadOrStore

func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
  
  • 1

LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.

func (*Map) Range

func (m *Map) Range(f func(key, value interface{}) bool)
  
  • 1

Range calls f sequentially for each key and value present in the map. If f returns false, range stops the iteration.
for … range map是內建的語言特性,所以沒有辦法使用for range遍歷sync.Map, 但是可以使用它的Range方法,通過回撥的方式遍歷。

func (*Map) Store

func (m *Map) Store(key, value interface{})
  
  • 1

Store sets the value for a key.
更新或者新增一個entry

使用示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var m sync.Map
    for i := 0; i < 3; i++ {
        go func(i int) {
            for j := 0; ; j++ {
                m.Store(i, j)
            }
        }(i)
    }
    for i := 0; i < 10; i++ {
        m.Range(func(key, value interface{}) bool {
            fmt.Printf("%d: %d\t", key, value)
            return true
        })
        fmt.Println()
        time.Sleep(time.Second)
    }
}

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

輸出:
無,要更新go1.9

Locker Mutex

互斥鎖用來保證在任一時刻,只能有一個例程訪問某物件。Mutex 的初始值為解鎖狀態。Mutex 通常作為其它結構體的匿名欄位使用,使該結構體具有 Lock 和 Unlock 方法。

type Locker

type Locker interface {
        Lock()
        Unlock()
}
  
  • 1
  • 2
  • 3
  • 4

A Locker represents an object that can be locked and unlocked.
Locker 介面包裝了基本的 Lock 和 UnLock 方法,用於加鎖和解鎖。

type Mutex

type Mutex struct {
        // contains filtered or unexported fields
}
  
  • 1
  • 2
  • 3

func (*Mutex) Lock

func (m *Mutex) Lock()
  
  • 1

Lock locks m. If the lock is already in use, the calling goroutine blocks until the mutex is available.
Lock 用於鎖住 m,如果 m 已經被加鎖,則 Lock 將被阻塞,直到 m 被解鎖。

func (*Mutex) Unlock

func (m *Mutex) Unlock()
  
  • 1

Unlock unlocks m. It is a run-time error if m is not locked on entry to Unlock.
Unlock 用於解鎖 m,如果 m 未加鎖,則該操作會引發 panic。

使用示例:

package main

import (
    "fmt"
    "sync"
)

type SafeInt struct {
    sync.Mutex
    Num int
}

func main() {
    count := SafeInt{}
    done := make(chan bool)
    for i := 0; i < 10; i++ {
        go func(i int) {
            count.Lock()
            count.Num += i
            fmt.Print(count.Num, " ")
            count.Unlock()
            done <- true
        }(i)
    }
    for i := 0; i < 10; i++ {
        <-done
    }
}

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

輸出(隨機):
0 9 10 12 15 19 24 30 37 45

Once

Once is an object that will perform exactly one action.

Once 的作用是多次呼叫但只執行一次,Once 只有一個方法,Once.Do(),向 Do 傳入一個函式,這個函式在第一次執行 Once.Do() 的時候會被呼叫,以後再執行 Once.Do() 將沒有任何動作,即使傳入了其它的函式,也不會被執行,如果要執行其它函式,需要重新建立一個 Once 物件。

type Once

type Once struct {
        // contains filtered or unexported fields
}
  
  • 1
  • 2
  • 3

func (*Once) Do

func (o *Once) Do(f func())
  
  • 1

Do calls the function f if and only if Do is being called for the first time for this instance of Once. In other words, given

使用示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {

    o := &sync.Once{}
    go myfun(o)
    go myfun(o)
    time.Sleep(time.Second * 2)
}

func myfun(o *sync.Once) {

    fmt.Println("Begin function")

    o.Do(func() {
        fmt.Println("Working...")
    })

    fmt.Println("Function end")
}

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

輸出:
Begin function
Working…
Function end
Begin function
Function end

Pool

Pool 用於儲存臨時物件,它將使用完畢的物件存入物件池中,在需要的時候取出來重複使用,目的是為了避免重複建立相同的物件造成 GC 負擔過重。其中存放的臨時物件隨時可能被 GC 回收掉(如果該物件不再被其它變數引用)。

從 Pool 中取出物件時,如果 Pool 中沒有物件,將返回 nil,但是如果給 Pool.New 欄位指定了一個函式的話,Pool 將使用該函式建立一個新物件返回。

Pool 可以安全的在多個例程中並行使用,但 Pool 並不適用於所有空閒物件,Pool 應該用來管理併發的例程共享的臨時物件,而不應該管理短壽命物件中的臨時物件,因為這種情況下記憶體不能很好的分配,這些短壽命物件應該自己實現空閒列表。

切記,Pool 在開始使用之後,不能再被複制!!!!

type Pool

type Pool struct {

        // New optionally specifies a function to generate
        // a value when Get would otherwise return nil.
        // It may not be changed concurrently with calls to Get.
        New func() interface{}
        // contains filtered or unexported fields
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

func (*Pool) Get

func (p *Pool) Get() interface{}
  
  • 1

作用:從臨時物件池中取出物件

func (*Pool) Put

func (p *Pool) Put(x interface{})
  
  • 1

作用:向臨時物件池中存入物件

使用示例:

package main

import (
    "fmt"
    "sync"
)

func main() {
    p := &sync.Pool{
        New: func() interface{} {
            return 0
        },
    }

    a := p.Get().(int)
    p.Put(100)
    b := p.Get().(int)
    fmt.Println(a, b)
}

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

輸出:0 100

RWMutex

RWMutex 比 Mutex 多了一個“讀鎖定”和“讀寫解鎖”,可以讓多個例程同時讀取某物件。

RWMutex 可以安全的在多個例程中並行使用。

type RWMutex

type RWMutex struct {
        // contains filtered or unexported fields
}
  
  • 1
  • 2
  • 3

func (*RWMutex) Lock

func (rw *RWMutex) Lock()
  
  • 1

Lock locks rw for writing. If the lock is already locked for reading or writing, Lock blocks until the lock is available.
Lock 將 rw 設定為寫鎖定狀態,禁止其他例程讀取或寫入

func (*RWMutex) RLock

func (rw *RWMutex) RLock()
  
  • 1

RLock locks rw for reading.
RLock 將 rw 設定為讀鎖定狀態,禁止其他例程寫入,但可以讀取。
It should not be used for recursive read locking; a blocked Lock call excludes new readers from acquiring the lock. See the documentation on the RWMutex type.

func (*RWMutex) RLocker

func (rw *RWMutex) RLocker() Locker
  
  • 1

RLocker returns a Locker interface that implements the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.

func (*RWMutex) RUnlock

func (rw *RWMutex) RUnlock()
  
  • 1

RUnlock undoes a single RLock call; it does not affect other simultaneous readers. It is a run-time error if rw is not locked for reading on entry to RUnlock.
Runlock 解除 rw 的讀鎖定狀態,如果 rw 未被讀鎖頂,則該操作會引發 panic。

func (*RWMutex) Unlock

func (rw *RWMutex) Unlock()
  
  • 1

Unlock unlocks rw for writing. It is a run-time error if rw is not locked for writing on entry to Unlock.
Unlock 解除 rw 的寫鎖定狀態,如果 rw 未被寫鎖定,則該操作會引發 panic。

使用示例:

package main

import (
    "sync"
    "time"
)

var m *sync.RWMutex

func main() {
    m = new(sync.RWMutex)

    go read(1)
    go read(2)

    time.Sleep(2 * time.Second)
}

func read(i int) {
    println(i, "start")

    m.RLock()
    println(i, "reading......")
    time.Sleep(1 * time.Second)
    m.RUnlock()

    println(i, "end")
}

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

輸出:
1 start
1 reading……
2 start
2 reading……
1 end
2 end

WaitGroup

WaitGroup 用於等待一組例程的結束。主例程在建立每個子例程的時候先呼叫 Add 增加等待計數,每個子例程在結束時呼叫 Done 減少例程計數。之後,主例程通過 Wait 方法開始等待,直到計數器歸零才繼續執行。

type WaitGroup

type WaitGroup struct {
        // contains filtered or unexported fields
}
  
  • 1
  • 2
  • 3

func (*WaitGroup) Add

func (wg *WaitGroup) Add(delta int)
  
  • 1

Add adds delta, which may be negative, to the WaitGroup counter. If the counter becomes zero, all goroutines blocked on Wait are released. If the counter goes negative, Add panics.
計數器增加 delta,delta 可以是負數。

func (*WaitGroup) Done

func (wg *WaitGroup) Done()
  
  • 1

Done decrements the WaitGroup counter by one.
計數器減少 1

func (*WaitGroup) Wait

func (wg *WaitGroup) Wait()
  
  • 1

Wait blocks until the WaitGroup counter is zero.
等待直到計數器歸零。如果計數器小於 0,則該操作會引發 panic。

使用示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func dosomething(millisecs time.Duration, wg *sync.WaitGroup) {
    duration := millisecs * time.Millisecond
    time.Sleep(duration)
    fmt.Println("Function in background, duration:", duration)
    wg.Done()
}

func main() {
    var wg sync.WaitGroup
    wg.Add(4)
    go dosomething(200, &wg)
    go dosomething(400, &wg)
    go dosomething(150, &wg)
    go dosomething(600, &wg)

    wg.Wait()
    fmt.Println("Done")
}

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

輸出:
Function in background, duration: 150ms
Function in background, duration: 200ms
Function in background, duration: 400ms
Function in background, duration: 600ms
Done

簡單應用

通過sync實現單例

package singleton

import (
    "sync"
)

type singleton struct {
}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}
  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

訪問多個url

package main

import (
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "sync"
)

func main() {
    urls := []string{
        "http://blog.csdn.net/wangshubo1989/article/details/77949252",
        "http://blog.csdn.net/wangshubo1989/article/details/77933654",
        "http://blog.csdn.net/wangshubo1989/article/details/77893561",
    }
    jsonResponses := make(chan string)

    var wg sync.WaitGroup

    wg.Add(len(urls))

    for _, url := range urls {
        go func(url string) {
            defer wg.Done()
            res, err := http.Get(url)
            if err != nil {
                log.Fatal(err)
            } else {
                defer res.Body.Close()
                body, err := ioutil.ReadAll(res.Body)
                if err != nil {
                    log.Fatal(err)
                } else {
                    jsonResponses <- string(body)
                }
            }
        }(url)
    }

    go func() {
        for response := range jsonResponses {
            fmt.Println(response)
        }
    }()

    wg.Wait()
}

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

這裡寫圖片描述