node-mysqlとStreamで大量のデータを効率的に処理

node.jsのStreamを使えばメモリを節約することができます。Streamは大量のデータを扱う場面で特に威力を発揮します。

例えば、データベースの大量データの処理で考えてみましょう。データベースから膨大なデータを全て読み込んで何かしらの処理を行いたい場合、一度に全てを読み込むとメモリを圧迫してしまいます。たとえ一定レコード数づつページングしながら読み込んだとしても場合によっては同じでしょう。このような場面にStreamが活躍します。

目次

サンプル

自作Stream (Transform = 変換用ストリーム) を2つ作り、node-mysqlとpipe接続するサンプルです。このサンプルは、次のように動きます:

  1. mysqlからStreamでレコード取得
  2. 1つ目のTransformで取得したレコードのidカラム(数値)を2倍し、次のStreamへ渡す
  3. 2つ目のTransformで渡されたデータをJSON文字列化し、次のStreamへ渡す
  4. 渡された文字列を標準出力

前提

  • node v8.11.3
  • mysql v5.7.27

下準備

MySQLデータベースにテスト用テーブルを作成します:

mysql> \r test
mysql> 
CREATE TABLE `hugetable` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB;
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
mysql> select * from hugetable;
+----+
| id |
+----+
|  1 |
|  2 |
|  3 |
|  4 |
|  5 |
|  6 |
|  7 |
|  8 |
|  9 |
+----+

node-mysqlモジュールのインストール:

$ npm install --save mysql

サンプルコード

pipe-test.js

const stream = require('stream');
const mysql = require('mysql');

// DBコネクションプールを取得
// TODO: 環境に合わせてDB設定を変更してください
const pool = mysql.createPool({
  host: 'localhost',
  user: 'dbuser',
  password: 'dbpass',
  database: 'test',
  connectionLimit: 5,
});

// DBから取得したレコードのidカラム(数値)をx2するトランスフォーム
const x2Trans = stream.Transform({
  objectMode: true,
  highWaterMark: 1,
  transform: function(data, encoding, next) {
    // idカラムの値を2倍して次のstreamに渡す
    data.id *= 2;
    this.push(data);
    next();
  },
});

// データを文字列化するトランスフォーム
// このトランスフォームで渡したいデータを文字列化します。
// NOTE: このトランスフォームが次に渡す process.stdout は文字列かBufferしか受け付けません!
const stringifyTrans = stream.Transform({
  objectMode: true,
  highWaterMark: 1,
  transform: function(data, encoding, next) {
    // JSON文字列化して次のstreamに渡す
    // NOTE: 重い処理風にsetTimeoutでわざと処理を遅延させています。
    const _this = this;
    setTimeout(() => {
      _this.push(JSON.stringify(data));
      next();
    }, 100);
  },
});

// SQLを実行し、各処理をpipe接続:
const query = pool.query('SELECT * FROM `hugetable` ORDER BY `id`');
query
  .stream({ highWaterMark: 1 })
  .pipe(x2Trans) // 1) DBからデータを取得し、idカラムをx2する
  .on('finish', function() { // DBからの読込終了を検知
    console.log('DBから全件読込完了!');
    pool.end(); // コネクションを閉じる
  })
  .pipe(stringifyTrans) // 2) データをJSON文字列化
  .pipe(process.stdout); // 3) 標準出力に表示

コードレビュー

pipe接続:
pipe()でStreamな処理(x2Trans,stringifyTrans,process.stdout)を繋いでいる部分がポイントです。 (process.stdoutもまたStreamの一種(Writable Stream)です)

StreamからStreamへの値の引き渡し:
transform: function(data, encoding, next) {...}な関数内でthis.push(data)することで次のStreamへ値を引き渡すことができます。

ObjectMode:
node-mysqlのquery.stream()ObjectMode なStreamを返します。そのため、process.stdout(StringかBufferのデータしか受け付けない)にそのままデータを渡してもTypeError: Invalid data, chunk must be a string or buffer, not objectなエラーが発生します。 自作TransformではobjectMode: trueとすることで、node-mysqlから渡ってきたオブジェクトをそのまま取り扱えるようにしています。

実行結果

$ node pipe-test.js
{"id":2}{"id":4}{"id":6}{"id":8}{"id":10}{"id":12}{"id":14}DBから全件読込完了!
{"id":16}{"id":18}

各レコードのidが2倍になって標準出力されていることが分かります。

TIP: 全てのレコードJSON文字列が表示される前にDBから全件読込完了!が表示されています。これはstringifyTrans以降の処理(意図的に重い処理風にしてある)よりも先にDBのレコードを全件読込完了したからです。

参考

芽萌丸プログラミング部 @programming
プログラミング関連アカウント。Web標準技術を中心に書いていきます。フロントエンドからサーバサイドまで JavaScript だけで済ませたい人たちの集いです。