1. 程式人生 > >nodejs中使用worker_threads來建立新的執行緒

nodejs中使用worker_threads來建立新的執行緒

[toc] nodejs中使用worker_threads來建立新的執行緒 # 簡介 之前的文章中提到了,nodejs中有兩種執行緒,一種是event loop用來相應使用者的請求和處理各種callback。另一種就是worker pool用來處理各種耗時操作。 nodejs的官網提到了一個能夠使用nodejs本地woker pool的lib叫做webworker-threads。 可惜的是webworker-threads的最後一次更新還是在2年前,而在最新的nodejs 12中,根本無法使用。 而webworker-threads的作者則推薦了一個新的lib叫做web-worker。 web-worker是構建於nodejs的worker_threads之上的,本文將會詳細講解worker_threads和web-worker的使用。 # worker_threads worker_threads模組的原始碼源自lib/worker_threads.js,它指的是工作執行緒,可以開啟一個新的執行緒來並行執行javascript程式。 worker_threads主要用來處理CPU密集型操作,而不是IO操作,因為nodejs本身的非同步IO已經非常強大了。 worker_threads中主要有5個屬性,3個class和3個主要的方法。接下來我們將會一一講解。 ## isMainThread isMainThread用來判斷程式碼是否在主執行緒中執行,我們看一個使用的例子: ~~~js const { Worker, isMainThread } = require('worker_threads'); if (isMainThread) { console.log('在主執行緒中'); new Worker(__filename); } else { console.log('在工作執行緒中'); console.log(isMainThread); // 列印 'false'。 } ~~~ 上面的例子中,我們從worker_threads模組中引入了Worker和isMainThread,Worker就是工作執行緒的主類,我們將會在後面詳細講解,這裡我們使用Worker建立了一個工作執行緒。 ## MessageChannel MessageChannel代表的是一個非同步雙向通訊channel。MessageChannel中沒有方法,主要通過MessageChannel來連線兩端的MessagePort。 ~~~js class MessageChannel { readonly port1: MessagePort; readonly port2: MessagePort; } ~~~ 當我們使用new MessageChannel()的時候,會自動建立兩個MessagePort。 ~~~js const { MessageChannel } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); port1.on('message', (message) => console.log('received', message)); port2.postMessage({ foo: 'bar' }); // Prints: received { foo: 'bar' } from the `port1.on('message')` listener ~~~ 通過MessageChannel,我們可以進行MessagePort間的通訊。 ## parentPort和MessagePort parentPort是一個MessagePort型別,parentPort主要用於worker執行緒和主執行緒進行訊息互動。 通過parentPort.postMessage()傳送的訊息在主執行緒中將可以通過worker.on('message')接收。 主執行緒中通過worker.postMessage()傳送的訊息將可以在工作執行緒中通過parentPort.on('message')接收。 我們看一下MessagePort的定義: ~~~js class MessagePort extends EventEmitter { close(): void; postMessage(value: any, transferList?: Array): void; ref(): void; unref(): void; start(): void; addListener(event: "close", listener: () => void): this; addListener(event: "message", listener: (value: any) => void): this; addListener(event: string | symbol, listener: (...args: any[]) => void): this; emit(event: "close"): boolean; emit(event: "message", value: any): boolean; emit(event: string | symbol, ...args: any[]): boolean; on(event: "close", listener: () => void): this; on(event: "message", listener: (value: any) => void): this; on(event: string | symbol, listener: (...args: any[]) => void): this; once(event: "close", listener: () => void): this; once(event: "message", listener: (value: any) => void): this; once(event: string | symbol, listener: (...args: any[]) => void): this; prependListener(event: "close", listener: () => void): this; prependListener(event: "message", listener: (value: any) => void): this; prependListener(event: string | symbol, listener: (...args: any[]) => void): this; prependOnceListener(event: "close", listener: () => void): this; prependOnceListener(event: "message", listener: (value: any) => void): this; prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this; removeListener(event: "close", listener: () => void): this; removeListener(event: "message", listener: (value: any) => void): this; removeListener(event: string | symbol, listener: (...args: any[]) => void): this; off(event: "close", listener: () => void): this; off(event: "message", listener: (value: any) => void): this; off(event: string | symbol, listener: (...args: any[]) => void): this; } ~~~ MessagePort繼承自EventEmitter,它表示的是非同步雙向通訊channel的一端。這個channel就叫做MessageChannel,MessagePort通過MessageChannel來進行通訊。 我們可以通過MessagePort來傳輸結構體資料,記憶體區域或者其他的MessagePorts。 從原始碼中,我們可以看到MessagePort中有兩個事件,close和message。 close事件將會在channel的中任何一端斷開連線的時候觸發,而message事件將會在port.postMessage時候觸發,下面我們看一個例子: ~~~js const { MessageChannel } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); // Prints: // foobar // closed! port2.on('message', (message) => console.log(message)); port2.on('close', () => console.log('closed!')); port1.postMessage('foobar'); port1.close(); ~~~ port.on('message')實際上為message事件添加了一個listener,port還提供了addListener方法來手動新增listener。 port.on('message')會自動觸發port.start()方法,表示啟動一個port。 當port有listener存在的時候,這表示port存在一個ref,當存在ref的時候,程式是不會結束的。我們可以通過呼叫port.unref方法來取消這個ref。 接下來我們看一下怎麼通過port來傳輸訊息: ~~~js port.postMessage(value[, transferList]) ~~~ postMessage可以接受兩個引數,第一個引數是value,這是一個JavaScript物件。第二個引數是transferList。 先看一個傳遞一個引數的情況: ~~~js const { MessageChannel } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); port1.on('message', (message) => console.log(message)); const circularData = {}; circularData.foo = circularData; // Prints: { foo: [Circular] } port2.postMessage(circularData); ~~~ 通常來說postMessage傳送的物件都是value的拷貝,但是如果你指定了transferList,那麼在transferList中的物件將會被transfer到channel的接受端,並且不再存在於傳送端,就好像把物件傳送出去一樣。 transferList是一個list,list中的物件可以是ArrayBuffer, MessagePort 和 FileHandle。 如果value中包含SharedArrayBuffer物件,那麼該物件不能被包含在transferList中。 看一個包含兩個引數的例子: ~~~js const { MessageChannel } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); port1.on('message', (message) => console.log(message)); const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]); // post uint8Array的拷貝: port2.postMessage(uint8Array); port2.postMessage(uint8Array, [ uint8Array.buffer ]); //port2.postMessage(uint8Array); ~~~ 上面的例子將輸出: ~~~js Uint8Array(4) [ 1, 2, 3, 4 ] Uint8Array(4) [ 1, 2, 3, 4 ] ~~~ 第一個postMessage是拷貝,第二個postMessage是transfer Uint8Array底層的buffer。 如果我們再次呼叫port2.postMessage(uint8Array),我們會得到下面的錯誤: ~~~ DOMException [DataCloneError]: An ArrayBuffer is detached and could not be cloned. ~~~ buffer是TypedArray的底層儲存結構,如果buffer被transfer,那麼之前的TypedArray將會變得不可用。 ## markAsUntransferable 要想避免這個問題,我們可以呼叫markAsUntransferable將buffer標記為不可transferable. 我們看一個markAsUntransferable的例子: ~~~js const { MessageChannel, markAsUntransferable } = require('worker_threads'); const pooledBuffer = new ArrayBuffer(8); const typedArray1 = new Uint8Array(pooledBuffer); const typedArray2 = new Float64Array(pooledBuffer); markAsUntransferable(pooledBuffer); const { port1 } = new MessageChannel(); port1.postMessage(typedArray1, [ typedArray1.buffer ]); console.log(typedArray1); console.log(typedArray2); ~~~ ## SHARE_ENV SHARE_ENV是傳遞給worker建構函式的一個env變數,通過設定這個變數,我們可以在主執行緒與工作執行緒進行共享環境變數的讀寫。 ~~~js const { Worker, SHARE_ENV } = require('worker_threads'); new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV }) .on('exit', () => { console.log(process.env.SET_IN_WORKER); // Prints 'foo'. }); ~~~ ## workerData 除了postMessage(),還可以通過在主執行緒中傳遞workerData給worker的建構函式,從而將主執行緒中的資料傳遞給worker: ~~~js const { Worker, isMainThread, workerData } = require('worker_threads'); if (isMainThread) { const worker = new Worker(__filename, { workerData: 'Hello, world!' }); } else { console.log(workerData); // Prints 'Hello, world!'. } ~~~ ## worker類 先看一下worker的定義: ~~~js class Worker extends EventEmitter { readonly stdin: Writable | null; readonly stdout: Readable; readonly stderr: Readable; readonly threadId: number; readonly resourceLimits?: ResourceLimits; constructor(filename: string | URL, options?: WorkerOptions); postMessage(value: any, transferList?: Array): void; ref(): void; unref(): void; terminate(): Promise; getHeapSnapshot(): Promise; addListener(event: "error", listener: (err: Error) => void): this; addListener(event: "exit", listener: (exitCode: number) => void): this; addListener(event: "message", listener: (value: any) => void): this; addListener(event: "online", listener: () => void): this; addListener(event: string | symbol, listener: (...args: any[]) => void): this; ... } ~~~ worker繼承自EventEmitter,並且包含了4個重要的事件:error,exit,message和online。 worker表示的是一個獨立的 JavaScript 執行執行緒,我們可以通過傳遞filename或者URL來構造worker。 每一個worker都有一對內建的MessagePort,在worker建立的時候就會相互關聯。worker使用這對內建的MessagePort來和父執行緒進行通訊。 通過parentPort.postMessage()傳送的訊息在主執行緒中將可以通過worker.on('message')接收。 主執行緒中通過worker.postMessage()傳送的訊息將可以在工作執行緒中通過parentPort.on('message')接收。 當然,你也可以顯式的建立MessageChannel 物件,然後將MessagePort作為訊息傳遞給其他執行緒,我們看一個例子: ~~~js const assert = require('assert'); const { Worker, MessageChannel, MessagePort, isMainThread, parentPort } = require('worker_threads'); if (isMainThread) { const worker = new Worker(__filename); const subChannel = new MessageChannel(); worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]); subChannel.port2.on('message', (value) => { console.log('接收到:', value); }); } else { parentPort.once('message', (value) => { assert(value.hereIsYourPort instanceof MessagePort); value.hereIsYourPort.postMessage('工作執行緒正在傳送此訊息'); value.hereIsYourPort.close(); }); } ~~~ 上面的例子中,我們藉助了worker和parentPort本身的訊息傳遞功能,傳遞了一個顯式的MessageChannel中的MessagePort。 然後又通過該MessagePort來進行訊息的分發。 ## receiveMessageOnPort 除了port的on('message')方法之外,我們還可以使用receiveMessageOnPort來手動接收訊息: ~~~js const { MessageChannel, receiveMessageOnPort } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); port1.postMessage({ hello: 'world' }); console.log(receiveMessageOnPort(port2)); // Prints: { message: { hello: 'world' } } console.log(receiveMessageOnPort(port2)); // Prints: undefined ~~~ ## moveMessagePortToContext 先了解一下nodejs中的Context的概念,我們可以從vm中建立context,它是一個隔離的上下文環境,從而保證不同執行環境的安全性,我們看一個context的例子: ~~~js const vm = require('vm'); const x = 1; const context = { x: 2 }; vm.createContext(context); // 上下文隔離化物件。 const code = 'x += 40; var y = 17;'; // `x` and `y` 是上下文中的全域性變數。 // 最初,x 的值為 2,因為這是 context.x 的值。 vm.runInContext(code, context); console.log(context.x); // 42 console.log(context.y); // 17 console.log(x); // 1; y 沒有定義。 ~~~ 在worker中,我們可以將一個MessagePort move到其他的context中。 ~~~js worker.moveMessagePortToContext(port, contextifiedSandbox) ~~~ 這個方法接收兩個引數,第一個引數就是要move的MessagePort,第二個引數就是vm.createContext()建立的context物件。 # worker_threads的執行緒池 上面我們提到了使用單個的worker thread,但是現在程式中一個執行緒往往是不夠的,我們需要建立一個執行緒池來維護worker thread物件。 nodejs提供了AsyncResource類,來作為對非同步資源的擴充套件。 AsyncResource類是async_hooks模組中的。 下面我們看下怎麼使用AsyncResource類來建立worker的執行緒池。 假設我們有一個task,使用來執行兩個數相加,指令碼名字叫做task_processor.js: ~~~js const { parentPort } = require('worker_threads'); parentPort.on('message', (task) => { parentPort.postMessage(task.a + task.b); }); ~~~ 下面是worker pool的實現: ~~~js const { AsyncResource } = require('async_hooks'); const { EventEmitter } = require('events'); const path = require('path'); const { Worker } = require('worker_threads'); const kTaskInfo = Symbol('kTaskInfo'); const kWorkerFreedEvent = Symbol('kWorkerFreedEvent'); class WorkerPoolTaskInfo extends AsyncResource { constructor(callback) { super('WorkerPoolTaskInfo'); this.callback = callback; } done(err, result) { this.runInAsyncScope(this.callback, null, err, result); this.emitDestroy(); // `TaskInfo`s are used only once. } } class WorkerPool extends EventEmitter { constructor(numThreads) { super(); this.numThreads = numThreads; this.workers = []; this.freeWorkers = []; for (let i = 0; i < numThreads; i++) this.addNewWorker(); } addNewWorker() { const worker = new Worker(path.resolve(__dirname, 'task_processor.js')); worker.on('message', (result) => { // In case of success: Call the callback that was passed to `runTask`, // remove the `TaskInfo` associated with the Worker, and mark it as free // again. worker[kTaskInfo].done(null, result); worker[kTaskInfo] = null; this.freeWorkers.push(worker); this.emit(kWorkerFreedEvent); }); worker.on('error', (err) => { // In case of an uncaught exception: Call the callback that was passed to // `runTask` with the error. if (worker[kTaskInfo]) worker[kTaskInfo].done(err, null); else this.emit('error', err); // Remove the worker from the list and start a new Worker to replace the // current one. this.workers.splice(this.workers.indexOf(worker), 1); this.addNewWorker(); }); this.workers.push(worker); this.freeWorkers.push(worker); this.emit(kWorkerFreedEvent); } runTask(task, callback) { if (this.freeWorkers.length === 0) { // No free threads, wait until a worker thread becomes free. this.once(kWorkerFreedEvent, () => this.runTask(task, callback)); return; } const worker = this.freeWorkers.pop(); worker[kTaskInfo] = new WorkerPoolTaskInfo(callback); worker.postMessage(task); } close() { for (const worker of this.workers) worker.terminate(); } } module.exports = WorkerPool; ~~~ 我們給worker建立了一個新的kTaskInfo屬性,並且將非同步的callback封裝到WorkerPoolTaskInfo中,賦值給worker.kTaskInfo. 接下來我們就可以使用workerPool了: ~~~js const WorkerPool = require('./worker_pool.js'); const os = require('os'); const pool = new WorkerPool(os.cpus().length); let finished = 0; for (let i = 0; i < 10; i++) { pool.runTask({ a: 42, b: 100 }, (err, result) => { console.log(i, err, result); if (++finished === 10) pool.close(); }); } ~~~ > 本文作者:flydean程式那些事 > > 本文連結:[http://www.flydean.com/nodejs-worker-thread/](http://www.flydean.com/nodejs-worker-thread/) > > 本文來源:flydean的部落格 > > 歡迎關注我的公眾號:「程式那些事」最通俗的解讀,最深刻的乾貨,最簡潔的教程,眾多你不知道的小技巧等你來