超単純 HTTP サーバ (練習)

去年から,仕事の合間に時間を見つけてはちまちまと Haskell の勉強をしていました.
でも本を読むだけでは今一なので,ここら辺で一回まとまったものを書いてみようかと.初心者なのは承知の上でコードをさらそうかと.
あと Haskell 良いですね.まだうまく表現できませんがおもしろいです.

環境

書いたコード

仕様は「HTTP リクエストの Request-Line を解釈*2し,要求されたファイルの内容を返す」だけです.セキュリティ関連も対応していません.大変に単純です.

{- thin HTTP server implementation -}
module Main where

import Prelude hiding (catch)
import Network (listenOn, accept, sClose, Socket, withSocketsDo, PortID(..), PortNumber)
import System.IO
import System.Environment (getArgs)
import Control.Exception (catch, finally, SomeException(..))
import Control.Concurrent (forkIO)
import Control.Applicative ((*>))
import Control.Monad (forM_)

import ThinHttpParser
import Text.ParserCombinators.Parsec

main :: IO ()
main = do
  (portStr:_) <- getArgs
  runServer $ fromIntegral (read portStr :: Int)

runServer :: PortNumber -> IO ()
runServer port = withSocketsDo $ do
  lSock <- listenOn $ PortNumber port
  putStrLn $ "listening on: " ++ show port
  acceptLoop lSock `finally` (sClose lSock >> putStrLn "stopped.")

acceptLoop :: Socket -> IO ()
acceptLoop lSock = do
  (cHandle, _, _) <- accept lSock
  forkIO $ clientHandler cHandle
  acceptLoop lSock

clientHandler :: Handle -> IO ()
clientHandler handle = service handle
  `catch`   (\(SomeException e) -> putStrLn $ show e)
  `finally` hClose handle

service :: Handle -> IO ()
service handle = do
  rawReq  <- hGetContents handle
  case parse parseRequest "parse http-request" rawReq of
    Right httpReq -> do
      let path   = reqUrl httpReq
      -- putStrLn $ "request: " ++ (show $ reqMethod httpReq) ++ " " ++ path -- XXX debug
      (readFile ("./" ++ path) >>= responseOk handle (contentType $ fileExt path))
        `catch` (\(SomeException _) -> responseError handle 404)
      hFlush handle
    Left err -> do
      putStrLn $ show err
      responseError handle 400

-- 成功
responseOk :: Handle -> String -> String -> IO ()
responseOk handle ctype content = forM_ [
    "HTTP/1.1 200 OK\r\n" 
      ++ "Content-Type: " ++ ctype ++ "\r\n"
      ++ "\r\n",
    content
  ] (hPutStr handle)

-- XXX 失敗
responseError :: Handle -> Int -> IO ()
responseError handle scode = hPutStr handle $ "HTTP/1.1 " ++ show scode ++ " " ++ reasonPhrase scode ++ "\r\n\r\n"

-- helper --

fileExt :: String -> String
fileExt path = case parse parseExt "parse path" path of
  Right ext -> ext
  Left  _   -> ""

parseExt :: CharParser st String
parseExt = manyTill anyChar (char '.') *> many anyChar

-- XXX Map?
contentType :: String -> String
contentType "html" = "text/html"
contentType _      = "text/plain"

-- XXX Map?
reasonPhrase :: Int -> String
reasonPhrase 400 = "Bad Request"
reasonPhrase 404 = "Not Found"
reasonPhrase _   = error "unknown status code"
module ThinHttpParser (
    HttpRequest(..),
    Method(..),
    parseRequest
  ) where

import Control.Applicative
import Control.Monad (MonadPlus(..), ap)
import Text.ParserCombinators.Parsec hiding (many, optional, (<|>))
import Numeric (readHex)
import Control.Monad (liftM4)
import System.IO (Handle)

instance Applicative (GenParser s a) where
  pure  = return
  (<*>) = ap

instance Alternative (GenParser s a) where
  empty = mzero
  (<|>) = mplus

data Method = Get | Post deriving (Eq, Ord, Show)

data HttpRequest = HttpRequest {
    reqMethod  :: Method,
    reqUrl     :: String,
    reqHeaders :: [(String, String)],
    reqBody    :: Maybe String
  } deriving (Eq, Show)

parseRequest :: CharParser () HttpRequest
parseRequest = q "GET"  Get  (pure Nothing)
           <|> q "POST" Post (Just <$> many anyChar)
  where
    q name ctor body = liftM4 HttpRequest req url parseHeaders body
      where
        req = ctor <$ string name <* char ' '
    url = optional (char '/') *>
          manyTill notEOL (try $ char ' ') <* (try $ string "HTTP/1." <* oneOf "01")
          <* crlf

parseHeaders :: CharParser st [(String, String)]
parseHeaders = pure [] -- XXX

crlf :: CharParser st ()
crlf = () <$ string "\r\n"

notEOL :: CharParser st Char
notEOL = noneOf "\r\n"

gist はこちら → https://gist.github.com/1573839

ビルド&実行

***> ghc --make -Wall -fno-warn-unused-do-bind -o thin_http_server.exe Main.hs
[2 of 2] Compiling Main             ( Main.hs, Main.o )
Linking thin_http_server.exe ...

***>thin_http_server.exe 8888
listening on: 8888


はい見えました.

疑問とかいろいろ

細かな疑問はもっといっぱいありますが...

  1. エラー処理のやり方あってるかなぁ...?
  2. 文字の扱いが雑すぎて反省
  3. スレッド間の通信はどうやるの?(STM?)
  4. I/O 多重化は?
  5. Parser Combinator はどのパッケージが流行なんだろう?

どんどんコード書いて探っていこうかな,と.

おわりに

最初から最後まで躓きっぱなしでした...orz
今回書いたコードが Haskell っぽいコードになっているかどうか,今はまだ正しく評価できません.
未来の自分がみたとき,何て思うのかなぁ...

*1:結構古い?アップグレードしておこう...

*2:Request-URI は abs_path のみを解釈

TameJS と Fiber による非同期処理の記述 (2/2)

前回の続きで今回は Fiber の話題.ずいぶんと日が空いてしまいました.

見出し

  • はじめに
  • 環境
  • node-fibers とは?
  • Fiber の利用例
  • Fiber を使った非同期処理の記述例
  • 終わりに

はじめに

