TameJS と Fiber による非同期処理の記述 (1/2)

前回からかなり時間が空いてしまいました...やっと時間取れた.今回は 2 回に分け,1 回目で TameJSを,2 回目で node-fibers を取り上げたいと思います.
ちなみに,「この方法が良い」と言うよりは「こんな方法もありますよね」というスタンスで書いています*1

見出し

  • はじめに
  • TameJS とは?
  • TameJS の利用例
  • TameJS のしていること
  • おわりに

はじめに

Node.js は非同期処理が基本であり,コールバックを多用するスタイルです.そのため,コードは簡単にコールバックのネストだらけになります*2
この "深いネスト" を解消するため,多くの場合は control flow ライブラリ が使用されます*3

TameJS では,非同期処理の記述に await/defer を持ち込み,上記の解決を試みています.

TameJS とは?

TameJS は tjs コードから JavaScript を生成する translator です.tjs コードは await/defer が追加されている点を除けば,JavaScript そのものです.
非同期処理の記述に await/defer を持ち込むことで,ノンブロッキング API を利用しているにもかかわらず,あたかも同期処理であるかのような記述を可能としています*4.これにより,ユーザの記述するコード上からコールバックによるネストを無くすことができます*5

TameJS の利用例

まずは以下のような tjs コードを記述します.

// ファイル名:read_file.tjs

var fs = require('fs');
await {
  fs.readFile('./test.txt', defer(var err, text));
}
if (err) {
  console.log('error: ' + err);
} else {
  console.log(text.toString('utf8'));
}

await ブロックで囲まれたコードが非同期に実行されます.複数の非同期 API を呼び出せば,それぞれ並行して動作します.ブロック中の API から結果が返ってくると,ブロック外のコードへと処理が進むイメージです.
defer にはコールバックが受け取るパラメータを指定します.await ブロックを抜けると,これらのパラメータが値に束縛されます.
利用可能な API については,以下の "API and Documentation" に載っています.

ちなみに,上記の tjs コードは以下の JavaScript とほぼ等価です.見た目はずいぶんと違いますね.

var fs = require('fs');
fs.readFile('./test.txt', function(err, text) {
  if (err) {
    console.log(err);
  } else {
    consloe.log(text);
  }
});

実行方法は,以下に示す 2 つのパターンがあります.


事前にコンパイルして実行
tamejs コマンドでコンパイルします."-o" オプションで出力ファイル名を指定できます.

$ tamejs read_file.tjs  // tjs -> js
$ node read_file.js     // 実行


require されたときにコンパイルして実行*6
*.tjs コードを利用したい JavaScript コード中に,以下を記載します.

// ファイル名:main.js

require('tamejs').register();
require('./read_file.tjs');

// ...

最後に,いつも通り実行します.

$ node main.js

(余談: register() 内部では,".tjs" に対してコンパイル処理を定義しています.TameJS Engine によって *.tjs から *.js に変換し,V8 コンパイルルーチンにコードを渡す,といった流れです.)

TameJS のしていること

await/defer の記述を元に,tjs コードを 継続渡し形式 (CPS)JavaScript に変換しています.CPSググるといっぱい解説が出てきます.
継続とは,端的に言ってしまうと「以降の計算処理」をまとめたものです.CPS では,渡された継続を利用してプログラムの制御を行います.

例えば,以下のような 2 つの処理があったとします.

A;
B;

これを CPS 変換*7すると,以下のようになります.

function _A(next) {
  A;
  next();
}
function _B(next) {
  B;
  next();
}
function end() {}

// 実行
_A(function() { _B(function() { end(); }); });

上記コードにおける next が継続に相当します*8.もしも A の処理が非同期であった場合は,_A に渡された next をコールバック内部で呼び出せばよいわけです.


実際に,前出の read_file.tjs をコンパイルすると,以下のようなコードが生成されます.

var tame = require('tamejs').runtime;
var __tame_fn_6 = function (__tame_k) {
    var fs = require ( 'fs' ) ;
    var __tame_fn_0 = function (__tame_k) {
        var err, text;
        var __tame_fn_1 = function (__tame_k) {
            var __tame_defers = new tame.Defers (__tame_k);
            var __tame_fn_2 = function (__tame_k) {
                fs . readFile ( './test.txt' ,
                __tame_defers.defer (
                    function () {
                        err = arguments[0];
                        text = arguments[1];
                    }
                )
                ) ;
                tame.callChain([__tame_k]);
            };
            __tame_fn_2(tame.end);
            __tame_defers._fulfill();
        };
        var __tame_fn_3 = function (__tame_k) {
            var __tame_fn_4 = function (__tame_k) {
                console . log ( 'error: ' + err ) ;
                tame.callChain([__tame_k]);
            };
            var __tame_fn_5 = function (__tame_k) {
                console . log ( text . toString ( 'utf8' ) ) ;
                tame.callChain([__tame_k]);
            };
            if (err) {
                tame.callChain([__tame_fn_4, __tame_k]);
            } else {
                tame.callChain([__tame_fn_5, __tame_k]);
            }
        };
        tame.callChain([__tame_fn_1, __tame_fn_3, __tame_k]);
    };
    tame.callChain([__tame_fn_0, __tame_k]);
};
__tame_fn_6 (tame.end);

"次の処理" を __tame_fn_* で包んで継続とし,callChain に渡しています.callChain は以下のように定義されており,次々と継続を処理していきます.

function callChain (l) {
    if (l.length) {
        var first = l.shift ();
        first (function () { callChain (l); });
    }
};


await ブロック内部において,defer 指定の変数が値に束縛されるまで待つのは,Defers#defer と Defers#_fulfill が同期部分の継続呼び出しをコントロールしているためです (上記コードにおける __tame_fn_3).
Defers 内部ではコールバック呼び出しをカウンタ管理しており,defer 呼び出しで内部カウンタを increment,非同期 API のコールバックが呼び出されたタイミングで _fulfill を呼び出して decrement しています*9.このカウンタが 0 になるタイミング → 最後に呼び出されたコールバックのタイミング,で次の継続 (同期コードの部分) が呼び出されます.



