1. 程式人生 > 程式設計 >node.js中stream流中可讀流和可寫流的實現與使用方法例項分析

node.js中stream流中可讀流和可寫流的實現與使用方法例項分析

本文例項講述了node.js中stream流中可讀流和可寫流的實現與使用方法。分享給大家供大家參考,具體如下:

node.js中的流 stream 是處理流式資料的抽象介面。node.js 提供了很多流物件,像http中的request和response,和 process.stdout 都是流的例項。

流可以是 可讀的,可寫的,或是可讀可寫的。所有流都是 events 的例項。

一、流的型別

node.js中有四種基本流型別:

1、Writable 可寫流 (例:fs.createWriteStream() )

2、Readable 可讀流 (例:fs.createReadStream() )

3、Duplex 可讀又可寫流 (例:net.Socket )

4、Transform 讀寫過程中可修改或轉換資料的 Duplex 流 (例:zlib.createDeflate() )

二、流中的資料有兩種模式

1、二進位制模式,都是 string字串 和 Buffer。

2、物件模式,流內部處理的是一系統普通物件。

三、可讀流的兩種模式

1、流動模式 ( flowing ) ,資料自動從系統底層讀取,並通過事件,儘可能快地提供給應用程式。

2、暫停模式 ( paused ),必須顯式的呼叫 read() 讀取資料。

可讀流 都開始於暫停模式,可以通過如下方法切換到流動模式:

1、新增 'data' 事件回撥。

2、呼叫 resume()。

3、呼叫 pipe()。

可讀流通過如下方法切換回暫停模式:

1、如果沒有管道目標,呼叫 pause()。

2、如果有管道目標,移除所有管道目標,呼叫 unpipe() 移除多個管道目標。

四、建立可讀流,並監聽事件

const fs = require('fs');
//建立一個檔案可讀流
let rs = fs.createReadStream('./1.txt',{
  //檔案系統標誌
  flags: 'r',//資料編碼,如果調置了該引數,則讀取的資料會自動解析
  //如果沒調置,則讀取的資料會是 Buffer
  //也可以通過 rs.setEncoding() 進行設定
  encoding: 'utf8',//檔案描述符,預設為null
  fd: null,//檔案許可權
  mode: 0o666,//檔案讀取的開始位置
  start: 0,//檔案讀取的結束位置(包括結束位置)
  end: Infinity,//讀取緩衝區的大小,預設64K
  highWaterMark: 3
});
//檔案被開啟時觸發
rs.on('open',function () {
  console.log('檔案開啟');
});
//監聽data事件,會讓當前流切換到流動模式
//當流中將資料傳給消費者後觸發
//由於我們在上面配置了 highWaterMark 為 3位元組,所以下面會列印多次。
rs.on('data',function (data) {
  console.log(data);
});
//流中沒有資料可供消費者時觸發
rs.on('end',function () {
  console.log('資料讀取完畢');
});
//讀取資料出錯時觸發
rs.on('error',function () {
  console.log('讀取錯誤');
});
//當檔案被關閉時觸發
rs.on('close',function () {
  console.log('檔案關閉');
});

注意,'open' 和 'close' 事件並不是所有流都會觸發。

當們監聽'data'事件後,系統會盡可能快的讀取出資料。但有時候,我們需要暫停一下流的讀取,操作其他事情。

這時候就需要用到 pause() 和 resume() 方法。

const fs = require('fs');
//建立一個檔案可讀流
let rs = fs.createReadStream('./1.txt',{
  highWaterMark: 3
});
rs.on('data',function (data) {
  console.log(`讀取了 ${data.length} 位元組資料 : ${data.toString()}`);
  //使流動模式的流停止觸發'data'事件,切換出流動模式,資料都會保留在內部快取中。
  rs.pause();
  //等待3秒後,再恢復觸發'data'事件,將流切換回流動模式。
  setTimeout(function () {
    rs.resume();
  },3000);
});

可讀流的 'readable' 事件,當流中有資料可供讀取時就觸發。

注意當監聽 'readable' 事件後,會導致流停止流動,需呼叫 read() 方法讀取資料。

注意 on('data'),on('readable'),pipe() 不要混合使用,會導致不明確的行為。

const fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
  highWaterMark: 1
});
//當流中有資料可供讀取時就觸發
rs.on('readable',function () {
  let data;
  //迴圈讀取資料
  //引數表示要讀取的位元組數
  //如果可讀的資料不足位元組數,則返回緩衝區剩餘資料
  //如是沒有指定位元組數,則返回緩衝區中所有資料
  while (data = rs.read()) {
    console.log(`讀取到 ${data.length} 位元組資料`);
    console.log(data.toString());
  }
});

五、建立可寫流,並監聽事件