今回は fiber を導入し,非同期 API のコールバック周りを少し整理してみよう,ということが主題です.
node.js (というかそのエンジンである V8) は fiber をサポートしていませんが*1node-fibers を利用することで node.js 上に fiber を導入することができます.

環境

  • ubuntu 10.04 32bit (VM 上で稼働)
  • node 0.6.6
  • node-fibers 0.6.3

node-fibers とは?

node-fibers は node.js 上で fiber (coroutine) *2 をサポートするためのライブラリです.

node@0.5.2 以上の環境であれば,以下のように簡単に導入することができます ('Fiber' というリテラルが導入される).

$ npm install fibers

なお,Fiber の直接利用は "not recommended" であり,Future の利用を推奨しているようです (README.md の EXAMPLES 参照).
Future は並行処理ではおなじみの概念ですし,なかなか扱いやすそうです.ただ,generator のようなものは Fiber の直接利用が必要となります.

今回は実験と割り切って Fiber を直接利用しています.

Fiber の利用例

以下のコードはよくある generator です.サンプルとしても紹介しやすいですし,動作も理解しやすい使い方の一つです.

// sample.js
require('fibers');

var inc = Fiber(function() {
  var i = 0;
  while (true) {
    yield(i++);  // (1)
  }
});

var i;
while ((i = inc.run()) < 5) {  // (2)
  console.log(i);
}
$ node sample.js
0
1
2
3
4

(2) の run で Fiber が実行されます.Fiber 側で yield が呼び出されると,呼び出し元 (2) に制御が戻ります.このとき yield の引数を戻り値として返します.再度 run が呼び出されると,今度は (1) 以降の処理が再開されます.
このように処理を切り替えていくわけです.

node-fibers のコードを読んでみると,native でいろいろとやっていて (V8 との絡みとか,linux の場合 get/make/swapcontext とか) なかなか興味深いです.

Fiber を使った非同期処理の記述例

例として read input.txt → write output.txt のコードを取り上げます.
ナイーブに書くと以下の通りです.

var fs = require('fs');

var error_handler = function(e) {
  console.log(e);
};

fs.readFile('input.txt', 'UTF-8', function(err, data) {
  if (err) {
    error_handler(err);
    return;
  }
  fs.writeFile('output.txt', data, 'UTF-8', function(err) {
    if (err) {
      error_handler(err);
      return;
    } else {
      console.log('成功');
      // 処理が続く...
    }
  });
});

Fiber を使って少し工夫してみると,以下のように書くことができます.

// Fiber を再開するための汎用コールバック
var resume_cb = function(fiber) {
  return function(/*...*/) { fiber.run(Array.prototype.slice.call(arguments)); };
};

var error_handler = function(e) {
  console.log(e);
};

var main = function() {
  var fiber = Fiber.current;

  fs.readFile('input.txt', 'UTF-8', resume_cb(fiber));
  var input_res = yield();
  if (input_res[0]) {
    error_handler(input_res[0]);
    return;
  }

  fs.writeFile('output.txt', input_res[1], 'UTF-8', resume_cb(fiber));
  var output_res = yield();
  if (output_res[0]) {
    error_handler(output_res[0]);
    return;
  }

  // 処理が続く...
};

Fiber(main).run();

上記は,非同期 API を呼び出した後すぐに yield してイベントループに戻り,コールバック (resume_cb が返す function) が呼ばれたタイミングで Fiber を再開しています.
こんな使い方もできるかな,といった感じですが.

おわりに

今回は fiber を使った非同期処理の記述を取り上げました.fiber は ruby でも 1.9 から導入されていますね (最近脚光を浴びているのでしょうか?).
node.js では node-fibers を利用することで Fiber が利用可能となります.これによりコールバックネストをいくぶんか軽減することができました.

次回で node.js の control-flow あたりをまとめられたら良いですね...(できるの?)

*1:ですよね?

