node-mysqlとStreamで大量のデータを効率的に処理の画像
芽萌丸プログラミング部 @programming
投稿日 2019/09/12
更新日 2020/06/30 ✏

Node.js

node-mysqlとStreamで大量のデータを効率的に処理

node.jsのStreamを使えばメモリを節約することができます。Streamは大量のデータを扱う場面で特に威力を発揮します。

例えば、データベースの大量データの処理で考えてみましょう。データベースから膨大なデータを全て読み込んで何かしらの処理を行いたい場合、一度に全てを読み込むとメモリを圧迫してしまいます。たとえ一定レコード数づつページングしながら読み込んだとしても場合によっては同じでしょう。このような場面にStreamが活躍します。

というわけで今回は、StreamでDBを操作するためのサンプルコードをご紹介します。

目次

前提

  • node v8.11.3
  • mysql v5.7.27

下準備

MySQLデータベースにテスト用テーブルを作成します:

mysql> \r test
mysql> 
CREATE TABLE `hugetable` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB;
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
mysql> select * from hugetable;
+----+
| id |
+----+
|  1 |
|  2 |
|  3 |
|  4 |
|  5 |
|  6 |
|  7 |
|  8 |
|  9 |
+----+

node-mysqlモジュールのインストール:

$ npm install --save mysql

サンプル

サンプルコードその1

DBから1行づつデータを読み込み順番に処理します。

one-by-one-test.js

const stream = require('stream');
const mysql = require('mysql');

// DBコネクションプールを取得
// TODO: 環境に合わせてDB設定を変更してください
const pool = mysql.createPool({
  host: 'localhost',
  user: 'dbuser',
  password: 'dbpass',
  database: 'test',
  connectionLimit: 5,
});

// コネクションを取得
pool.getConnection((err, conn) => {
  if (err) throw err;
  // SQLを実行し、各処理をpipe接続:
  const query = conn.query('SELECT * FROM `hugetable` ORDER BY `id`');
  query
    .on('result', function(data) {
      console.log("result", data);
      conn.pause();
      console.log("long process ...");
      setTimeout(() => {
        console.log("long process resume!");
        conn.resume();
      }, 1000);
    })
    .on('end', function() { // DBからの読込終了を検知
      // readable streamが完了した時に呼ばれます:
      console.log('end: DBから全件読込完了');
      pool.end(); // コネクションを閉じる
    })
});

コードレビュー

処理の一時停止と再開
node-mysql APIの「一時停止」conn.pause()と「再開」conn.resume()を使ってデータを1行づつ順番に処理します。 この2つのAPIは必ずresultイベントハンドラ内で使います。dataイベントハンドラ内ではないことに注意!

endイベント
DBの読み込みが全件完了するとendイベント(readableストリームの終了イベント)が呼び出されます。

実行結果

$ node one-by-one-test.js

result RowDataPacket { id: 1 }
long process ...
long process resume!
result RowDataPacket { id: 2 }
long process ...
long process resume!
result RowDataPacket { id: 3 }
long process ...
long process resume!
result RowDataPacket { id: 4 }
long process ...
long process resume!
result RowDataPacket { id: 5 }
long process ...
long process resume!
result RowDataPacket { id: 6 }
long process ...
long process resume!
result RowDataPacket { id: 7 }
long process ...
long process resume!
result RowDataPacket { id: 8 }
long process ...
long process resume!
result RowDataPacket { id: 9 }
long process ...
long process resume!
end: DBから全件読込完了

サンプルコードその2

自作Stream (Transform = 変換用ストリーム) を2つ作り、node-mysqlとpipe接続するサンプルです。このサンプルは、次のように動きます:

  1. mysqlからStreamでレコード取得
  2. 1つ目のTransformで取得したレコードのidカラム(数値)を2倍し、次のStreamへ渡す
  3. 2つ目のTransformで渡されたデータをJSON文字列化し、次のStreamへ渡す
  4. 渡された文字列を標準出力