図にするとこんな感じです.黒四角の番号は __tame_fn_*, R は readFile, E と E' は tame.end, C は readFile のコールバック,青四角は callChain による継続を表しています.function を直接呼び出す場合は黒矢印,継続として呼び出す場合は青矢印で表しています.
6 から順に呼び出しが進んでいき,2 内部の tame.callChain が終わると一旦イベントループに戻ります.1 で _fulfill が呼び出されていますが,先に述べた Defers の働きにより,3 以降の継続は呼び出されません.
その後,readFile の結果としてコールバック (図中 C) が呼び出されると,Defers が内部に保持していた 3 以降の継続が呼び出されます.











おわりに

TameJS により,非同期処理を見通しの良いコードで記述することができます.外部リソースの参照・更新を頻繁に行いながら処理を進めるコードで利用すると良さそうです.
あとは IDE でこの記述がサポートされるともっと便利になりますね.
次回は Fiber を利用します.Node.js 上で coroutine が使えます.
(2/2 へ続く...)

*1:どちらもコアは,古より伝わりし手法です

*2:いやマジで

*3:有名どころは async,Step,Slide とかでしょうか,正直詳しくは分かりません...

*4:当然ですが,ノンブロッキング API はキチンと "ノンブロッキング" として動作します

*5:後で述べますが,プリコンパイル後の JavaScript はコールバックだらけです

*6:手元の環境は,最近ランタイム系をごちゃごちゃいじったせいか,うまく動きません...なぜだ

*7:JavaScript

*8:このような変換を行わず,継続を明示的に扱うオブジェクトとして取得するのが rubyscheme 等の call/cc であり,次回に紹介する Fiber の素でもあります

*9:_fulfill は defer の返すクロージャ内部で呼び出されます

node.js アプリの負荷分散構成を考える

node.js の負荷分散について考えてみました (フェイルオーバは考慮できていません).個人レベルなので 1 台のハード上に仮想マシンを 5〜6 個立ち上げて実験しています.

見出し

  • はじめに
  • cluster で負荷分散
  • 寄り道:cluster の仕組み
  • 例えばこんな全体構成
  • おわりに

はじめに

node.js は設計上,大量のコネクションを省リソース (プロセス・スレッドをバカスカ生成しない) でさばきます.おそらく想定されているのは I/O バウンドな処理であり,この場合は基本的に非同期で処理されるため,I/O 待ちで他のリクエスト処理がブロックすることはまずありません.
node.js は「サービスをつなぎ・組み合わせるためのハブ」的な位置づけが一番しっくりくるように感じます *1
ただ,

  • 大量のリクエストをさばかなければならない
  • ロジックが重くてコールバック処理に負荷がかかってしまう

等といった場合では,CPU 数に応じて負荷分散させたくなります.
また,Web アプリケーションの静的コンテンツ (js, css, image 等) の配信まで node.js ノードに任せるのは明らかに非効率ですよね.

cluster で負荷分散

