Stream API を実装する方法
Node.js の Stream API を使うことで、メモリを大量に消費するような処理(例えば巨大なファイルの読み込みと変換、そして書き込み等)を細切れにできます。これはメモリを効率的に使えるだけでなく、Node.jsのイベントループのブロッキング時間を短くし、ノンブロッキングI/Oの強みをより活かすことができるということを意味します。つまり、巨大データの大量処理等においてもシステムの全体的な処理を比較的円滑に行えるということです。
Stream API には主にReadable
(読込),Writable
(書込),Duplex
(読込+書込),Transform
(変換。これもDuplex
の一種) の4つの根底クラスが存在しますが、ここではそれぞれの細かい説明は省略します。より詳しい説明はNode.jsの本家サイトの方をご覧ください。
今回は実際に、簡単な自作クラス(Readable
派生クラスとTransform
派生クラス)を作り、Stream API の実装方法をざっくりと理解してみたいと思います。
目次
前提
- Node.jsバージョン: v8.11.3
サンプル処理フロー
今回は以下のような処理フローを Stream で実行してみます:
- テキトーなオブジェクトを生成 (
ObjectGenerator
という自作のReadable
実装) - オブジェクトを文字列化 (
Stringifier
という自作のTransform
実装) - データをgzip圧縮※ (
zlib
のTransform
実装) - データをログ出力 (
ProcessLogger
という自作のTransform
実装) - データをgunzip解凍※ (
zlib
のTransform
実装) - データを標準出力 (
process.stdout
=Writable
実装)
最初はReadable
の実装、途中はTransform
の実装、最後はWritable
の実装(process.stdout
=標準出力)です。 この後紹介するサンプルコード上では、処理を全てpipe()
で接続することにします。
※gzip圧縮&解凍も Stream で効率的に行えるということを伝えるため、圧縮したファイルを再び解凍するという無駄な処理も途中に挟んでいます。
サンプルコード
次のサンプルコード内では、ObjectGenerator
クラス(Readable
の派生)、Stringifier
(Transform
の派生)、ProcessLogger
(Transform
の派生)という3つの派生クラスを作成しています。また、zlib.createGzip()
とzlib.createGunzip()
でgzipの圧縮と解凍処理を行うTransform
派生のインスタンスも生成し利用しています:
const fs = require("fs");
const zlib = require("zlib");
const {
Readable,
// Writable,
// Duplex,
Transform,
} = require("stream");
/**
* テキトーなオブジェクトを生成するReadable
*/
class ObjectGenerator extends Readable {
constructor(max) {
super({
objectMode: true, // オブジェクトモードをON!
});
this.state = 0;
this.max = max;
}
/**
* テキトーなオブジェクトを数個生成して次のストリームへ渡す
*/
_read(size) {
// console.log("[Readable] ObjectGenerator read size:", size);
const obj = { "n": this.state };
console.log("[Readable] ObjectGenerator generates new object:", obj);
this.push(obj)
this.state += 1;
if (this.state === this.max) {
this.push(null);
}
}
}
/**
* オブジェクトをログ標準出力するTransform
*/
class ProcessLogger extends Transform {
constructor() {
super({ objectMode: true });
}
/**
* 受け取ったデータchunk(objectModeなのでオブジェクト単位)をログ(標準出力)
* ※受け取ったデータchunk自体は何も加工(transfrom)せずにそのまま次のstreamへ渡す
*/
_transform(chunk, encoding, callback) {
console.log("[Transform] ProcessLogger console output:", chunk);
this.push(chunk);
callback();
}
}
/**
* オブジェクトを文字列化するTransform
*/
class Stringifier extends Transform {
constructor() {
super({ objectMode: true });
}
/**
* 受け取ったデータchunk(objectModeなのでオブジェクト単位)を文字列化して次のstreamへ渡す
*/
_transform(chunk, encoding, callback) {
const str = JSON.stringify(chunk);
console.log("[Transform] Stringifier stringifies an object:", str);
this.push(str);
callback();
}
}
// それぞれのstreamクラスのインスタンスを生成:
const objectGenerator = new ObjectGenerator(5);
const stringifier = new Stringifier();
const processLogger = new ProcessLogger();
const gzip = zlib.createGzip();
const gunzip = zlib.createGunzip();
// それぞれのインスタンスをpipeで繋げて実行:
objectGenerator // 0. オブジェクトを生成 (Readable)
.pipe(stringifier) // 1. 生成されたオブジェクトを文字列化 (Tranform)
.pipe(gzip) // 2. gzip圧縮 (Tranform)
.pipe(processLogger) // 3. 途中経過を無駄にログ出力 (Tranform)
.pipe(gunzip) // 4. gzipを解凍 (Tranform)
.pipe(process.stdout); // 5. 標準出力 (Writable)
実行結果
先ほどのサンプルコードを実行すると以下のようにログ出力(標準出力)されます:
[Readable] ObjectGenerator generates new object: { n: 0 }
[Readable] ObjectGenerator generates new object: { n: 1 }
[Transform] Stringifier stringifies an object: {"n":0}
[Readable] ObjectGenerator generates new object: { n: 2 }
[Transform] Stringifier stringifies an object: {"n":1}
[Readable] ObjectGenerator generates new object: { n: 3 }
[Transform] Stringifier stringifies an object: {"n":2}
[Readable] ObjectGenerator generates new object: { n: 4 }
[Transform] Stringifier stringifies an object: {"n":3}
[Transform] Stringifier stringifies an object: {"n":4}
[Transform] ProcessLogger console output: <Buffer 1f 8b 08 00 00 00 00 00 00 03>
[Transform] ProcessLogger console output: <Buffer ab 56 ca 53 b2 32 a8 ad 06 51 86 10 ca 08 42 19 43 28 93 5a 00 07 5e f8 11 23 00 00 00>
{"n":0}{"n":1}{"n":2}{"n":3}{"n":4}
ログ出力を見て分かるように、最初にObjectGenerator
がテキトーなオブジェクトを生成し、次のストリームStringifier
で文字列化され、gzip圧縮された後にProcessLogger
によりログが標準出力され、再びgunzip解凍された後に最後に標準出力されています。
また、それぞれの処理は細切れに(オブジェクトモードobjectMode: true
なのでオブジェクト毎に)処理されていることも見て取れます。
以上、Stream API のイメージを掴むための自分向けのメモでした。