Stream API を実装する方法

Node.js の Stream API を使うことで、メモリを大量に消費するような処理(例えば巨大なファイルの読み込みと変換、そして書き込み等)を細切れにできます。これはメモリを効率的に使えるだけでなく、Node.jsのイベントループのブロッキング時間を短くし、ノンブロッキングI/Oの強みをより活かすことができるということを意味します。つまり、巨大データの大量処理等においてもシステムの全体的な処理を比較的円滑に行えるということです。

Stream API には主に Readable(読込), Writable(書込), Duplex(読込+書込), Transform(変換。これもDuplexの一種) の4つの根底クラスが存在しますが、ここではそれぞれの細かい説明は省略します。より詳しい説明はNode.jsの本家サイトの方をご覧ください。

今回は実際に、簡単な自作クラス(Readable派生クラスとTransform派生クラス)を作り、Stream API の実装方法をざっくりと理解してみたいと思います。

目次

前提

  • Node.jsバージョン: v8.11.3

サンプル処理フロー

今回は以下のような処理フローを Stream で実行してみます:

  1. テキトーなオブジェクトを生成 (ObjectGeneratorという自作のReadable実装)
  2. オブジェクトを文字列化 (Stringifierという自作のTransform実装)
  3. データをgzip圧縮※ (zlibTransform実装)
  4. データをログ出力 (ProcessLoggerという自作のTransform実装)
  5. データをgunzip解凍※ (zlibTransform実装)
  6. データを標準出力 (process.stdout=Writable実装)

最初はReadableの実装、途中はTransformの実装、最後はWritableの実装(process.stdout=標準出力)です。 この後紹介するサンプルコード上では、処理を全てpipe()で接続することにします。

※gzip圧縮&解凍も Stream で効率的に行えるということを伝えるため、圧縮したファイルを再び解凍するという無駄な処理も途中に挟んでいます。

サンプルコード

次のサンプルコード内では、ObjectGeneratorクラス(Readableの派生)、Stringifier(Transformの派生)、ProcessLogger(Transformの派生)という3つの派生クラスを作成しています。また、zlib.createGzip()zlib.createGunzip()でgzipの圧縮と解凍処理を行うTransform派生のインスタンスも生成し利用しています:

const fs = require("fs");
const zlib = require("zlib");
const {
  Readable,
  // Writable,
  // Duplex,
  Transform,
} = require("stream");

/**
 * テキトーなオブジェクトを生成するReadable
 */
class ObjectGenerator extends Readable {
  constructor(max) {
    super({
      objectMode: true, // オブジェクトモードをON!
    });
    this.state = 0;
    this.max = max;
  }
  /**
   * テキトーなオブジェクトを数個生成して次のストリームへ渡す
   */
  _read(size) {
    // console.log("[Readable] ObjectGenerator read size:", size);
    const obj = { "n": this.state };
    console.log("[Readable] ObjectGenerator generates new object:", obj);
    this.push(obj)
    this.state += 1;
    if (this.state === this.max) {
      this.push(null);
    }
  }
}

/**
 * オブジェクトをログ標準出力するTransform
 */
class ProcessLogger extends Transform {
  constructor() {
    super({ objectMode: true });
  }
  /**
   * 受け取ったデータchunk(objectModeなのでオブジェクト単位)をログ(標準出力)
   * ※受け取ったデータchunk自体は何も加工(transfrom)せずにそのまま次のstreamへ渡す
   */
  _transform(chunk, encoding, callback) {
    console.log("[Transform] ProcessLogger console output:", chunk);
    this.push(chunk);
    callback();
  }
}

/**
 * オブジェクトを文字列化するTransform
 */
class Stringifier extends Transform {
  constructor() {
    super({ objectMode: true });
  }
  /**
   * 受け取ったデータchunk(objectModeなのでオブジェクト単位)を文字列化して次のstreamへ渡す
   */
  _transform(chunk, encoding, callback) {
    const str = JSON.stringify(chunk);
    console.log("[Transform] Stringifier stringifies an object:", str);
    this.push(str);
    callback();
  }
}

// それぞれのstreamクラスのインスタンスを生成:
const objectGenerator = new ObjectGenerator(5);
const stringifier = new Stringifier();
const processLogger = new ProcessLogger();
const gzip = zlib.createGzip();
const gunzip = zlib.createGunzip();

// それぞれのインスタンスをpipeで繋げて実行:
objectGenerator // 0. オブジェクトを生成 (Readable)
  .pipe(stringifier) // 1. 生成されたオブジェクトを文字列化 (Tranform)
  .pipe(gzip) // 2. gzip圧縮 (Tranform)
  .pipe(processLogger) // 3. 途中経過を無駄にログ出力 (Tranform)
  .pipe(gunzip) // 4. gzipを解凍 (Tranform)
  .pipe(process.stdout); // 5. 標準出力 (Writable)

実行結果

先ほどのサンプルコードを実行すると以下のようにログ出力(標準出力)されます:

[Readable] ObjectGenerator generates new object: { n: 0 }
[Readable] ObjectGenerator generates new object: { n: 1 }
[Transform] Stringifier stringifies an object: {"n":0}
[Readable] ObjectGenerator generates new object: { n: 2 }
[Transform] Stringifier stringifies an object: {"n":1}
[Readable] ObjectGenerator generates new object: { n: 3 }
[Transform] Stringifier stringifies an object: {"n":2}
[Readable] ObjectGenerator generates new object: { n: 4 }
[Transform] Stringifier stringifies an object: {"n":3}
[Transform] Stringifier stringifies an object: {"n":4}
[Transform] ProcessLogger console output: <Buffer 1f 8b 08 00 00 00 00 00 00 03>
[Transform] ProcessLogger console output: <Buffer ab 56 ca 53 b2 32 a8 ad 06 51 86 10 ca 08 42 19 43 28 93 5a 00 07 5e f8 11 23 00 00 00>
{"n":0}{"n":1}{"n":2}{"n":3}{"n":4}

ログ出力を見て分かるように、最初にObjectGeneratorがテキトーなオブジェクトを生成し、次のストリームStringifierで文字列化され、gzip圧縮された後にProcessLoggerによりログが標準出力され、再びgunzip解凍された後に最後に標準出力されています。

また、それぞれの処理は細切れに(オブジェクトモードobjectMode: trueなのでオブジェクト毎に)処理されていることも見て取れます。

以上、Stream API のイメージを掴むための自分向けのメモでした。

オススメ:Zattoyomiで時事ネタチェックの時間節約!
芽萌丸プログラミング部 @programming
プログラミング関連アカウント。Web標準技術を中心に書いていきます。フロントエンドからサーバサイドまで JavaScript だけで済ませたい人たちの集いです。記事は主に @TanakaSoftwareLab が担当。