*2:[http://en.wikipedia.org/wiki/Fiber_(computer_science):title=fiber] は軽量スレッドと呼ばれるものです.通常のスレッドはカーネルが実行をスケジューリングしますが,fiber は明示的にスケジューリングする必要があります (協調的マルチタスクと呼ばれます.明示的に CPU リソースを明け渡さないといけないアレです.).[http://en.wikipedia.org/wiki/Coroutine:title=coroutine] は,おおざっぱに言ってしまえば処理を途中で中断/再開 (suspend/resume) できるサブルーチンです.言語レベルの機能である coroutine を実装するために fiber が利用されます.

TameJS と Fiber による非同期処理の記述 (1/2)

前回からかなり時間が空いてしまいました...やっと時間取れた.今回は 2 回に分け,1 回目で TameJSを,2 回目で node-fibers を取り上げたいと思います.
ちなみに,「この方法が良い」と言うよりは「こんな方法もありますよね」というスタンスで書いています*1

見出し

  • はじめに
  • TameJS とは?
  • TameJS の利用例
  • TameJS のしていること
  • おわりに

はじめに

Node.js は非同期処理が基本であり,コールバックを多用するスタイルです.そのため,コードは簡単にコールバックのネストだらけになります*2
この "深いネスト" を解消するため,多くの場合は control flow ライブラリ が使用されます*3

TameJS では,非同期処理の記述に await/defer を持ち込み,上記の解決を試みています.

TameJS とは?

TameJS は tjs コードから JavaScript を生成する translator です.tjs コードは await/defer が追加されている点を除けば,JavaScript そのものです.
非同期処理の記述に await/defer を持ち込むことで,ノンブロッキング API を利用しているにもかかわらず,あたかも同期処理であるかのような記述を可能としています*4.これにより,ユーザの記述するコード上からコールバックによるネストを無くすことができます*5

TameJS の利用例

まずは以下のような tjs コードを記述します.

// ファイル名:read_file.tjs

var fs = require('fs');
await {
  fs.readFile('./test.txt', defer(var err, text));
}
if (err) {
  console.log('error: ' + err);
} else {
  console.log(text.toString('utf8'));
}

await ブロックで囲まれたコードが非同期に実行されます.複数の非同期 API を呼び出せば,それぞれ並行して動作します.ブロック中の API から結果が返ってくると,ブロック外のコードへと処理が進むイメージです.
defer にはコールバックが受け取るパラメータを指定します.await ブロックを抜けると,これらのパラメータが値に束縛されます.
利用可能な API については,以下の "API and Documentation" に載っています.

ちなみに,上記の tjs コードは以下の JavaScript とほぼ等価です.見た目はずいぶんと違いますね.

var fs = require('fs');
fs.readFile('./test.txt', function(err, text) {
  if (err) {
    console.log(err);
  } else {
    consloe.log(text);
  }
});

実行方法は,以下に示す 2 つのパターンがあります.


事前にコンパイルして実行
tamejs コマンドでコンパイルします."-o" オプションで出力ファイル名を指定できます.

$ tamejs read_file.tjs  // tjs -> js
$ node read_file.js     // 実行


require されたときにコンパイルして実行*6
*.tjs コードを利用したい JavaScript コード中に,以下を記載します.

// ファイル名:main.js

require('tamejs').register();
require('./read_file.tjs');

// ...

最後に,いつも通り実行します.

$ node main.js

(余談: register() 内部では,".tjs" に対してコンパイル処理を定義しています.TameJS Engine によって *.tjs から *.js に変換し,V8 コンパイルルーチンにコードを渡す,といった流れです.)

TameJS のしていること

await/defer の記述を元に,tjs コードを 継続渡し形式 (CPS)JavaScript に変換しています.CPSググるといっぱい解説が出てきます.
継続とは,端的に言ってしまうと「以降の計算処理」をまとめたものです.CPS では,渡された継続を利用してプログラムの制御を行います.

例えば,以下のような 2 つの処理があったとします.

A;
B;

これを CPS 変換*7すると,以下のようになります.

function _A(next) {
  A;
  next();
}
function _B(next) {
  B;
  next();
}
function end() {}

// 実行
_A(function() { _B(function() { end(); }); });

上記コードにおける next が継続に相当します*8.もしも A の処理が非同期であった場合は,_A に渡された next をコールバック内部で呼び出せばよいわけです.


実際に,前出の read_file.tjs をコンパイルすると,以下のようなコードが生成されます.

var tame = require('tamejs').runtime;
var __tame_fn_6 = function (__tame_k) {
    var fs = require ( 'fs' ) ;
    var __tame_fn_0 = function (__tame_k) {
        var err, text;
        var __tame_fn_1 = function (__tame_k) {
            var __tame_defers = new tame.Defers (__tame_k);
            var __tame_fn_2 = function (__tame_k) {
                fs . readFile ( './test.txt' ,
                __tame_defers.defer (
                    function () {
                        err = arguments[0];
                        text = arguments[1];
                    }
                )
                ) ;
                tame.callChain([__tame_k]);
            };
            __tame_fn_2(tame.end);
            __tame_defers._fulfill();
        };
        var __tame_fn_3 = function (__tame_k) {
            var __tame_fn_4 = function (__tame_k) {
                console . log ( 'error: ' + err ) ;
                tame.callChain([__tame_k]);
            };
            var __tame_fn_5 = function (__tame_k) {
                console . log ( text . toString ( 'utf8' ) ) ;
                tame.callChain([__tame_k]);
            };
            if (err) {
                tame.callChain([__tame_fn_4, __tame_k]);
            } else {
                tame.callChain([__tame_fn_5, __tame_k]);
            }
        };
        tame.callChain([__tame_fn_1, __tame_fn_3, __tame_k]);
    };
    tame.callChain([__tame_fn_0, __tame_k]);
};
__tame_fn_6 (tame.end);

"次の処理" を __tame_fn_* で包んで継続とし,callChain に渡しています.callChain は以下のように定義されており,次々と継続を処理していきます.

function callChain (l) {
    if (l.length) {
        var first = l.shift ();
        first (function () { callChain (l); });
    }
};


await ブロック内部において,defer 指定の変数が値に束縛されるまで待つのは,Defers#defer と Defers#_fulfill が同期部分の継続呼び出しをコントロールしているためです (上記コードにおける __tame_fn_3).
Defers 内部ではコールバック呼び出しをカウンタ管理しており,defer 呼び出しで内部カウンタを increment,非同期 API のコールバックが呼び出されたタイミングで _fulfill を呼び出して decrement しています*9.このカウンタが 0 になるタイミング → 最後に呼び出されたコールバックのタイミング,で次の継続 (同期コードの部分) が呼び出されます.



図にするとこんな感じです.黒四角の番号は __tame_fn_*, R は readFile, E と E' は tame.end, C は readFile のコールバック,青四角は callChain による継続を表しています.function を直接呼び出す場合は黒矢印,継続として呼び出す場合は青矢印で表しています.
6 から順に呼び出しが進んでいき,2 内部の tame.callChain が終わると一旦イベントループに戻ります.1 で _fulfill が呼び出されていますが,先に述べた Defers の働きにより,3 以降の継続は呼び出されません.
その後,readFile の結果としてコールバック (図中 C) が呼び出されると,Defers が内部に保持していた 3 以降の継続が呼び出されます.











おわりに

TameJS により,非同期処理を見通しの良いコードで記述することができます.外部リソースの参照・更新を頻繁に行いながら処理を進めるコードで利用すると良さそうです.
あとは IDE でこの記述がサポートされるともっと便利になりますね.
次回は Fiber を利用します.Node.js 上で coroutine が使えます.
(2/2 へ続く...)

*1:どちらもコアは,古より伝わりし手法です

*2:いやマジで

*3:有名どころは async,Step,Slide とかでしょうか,正直詳しくは分かりません...

*4:当然ですが,ノンブロッキング API はキチンと "ノンブロッキング" として動作します

*5:後で述べますが,プリコンパイル後の JavaScript はコールバックだらけです

*6:手元の環境は,最近ランタイム系をごちゃごちゃいじったせいか,うまく動きません...なぜだ

*7:JavaScript

*8:このような変換を行わず,継続を明示的に扱うオブジェクトとして取得するのが rubyscheme 等の call/cc であり,次回に紹介する Fiber の素でもあります

*9:_fulfill は defer の返すクロージャ内部で呼び出されます

node.js アプリの負荷分散構成を考える

node.js の負荷分散について考えてみました (フェイルオーバは考慮できていません).個人レベルなので 1 台のハード上に仮想マシンを 5〜6 個立ち上げて実験しています.

見出し

  • はじめに
  • cluster で負荷分散
  • 寄り道:cluster の仕組み
  • 例えばこんな全体構成
  • おわりに

はじめに

node.js は設計上,大量のコネクションを省リソース (プロセス・スレッドをバカスカ生成しない) でさばきます.おそらく想定されているのは I/O バウンドな処理であり,この場合は基本的に非同期で処理されるため,I/O 待ちで他のリクエスト処理がブロックすることはまずありません.
node.js は「サービスをつなぎ・組み合わせるためのハブ」的な位置づけが一番しっくりくるように感じます *1
ただ,

  • 大量のリクエストをさばかなければならない
  • ロジックが重くてコールバック処理に負荷がかかってしまう

等といった場合では,CPU 数に応じて負荷分散させたくなります.
また,Web アプリケーションの静的コンテンツ (js, css, image 等) の配信まで node.js ノードに任せるのは明らかに非効率ですよね.

cluster で負荷分散

この記事 (http://blog.asial.co.jp/807) を読んで cluster を試してみたところ,大変使いやすい.express のコミッタが開発しているためか,express との相性も良いと感じます.
おなじみ express コマンドでスケルトンを生成し,以下のように書き換えます.

// app.js
var express = require('express');
var app     = express.createServer();

app.configure(function(){
  app.set('views', __dirname + '/views');
  app.set('view engine', 'jade');
  app.use(express.logger());
  app.use(express.bodyParser());
  app.use(express.methodOverride());
  app.use(app.router);
  app.use(express.static(__dirname + '/public'));
});

app.get('/', function(req, res){
  res.render('index', {
    title: 'Express',
    worker_id: process.env.CLUSTER_WORKER  // worker の ID が取れる
  });
});

module.exports = app;
<!-- views/index.jade -->
h1= title
p Welcome to #{title}
p worker #{worker_id}
// cluster.js
var cluster = require('cluster');

cluster('./app')
  .use(cluster.debug())    // debug ログが出るので,動作が分かりやすくなる
  .use(cluster.logger())
  .use(cluster.pidfiles())
  .on('close', function() { console.log('cluster end'); })
  .listen(8001);

$ node cluster.js して F5 アタック?をかけると,worker の切り替わる様子が分かります.

Readme の Example で 'recommended usage' とあるように,cluster 関数にファイル名 './app' を指定すると,master プロセスでは require('./app') が発生せず,worker プロセスの場合にのみ require('./app') されます (docs/api).
また,use(cluster.repl(port)) することで簡単な管理コンソールを起動することができます (docs/repl).
分散してもセッションを維持したい場合は,express で connect-redis を利用するのが一番手軽です.

寄り道:cluster の仕組み

cluster は以下のような構成で複数の node.js プロセスを管理しています.


Worker プロセス数は外部から指定することも可能ですが,デフォルトでは CPU 数だけ起動します.また,プロセス間のやりとり (赤点線矢印) は JSON で独自の RPC をしています (JSON-RPC とは異なる仕様).

プロセスとインスタンスの構成は以下のようになっています.コードを読む際はこの構成を念頭に置くと,理解しやすくなります.

例えばこんな全体構成

例えばこんな全体構成.動作確認用アプリとして チャットアプリ をイメージしており,手元の環境では動作しています *2



クライアントからは HTTP リクエストと socket.io (WebSocket) による通信が発生します.

サーバ側では,リバースプロキシ (の設定をした nginx) を配置し,静的コンテンツは自身で配信,動的コンテンツについてはバックエンドの cluster にリクエストを投げます.設定としてはこんな感じです.

# /etc/nginx/conf.d/proxy.conf として以下の内容を作成します (IP は適当です).
# ↓ロードバランサ設定の場合
# upstream clusters {
#   server 192.168.11.20:8001;
#   server 192.168.11.20:8002;
# }
server {
  listen 8080;
  server_name 192.168.11.10;
  location / {
    proxy_pass http://192.168.11.20:8001;
#    proxy_pass http://clusters;
  }
  location /images/ {
    alias /var/www/public/images/;
  }
  location /stylesheets/ {
    alias /var/www/public/stylesheets/;
  }
  location /javascripts/ {
    alias /var/www/public/javascripts/;
  }
}

cluster マシンを増やす場合は nginx でロードバランシングさせるか,LVS + keepalived (←こっちはやったことない) で分散させます.

cluster 内の node.js では,express でメイン画面を,リアルタイム通信には socket.io を利用しています.もう鉄板ですね.セッション維持には redis を利用しています.

relay server は,各 node.js プロセスで発生した Socket.io のイベントを別の node.js プロセスに伝達するための中継サーバです.今回は実験ということもあって,適当にこしらえました.

メイン DB には mongoDB を利用しています (後に作成予定のアプリにとって都合が良いため).Replica Set 便利ですね.今回は sharding しないので 1 セットだけです.express からの接続には mongoose を利用しています.


参考:

おわりに

今回は node.js の負荷分散について試してみました.cluster の簡単な解説と,全体構成を示しています.
一応,proxy から DB まで一通りつないで実験していますが,Socket.io と cluster の組み合わせに問題がないのかを調査し切れていません.また,はやいところ計測を済ませてしまいたいです.

あとはテストについて調査をしたら,そろそろアプリを作り始めても良いんじゃないかな,KrdLab よ.

*1:多くの Web アプリがこれに該当すると思いますが

*2:他の人たちはどんな感じでやっているんでしょうか.是非知りたいのですが...

Mongoose のモデルに独自メソッドを追加する

久しぶりに Mongoose ネタです.Schema の API である static メソッドについて少しだけ.
今回は短め.

見出し

  • 何ができる?
  • 登録した関数内での処理
  • 実際の定義
  • 参考にした情報

何ができる?

例えば User というモデルを定義 (UserSchema) したとします.User に対しては,find/findOne といったメソッドを呼び出すことができます.
しかし,例えば DataMapper のような「first_or_create (あれば最初の 1 つを返す,無ければ作って返す)」が欲しくなった場合はどうすればよいでしょう?
上記のような場合に Schema.static メソッドを使用します.Schema.static メソッドは,モデルに対して独自の処理を追加することができます.

var UserSchema = new Schema({
  name: String,
  created_at: { type: Date, default: Date.now }
});

UserSchema.static('first_or_create', function(query, callback) {
  // ここに処理を定義
});

// Schema を登録
mongoose.model('User', UserSchema);

// DB に接続してモデルを取得
var db = mongoose.createConnection('...');
var User = db.model('User');

// first_or_create が呼び出せる
User.first_or_create({ name: 'krdlab' }, function(err, user) {
  // ...
});

登録した関数内での処理

User.first_or_create と呼び出すことから,当然 this はモデル (User) を指しています.このため,関数内の処理で 'this.find' と呼び出せば,接続先 DB が解決された状態で操作を呼び出せます.
Schema に登録したメソッドがモデルに定義されるのは,Schema からモデルを生成するとき (mongoose.model に Schema を渡したとき) です.以下のコードから Schema.static で登録した関数をモデルのプロパティに登録していることがわかります.

// lib/mongoose/index.js:149
Mongoose.prototype.model = function (name, schema, collection, skipInit) {
  // ... 省略

  if (!this.models[collection][name]) {
    model = Model.compile(name
                        , this.modelSchemas[name]
                        , collection
                        , conn
                        , this);  // ← ここ★

    if (!skipInit) model.init();

    this.models[collection][name] = model;
  }

  return this.models[collection][name];
};
// lib/mongoose/model.js:648
Model.compile = function (name, schema, collectionName, connection, base) {
  // ... 省略

  // apply methods
  for (var i in schema.methods)
    model.prototype[i] = schema.methods[i];

  // apply statics
  for (var i in schema.statics)
    model[i] = schema.statics[i];    // ← ここ★

  // ... 省略

  return model;
};

これで,mongoose.model('User') から返ってくる User に first_or_created が定義されます.

実際の定義

上記を踏まえると,以下のようになります.

var UserSchema = new Schema({
  name:       String,
  created_at: { type: Date, default: Date.now },
});

UserSchema.static('first_or_create', function(query, callback) {
  var U = this;
  U.findOne({ name: query.name }, function(err, user) {
    if (err) {
      callback(err);
    } else if (user) {
      callback(null, user);
    } else {
      var newU = new U({ name: query.name });
      newU.save(function(err) {
        if (err) {
          callback(err);
        } else {
          callback(null, newU);
        }
      });
    }
  });
});

mongoose.model('User', UserSchema);

実際には,登録する関数を function(query, defaults, callback) として,新規作成時のプロパティ値を指定できるとなお良いでしょう.

ちょっとだけ Inside node.js

またしてもスケジュールきつめのプロジェクトに放り込まれた KrdLab です.Java ジャバしてます.IDE がないとコード書くのがしんどすぎて,もううんざりです.

はじめに

何らかのプラットフォームを利用する場合,その仕組みについて知っておくことは,より適切な設計を行うという目的に対して有用な情報となるでしょう.
というわけで,今回は少しだけ node.js の内部に潜ってみようと思います.対象は git repo から clone した 0.4.x です (2011/03/29 に clone したっぽい).
なお,今回はイベント駆動に焦点を当て,V8 については割愛します.

見出し

  • 全体像
  • メイン処理
  • イベントループで処理される各種 watcher
  • tick 系のイベント処理
  • gc 系のイベント処理
  • eio 系のイベント処理
  • IOWatcher によるソケットの READ/WRITE
  • Timer による setInterval/setTimeout

全体像

node.js は,イベント駆動をベースとした JavaScript エンジンです.処理は全て非同期に行われます.サーバサイド JavaScript として人気を集めており,ネットワーク系のプログラムを主なターゲットとしています.

モジュール構成としては以下のような感じです (c-ares,http_parser は省略).

libev によってイベントループを,libeio によって非同期 I/O 処理を実現しています.各ライブラリの連携は以下のようになります.


node.js プロセスには,イベントループを実行するメインスレッドと,I/O の実処理を行うワーカスレッドが存在します.ワーカスレッドは libeio によりスレッドプールとして管理されています.
このように,イベントループと各 I/O 実処理は並行処理されますが,コールバックは全てメインスレッド上のイベントループで処理されます.つまり,ユーザの書いた JavaScript はメインスレッド上でしか実行されず,これが「シングルスレッド」と表現される由縁にもなっています.
当然コールバック処理でブロックしてしまうと,イベントループもブロックしてしまいます.コールバック実装は「ブロックしないですぐ終わる」が基本方針となります.
なお,node.js 上のイベントは,libev の event watcher と呼ばれる ev_watcher 構造体で管理されます (libev に ev_idle,ev_async といった名前で定義されている).以降に出てくる 'watcher' はこの 'event watcher' のことを指し,'イベント' もこの watcher に対して発生する (そして,events.EventEmitter が生成するイベントの元になる) ものを指します.

メイン処理

エントリポイントである main 関数からは,node.cc に定義されている node::Start 関数が呼び出されます.

// node.cc:2420
int Start(int argc, char *argv[]) {
  v8::V8::Initialize();
  v8::HandleScope handle_scope;

  argv = Init(argc, argv);

  // Create the one and only Context.
  Persistent<v8::Context> context = v8::Context::New();
  v8::Context::Scope context_scope(context);

  Handle<Object> process = SetupProcessObject(argc, argv);

  // Create all the objects, load modules, do everything.
  // so your next reading stop should be node::Load()!
  Load(process);

  // TODO Probably don't need to start this each time.
  // Avoids failing on test/simple/test-eio-race3.js though
  ev_idle_start(EV_DEFAULT_UC_ &eio_poller);

  // All our arguments are loaded. We've evaluated all of the scripts. We
  // might even have created TCP servers. Now we enter the main eventloop. If
  // there are no watchers on the loop (except for the ones that were
  // ev_unref'd) then this function exits. As long as there are active
  // watchers, it blocks.
  ev_loop(EV_DEFAULT_UC_ 0);

  EmitExit(process);

#ifndef NDEBUG
  // Clean up.
  context.Dispose();
  V8::Dispose();
#endif  // NDEBUG
  return 0;
}

Init 関数では,引数の解析や signal handler の登録,デフォルトイベントループの初期化が行われた後,event watcher を初期化しています.*1

SetupProcessObject 関数では,おなじみの global object である process を組み立てています.

Load 関数では,src/node.js に定義された function を実行して各モジュールのロード,グローバルオブジェクトの生成を行い,ユーザの JavaScript の実行を開始します*2.ここでコールバックが登録され,後に何らかのイベントが発生すれば,このあと出てくるイベントループ (ev_loop) で処理されます.

次の ev_idle_start は,watcher によるイベント監視を開始させます.*3

ev_loop はイベントループの本体です.何らかのイベントが発生して backend_poll (epoll/kqueue) に引っかかれば,登録したコールバックが呼び出されます.単純な (イベント発生を気にしない) スクリプトであればループすることなく素通りします.

EmitExit 関数では process に対して 'exit' を emit します.JavaScript 側で process.on('exit', function() {...}) としていた場合は,コールバックとして function() {...} が呼び出されます.

イベントループで処理される各種 watcher

イベントループ内で処理されるのは,ev_xxx_start で active となった watcher や,I/O で read/write 可となった watcher,タイマ watcher 等に対応するコールバックです.Init 関数で用意される主な watcher には,以下の 3 種類があります.

  1. tick 系
    • prepare_tick_watcher/check_tick_watcher/tick_spinner は,process.nextTick で積まれたキューの消化を管理するために使用される.
  2. gc
    • gc_check/gc_idle/gc_timer は,GC のタイミング制御に使用される.
  3. eio 系
    • eio_poller/eio_want_poll_notifier/eio_done_poll_notifier は,fs モジュール API による非同期 I/O 処理のコールバック制御に使用される.

上記以外に,net モジュールは IOWatcher (lib/node_watcher) が管理する ev_io watcher を,setTimeout/setInterval は Timer (lib/node_timer) が管理する ev_timer watcher を利用しています.これらは API 利用時に適宜用意されます.

tick 系のイベント処理

tick 系は以下のイベントで処理されます.

  1. EV_PREPARE
    • イベントループの先頭で発生
  2. EV_CHECK
    • イベントループ最後で各コールバックを呼び出す直前で発生

tick 系 watcher によりコールバックされる関数は node::Tick 関数です.これにより,process.nextTick で登録したコールバックは定期的に処理されます.

// node.cc:221
static void Tick(void) {
  // Avoid entering a V8 scope.
  if (!need_tick_cb) return;

  need_tick_cb = false;
  ev_idle_stop(EV_DEFAULT_UC_ &tick_spinner);

  HandleScope scope;

  if (tick_callback_sym.IsEmpty()) {
    // Lazily set the symbol
    tick_callback_sym =
      Persistent<String>::New(String::NewSymbol("_tickCallback"));
  }

  Local<Value> cb_v = process->Get(tick_callback_sym);
  if (!cb_v->IsFunction()) return;
  Local<Function> cb = Local<Function>::Cast(cb_v);

  TryCatch try_catch;

  cb->Call(process, 0, NULL);

  if (try_catch.HasCaught()) {
    FatalException(try_catch);
  }
}

内容は,node.js に定義された 'process._tickCallback' を呼び出す,というものです.need_tick_cb のチェックや,tick_spinner に対する ev_idle_stop は,process.nextTick によって積まれたキューを消化するまで,イベントループへの参照保持を保証するための処置です.
process._tickCallback は以下のコードです.

  // node.js:161
  startup.processNextTick = function() {
    var nextTickQueue = [];

    process._tickCallback = function() {
      var l = nextTickQueue.length;
      if (l === 0) return;

      try {
        for (var i = 0; i < l; i++) {
          nextTickQueue[i]();
        }
      }
      catch (e) {
        nextTickQueue.splice(0, i + 1);
        if (i + 1 < l) {
          process._needTickCallback();
        }
        throw e; // process.nextTick error, or 'error' event on first tick
      }

      nextTickQueue.splice(0, l);
    };

    process.nextTick = function(callback) {
      nextTickQueue.push(callback);
      process._needTickCallback();
    };
  };

_tickCallback では,とにかくキューに積まれたコールバックを呼び出します.例外発生時は,成功したコールバックをキューから除去し,残りを再度処理するように _needTickCallback を呼び出します.
JavaScript 側で _needTickCallback が呼び出されると,前出の need_tick_cb が true となり,また tick_spinner に対して ev_idle_start が呼び出され,イベントループの参照カウントが増加します (その後の Tick 関数呼び出しで stop が呼び出され,増加した分のカウントは元に戻る).

gc 系のイベント処理

gc_check は EV_CHECK (前出),gc_idle は EV_IDLE (いわゆるプロセスの IDLE 状態),gc_timer は EV_TIMER (timeout) に対応する watcher です.gc_timer は,Init 関数で 5 秒間隔に設定されています.
gc_check のコールバックは node::Check 関数です.

// node.cc:175
static void Check(EV_P_ ev_check *watcher, int revents) {
  ...

  tick_times[tick_time_head] = ev_now(EV_DEFAULT_UC);
  tick_time_head = (tick_time_head + 1) % RPM_SAMPLES;

  StartGCTimer();

  for (int i = 0; i < (int)(GC_WAIT_TIME/FAST_TICK); i++) {
    double d = TICK_TIME(i+1) - TICK_TIME(i+2);
    //printf("d = %f\n", d);
    // If in the last 5 ticks the difference between
    // ticks was less than 0.7 seconds, then continue.
    if (d < FAST_TICK) {
      //printf("---\n");
      return;
    }
  }

  // Otherwise start the gc!

  //fprintf(stderr, "start idle 2\n");
  ev_idle_start(EV_A_ &gc_idle);
}

ev_now でイベント発生時のタイムスタンプを取得し,後半の for 文で GC を開始するかどうかの判断に使用します.StartGCTimer 関数は,gc_timer を active にすることで GC タイマを発動させます.for 文のところでは,直近のイベント発生間隔を調べ,全てが基準値 (FAST_TICK) よりも長ければ gc_idle を active にして,EV_IDLE 発生時にコールバックで GC を実行させます.

gc_timer のコールバックは CheckStatus 関数です.

static void CheckStatus(EV_P_ ev_timer *watcher, int revents) {
  ...

  // check memory
  if (!ev_is_active(&gc_idle)) {
    HeapStatistics stats;
    V8::GetHeapStatistics(&stats);
    if (stats.total_heap_size() > 1024 * 1024 * 128) {
      // larger than 128 megs, just start the idle watcher
      ev_idle_start(EV_A_ &gc_idle);
      return;
    }
  }

  double d = ev_now(EV_DEFAULT_UC) - TICK_TIME(3);

  //printfb("timer d = %f\n", d);

  if (d  >= GC_WAIT_TIME - 1.) {
    //fprintf(stderr, "start idle\n");
    ev_idle_start(EV_A_ &gc_idle);
  }
}

gc_idle がアクティブでない (GC 起動が確約されていない) 場合,メモリ使用量をチェックして合計が 128MB を超えていると gc_idle を active にします.また,3 つ前のイベント発生時刻から 4 秒以上経過していた場合にも gc_idle を active にします.

gc_idle のコールバックは node::Idle 関数です.

static void Idle(EV_P_ ev_idle *watcher, int revents) {
  ...

  if (V8::IdleNotification()) {
    ev_idle_stop(EV_A_ watcher);
    StopGCTimer();
  }
}

内部で V8::IdleNotification 関数を呼び出し,GC を実行します.V8::IdleNotification 関数は呼び出す必要がなくなると true を返すため,watcher (gc_idle) を stop,GC タイマも stop します.

eio 系のイベント処理

eio_poller は EV_IDLE (前出),eio_want_poll_notifier と eio_done_poll_notifier は EV_ASYNC (別スレッドから非同期に通知を受け取る) に対応する watcher です.それぞれ順に,node::DoPoll 関数,node::WantPollNotifier 関数,node::DonePollNotifier 関数がコールバックとして登録されています.
非同期 I/O 処理後のコールバック (JavaScript 側で I/O 処理後に呼び出されるよう登録した function) を呼び出すには eio_poll 関数を呼び出す必要があります.eio_poll 関数は主に node::DoPoll 関数から呼び出されます.node::DoPoll 関数は EV_IDLE のタイミングで処理されるわけですが,この制御には node::EIOWantPoll,node::EIODonePoll,node::WantPollNotifier,node::DonePollNotifier 関数が連携しています.



eio に積まれた I/O 要求が処理される (request queue が 0 になる) と,node::EIOWantPoll 関数が want_poll_cb としてワーカスレッド (メインとは異なるスレッド) から呼び出されます.内部では eio_want_poll_notifier にイベント発行しており,これによってメインのイベントループで node::WantPollNotifier 関数が呼び出されます.node::WantPollNotifier 関数では,eio_poll 関数でコールバックを 1 つ処理し,まだ残っていれば eio_poller に対して EV_IDLE を start させます.これにより,IDLE 状態になる度に eio_poll 関数が呼び出され,非同期 I/O 処理のコールバックが呼び出されていきます.
eio_poller とひも付いているのは node::DoPoll 関数です.内部では eio_poll 関数を呼び出し,コールバックがなくなれば eio_poller による EV_IDLE 監視を停止します.また別ルートとして node::EIODonePoll 関数があります.eio_poll 関数内部で response queue を消化しきると,node::EIODonePoll 関数が done_poll_cb として呼び出されます.内部では eio_done_poll_notifier にイベント発行しており,これによってメインのイベントループで node::DonePollNotifier 関数が呼び出されます.あとは node::DonePollNotifier 関数の中で 1 回だけ eio_poll 関数を呼び出し,コールバックを処理しきったことが確認できれば eio_poller の EV_IDLE 監視を停止します.
以上のように非同期 I/O 処理では,メインスレッドと eio がイベント通知によって連携をとり,eio_poll 関数の呼び出しを制御しています.

なお,実際の I/O 要求発行のあたりは,src/node_file.cc の 740 行目にある Read 関数を見ると良いかもしれません.ASYNC_CALL マクロによって eio API が呼び出されています.

IOWatcher によるソケットの READ/WRITE

ソケットの read/write は IOWatcher として機能提供されています.IOWatcher は EV_READ,EV_WRITE を監視します.net モジュールは IOWatcher を利用して net.Socket や net.Server を実現しています.*4

IOWatcher が READ/WRITE のどちらを監視するのかは,IOWatcher.prototype.set により決定します.

// lib/net.js:565
socket._readWatcher.set(socket.fd, true, false);
...
// lib/net.js:572
socket._writeWatcher.set(socket.fd, false, true);
// src/net_io_watcher.cc:132
Handle<Value> IOWatcher::Set(const Arguments& args) {
  ...
  IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
  ...
  int fd = args[0]->Int32Value();
  ...
  int events = 0;

  if (args[1]->IsTrue()) events |= EV_READ;
  ...
  if (args[2]->IsTrue()) events |= EV_WRITE;

  assert(!io->watcher_.active);
  ev_io_set(&io->watcher_, fd, events);

  return Undefined();
}

(file discriptor, readable, writable) を引数とし,watcher にファイルディスクリプタと検出対象イベントを登録しています.

Timer による setInterval/setTimeout

setInterval は内部で Timer (src/node_timer.cc) を利用しています.

exports.setInterval = function(callback, repeat) {
  var timer = new Timer();

  if (arguments.length > 2) {
    var args = Array.prototype.slice.call(arguments, 2);
    timer.callback = function() {
      callback.apply(timer, args);
    };
  } else {
    timer.callback = callback;
  }

  timer.start(repeat, repeat ? repeat : 1);
  return timer;
};

Timer は ev_timer watcher を保持しており,指定された間隔 (上のコードで repeat) でコールバック (Timer::OnTimeout 関数) を呼び出します*5.Timer::OnTimeout 関数では,timer.callback に設定されたコールバック function を呼び出しています.


次に setTimeout ですが,これは setInterval とは仕組みが異なります.Timer を利用してはいますが,watcher の利用数を減らすために,同じ delay (下のコードで after) の callback をまとめて 1 つの Timer で管理しています.

exports.setTimeout = function(callback, after) {
  var timer;

  if (after <= 0) {
    // Use the slow case for after == 0
    timer = new Timer();
    timer.callback = callback;
  } else {
    timer = { _idleTimeout: after, _onTimeout: callback };
    timer._idlePrev = timer;
    timer._idleNext = timer;
  }

  ...

  if (arguments.length > 2) {
    var args = Array.prototype.slice.call(arguments, 2);
    var c = function() {
      callback.apply(timer, args);
    };

    if (timer instanceof Timer) {
      timer.callback = c;
    } else {
      timer._onTimeout = c;
    }
  }

  if (timer instanceof Timer) {
    timer.start(0, 0);
  } else {
    exports.active(timer);
  }

  return timer;
};

setTimeout の delay が 0 以下であれば Timer が使用されますが,それ以外の場合 (通常の利用はこちらの場合に該当) は 4 つのプロパティを持った timer オブジェクトを作成し,active に渡しています.

exports.active = function(item) {
  var msecs = item._idleTimeout;
  if (msecs >= 0) {
    var list = lists[msecs];
    if (item._idleNext == item) {
      insert(item, msecs);
    } else {
      item._idleStart = new Date();
      L.append(list, item);
    }
  }
};

setTimeout から流れてきた場合は必ず item._idleNext == item が成立するため,insert へと流れていきます.
insert は実際に list へ item を追加するところです.

function insert(item, msecs) {
  item._idleStart = new Date();
  item._idleTimeout = msecs;

  if (msecs < 0) return;

  var list;

  if (lists[msecs]) {
    list = lists[msecs];
  } else {
    list = new Timer();
    L.init(list);

    lists[msecs] = list;

    list.callback = function() {
      ...
      var now = new Date();

      var first;
      while (first = L.peek(list)) {
        var diff = now - first._idleStart;
        if (diff + 1 < msecs) {
          list.again(msecs - diff);
          debug(msecs + ' list wait because diff is ' + diff);
          return;
        } else {
          L.remove(first);
          assert(first !== L.peek(list));
          if (first._onTimeout) first._onTimeout();
        }
      }

      debug(msecs + ' list empty');
      assert(L.isEmpty(list));
      list.stop();
    };
  }

  if (L.isEmpty(list)) {
    // if empty (re)start the timer
    list.again(msecs);
  }

  L.append(list, item);
  assert(!L.isEmpty(list)); // list is not empty
}

指定された msecs に対して最初の要素であれば Timer を生成し,list の各要素に対して _onTimeout を呼び出す callback を設定します.最後に again (Timer::Again 関数に対応) を呼び出し,msecs 間隔で動作するよう再設定かつ Timer をスタートさせます.2 個目から後は単純に list へ要素として追加されていきます.

おわりに

今回は node.js の内部を追いかけてみました (Timer のあたりは追いかけ切れていませんが).libev によるイベント監視を駆使し,うまく連携している様子が見て取れます.また,内部を追いかけたことで node.js に対する理解が深まったため,もう少しうまくコード書けるようになるかなー,という淡い期待もありますw

そういえば,v0.6 から Windows サポート強化の一環として,IOCP 対応が入るそうです (http://nodejs.org/codeconf.pdf).そのうちそこら辺も調べてみたいと思います.

*1:ev_xxx_start の後 ev_unref を呼び出しているのは,イベントが積まれていなければすぐに ev_loop を抜けさせるための処置です.ここら辺のことは ev.pod の ev_ref/ev_unref に記載されています.なお,eio_poller だけは Start 関数内で ev_idle_start したままになっていますが,対応する node::DoPoll でコールバックを処理しきると ev_idle_stop が呼び出されるため,やはり ev_loop は終了することになります.

*2:ユーザ JavaScript 実行の流れとしては,node.js の startup → Module.runMain → Module._load → Module.prototype.load → Module.prototype._compile → V8 へ,といった感じです.

*3:しかし,コメントを読む限りではテストを通すためのものっぽいですね.

*4:ソケットプログラミングでおなじみの socket, bind, listen といった関数は src/node_net で定義されています.

*5:当然ですが best-effort です

Mongoose のコネクション周り

node.js + MongoDB はイイ感じです.というわけで Mongoose ネタの続き.

見出し

  • connect と disconnect
  • 明示的にコネクションを扱う
  • Connection とイベント

connect と disconnect

require で取得する Mongoose オブジェクトには connect と disconnect があります.

var mongoose = require('mongoose');
var Schema   = mongoose.Schema;

// スキーマ登録
var HogeSchema = new Schema({
  label: String,
  value: Number
});
mongoose.model('Hoge', HogeSchema);

// デフォルトコネクションを開く
mongoose.connect('mongodb://localhost/test');

... (なんか使う)

// signal 飛んできたら閉じる
process.on('SIGINT', function() { mongoose.disconnect(); });

mongoose の connect を呼び出すと,デフォルトコネクションが使用されます.「デフォルトコネクション」とは,mongoose がプロパティとして持つ connections の [0] 要素を指しており,connection プロパティから取得可能です.
この [0] 要素は require 時の 'new Mongoose()' で生成され,'mongoose.connect(...)' でコネクションが開かれます.また,'mongoose.model(...)' した際には内部で自動的に使用されます.

対する disconnect は,connections プロパティが保持する全てのコネクションを閉じます.

明示的にコネクションを扱う

Mongoose オブジェクトの connect/model を使うと,常に「デフォルトコネクション」が使用されます.DB ごとにコネクションを保持したい場合は,コネクションを直接扱う必要があります.以下のようにします.

var mongoose = require('mongoose');
var Schema   = mongoose.Schema;

// スキーマ登録
var HogeSchema = new Schema({
  label: String,
  value: Number
});
mongoose.model('Hoge', HogeSchema);

// コネクションと,それにひも付いたモデルを取得
var db = mongoose.createConnection('mongodb://localhost/test');
var Hoge = db.model('Hoge');

// 使う
Hoge.findOne({}, function(err, doc) {
  // ...
});

// 終わり
process.on('SIGINT', function() { mongoose.disconnect(); });

createConnection を使用すると Connection オブジェクトを取得できます.引数 (uri もしくは (host, database, port, callback) の組) を指定した場合,open 済みの Connection オブジェクトが返ってきます.

モデルは以下のような手順で定義,取得します.

  1. トップレベルの Mongoose オブジェクトにスキーマを登録 (mongoose.model)
  2. Connection を取得 (mongoose.createConnection)
  3. Connection 経由でモデルを取得 (db.model)

Connection の model を使用することで,Connection オブジェクトとモデルをひも付けます.

サンプルコードでは disconnect を呼び出して一括 close していますが,Connection にも close があるため,個別にコネクションを閉じることも可能です.

Connection とイベント

Connection は EventEmitter の派生になっており,イベントをハンドリングできます.

var db = mongoose.createConnection();  // まだ開かない

// ハンドラ登録
db.on('open' , function() { console.log('open'); });
db.on('close', function() { console.log('close'); });

// コネクションを開く
db.open('mongodb://localhost/test');

var Hoge = db.model('Hoge');
Hoge.findOne({}, function(err, doc) {
  // ... (いろいろ)
});

process.on('SIGINT', function() { mongoose.disconnect(); });

// [実行結果]
// $ node test.js
// open
// ... (findOne の結果)
// close

これ以外にもあるのかな?

終わりに

前回の内容はコネクション周りを明記していなかったので,今回独立してコネクション周りを取り上げてみました.
実はまだ '...Set' という API があるのですが,それはまた次の機会に取り上げてみたいと思います.