pipe-test.js

const stream = require('stream');
const mysql = require('mysql');

// DBコネクションプールを取得
// TODO: 環境に合わせてDB設定を変更してください
const pool = mysql.createPool({
  host: 'localhost',
  user: 'dbuser',
  password: 'dbpass',
  database: 'test',
  connectionLimit: 5,
});

// DBから取得したレコードのidカラム(数値)をx2するトランスフォーム
const x2Trans = stream.Transform({
  objectMode: true,
  highWaterMark: 1,
  transform: function(data, encoding, next) {
    // idカラムの値を2倍して次のstreamに渡す
    data.id *= 2;
    this.push(data);
    next();
  },
});

// データを文字列化するトランスフォーム
// このトランスフォームで渡したいデータを文字列化します。
// NOTE: このトランスフォームが次に渡す process.stdout は文字列かBufferしか受け付けません!
const stringifyTrans = stream.Transform({
  objectMode: true,
  highWaterMark: 1,
  transform: function(data, encoding, next) {
    // JSON文字列化して次のstreamに渡す
    // NOTE: 重い処理風にsetTimeoutでわざと処理を遅延させています。
    const _this = this;
    setTimeout(() => {
      _this.push(JSON.stringify(data));
      next();
    }, 100);
  },
});

// SQLを実行し、各処理をpipe接続:
const query = pool.query('SELECT * FROM `hugetable` ORDER BY `id`');
query
  .stream({ highWaterMark: 1 })
  .on('end', function() { // DBからの読込終了を検知
    // readable streamが完了した時に呼ばれます:
    console.log('end: DBから全件読込完了');
    pool.end(); // コネクションを閉じる
  })
  .pipe(x2Trans) // 1) DBからデータを取得し、idカラムをx2する
  .on('finish', function() {
    // writable streamが完了した時に呼ばれます:
    console.log('finish: x2Trans');
  })
  .pipe(stringifyTrans) // 2) データをJSON文字列化
  .on('finish', function() {
    // writable streamが完了した時に呼ばれます:
    console.log('finish: stringifyTrans');
  })
  .pipe(process.stdout) // 3) 標準出力に表示

コードレビュー

pipe接続:
pipe()でStreamな処理(x2Trans,stringifyTrans,process.stdout)を繋いでいる部分がポイントです。 (process.stdoutもまたStreamの一種(Writable Stream)です)

StreamからStreamへの値の引き渡し:
transform: function(data, encoding, next) {...}な関数内でthis.push(data)することで次のStreamへ値を引き渡すことができます。

ObjectMode:
node-mysqlのquery.stream()ObjectMode なStreamを返します。そのため、process.stdout(StringかBufferのデータしか受け付けない)にそのままデータを渡してもTypeError: Invalid data, chunk must be a string or buffer, not objectなエラーが発生します。 自作TransformではobjectMode: trueとすることで、node-mysqlから渡ってきたオブジェクトをそのまま取り扱えるようにしています。

finishイベントとendイベント
finishイベントはwritableストリームが完了した時に、そしてendイベントはreadableストリームが完了した時に呼び出されます。

実行結果

$ node pipe-test.js

{"id":2}{"id":4}{"id":6}{"id":8}{"id":10}{"id":12}{"id":14}end: DBから全件読込完了
finish: x2Trans
{"id":16}{"id":18}finish: stringifyTrans

各レコードのidが2倍になって標準出力されていることが分かります。

TIP: 全てのレコードJSON文字列が表示される前にDBから全件読込完了!が表示されています。これはstringifyTrans以降の処理(意図的に重い処理風にしてある)よりも先にDBのレコードを全件読込完了したからです。

参考


芽萌丸プログラミング部
芽萌丸プログラミング部 @programming
プログラミング関連アカウント。Web標準技術を中心に書いていきます。フロントエンドからサーバサイドまで JavaScript だけで済ませたい人たちの集いです。記事は主に @TanakaSoftwareLab が担当。