この記事 (http://blog.asial.co.jp/807) を読んで cluster を試してみたところ,大変使いやすい.express のコミッタが開発しているためか,express との相性も良いと感じます.
おなじみ express コマンドでスケルトンを生成し,以下のように書き換えます.

// app.js
var express = require('express');
var app     = express.createServer();

app.configure(function(){
  app.set('views', __dirname + '/views');
  app.set('view engine', 'jade');
  app.use(express.logger());
  app.use(express.bodyParser());
  app.use(express.methodOverride());
  app.use(app.router);
  app.use(express.static(__dirname + '/public'));
});

app.get('/', function(req, res){
  res.render('index', {
    title: 'Express',
    worker_id: process.env.CLUSTER_WORKER  // worker の ID が取れる
  });
});

module.exports = app;
<!-- views/index.jade -->
h1= title
p Welcome to #{title}
p worker #{worker_id}
// cluster.js
var cluster = require('cluster');

cluster('./app')
  .use(cluster.debug())    // debug ログが出るので,動作が分かりやすくなる
  .use(cluster.logger())
  .use(cluster.pidfiles())
  .on('close', function() { console.log('cluster end'); })
  .listen(8001);

$ node cluster.js して F5 アタック?をかけると,worker の切り替わる様子が分かります.

Readme の Example で 'recommended usage' とあるように,cluster 関数にファイル名 './app' を指定すると,master プロセスでは require('./app') が発生せず,worker プロセスの場合にのみ require('./app') されます (docs/api).
また,use(cluster.repl(port)) することで簡単な管理コンソールを起動することができます (docs/repl).
分散してもセッションを維持したい場合は,express で connect-redis を利用するのが一番手軽です.

寄り道:cluster の仕組み

cluster は以下のような構成で複数の node.js プロセスを管理しています.


Worker プロセス数は外部から指定することも可能ですが,デフォルトでは CPU 数だけ起動します.また,プロセス間のやりとり (赤点線矢印) は JSON で独自の RPC をしています (JSON-RPC とは異なる仕様).

プロセスとインスタンスの構成は以下のようになっています.コードを読む際はこの構成を念頭に置くと,理解しやすくなります.

例えばこんな全体構成

例えばこんな全体構成.動作確認用アプリとして チャットアプリ をイメージしており,手元の環境では動作しています *2



クライアントからは HTTP リクエストと socket.io (WebSocket) による通信が発生します.

サーバ側では,リバースプロキシ (の設定をした nginx) を配置し,静的コンテンツは自身で配信,動的コンテンツについてはバックエンドの cluster にリクエストを投げます.設定としてはこんな感じです.

# /etc/nginx/conf.d/proxy.conf として以下の内容を作成します (IP は適当です).
# ↓ロードバランサ設定の場合
# upstream clusters {
#   server 192.168.11.20:8001;
#   server 192.168.11.20:8002;
# }
server {
  listen 8080;
  server_name 192.168.11.10;
  location / {
    proxy_pass http://192.168.11.20:8001;
#    proxy_pass http://clusters;
  }
  location /images/ {
    alias /var/www/public/images/;
  }
  location /stylesheets/ {
    alias /var/www/public/stylesheets/;
  }
  location /javascripts/ {
    alias /var/www/public/javascripts/;
  }
}

cluster マシンを増やす場合は nginx でロードバランシングさせるか,LVS + keepalived (←こっちはやったことない) で分散させます.

cluster 内の node.js では,express でメイン画面を,リアルタイム通信には socket.io を利用しています.もう鉄板ですね.セッション維持には redis を利用しています.

relay server は,各 node.js プロセスで発生した Socket.io のイベントを別の node.js プロセスに伝達するための中継サーバです.今回は実験ということもあって,適当にこしらえました.

メイン DB には mongoDB を利用しています (後に作成予定のアプリにとって都合が良いため).Replica Set 便利ですね.今回は sharding しないので 1 セットだけです.express からの接続には mongoose を利用しています.


参考:

おわりに

今回は node.js の負荷分散について試してみました.cluster の簡単な解説と,全体構成を示しています.
一応,proxy から DB まで一通りつないで実験していますが,Socket.io と cluster の組み合わせに問題がないのかを調査し切れていません.また,はやいところ計測を済ませてしまいたいです.

あとはテストについて調査をしたら,そろそろアプリを作り始めても良いんじゃないかな,KrdLab よ.

*1:多くの Web アプリがこれに該当すると思いますが

*2:他の人たちはどんな感じでやっているんでしょうか.是非知りたいのですが...

Mongoose のモデルに独自メソッドを追加する

久しぶりに Mongoose ネタです.Schema の API である static メソッドについて少しだけ.
今回は短め.

見出し

  • 何ができる?
  • 登録した関数内での処理
  • 実際の定義
  • 参考にした情報

何ができる?

例えば User というモデルを定義 (UserSchema) したとします.User に対しては,find/findOne といったメソッドを呼び出すことができます.
しかし,例えば DataMapper のような「first_or_create (あれば最初の 1 つを返す,無ければ作って返す)」が欲しくなった場合はどうすればよいでしょう?
上記のような場合に Schema.static メソッドを使用します.Schema.static メソッドは,モデルに対して独自の処理を追加することができます.

var UserSchema = new Schema({
  name: String,
  created_at: { type: Date, default: Date.now }
});

UserSchema.static('first_or_create', function(query, callback) {
  // ここに処理を定義
});

// Schema を登録
mongoose.model('User', UserSchema);

// DB に接続してモデルを取得
var db = mongoose.createConnection('...');
var User = db.model('User');

// first_or_create が呼び出せる
User.first_or_create({ name: 'krdlab' }, function(err, user) {
  // ...
});

登録した関数内での処理

User.first_or_create と呼び出すことから,当然 this はモデル (User) を指しています.このため,関数内の処理で 'this.find' と呼び出せば,接続先 DB が解決された状態で操作を呼び出せます.
Schema に登録したメソッドがモデルに定義されるのは,Schema からモデルを生成するとき (mongoose.model に Schema を渡したとき) です.以下のコードから Schema.static で登録した関数をモデルのプロパティに登録していることがわかります.

// lib/mongoose/index.js:149
Mongoose.prototype.model = function (name, schema, collection, skipInit) {
  // ... 省略

  if (!this.models[collection][name]) {
    model = Model.compile(name
                        , this.modelSchemas[name]
                        , collection
                        , conn
                        , this);  // ← ここ★

    if (!skipInit) model.init();

    this.models[collection][name] = model;
  }

  return this.models[collection][name];
};
// lib/mongoose/model.js:648
Model.compile = function (name, schema, collectionName, connection, base) {
  // ... 省略

  // apply methods
  for (var i in schema.methods)
    model.prototype[i] = schema.methods[i];

  // apply statics
  for (var i in schema.statics)
    model[i] = schema.statics[i];    // ← ここ★

  // ... 省略

  return model;
};

これで,mongoose.model('User') から返ってくる User に first_or_created が定義されます.

実際の定義

上記を踏まえると,以下のようになります.

var UserSchema = new Schema({
  name:       String,
  created_at: { type: Date, default: Date.now },
});

UserSchema.static('first_or_create', function(query, callback) {
  var U = this;
  U.findOne({ name: query.name }, function(err, user) {
    if (err) {
      callback(err);
    } else if (user) {
      callback(null, user);
    } else {
      var newU = new U({ name: query.name });
      newU.save(function(err) {
        if (err) {
          callback(err);
        } else {
          callback(null, newU);
        }
      });
    }
  });
});

mongoose.model('User', UserSchema);

実際には,登録する関数を function(query, defaults, callback) として,新規作成時のプロパティ値を指定できるとなお良いでしょう.

ちょっとだけ Inside node.js

またしてもスケジュールきつめのプロジェクトに放り込まれた KrdLab です.Java ジャバしてます.IDE がないとコード書くのがしんどすぎて,もううんざりです.

はじめに

何らかのプラットフォームを利用する場合,その仕組みについて知っておくことは,より適切な設計を行うという目的に対して有用な情報となるでしょう.
というわけで,今回は少しだけ node.js の内部に潜ってみようと思います.対象は git repo から clone した 0.4.x です (2011/03/29 に clone したっぽい).
なお,今回はイベント駆動に焦点を当て,V8 については割愛します.

見出し

  • 全体像
  • メイン処理
  • イベントループで処理される各種 watcher
  • tick 系のイベント処理
  • gc 系のイベント処理
  • eio 系のイベント処理
  • IOWatcher によるソケットの READ/WRITE
  • Timer による setInterval/setTimeout

全体像

node.js は,イベント駆動をベースとした JavaScript エンジンです.処理は全て非同期に行われます.サーバサイド JavaScript として人気を集めており,ネットワーク系のプログラムを主なターゲットとしています.

モジュール構成としては以下のような感じです (c-ares,http_parser は省略).

libev によってイベントループを,libeio によって非同期 I/O 処理を実現しています.各ライブラリの連携は以下のようになります.


node.js プロセスには,イベントループを実行するメインスレッドと,I/O の実処理を行うワーカスレッドが存在します.ワーカスレッドは libeio によりスレッドプールとして管理されています.
このように,イベントループと各 I/O 実処理は並行処理されますが,コールバックは全てメインスレッド上のイベントループで処理されます.つまり,ユーザの書いた JavaScript はメインスレッド上でしか実行されず,これが「シングルスレッド」と表現される由縁にもなっています.
当然コールバック処理でブロックしてしまうと,イベントループもブロックしてしまいます.コールバック実装は「ブロックしないですぐ終わる」が基本方針となります.
なお,node.js 上のイベントは,libev の event watcher と呼ばれる ev_watcher 構造体で管理されます (libev に ev_idle,ev_async といった名前で定義されている).以降に出てくる 'watcher' はこの 'event watcher' のことを指し,'イベント' もこの watcher に対して発生する (そして,events.EventEmitter が生成するイベントの元になる) ものを指します.

メイン処理

エントリポイントである main 関数からは,node.cc に定義されている node::Start 関数が呼び出されます.

// node.cc:2420
int Start(int argc, char *argv[]) {
  v8::V8::Initialize();
  v8::HandleScope handle_scope;

  argv = Init(argc, argv);

  // Create the one and only Context.
  Persistent<v8::Context> context = v8::Context::New();
  v8::Context::Scope context_scope(context);

  Handle<Object> process = SetupProcessObject(argc, argv);

  // Create all the objects, load modules, do everything.
  // so your next reading stop should be node::Load()!
  Load(process);

  // TODO Probably don't need to start this each time.
  // Avoids failing on test/simple/test-eio-race3.js though
  ev_idle_start(EV_DEFAULT_UC_ &eio_poller);

  // All our arguments are loaded. We've evaluated all of the scripts. We
  // might even have created TCP servers. Now we enter the main eventloop. If
  // there are no watchers on the loop (except for the ones that were
  // ev_unref'd) then this function exits. As long as there are active
  // watchers, it blocks.
  ev_loop(EV_DEFAULT_UC_ 0);

  EmitExit(process);

#ifndef NDEBUG
  // Clean up.
  context.Dispose();
  V8::Dispose();
#endif  // NDEBUG
  return 0;
}

Init 関数では,引数の解析や signal handler の登録,デフォルトイベントループの初期化が行われた後,event watcher を初期化しています.*1

SetupProcessObject 関数では,おなじみの global object である process を組み立てています.

Load 関数では,src/node.js に定義された function を実行して各モジュールのロード,グローバルオブジェクトの生成を行い,ユーザの JavaScript の実行を開始します*2.ここでコールバックが登録され,後に何らかのイベントが発生すれば,このあと出てくるイベントループ (ev_loop) で処理されます.

次の ev_idle_start は,watcher によるイベント監視を開始させます.*3

ev_loop はイベントループの本体です.何らかのイベントが発生して backend_poll (epoll/kqueue) に引っかかれば,登録したコールバックが呼び出されます.単純な (イベント発生を気にしない) スクリプトであればループすることなく素通りします.

EmitExit 関数では process に対して 'exit' を emit します.JavaScript 側で process.on('exit', function() {...}) としていた場合は,コールバックとして function() {...} が呼び出されます.

イベントループで処理される各種 watcher

イベントループ内で処理されるのは,ev_xxx_start で active となった watcher や,I/O で read/write 可となった watcher,タイマ watcher 等に対応するコールバックです.Init 関数で用意される主な watcher には,以下の 3 種類があります.

  1. tick 系
    • prepare_tick_watcher/check_tick_watcher/tick_spinner は,process.nextTick で積まれたキューの消化を管理するために使用される.
  2. gc
    • gc_check/gc_idle/gc_timer は,GC のタイミング制御に使用される.
  3. eio 系
    • eio_poller/eio_want_poll_notifier/eio_done_poll_notifier は,fs モジュール API による非同期 I/O 処理のコールバック制御に使用される.

上記以外に,net モジュールは IOWatcher (lib/node_watcher) が管理する ev_io watcher を,setTimeout/setInterval は Timer (lib/node_timer) が管理する ev_timer watcher を利用しています.これらは API 利用時に適宜用意されます.

tick 系のイベント処理

tick 系は以下のイベントで処理されます.

  1. EV_PREPARE
    • イベントループの先頭で発生
  2. EV_CHECK
    • イベントループ最後で各コールバックを呼び出す直前で発生

tick 系 watcher によりコールバックされる関数は node::Tick 関数です.これにより,process.nextTick で登録したコールバックは定期的に処理されます.

// node.cc:221
static void Tick(void) {
  // Avoid entering a V8 scope.
  if (!need_tick_cb) return;

  need_tick_cb = false;
  ev_idle_stop(EV_DEFAULT_UC_ &tick_spinner);

  HandleScope scope;

  if (tick_callback_sym.IsEmpty()) {
    // Lazily set the symbol
    tick_callback_sym =
      Persistent<String>::New(String::NewSymbol("_tickCallback"));
  }

  Local<Value> cb_v = process->Get(tick_callback_sym);
  if (!cb_v->IsFunction()) return;
  Local<Function> cb = Local<Function>::Cast(cb_v);

  TryCatch try_catch;

  cb->Call(process, 0, NULL);

  if (try_catch.HasCaught()) {
    FatalException(try_catch);
  }
}

内容は,node.js に定義された 'process._tickCallback' を呼び出す,というものです.need_tick_cb のチェックや,tick_spinner に対する ev_idle_stop は,process.nextTick によって積まれたキューを消化するまで,イベントループへの参照保持を保証するための処置です.
process._tickCallback は以下のコードです.

  // node.js:161
  startup.processNextTick = function() {
    var nextTickQueue = [];

    process._tickCallback = function() {
      var l = nextTickQueue.length;
      if (l === 0) return;

      try {
        for (var i = 0; i < l; i++) {
          nextTickQueue[i]();
        }
      }
      catch (e) {
        nextTickQueue.splice(0, i + 1);
        if (i + 1 < l) {
          process._needTickCallback();
        }
        throw e; // process.nextTick error, or 'error' event on first tick
      }

      nextTickQueue.splice(0, l);
    };

    process.nextTick = function(callback) {
      nextTickQueue.push(callback);
      process._needTickCallback();
    };
  };

_tickCallback では,とにかくキューに積まれたコールバックを呼び出します.例外発生時は,成功したコールバックをキューから除去し,残りを再度処理するように _needTickCallback を呼び出します.
JavaScript 側で _needTickCallback が呼び出されると,前出の need_tick_cb が true となり,また tick_spinner に対して ev_idle_start が呼び出され,イベントループの参照カウントが増加します (その後の Tick 関数呼び出しで stop が呼び出され,増加した分のカウントは元に戻る).

gc 系のイベント処理

gc_check は EV_CHECK (前出),gc_idle は EV_IDLE (いわゆるプロセスの IDLE 状態),gc_timer は EV_TIMER (timeout) に対応する watcher です.gc_timer は,Init 関数で 5 秒間隔に設定されています.
gc_check のコールバックは node::Check 関数です.

// node.cc:175
static void Check(EV_P_ ev_check *watcher, int revents) {
  ...

  tick_times[tick_time_head] = ev_now(EV_DEFAULT_UC);
  tick_time_head = (tick_time_head + 1) % RPM_SAMPLES;

  StartGCTimer();

  for (int i = 0; i < (int)(GC_WAIT_TIME/FAST_TICK); i++) {
    double d = TICK_TIME(i+1) - TICK_TIME(i+2);
    //printf("d = %f\n", d);
    // If in the last 5 ticks the difference between
    // ticks was less than 0.7 seconds, then continue.
    if (d < FAST_TICK) {
      //printf("---\n");
      return;
    }
  }

  // Otherwise start the gc!

  //fprintf(stderr, "start idle 2\n");
  ev_idle_start(EV_A_ &gc_idle);
}

ev_now でイベント発生時のタイムスタンプを取得し,後半の for 文で GC を開始するかどうかの判断に使用します.StartGCTimer 関数は,gc_timer を active にすることで GC タイマを発動させます.for 文のところでは,直近のイベント発生間隔を調べ,全てが基準値 (FAST_TICK) よりも長ければ gc_idle を active にして,EV_IDLE 発生時にコールバックで GC を実行させます.

gc_timer のコールバックは CheckStatus 関数です.

static void CheckStatus(EV_P_ ev_timer *watcher, int revents) {
  ...

  // check memory
  if (!ev_is_active(&gc_idle)) {
    HeapStatistics stats;
    V8::GetHeapStatistics(&stats);
    if (stats.total_heap_size() > 1024 * 1024 * 128) {
      // larger than 128 megs, just start the idle watcher
      ev_idle_start(EV_A_ &gc_idle);
      return;
    }
  }

  double d = ev_now(EV_DEFAULT_UC) - TICK_TIME(3);

  //printfb("timer d = %f\n", d);

  if (d  >= GC_WAIT_TIME - 1.) {
    //fprintf(stderr, "start idle\n");
    ev_idle_start(EV_A_ &gc_idle);
  }
}

gc_idle がアクティブでない (GC 起動が確約されていない) 場合,メモリ使用量をチェックして合計が 128MB を超えていると gc_idle を active にします.また,3 つ前のイベント発生時刻から 4 秒以上経過していた場合にも gc_idle を active にします.

gc_idle のコールバックは node::Idle 関数です.

static void Idle(EV_P_ ev_idle *watcher, int revents) {
  ...

  if (V8::IdleNotification()) {
    ev_idle_stop(EV_A_ watcher);
    StopGCTimer();
  }
}

内部で V8::IdleNotification 関数を呼び出し,GC を実行します.V8::IdleNotification 関数は呼び出す必要がなくなると true を返すため,watcher (gc_idle) を stop,GC タイマも stop します.

eio 系のイベント処理

eio_poller は EV_IDLE (前出),eio_want_poll_notifier と eio_done_poll_notifier は EV_ASYNC (別スレッドから非同期に通知を受け取る) に対応する watcher です.それぞれ順に,node::DoPoll 関数,node::WantPollNotifier 関数,node::DonePollNotifier 関数がコールバックとして登録されています.
非同期 I/O 処理後のコールバック (JavaScript 側で I/O 処理後に呼び出されるよう登録した function) を呼び出すには eio_poll 関数を呼び出す必要があります.eio_poll 関数は主に node::DoPoll 関数から呼び出されます.node::DoPoll 関数は EV_IDLE のタイミングで処理されるわけですが,この制御には node::EIOWantPoll,node::EIODonePoll,node::WantPollNotifier,node::DonePollNotifier 関数が連携しています.



eio に積まれた I/O 要求が処理される (request queue が 0 になる) と,node::EIOWantPoll 関数が want_poll_cb としてワーカスレッド (メインとは異なるスレッド) から呼び出されます.内部では eio_want_poll_notifier にイベント発行しており,これによってメインのイベントループで node::WantPollNotifier 関数が呼び出されます.node::WantPollNotifier 関数では,eio_poll 関数でコールバックを 1 つ処理し,まだ残っていれば eio_poller に対して EV_IDLE を start させます.これにより,IDLE 状態になる度に eio_poll 関数が呼び出され,非同期 I/O 処理のコールバックが呼び出されていきます.
eio_poller とひも付いているのは node::DoPoll 関数です.内部では eio_poll 関数を呼び出し,コールバックがなくなれば eio_poller による EV_IDLE 監視を停止します.また別ルートとして node::EIODonePoll 関数があります.eio_poll 関数内部で response queue を消化しきると,node::EIODonePoll 関数が done_poll_cb として呼び出されます.内部では eio_done_poll_notifier にイベント発行しており,これによってメインのイベントループで node::DonePollNotifier 関数が呼び出されます.あとは node::DonePollNotifier 関数の中で 1 回だけ eio_poll 関数を呼び出し,コールバックを処理しきったことが確認できれば eio_poller の EV_IDLE 監視を停止します.
以上のように非同期 I/O 処理では,メインスレッドと eio がイベント通知によって連携をとり,eio_poll 関数の呼び出しを制御しています.

なお,実際の I/O 要求発行のあたりは,src/node_file.cc の 740 行目にある Read 関数を見ると良いかもしれません.ASYNC_CALL マクロによって eio API が呼び出されています.

IOWatcher によるソケットの READ/WRITE

ソケットの read/write は IOWatcher として機能提供されています.IOWatcher は EV_READ,EV_WRITE を監視します.net モジュールは IOWatcher を利用して net.Socket や net.Server を実現しています.*4

IOWatcher が READ/WRITE のどちらを監視するのかは,IOWatcher.prototype.set により決定します.

// lib/net.js:565
socket._readWatcher.set(socket.fd, true, false);
...
// lib/net.js:572
socket._writeWatcher.set(socket.fd, false, true);
// src/net_io_watcher.cc:132
Handle<Value> IOWatcher::Set(const Arguments& args) {
  ...
  IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
  ...
  int fd = args[0]->Int32Value();
  ...
  int events = 0;

  if (args[1]->IsTrue()) events |= EV_READ;
  ...
  if (args[2]->IsTrue()) events |= EV_WRITE;

  assert(!io->watcher_.active);
  ev_io_set(&io->watcher_, fd, events);

  return Undefined();
}

(file discriptor, readable, writable) を引数とし,watcher にファイルディスクリプタと検出対象イベントを登録しています.

Timer による setInterval/setTimeout

setInterval は内部で Timer (src/node_timer.cc) を利用しています.

exports.setInterval = function(callback, repeat) {
  var timer = new Timer();

  if (arguments.length > 2) {
    var args = Array.prototype.slice.call(arguments, 2);
    timer.callback = function() {
      callback.apply(timer, args);
    };
  } else {
    timer.callback = callback;
  }

  timer.start(repeat, repeat ? repeat : 1);
  return timer;
};

Timer は ev_timer watcher を保持しており,指定された間隔 (上のコードで repeat) でコールバック (Timer::OnTimeout 関数) を呼び出します*5.Timer::OnTimeout 関数では,timer.callback に設定されたコールバック function を呼び出しています.


次に setTimeout ですが,これは setInterval とは仕組みが異なります.Timer を利用してはいますが,watcher の利用数を減らすために,同じ delay (下のコードで after) の callback をまとめて 1 つの Timer で管理しています.

exports.setTimeout = function(callback, after) {
  var timer;

  if (after <= 0) {
    // Use the slow case for after == 0
    timer = new Timer();
    timer.callback = callback;
  } else {
    timer = { _idleTimeout: after, _onTimeout: callback };
    timer._idlePrev = timer;
    timer._idleNext = timer;
  }

  ...

  if (arguments.length > 2) {
    var args = Array.prototype.slice.call(arguments, 2);
    var c = function() {
      callback.apply(timer, args);
    };

    if (timer instanceof Timer) {
      timer.callback = c;
    } else {
      timer._onTimeout = c;
    }
  }

  if (timer instanceof Timer) {
    timer.start(0, 0);
  } else {
    exports.active(timer);
  }

  return timer;
};

setTimeout の delay が 0 以下であれば Timer が使用されますが,それ以外の場合 (通常の利用はこちらの場合に該当) は 4 つのプロパティを持った timer オブジェクトを作成し,active に渡しています.

exports.active = function(item) {
  var msecs = item._idleTimeout;
  if (msecs >= 0) {
    var list = lists[msecs];
    if (item._idleNext == item) {
      insert(item, msecs);
    } else {
      item._idleStart = new Date();
      L.append(list, item);
    }
  }
};

setTimeout から流れてきた場合は必ず item._idleNext == item が成立するため,insert へと流れていきます.
insert は実際に list へ item を追加するところです.

function insert(item, msecs) {
  item._idleStart = new Date();
  item._idleTimeout = msecs;

  if (msecs < 0) return;

  var list;

  if (lists[msecs]) {
    list = lists[msecs];
  } else {
    list = new Timer();
    L.init(list);

    lists[msecs] = list;

    list.callback = function() {
      ...
      var now = new Date();

      var first;
      while (first = L.peek(list)) {
        var diff = now - first._idleStart;
        if (diff + 1 < msecs) {
          list.again(msecs - diff);
          debug(msecs + ' list wait because diff is ' + diff);
          return;
        } else {
          L.remove(first);
          assert(first !== L.peek(list));
          if (first._onTimeout) first._onTimeout();
        }
      }

      debug(msecs + ' list empty');
      assert(L.isEmpty(list));
      list.stop();
    };
  }

  if (L.isEmpty(list)) {
    // if empty (re)start the timer
    list.again(msecs);
  }

  L.append(list, item);
  assert(!L.isEmpty(list)); // list is not empty
}

指定された msecs に対して最初の要素であれば Timer を生成し,list の各要素に対して _onTimeout を呼び出す callback を設定します.最後に again (Timer::Again 関数に対応) を呼び出し,msecs 間隔で動作するよう再設定かつ Timer をスタートさせます.2 個目から後は単純に list へ要素として追加されていきます.

おわりに

今回は node.js の内部を追いかけてみました (Timer のあたりは追いかけ切れていませんが).libev によるイベント監視を駆使し,うまく連携している様子が見て取れます.また,内部を追いかけたことで node.js に対する理解が深まったため,もう少しうまくコード書けるようになるかなー,という淡い期待もありますw

そういえば,v0.6 から Windows サポート強化の一環として,IOCP 対応が入るそうです (http://nodejs.org/codeconf.pdf).そのうちそこら辺も調べてみたいと思います.

*1:ev_xxx_start の後 ev_unref を呼び出しているのは,イベントが積まれていなければすぐに ev_loop を抜けさせるための処置です.ここら辺のことは ev.pod の ev_ref/ev_unref に記載されています.なお,eio_poller だけは Start 関数内で ev_idle_start したままになっていますが,対応する node::DoPoll でコールバックを処理しきると ev_idle_stop が呼び出されるため,やはり ev_loop は終了することになります.

*2:ユーザ JavaScript 実行の流れとしては,node.js の startup → Module.runMain → Module._load → Module.prototype.load → Module.prototype._compile → V8 へ,といった感じです.

*3:しかし,コメントを読む限りではテストを通すためのものっぽいですね.

*4:ソケットプログラミングでおなじみの socket, bind, listen といった関数は src/node_net で定義されています.

*5:当然ですが best-effort です

Mongoose のコネクション周り

node.js + MongoDB はイイ感じです.というわけで Mongoose ネタの続き.

見出し

  • connect と disconnect
  • 明示的にコネクションを扱う
  • Connection とイベント

connect と disconnect

require で取得する Mongoose オブジェクトには connect と disconnect があります.

var mongoose = require('mongoose');
var Schema   = mongoose.Schema;

// スキーマ登録
var HogeSchema = new Schema({
  label: String,
  value: Number
});
mongoose.model('Hoge', HogeSchema);

// デフォルトコネクションを開く
mongoose.connect('mongodb://localhost/test');

... (なんか使う)

// signal 飛んできたら閉じる
process.on('SIGINT', function() { mongoose.disconnect(); });

mongoose の connect を呼び出すと,デフォルトコネクションが使用されます.「デフォルトコネクション」とは,mongoose がプロパティとして持つ connections の [0] 要素を指しており,connection プロパティから取得可能です.
この [0] 要素は require 時の 'new Mongoose()' で生成され,'mongoose.connect(...)' でコネクションが開かれます.また,'mongoose.model(...)' した際には内部で自動的に使用されます.

対する disconnect は,connections プロパティが保持する全てのコネクションを閉じます.

明示的にコネクションを扱う

Mongoose オブジェクトの connect/model を使うと,常に「デフォルトコネクション」が使用されます.DB ごとにコネクションを保持したい場合は,コネクションを直接扱う必要があります.以下のようにします.

var mongoose = require('mongoose');
var Schema   = mongoose.Schema;

// スキーマ登録
var HogeSchema = new Schema({
  label: String,
  value: Number
});
mongoose.model('Hoge', HogeSchema);

// コネクションと,それにひも付いたモデルを取得
var db = mongoose.createConnection('mongodb://localhost/test');
var Hoge = db.model('Hoge');

// 使う
Hoge.findOne({}, function(err, doc) {
  // ...
});

// 終わり
process.on('SIGINT', function() { mongoose.disconnect(); });

createConnection を使用すると Connection オブジェクトを取得できます.引数 (uri もしくは (host, database, port, callback) の組) を指定した場合,open 済みの Connection オブジェクトが返ってきます.

モデルは以下のような手順で定義,取得します.

  1. トップレベルの Mongoose オブジェクトにスキーマを登録 (mongoose.model)
  2. Connection を取得 (mongoose.createConnection)
  3. Connection 経由でモデルを取得 (db.model)

Connection の model を使用することで,Connection オブジェクトとモデルをひも付けます.

サンプルコードでは disconnect を呼び出して一括 close していますが,Connection にも close があるため,個別にコネクションを閉じることも可能です.

Connection とイベント

Connection は EventEmitter の派生になっており,イベントをハンドリングできます.

var db = mongoose.createConnection();  // まだ開かない

// ハンドラ登録
db.on('open' , function() { console.log('open'); });
db.on('close', function() { console.log('close'); });

// コネクションを開く
db.open('mongodb://localhost/test');

var Hoge = db.model('Hoge');
Hoge.findOne({}, function(err, doc) {
  // ... (いろいろ)
});

process.on('SIGINT', function() { mongoose.disconnect(); });

// [実行結果]
// $ node test.js
// open
// ... (findOne の結果)
// close

これ以外にもあるのかな?

終わりに

前回の内容はコネクション周りを明記していなかったので,今回独立してコネクション周りを取り上げてみました.
実はまだ '...Set' という API があるのですが,それはまた次の機会に取り上げてみたいと思います.

node.js から MongoDB にアクセス (Mongoose の紹介)

node.js から MongoDB にアクセススためのライブラリに Mongoose があります.今回はこれを紹介しようと思います.O/R Mapper っぽく使えるように設計されており,既存の O/R Mapper を使ったことがある人にとっては,比較的わかりやすい仕様です.

見出し

  • Mongoose とは?
  • インストール
  • 何はともあれ使い方を
  • Schema 定義について
  • ドキュメント生成 (保存)
  • ドキュメント読み取り
  • ドキュメント更新
  • ドキュメント削除
  • Embedded Document
  • 終わりに

Mongoose とは?

node.js 向けに開発された MongoDB アクセスライブラリです.

Mongoose is a MongoDB object modeling tool designed to work in an asychronous environment.
Mongoose とは,非同期環境において動作するよう設計された,MongoDB オブジェクト用モデリングツールである.[引用]

MongoDB に格納するモデルを定義し,O/R Mapper の様なノリで操作できるようになります.なお,あくまで "ノリ" であって,node.js で動く以上はイベント駆動なんだということは留意しておくべきです.

インストール

$ npm install mongoose

パッケージ管理があると楽だ.

何はともあれ使い方を

MongoDB のホストが localhost,データベースが sample_db であったとします.

var mongoose = require('mongoose');

// 定義フェーズ
var Schema   = mongoose.Schema;

var UserSchema = new Schema({
  name:  String,
  point: Number
});
mongoose.model('User', UserSchema);

// 使用フェーズ
mongoose.connect('mongodb://localhost/sample_db');

var User = mongoose.model('User');
var user = new User();
user.name  = 'KrdLab';
user.point = 777;
user.save(function(err) {
  if (err) { console.log(err); }
});

// ※注意:イベント駆動

User.find({}, function(err, docs) {
  for (var i=0, size=docs.length; i<size; ++i) {
    console.log(docs[i].doc.name);
  }
});

(個人的に,JavaScript において 'new' を使うオブジェクト生成は嫌いなのですが...今回はこらえて流儀に従います.)

上記のように User スキーマを定義 → save の流れでドキュメントが格納されます.なお,MongoDB の各ドキュメントに割り振られる '_id' は,そのまま 'user._id' で取得可能です (Schema.ObjectId オブジェクトとして扱われる).

find には (query, fields, options, callback) の 4 引数があります.query は必須,残りは省略可能です.ただし,callback を省略すると FindQuery オブジェクトが返ってきます.このオブジェクトはクエリのコンテキストであり,DataMapper (PofEAA のパターン名ではなく,ruby 向けの DataMapper) でいうところの Scopes and Chaining に相当する操作を可能とします (where や sort,skip,limit 等を積み重ねて,exec すると実際にクエリが発行される).
他にも,findOne,findById 等ありますが,詳しくは API の Model を参照してください.

上記サンプルでは save のあとすぐに find していますが,これは必ずしもうまくいきません (クエリが submit されても,どちらが先に終わるかわからないため).実際には save のハンドラ内で登録されるクロージャの中で find をすることになります.この辺は Reactor Pattern やらイベント駆動やらではおなじみです.

補足ですが,connect の呼び出しは使用直前まで遅らせることができます.

Mongoose buffers all the commands until it's connected to the database.
Mongoose は,データベースへの接続が確立されるまで,全てのコマンドをバッファする.[引用]

Schema 定義について

Validation をかけたり,default 値を設定できます (Model Definition).ここら辺は DataMapper の "property :hoge ..." の感じと同じです.

var User = new Schema({
  first_name: { type: String, required: true },
  last_name:  { type: String, required: true },
  point:      { type: Number, min: 0, default: 0 },
  created:    { type: Date, default: Date.now },
  updated:    { type: Date, default: Date.now }
});

Validation は save のタイミングでかかります.上記の例では,'first_name','last_name' は値を指定しないとエラー,'point' は 0 未満を指定するとエラー,といった感じです.
default 値は new のタイミングでオブジェクトに積まれます.上記の例では,'point' は 0,'created','updated' は現在の日時が格納されます.

MongoDB はスキーマレスが特徴の一つですが,要件として Validation をかけたい場合もあるでしょう.また,default 値指定は便利です.

ドキュメント生成 (保存)

先ほど出てきた通り.

User = mongoose.model('User');
var user = new User();
user.name  = 'KrdLab';
user.point = 777;

// user = new User({ name: 'KrdLab', point: 777 }); も可能

user.save(function(err) {
  if (err) { console.log(err); }
});

ドキュメント読み取り

まずは all 指定.

User.find({}, function(err, docs) {
  for (var i=0, size=docs.length; i<size; ++i) {
    // docs[i].doc
  }
});

次に query 指定.
指定形式は Querying とほぼ同じようです.

// SELECT * FROM users WHERE name='KrdLab' に相当
User.find({ name: 'KrdLab' }, function(err, docs) {
  // マッチしたドキュメントが docs[i].doc で取れる
});

そして,query + フィールド指定.

// SELECT point FROM users WHERE name='KrdLab' に相当
User.find({ name: 'KrdLab' }, ['point'], function(err, docs) {
  // マッチしたドキュメントが docs[i].doc で取れ,フィールドは point のみ
  // (default 値を指定したフィールドは,fields 引数に指定していなくても「default 値」が取れることに注意)
});

最後に,query + フィールド + オプション指定.
オプションには,limit,skip,sort 等が指定できます.指定形式は Advanced Queries とほぼ同じようです.

// SELECT point FROM users WHERE name='KrdLab' LIMIT 2 に相当
User.find({ name: 'KrdLab' }, ['point'], { limit: 2 }, function(err, docs) {
  // 略 (とにかく docs[i].doc で取れる)
});

ドキュメント更新

第 2 引数には Modifier Operations を指定します.

// UPDATE users SET point=999 WHERE name='KrdLab' に相当
User.update({ name: 'KrdLab' }, { $set: {point: 999} },
            { upsert: false, multi: true }, function(err) {
  // ...
});

補足:MongoDB は Atomic Operations にある通り,単一ドキュメントに対しのみ Atomic な操作を保証しています.lock や transaction はサポートされていません (v1.2.2).理由,並びに代替方法については前述のリンク先に記載されています.

ドキュメント削除

第 1 引数にクエリを指定します.'{}' 指定だと...全部消えます.

// DELETE FROM users WHERE name='KrdLab' に相当
User.remove({ name: 'KrdLab' }, function(err) {
  // ...
});

Embedded Document

MongoDB にはドキュメントの中にドキュメントを埋め込む embedded document という概念があります (Schema Design).Mongoose でも当然サポートされており,Schema 定義は以下のようになります.

// 公式のサンプルを若干修正
var CommentSchema = new Schema({
  title:  String,
  body:   String,
  date:   Date
});

var BlogPostSchema = new Schema({
  author: ObjectId,
  title:  String,
  body:   String,
  date:   Date,
  comments: [CommentSchema],
  metadata: {
    like: Number,
  }
});
mongoose.model('BlogPost', BlogPostSchema);

'comments' のように外部に Schema を定義して埋め込む方法と,'metadata' のようにスキーマ定義ごと埋め込む方法の 2 つがあります.なお,'metadata' のような埋め込みを行った場合は,BlogPost のコンストラクタで値を設定する必要があり,その後は get しかできないようです (要確認).
既存の BlogPost に対して comments を更新する場合は以下のように行います.

// 投稿済み BlogPost の _id を 'post_id' とする
BlogPost.findOne({ _id: post_id }, function(err, post) {
  if (!err) {
    var comment = {};
    comment.title = 'タイトル';
    comment.body  = '内容';
    comment.date  = Date.now();
    post.comments.push(comment);  // 追加
    post.save(function(err) {
      // ...
    });
  } else {
    console.log('error findOne: ' + err);
  }
});

新しいコメントを追加する場合,Comment を使わずに Object を生成し,push する必要があります (Comment を使って push すると RangeError が発生する).
save を使った場合は,追加した埋め込みドキュメント (comment) に '_id' が割り当てられます.しかし,以下のように update を使った場合は,単純に Object がそのまま追加されるのみであり,'_id' は割り当てられないようです.

var comment = {};
// ... set fields ...
BlogPost.update({ _id: post_id }, { $push: { comments: comment } },
                { upsert: false, multi: false }, function(err) {
  // ...
});

save 以外使うなってことか...

また,公式にて embedded document の削除は post.comments[0].remove() のように 'remove' 呼べよと紹介されていますが,呼び出しても消えません (v1.1.2).なぜ?

終わりに

Mongoose の I/F は,既存の O/R Mapper を使ったことのある人,または MongoDB のシェルをさわったことのある人が,比較的容易に使えるような感じで設計されています.
そしてやはり node.js と MongoDB は相性が良いですね.データの受け渡しが全て JSON で済んでしまうというのは楽です.
しばらくあれこれ使ってみようと思います.

お久しぶりです

今,大災害の発生により大変なときであり,ここ数日のニュースは見てるとしゃれになりません.
ひとまず今自分にできることを済ませた上で,できる限り平常運転で行こうと思います.何卒ご了承くださいますようお願い申し上げます.
...

大学時代の研究室仲間である id:veveve さんとサンボさんの無事は確認.よかった.
...

なお 1 〜 2 月は残業 + 休日出勤により,週一更新は守れませんでした...orz