const fs = require('fs');
//建立一個檔案可寫流
let ws = fs.createWriteStream('./1.txt',{
  highWaterMark: 3
});
//往流中寫入資料
//引數一表示要寫入的資料
//引數二表示編碼方式
//引數三表示寫入成功的回撥
//緩衝區滿時返回false,未滿時返回true。
//由於上面我們設定的緩衝區大小為 3位元組,所以到寫入第3個時,就返回了false。
console.log(ws.write('1','utf8'));
console.log(ws.write('2','utf8'));
console.log(ws.write('3','utf8'));
console.log(ws.write('4','utf8'));
function writeData() {
  let cnt = 9;
  return function () {
    let flag = true;
    while (cnt && flag) {
      flag = ws.write(`${cnt}`);
      console.log('緩衝區中寫入的位元組數',ws.writableLength);
      cnt--;
    }
  };
}
let wd = writeData();
wd();
//當緩衝區中的資料滿的時候,應停止寫入資料,
//一旦緩衝區中的資料寫入檔案了,並清空了,則會觸發 'drain' 事件,告訴生產者可以繼續寫資料了。
ws.on('drain',function () {
  console.log('可以繼續寫資料了');
  console.log('緩衝區中寫入的位元組數',ws.writableLength);
  wd();
});
//當流或底層資源關閉時觸發
ws.on('close',function () {
  console.log('檔案被關閉');
});
//當寫入資料出錯時觸發
ws.on('error',function () {
  console.log('寫入資料錯誤');
});

寫入流的 end() 方法 和 'finish' 事件監聽

const fs = require('fs');
//建立一個檔案可寫流
let ws = fs.createWriteStream('./1.txt','utf8'));
//呼叫end()表明已經沒有資料要被寫入,在關閉流之前再寫一塊資料。
//如果傳入了回撥函式,則將作為 'finish' 事件的回撥函式
ws.end('最後一點資料','utf8');
//呼叫 end() 且緩衝區資料都已傳給底層系統時觸發
ws.on('finish',function () {
  console.log('寫入完成');
});

寫入流的 cork() 和 uncork() 方法,主要是為了解決大量小塊資料寫入時,內部緩衝可能失效,導致的效能下降。

const fs = require('fs');
let ws = fs.createWriteStream('./1.txt',{
  highWaterMark: 1
});
//呼叫 cork() 後,會強制把所有寫入的資料緩衝到記憶體中。
//不會因為寫入的資料超過了 highWaterMark 的設定而寫入到檔案中。
ws.cork();
ws.write('1');
console.log(ws.writableLength);
ws.write('2');
console.log(ws.writableLength);
ws.write('3');
console.log(ws.writableLength);
//將呼叫 cork() 後的緩衝資料都輸出到目標,也就是寫入檔案中。
ws.uncork();

注意 cork() 的呼叫次數要與 uncork() 一致。

const fs = require('fs');
let ws = fs.createWriteStream('./1.txt',{
  highWaterMark: 1
});
//呼叫一次 cork() 就應該寫一次 uncork(),兩者要一一對應。
ws.cork();
ws.write('4');
ws.write('5');
ws.cork();
ws.write('6');
process.nextTick(function () {
  //注意這裡只調用了一次 uncork()
  ws.uncork();
  //只有呼叫同樣次數的 uncork() 資料才會被輸出。
  ws.uncork();
});

六、可讀流的 pipe() 方法

pipe() 方法類似下面的程式碼,在可讀流與可寫流之前架起一座橋樑。

const fs = require('fs');
//建立一個可讀流
let rs = fs.createReadStream('./1.txt',{
  highWaterMark: 3
});
//建立一個可寫流
let ws = fs.createWriteStream('./2.txt',function (data) {
  let flag = ws.write(data);
  console.log(`往可寫流中寫入 ${data.length} 位元組資料`);
  //如果寫入緩衝區已滿,則暫停可讀流的讀取
  if (!flag) {
    rs.pause();
    console.log('暫停可讀流');
  }
});
//監控可讀流資料是否讀完
rs.on('end',function () {
  console.log('資料已讀完');
  //如果可讀流讀完了,則呼叫 end() 表示可寫流已寫入完成
  ws.end();
});
//如果可寫流緩衝區已清空,可以再次寫入,則重新開啟可讀流
ws.on('drain',function () {
  rs.resume();
  console.log('重新開啟可讀流');
});

我們用 pipe() 方法完成上面的功能。

const fs = require('fs');
//建立一個可讀流
let rs = fs.createReadStream('./1.txt',{
  highWaterMark: 3
});
let ws2 = fs.createWriteStream('./3.txt',{
  highWaterMark: 3
});
//繫結可寫流到可讀流,自動將可讀流切換到流動模式,將可讀流的所有資料推送到可寫流。
rs.pipe(ws);
//可以繫結多個可寫流
rs.pipe(ws2);

我們也可以用 unpipe() 手動的解綁可寫流。

const fs = require('fs');
//建立一個可讀流
let rs = fs.createReadStream('./1.txt',{
  highWaterMark: 3
});
rs.pipe(ws);
rs.pipe(ws2);
//解綁可寫流,如果引數沒寫,則解綁所有管道
setTimeout(function () {
  rs.unpipe(ws2);
},0);

希望本文所述對大家node.js程式設計有所幫助。