1. 程式人生 > >golang中job佇列實現方法

golang中job佇列實現方法

THE “NO-JOB-QUEUE” JOB QUEUE

如果僅僅只需要執行一個非同步操作,而不需要job queue,那麼可以用下面程式碼

go process(job)

但是如果我們需要控制同時工作的job的數量或者對生產者的生產進行限制,就需要使用job queue.

THE SIMPLEST JOB QUEUE

下面是一個簡單的job queue實現,worker從job queue中獲取一個job來處理。

func worker(jobChan <-chan Job) {
    for job := range jobChan {
        process(job)
    }
}

// make a channel with a capacity of 100.
jobChan := make(chan Job, 100) // start the worker go worker(jobChan) // enqueue a job jobChan <- job

上面程式碼,建立一個Job物件的channel用來存放和傳遞job,該channel在建立時設定容量為100.然後啟動一個worker goroutine從channel中抽取一個job進行處理,worker一次處理一個job。通過<- job往channel中加入新的job。channerl中資料的in和out是執行緒安全的,開發人員無需擔心互斥。

PRODUCER THROTTLING

jobChan建立時擁有100的容量,那麼如果這個channel已經有了100個job,再執行

// enqueue a job
jobChan <- job

操作時,這個操作就會阻塞。這個模式可以幫助我們限制生產者生產資料的數量,避免生產的資料過多。
如果channerl滿了之後,我們並不希望當前生產者阻塞而是要返回一個錯誤訊息給上層的呼叫者,我們可以使用下面的方法。

ENQUEUEING WITHOUT BLOCKING

實現非阻塞的生產者模式,我們可以使用select

// TryEnqueue tries to enqueue a job to the given job channel. Returns true if
// the operation was successful, and false if enqueuing would not have been // possible without blocking. Job is not enqueued in the latter case. func TryEnqueue(job Job, jobChan <-chan Job) bool { select { case jobChan <- job: return true default: return false } }

當通道jobchan已經滿的時候,jobChan <- job: 阻塞,程式跳到default中執行。

STOPPING THE WORKER

如果job已經完了,如果優雅的告知worker停止,而不是阻塞的等待呢?我們可以使用close去關閉channel。

close(jobChan)

然後woker的程式碼就可以變為

for job := range jobChan {...}

在channerl關閉之前進入到jobChan的job會被woker讀取出來進行處理,最後這個迴圈會自動退出。
(在golang中,讀取已經關閉的channel是合法的,不過返回的第二結果是false)

WAITING FOR THE WORKER

使用close函式,是主動讓worker去停止,但是如果想要等待woker處理完,我們就要使用sync.WaitGroup

// use a WaitGroup 
var wg sync.WaitGroup

func worker(jobChan <-chan Job) {
    defer wg.Done()

    for job := range jobChan {
        process(job)
    }
}

// increment the WaitGroup before starting the worker
wg.Add(1)
go worker(jobChan)

// to stop the worker, first close the job channel
close(jobChan)

// then wait using the WaitGroup
wg.Wait()

wg.Add(1)讓WaitGroup增加1,wg.Done()讓WaitGroup減1,wg.Wait()會一直阻塞除非變為0 。

WAITING WITH A TIMEOUT

如果不想要WaitGroup一直等,而是有個超時時間,我們可以用select實現

// WaitTimeout does a Wait on a sync.WaitGroup object but with a specified
// timeout. Returns true if the wait completed without timing out, false
// otherwise.
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
    ch := make(chan struct{})
    go func() {
        wg.Wait()
        close(ch)
    }()
    select {
    case <-ch:
            return true
    case <-time.After(timeout):
            return false
    }
}

// now use the WaitTimeout instead of wg.Wait()
WaitTimeout(&wg, 5 * time.Second)

如果WaitGroup先返回,那麼close(ch)執行後,case<- ch:有效就會執行,否則當timeout到達後,case <-time.After(timeout):
就會執行。

CANCELLING WORKERS

如果想要worker立刻停止當前工作,而不是之前那樣worker還會處理剩下的job,我們可以利用context

// create a context that can be cancelled
ctx, cancel := context.WithCancel(context.Background())

// start the goroutine passing it the context
go worker(ctx, jobChan)

func worker(ctx context.Context, jobChan <-chan Job) {
    for {
        select {
        case <-ctx.Done():
            return

        case job := <-jobChan:
            process(job)
        }
    }
}

// Invoke cancel when the worker needs to be stopped. This *does not* wait
// for the worker to exit.
cancel()

首先

ctx, cancel := context.WithCancel(context.Background())

建立一個context物件以及相關的cancel,當cancel被呼叫後,ctx.Done()變為可讀,worker就會返回
這個方法有一個小問題,就是如果某時刻jobChan中有job,同時cancel也被呼叫了,那麼<-ctx.Done():job := <-jobChan:
同時都不阻塞了,那麼select就會隨機選一個,這個作為開發者就無法決定了。

CANCELLING WORKERS WITHOUT CONTEXT

// create a cancel channel
cancelChan := make(chan struct{})

// start the goroutine passing it the cancel channel 
go worker(jobChan, cancelChan)

func worker(jobChan <-chan Job, cancelChan <-chan struct{}) {
    for {
        select {
        case <-cancelChan:
            return

        case job := <-jobChan:
            process(job)
        }
    }
}

// to cancel the worker, close the cancel channel
close(cancelChan)

A POOL OF WORKERS

使用多個worker可以提高程式的併發行,最簡單的做法如下

for i:=0; i<workerCount; i++ {
    go worker(jobChan)
}

然後多個worker會嘗試從同一個channel中獲取job,這個操作是安全的,作為開發者可以放心。
要等待這些worker完成工作,仍然可以用wait group如下:

for i:=0; i<workerCount; i++ {
    wg.Add(1)
    go worker(jobChan)
}

// wait for all workers to exit
wg.Wait()

如果要cancel這些worker,可以再單獨使用一個channel,用來給這些worker發通知,類似CANCELLING WORKERS WITHOUT CONTEXT那樣


// create cancel channel
cancelChan := make(chan struct{})

// pass the channel to the workers, let them wait on it
for i:=0; i<workerCount; i++ {
    go worker(jobChan, cancelChan)
}

// close the channel to signal the workers
close(cancelChan)