node-mysqlとStreamで大量のデータを効率的に処理
node.jsのStreamを使えばメモリを節約することができます。Streamは大量のデータを扱う場面で特に威力を発揮します。
例えば、データベースの大量データの処理で考えてみましょう。データベースから膨大なデータを全て読み込んで何かしらの処理を行いたい場合、一度に全てを読み込むとメモリを圧迫してしまいます。たとえ一定レコード数づつページングしながら読み込んだとしても場合によっては同じでしょう。このような場面にStreamが活躍します。
というわけで今回は、StreamでDBを操作するためのサンプルコードをご紹介します。
目次
前提
- node v8.11.3
- mysql v5.7.27
下準備
MySQLデータベースにテスト用テーブルを作成します:
mysql> \r test
mysql>
CREATE TABLE `hugetable` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
INSERT INTO `hugetable`() VALUES();
mysql> select * from hugetable;
+----+
| id |
+----+
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
+----+
node-mysqlモジュールのインストール:
$ npm install --save mysql
サンプル
サンプルコードその1
DBから1行づつデータを読み込み順番に処理します。
one-by-one-test.js
const stream = require('stream');
const mysql = require('mysql');
// DBコネクションプールを取得
// TODO: 環境に合わせてDB設定を変更してください
const pool = mysql.createPool({
host: 'localhost',
user: 'dbuser',
password: 'dbpass',
database: 'test',
connectionLimit: 5,
});
// コネクションを取得
pool.getConnection((err, conn) => {
if (err) throw err;
// SQLを実行し、各処理をpipe接続:
const query = conn.query('SELECT * FROM `hugetable` ORDER BY `id`');
query
.on('result', function(data) {
console.log("result", data);
conn.pause();
console.log("long process ...");
setTimeout(() => {
console.log("long process resume!");
conn.resume();
}, 1000);
})
.on('end', function() { // DBからの読込終了を検知
// readable streamが完了した時に呼ばれます:
console.log('end: DBから全件読込完了');
pool.end(); // コネクションを閉じる
})
});
コードレビュー
処理の一時停止と再開
node-mysql APIの「一時停止」conn.pause()
と「再開」conn.resume()
を使ってデータを1行づつ順番に処理します。 この2つのAPIは必ずresult
イベントハンドラ内で使います。※data
イベントハンドラ内ではないことに注意!
end
イベント
DBの読み込みが全件完了するとend
イベント(readableストリームの終了イベント)が呼び出されます。
実行結果
$ node one-by-one-test.js
result RowDataPacket { id: 1 }
long process ...
long process resume!
result RowDataPacket { id: 2 }
long process ...
long process resume!
result RowDataPacket { id: 3 }
long process ...
long process resume!
result RowDataPacket { id: 4 }
long process ...
long process resume!
result RowDataPacket { id: 5 }
long process ...
long process resume!
result RowDataPacket { id: 6 }
long process ...
long process resume!
result RowDataPacket { id: 7 }
long process ...
long process resume!
result RowDataPacket { id: 8 }
long process ...
long process resume!
result RowDataPacket { id: 9 }
long process ...
long process resume!
end: DBから全件読込完了
サンプルコードその2
自作Stream (Transform = 変換用ストリーム) を2つ作り、node-mysqlとpipe接続するサンプルです。このサンプルは、次のように動きます:
- mysqlからStreamでレコード取得
- 1つ目のTransformで取得したレコードのidカラム(数値)を2倍し、次のStreamへ渡す
- 2つ目のTransformで渡されたデータをJSON文字列化し、次のStreamへ渡す
- 渡された文字列を標準出力
pipe-test.js
const stream = require('stream');
const mysql = require('mysql');
// DBコネクションプールを取得
// TODO: 環境に合わせてDB設定を変更してください
const pool = mysql.createPool({
host: 'localhost',
user: 'dbuser',
password: 'dbpass',
database: 'test',
connectionLimit: 5,
});
// DBから取得したレコードのidカラム(数値)をx2するトランスフォーム
const x2Trans = stream.Transform({
objectMode: true,
highWaterMark: 1,
transform: function(data, encoding, next) {
// idカラムの値を2倍して次のstreamに渡す
data.id *= 2;
this.push(data);
next();
},
});
// データを文字列化するトランスフォーム
// このトランスフォームで渡したいデータを文字列化します。
// NOTE: このトランスフォームが次に渡す process.stdout は文字列かBufferしか受け付けません!
const stringifyTrans = stream.Transform({
objectMode: true,
highWaterMark: 1,
transform: function(data, encoding, next) {
// JSON文字列化して次のstreamに渡す
// NOTE: 重い処理風にsetTimeoutでわざと処理を遅延させています。
const _this = this;
setTimeout(() => {
_this.push(JSON.stringify(data));
next();
}, 100);
},
});
// SQLを実行し、各処理をpipe接続:
const query = pool.query('SELECT * FROM `hugetable` ORDER BY `id`');
query
.stream({ highWaterMark: 1 })
.on('end', function() { // DBからの読込終了を検知
// readable streamが完了した時に呼ばれます:
console.log('end: DBから全件読込完了');
pool.end(); // コネクションを閉じる
})
.pipe(x2Trans) // 1) DBからデータを取得し、idカラムをx2する
.on('finish', function() {
// writable streamが完了した時に呼ばれます:
console.log('finish: x2Trans');
})
.pipe(stringifyTrans) // 2) データをJSON文字列化
.on('finish', function() {
// writable streamが完了した時に呼ばれます:
console.log('finish: stringifyTrans');
})
.pipe(process.stdout) // 3) 標準出力に表示
コードレビュー
pipe接続:pipe()
でStreamな処理(x2Trans
,stringifyTrans
,process.stdout
)を繋いでいる部分がポイントです。 (process.stdout
もまたStreamの一種(Writable Stream)です)
StreamからStreamへの値の引き渡し:transform: function(data, encoding, next) {...}
な関数内でthis.push(data)
することで次のStreamへ値を引き渡すことができます。
ObjectMode:
node-mysqlのquery.stream()
はObjectModeなStreamを返します。そのため、process.stdout
(StringかBufferのデータしか受け付けない)にそのままデータを渡してもTypeError: Invalid data, chunk must be a string or buffer, not object
なエラーが発生します。 自作TransformではobjectMode: true
とすることで、node-mysqlから渡ってきたオブジェクトをそのまま取り扱えるようにしています。
finish
イベントとend
イベントfinish
イベントはwritableストリームが完了した時に、そしてend
イベントはreadableストリームが完了した時に呼び出されます。
実行結果
$ node pipe-test.js
{"id":2}{"id":4}{"id":6}{"id":8}{"id":10}{"id":12}{"id":14}end: DBから全件読込完了
finish: x2Trans
{"id":16}{"id":18}finish: stringifyTrans
各レコードのidが2倍になって標準出力されていることが分かります。
TIP:全てのレコードJSON文字列が表示される前にDBから全件読込完了!
が表示されています。これはstringifyTrans
以降の処理(意図的に重い処理風にしてある)よりも先にDBのレコードを全件読込完了したからです。