cassy: Cassandra client for Haskell

最近 Cassandra を利用する機会がありまして,今回はその絡みで調べてみたことです. そういえば最近になって 1.2 がリリースされましたね.atomic_batch とか気になります.

はじめに

cassy は Haskell で書かれた Cassandra クライアントです.ClientOptions でも紹介されています.

以降の内容は version 0.4.0.1 に基づいています.

基本

使い方は github の README に書いてありま...と思ったら,古いのか記述どおりの内容だと動きません (2012/12/30 時点). 基本的な使い方は,以下のように Cassandra ノードへのコネクションプールを作成し,runCas で操作を実行します.

import Database.Cassandra.Basic
import Control.Monad.Trans.Class (lift)
import qualified Data.Text.IO as T

main :: IO ()
main = do
    pool <- createCassandraPool servers 1 3 300 ksTest
    column <- runCas pool $ do
        insert cfTest "rowkey_aaa" QUORUM
            [ packCol ( TUtf8 "sampleValue"
                      , encodeCas $ TUtf8 "ほげほげ"
                      )
            ]
        getCol cfTest "rowkey_aaa" (TUtf8 "samplevalue") QUORUM

    case column of
        Nothing -> putStrLn "not found"
        Just c  -> do
            let val = decodeCas $ colVal c :: TUtf8
            T.putStrLn $ getUtf8 val

servers :: [Server]
servers = [("192.168.xxx.xxx", 9160)]

ksTest :: KeySpace
ksTest = "test_ks"

cfTest :: ColumnFamily
cfTest = "test_cf"

createCassandraPool の第 2,3,4 引数は resource-pool パッケージに関係しており,順にサブプール数,サブプール毎のコネクション数,未使用コネクションを維持する秒数を指定します.

例外処理

insert/get/delete は CassandraException を投げるため,これらに備えておく必要があります. しかし現時点 (version <= 0.4.0.1) では,insert だけは cassandra-thrift の例外をそのまま投げるため,CassandraException で catch できません (次のリリースでは修正されています).

import Control.Exception.Lifted (catch)
import Prelude hiding (catch)

main = do
    ...
    runCas pool $
        operation `catch` (\(e :: CassandraException) -> エラー処理)
    ...

Slice Query

KeySelector とか Selector を使います.

main = do
    pool <- createCassandraPool servers 1 3 300 ksToDoList

    rows <- runCas pool $ getRows ["rowkey_aaa", "rowkey_ccc"]
    printRow $ Map.lookup "rowkey_aaa" rows     -- OK
    printRow $ Map.lookup "rowkey_bbb" rows     -- not found
    printRow $ Map.lookup "rowkey_ccc" rows     -- OK

    where
        getRows ks = do
            getMulti cfTest (Keys ks)
                Range { rangeStart = Just (TInt 1)
                      , rangeEnd   = Just (TInt 3)
                      , rangeOrder = Regular
                      , rangeLimit = 1024
                      }
                QUORUM
        printRow Nothing    = putStrLn "not found"
        printRow (Just row) = forM_ row $ \c ->
            T.putStrLn $ getUtf8 . decodeCas . colVal $ c

Composite Column

少し複雑なデータ構造だと Composite Column を使いたくなりますよね.

"user_id:00001": {
    "name":   "krdlab",
    "e-mail": [
        "krdlab@example.net",
        "krdlab@example.com"
    ]
}

例えば上記のようなデータを Composite Column で保存する場合は,以下のようにカラム名に当たる部分をタプルにします.

main = do
    ...
    runCas pool $ do
        cols <- exampleInsertAndGet
        printColumns cols
    where
        exampleInsertAndGet = do
            insert cfUsers "user_id:00001" QUORUM
                [ packCol ((TUtf8 "name"   , TInt 0), "krdlab")
                , packCol ((TUtf8 "reading", TInt 0), encodeCas . TUtf8 $ "けーあーるでぃーらぼ")
                , packCol ((TUtf8 "e-mail" , TInt 0), "krdlab@example.net")
                , packCol ((TUtf8 "e-mail" , TInt 1), "krdlab@example.com")
                ]
            get cfUsers "user_id:00001" All QUORUM

        printColumns cs = lift $ forM_ cs $ \c -> do
            let (k, v) = unpackCol c :: ((TUtf8, TInt), Value)
            print k
            T.putStrLn $ getUtf8 . decodeCas $ v

これは以下のように Cassandra へ格納されます."!" は cassy が追加しています.

                    ___________________________________________________________________________
"user_id:00001" -> | name:0:! | reading:0:!          | e-mail:0:!         | e-mail:1:!         |
                   +----------+----------------------+--------------------+--------------------+
                   | krdlab   | けーあーるでぃーらぼ | krdlab@example.net | krdlab@example.com |
                    ---------------------------------------------------------------------------

JSON データ

aeson の FromJSON/ToJSON のインスタンスとして定義しておくと,自動的に serialize/deserialize してくれます. しかし現状はどうも 1 つの column にシリアライズされたデータを押し込む方式のようですから,この点はデータ設計時に注意が必要です.

{-# LANGUAGE OverloadedStrings, DeriveGeneric #-}
module Main where

import Database.Cassandra.JSON  -- Basic ではなくこちらを import
...
import GHC.Generics (Generic)
import qualified Data.Aeson as A

data User = User
    { name :: Text
    , reading :: Text
    , emails :: [ByteString]
    }
    deriving (Show, Generic)

instance A.FromJSON User
instance A.ToJSON User

main :: IO ()
main = do
    ...
    runCas pool $ do
        c <- exampleInsertAndGet
        lift $ print (c :: Maybe User)
    where
        exampleInsertAndGet = do
            let user = User "krdlab" "けーあーるでぃーらぼ" ["krdlab@example.com"]
            insertCol cfUsers "user_id:00001" (TUtf8 "krdlab") QUORUM user
            getCol cfUsers "user_id:00001" (TUtf8 "krdlab") QUORUM

寄り道: resource-pool

名前の通り,リソースプールを提供するパッケージです.cassy ではコネクションを管理する目的で使用されています.

管理対象のリソース (Entry) をいくつかのサブプール (LocalPool) に分けて管理しています (Striped Pooling).

             ↓ sub-pools
[Pool] --+-- [LocalPool] --+-- [Entry]  <- 今回はここがコネクション
         |                 |
         |                 ...
         |                 +-- [Entry]
         ...
         +-- [LocalPool] --+-- [Entry]
                           |
                           ...
                           +-- [Entry]

この様に分割管理しているのは,マルチスレッド下でのリソース取得における競合の発生を抑えるためのようです.takeResource では threadId の hash 値に基づいて LocalPool が選択されます.

また裏では reaper スレッドが動いており,idle 状態のリソースは createPool で指定された時間を過ぎると destroy されるようになっています.

少し気になった点としては,0.2.1.1 時点では Pool 自体の破棄方法が提供されておらず,long running なアプリで Pool を変更したい時や,そうでなくても終了処理で確保済みのリソースを破棄したい時には困るのではないか?ということです.他の方はどうしているんでしょうか...

おわりに

  • cassy を使えば Haskell から一通りの操作 (insert/get/slice-query/delete) は実行可能です.
  • Cassandra 用のデータタイプ (CasType a) も定義されています.
  • aeson を用いたデータの auto-serialize/deserialize は便利ですが,各フィールドがカラムに収まるような方式もありかなと思っています (persistent パッケージと似た感じするとか).

LAPACK wrapper + Matrix/Vector + Some well-known statistical techniques = Lisys

長らく放置状態になっていた Lisys ですが,コード管理を GitHub に変更しました.コード管理が中途半端なまま放置するのは精神的によくありませんでしたので.

https://github.com/krdlab/lisys

これはなに?

LAPACK のほんの一部を C# 向けにラップ + Matrix/Vector + 少量の分析手法」が含まれたライブラリです.

エントリを掘り返してみると,作成はずいぶん前ですね.

昔,Windows 向けの GUI 分析ツールを作るために作成しました.作成当時は .NET 向けの良いライブラリがなく,「じゃあ LAPACK を利用させてもらおうかな」ということでできあがったものです.
元々は CLAPACK の使い方を学ぶために C++ でコードを書いており,そいつの派生物として作成しました.


しかし今となっては Math.NET Project がありますから,そちらの利用を検討した方が良いかもしれません.DotNumerics というのもあります.これらは pure C# なので Lisys よりも利用が簡単です.

何ができるの?

sample や test を見ていただければ.

64bit 対応が...

既に Visual Studio 2012 が出ているものの,Lisys は Visual C# 2010 Express と Visual C++ 2010 Express を利用して作成しています (2012 が出る大分前から 2008 → 2010 移行を開始したものの,しばらく放置してたらいつの間にか 2012 が出ていた).

Visual C++ 2010 Express Edition は 32bit コンパイラのみであるため,混合アセンブリを生成する Lisys も 32bit 版しか動作確認をしていません *1

LAPACK の Windows サポート

ビルド済バイナリ配布や自分でビルドする方法,C++ から利用する方法等がまとめられています.素晴らしいですね.

今回は上記を参考に CLAPACK から LAPACK 利用に変更しました *2

*1:残念すぎる

*2:CLAPACK は,Windows でしかも .NET から利用するのは割としんどい

resourcet パッケージのコードを読んでみた

久しぶりに時間を確保できたため,Resource モナドのあたりを調査してみました.結果としてはよくわからない部分*1 が残っているのですが,ごちゃごちゃを一旦整理する目的で書き出してみました.

見出し

  • はじめに
  • 基本的な使い方
  • リソース解放
  • 例外安全性
  • monad-control
  • おわりに

はじめに

Conduit では Resource モナドによるリソース管理が行われています.Resource モナドは例外安全にリソースを解放します.
中身が気になったのでコードを追いかけることにしました.

基本的な使い方

以下の通りです.

import System.IO
import Control.Monad.Trans.Resource (allocate, release, runResourceT)
import Control.Monad.Trans.Class (lift)

main :: IO ()
main = runResourceT $ do  -- runResourceT :: MonadBaseControl IO m => ResourceT m a -> m a
  (rkeyO, output) <- allocate (openFile "output.txt" WriteMode) hClose
  (rkeyI, input)  <- allocate (openFile "input.txt"  ReadMode)  hClose
  lift $ hGetContents input >>= hPutStr output
  release rkeyI
  release rkeyO

allocate でリソースの取得,ならびに release action (hClose) の登録を行い,ReleaseKey と Handle を受け取ります.使い終わったら release に ReleaseKey を指定して登録した hClose を呼び出しています.
また,明示的に release を呼び出さなくても runResourceT を抜けるタイミングでリソースは解放されます.

Conduit でもこれらの関数を使ってリソース管理が行われています.


runResourceT が取る ResourceT の定義は以下の通りです.

...
import qualified Data.IORef as I
...
newtype ResourceT m a = ResourceT { unResourceT :: I.IORef ReleaseMap -> m a }

この ReleaseMap にリソース解放アクション (release action,型は IO ()) が登録されます.
リソース管理用の各関数は MonadResource に定義されており,ResourceT m はこのインスタンスとして定義されています (定義の詳細は ドキュメント を参照).

リソース解放

前述の通り ResourceT では release action を Map で管理しており,runResourceT から抜ける際に登録済みの release action 全てを呼び出します.
runResourceT の定義は以下の通りです.

-- Control.Monad.Trans.Resource
runResourceT :: MonadBaseControl IO m => ResourceT m a -> m a
runResourceT (ResourceT r) = do
    istate <- liftBase $ I.newIORef
        $ ReleaseMap maxBound minBound IntMap.empty
    bracket_
        (stateAlloc istate)
        (stateCleanup istate)
        (r istate)

runResourceT では release action を登録するための最初の ReleaseMap を用意し,bracket_ を経由してリソースアクション*2 r を実行します.stateAlloc では現コンテキストにおいてマップが利用されていることを示すために参照カウントが +1 されます.stateCleanup では参照カウントが -1 されます.
ReleaseMap は参照カウントでリソースを管理しており,カウントが 0 になると stateCleanup 内部で登録済みの全 release action を呼び出します.
なお,各 release action は 1 度しか呼ばれないようになっています*3

例外安全性

リソース解放の保証と関係しますが,例外安全性については bracket_ により保証されています (control については後述).

-- Control.Monad.Trans.Resource
bracket_ :: MonadBaseControl IO m => IO () -> IO () -> m a -> m a
bracket_ alloc cleanup inside =
    control $ \run -> E.bracket_ alloc cleanup (run inside)

bracket_ 内部では E.bracket_ (Control.Exception.bracket_) を利用した例外ハンドリングが行われています.余談ですが,bracket_ の inside と戻り値の型がともに m a なので,元の E.bracket_ のように実行したい計算が IO である必要は無くなっています.

E.bracket_ の定義は以下の通りです.

-- Control.Exception
bracket_ :: IO a -> IO b -> IO c -> IO c
bracket_ before after thing = bracket before (const after) (const thing)

bracket
        :: IO a         -- ^ computation to run first (\"acquire resource\")
        -> (a -> IO b)  -- ^ computation to run last (\"release resource\")
        -> (a -> IO c)  -- ^ computation to run in-between
        -> IO c         -- returns the value from the in-between computation
bracket before after thing =
  mask $ \restore -> do
    a <- before
    r <- restore (thing a) `onException` after a
    _ <- after a
    return r

onException :: IO a -> IO b -> IO a
onException io what = io `catch` \e -> do _ <- what
                                          throwIO (e :: SomeException)

throwIO :: Exception e => e -> IO a
throwIO e = IO (raiseIO# (toException e))

bracket では mask により非同期例外をコントロールし,onException により例外を補足することで,after が確実に呼ばれるようになっています.restore は呼び出し元で非同期例外が Unmasked/Masked かによって mask/id のいずれかになります.

非同期例外や mask については以下の情報が参考になりました.

monad-control

ところで,Control.Monad.Trans.Resource.bracket_ 内部では control 経由で Control.Exception.bracket_ が呼び出されています.

-- Control.Monad.Trans.Resource
bracket_ :: MonadBaseControl IO m => IO () -> IO () -> m a -> m a
bracket_ alloc cleanup inside =
    control $ \run -> E.bracket_ alloc cleanup (run inside)

control の定義は monad-control にあります.

-- Control.Monad.Trans.Control
control :: MonadBaseControl b m => (RunInBase m b -> b (StM m a)) -> m a
control f = liftBaseWith f >>= restoreM

monad-control は,モナド変換子によって積み上げられたスタックにもぐって計算したり戻ってきたりする仕組みの一般化のようですが,正直まだよくわかっていません.lifted-base は monad-control を利用して Control.Exception のより一般的な定義を提供しているようです.

で,liftBaseWith と restoreM の型は以下のようになっています.

-- Control.Monad.Trans.Control
class MonadBase b m => MonadBaseControl b m | m -> b where
    -- | Monadic state of @m@.
    data StM m :: * -> *

    liftBaseWith :: (RunInBase m b -> b a) -> m a

    restoreM :: StM m a -> m a

type RunInBase m b = ∀ a. m a -> b (StM m a)

これ,どういうことかわからなくてずっとうねうね考えていたんですが,IO や XxxT に対する定義を眺めた結果,

  • liftBaseWith は RunInBase m b でモナドスタックを最下層までたどって実行し (多層 runXxx 的な?),各層の StM でくるんで戻す
  • restoreM は StM に基づいて元のモナドスタックを再構成する

という感じかな?と思いました...が,正直まだよくわかってないです.

↓眺めていた IO に対する定義

instance MonadBaseControl IO IO where
    newtype StM IO a = StIO a

    -- liftBaseWith :: (RunInBase IO IO -> IO a) -> IO a
    --    ↓展開
    -- liftBaseWith :: ((forall a'. IO a' -> IO (StM IO a')) -> IO a) -> IO a
    liftBaseWith f = f $ liftM StIO

    restoreM (StIO x) = return x

↓眺めていた IdentityT に対する定義

instance MonadTransControl IdentityT where
    newtype StT IdentityT a = StId {unStId :: a}
    liftWith f = IdentityT $ f $ liftM StId . runIdentityT
    restoreT = IdentityT . liftM unStId

instance (MonadBaseControl b m) ⇒ MonadBaseControl b (IdentityT m) where
    newtype StM (IdentityT m) a = StMId { unStMId :: ComposeSt IdentityT m a }
    liftBaseWith = defaultLiftBaseWith StMId
    restoreM     = defaultRestoreM     unStMId

defaultLiftBaseWith :: (MonadTransControl t, MonadBaseControl b m)
                    ⇒ (∀ c. ComposeSt t m c → StM (t m) c) -- ^ 'StM' constructor
                    → ((RunInBase (t m) b  → b a) → t m a)
defaultLiftBaseWith stM = \f → liftWith $ \run →
                                 liftBaseWith $ \runInBase →
                                   f $ liftM stM . runInBase . run

defaultRestoreM :: (MonadTransControl t, MonadBaseControl b m)
                ⇒ (StM (t m) a → ComposeSt t m a)  -- ^ 'StM' deconstructor
                → (StM (t m) a → t m a)
defaultRestoreM unStM = restoreT . restoreM . unStM

ここはもうちょい調べます.

おわりに

ライブラリのコードは勉強になりますが,型を追いかけるのは思っていたよりもしんどかったです...

*1:monad-control のところ

*2:呼び方はあってるのか?

*3:release 関数を直接呼び出した場合も同様

Conduit で Twitter Streaming API を扱う

忙しいときほど他のことをやりたくなるのは人の性なのか...というわけで今回も Haskell 関連の内容です.

はじめに

Twitter の Streamimg API から取得した内容を Conduit で処理する,という内容です.Streaming API は Source として利用するのに最適な気がしたのです.

環境

  • ubuntu 10.04 32bit
  • ghc 7.0.4
  • cabal-install 0.10.2 *1
    • using version 1.10.2.0 of the Cabal library

利用パッケージ

主な利用パッケージは以下の通りです.
(具体的な import については「リポジトリ」のところに載せたコードを参照)

JSON データの取り扱い

Streaming API から取得できる JSON データは Aeson で処理します.以下,データ型と parser の定義です.

data Status = Status { text :: Text
                     , createdAt :: ByteString
                     , user :: User
                     }

data User = User { screenName :: ByteString
                 }

instance FromJSON Status where
  parseJSON (Object v) = Status
                          <$> v .: "text"
                          <*> v .: "created_at"
                          <*> v .: "user"
  parseJSON _          = mzero

instance FromJSON User where
  parseJSON (Object v) = User
                          <$> v .: "screen_name"
  parseJSON _          = mzero

Twitter のステータス JSON データはもっと複雑なのですが,上記では欲しい情報だけを定義しています.parser をサクッと書けるところがイイ感じですね.

userstream を Conduit で処理

attoparsec-conduit の sinkParser を利用すると,attoparsec parser (一つ前で定義した JSON parser) の Sink を作ることができます.これを responseBody と接続することで,Streaming API から流れてくるステータスを順次処理することができます.

まずは Parser を Sink に変換します.繰り返し消費するように再帰させておきます *2

statusParser :: (Status -> IO ()) -> (String -> IO ()) -> C.Sink ByteString (C.ResourceT IO) ()
statusParser hs hf = do
  j <- CA.sinkParser json  -- TODO catch ParseError
  case fromJSON j of
    Success s@(Status {..}) -> liftIO . hs $ s  -- RecordWildCards (言語拡張) を使ってます
    Error m                 -> liftIO . hf $ m
  statusParser hs hf

parsing に失敗すると ParseError が投げられるのですが,これをうまく処理する方法が分からず,保留としています.
(statusParser 内部で catch して Either 返すように書く方法が分からなかった)

あとは Source につなぐだけ.

userStream :: OAuth -> Credential -> IO ()
userStream oauth credential = do
  withManager $ \manager -> do
    ...
    Response {..} <- http signedReq manager
    responseBody C.$$ statusParser success failure

これで JSON データをひたすら処理し続けます.

実際のコード

機能を順次追加する予定.

終わりに

Conduit の利用感覚をつかみたくて始めたのですが,終わってみるとあまり利用しない結果となってしまいました.
しかし attoparsec-conduit を知ることができたので,良かったかなといった感じです.

参考

基本的に Hoogle と HackageDB を使いまくりました.必要に応じてコードを見ながら.何というか,これらが無いとコード書けませんね.
以下は参考にさせていただいた記事です.

*1:ディレクトリ毎にパッケージをインストールできる奴があったはずだけど,何だっけ?

*2:Conduit 使った場合のプログラム終了処理ってどう書くのだろうか?

超単純 HTTP サーバ (練習)

去年から,仕事の合間に時間を見つけてはちまちまと Haskell の勉強をしていました.
でも本を読むだけでは今一なので,ここら辺で一回まとまったものを書いてみようかと.初心者なのは承知の上でコードをさらそうかと.
あと Haskell 良いですね.まだうまく表現できませんがおもしろいです.

環境

書いたコード

仕様は「HTTP リクエストの Request-Line を解釈*2し,要求されたファイルの内容を返す」だけです.セキュリティ関連も対応していません.大変に単純です.

{- thin HTTP server implementation -}
module Main where

import Prelude hiding (catch)
import Network (listenOn, accept, sClose, Socket, withSocketsDo, PortID(..), PortNumber)
import System.IO
import System.Environment (getArgs)
import Control.Exception (catch, finally, SomeException(..))
import Control.Concurrent (forkIO)
import Control.Applicative ((*>))
import Control.Monad (forM_)

import ThinHttpParser
import Text.ParserCombinators.Parsec

main :: IO ()
main = do
  (portStr:_) <- getArgs
  runServer $ fromIntegral (read portStr :: Int)

runServer :: PortNumber -> IO ()
runServer port = withSocketsDo $ do
  lSock <- listenOn $ PortNumber port
  putStrLn $ "listening on: " ++ show port
  acceptLoop lSock `finally` (sClose lSock >> putStrLn "stopped.")

acceptLoop :: Socket -> IO ()
acceptLoop lSock = do
  (cHandle, _, _) <- accept lSock
  forkIO $ clientHandler cHandle
  acceptLoop lSock

clientHandler :: Handle -> IO ()
clientHandler handle = service handle
  `catch`   (\(SomeException e) -> putStrLn $ show e)
  `finally` hClose handle

service :: Handle -> IO ()
service handle = do
  rawReq  <- hGetContents handle
  case parse parseRequest "parse http-request" rawReq of
    Right httpReq -> do
      let path   = reqUrl httpReq
      -- putStrLn $ "request: " ++ (show $ reqMethod httpReq) ++ " " ++ path -- XXX debug
      (readFile ("./" ++ path) >>= responseOk handle (contentType $ fileExt path))
        `catch` (\(SomeException _) -> responseError handle 404)
      hFlush handle
    Left err -> do
      putStrLn $ show err
      responseError handle 400

-- 成功
responseOk :: Handle -> String -> String -> IO ()
responseOk handle ctype content = forM_ [
    "HTTP/1.1 200 OK\r\n" 
      ++ "Content-Type: " ++ ctype ++ "\r\n"
      ++ "\r\n",
    content
  ] (hPutStr handle)

-- XXX 失敗
responseError :: Handle -> Int -> IO ()
responseError handle scode = hPutStr handle $ "HTTP/1.1 " ++ show scode ++ " " ++ reasonPhrase scode ++ "\r\n\r\n"

-- helper --

fileExt :: String -> String
fileExt path = case parse parseExt "parse path" path of
  Right ext -> ext
  Left  _   -> ""

parseExt :: CharParser st String
parseExt = manyTill anyChar (char '.') *> many anyChar

-- XXX Map?
contentType :: String -> String
contentType "html" = "text/html"
contentType _      = "text/plain"

-- XXX Map?
reasonPhrase :: Int -> String
reasonPhrase 400 = "Bad Request"
reasonPhrase 404 = "Not Found"
reasonPhrase _   = error "unknown status code"
module ThinHttpParser (
    HttpRequest(..),
    Method(..),
    parseRequest
  ) where

import Control.Applicative
import Control.Monad (MonadPlus(..), ap)
import Text.ParserCombinators.Parsec hiding (many, optional, (<|>))
import Numeric (readHex)
import Control.Monad (liftM4)
import System.IO (Handle)

instance Applicative (GenParser s a) where
  pure  = return
  (<*>) = ap

instance Alternative (GenParser s a) where
  empty = mzero
  (<|>) = mplus

data Method = Get | Post deriving (Eq, Ord, Show)

data HttpRequest = HttpRequest {
    reqMethod  :: Method,
    reqUrl     :: String,
    reqHeaders :: [(String, String)],
    reqBody    :: Maybe String
  } deriving (Eq, Show)

parseRequest :: CharParser () HttpRequest
parseRequest = q "GET"  Get  (pure Nothing)
           <|> q "POST" Post (Just <$> many anyChar)
  where
    q name ctor body = liftM4 HttpRequest req url parseHeaders body
      where
        req = ctor <$ string name <* char ' '
    url = optional (char '/') *>
          manyTill notEOL (try $ char ' ') <* (try $ string "HTTP/1." <* oneOf "01")
          <* crlf

parseHeaders :: CharParser st [(String, String)]
parseHeaders = pure [] -- XXX

crlf :: CharParser st ()
crlf = () <$ string "\r\n"

notEOL :: CharParser st Char
notEOL = noneOf "\r\n"

gist はこちら → https://gist.github.com/1573839

ビルド&実行

***> ghc --make -Wall -fno-warn-unused-do-bind -o thin_http_server.exe Main.hs
[2 of 2] Compiling Main             ( Main.hs, Main.o )
Linking thin_http_server.exe ...

***>thin_http_server.exe 8888
listening on: 8888


はい見えました.

疑問とかいろいろ

細かな疑問はもっといっぱいありますが...

  1. エラー処理のやり方あってるかなぁ...?
  2. 文字の扱いが雑すぎて反省
  3. スレッド間の通信はどうやるの?(STM?)
  4. I/O 多重化は?
  5. Parser Combinator はどのパッケージが流行なんだろう?

どんどんコード書いて探っていこうかな,と.

おわりに

最初から最後まで躓きっぱなしでした...orz
今回書いたコードが Haskell っぽいコードになっているかどうか,今はまだ正しく評価できません.
未来の自分がみたとき,何て思うのかなぁ...

*1:結構古い?アップグレードしておこう...

*2:Request-URI は abs_path のみを解釈

TameJS と Fiber による非同期処理の記述 (2/2)

前回の続きで今回は Fiber の話題.ずいぶんと日が空いてしまいました.

見出し

  • はじめに
  • 環境
  • node-fibers とは?
  • Fiber の利用例
  • Fiber を使った非同期処理の記述例
  • 終わりに

はじめに

今回は fiber を導入し,非同期 API のコールバック周りを少し整理してみよう,ということが主題です.
node.js (というかそのエンジンである V8) は fiber をサポートしていませんが*1node-fibers を利用することで node.js 上に fiber を導入することができます.

環境

  • ubuntu 10.04 32bit (VM 上で稼働)
  • node 0.6.6
  • node-fibers 0.6.3

node-fibers とは?

node-fibers は node.js 上で fiber (coroutine) *2 をサポートするためのライブラリです.

node@0.5.2 以上の環境であれば,以下のように簡単に導入することができます ('Fiber' というリテラルが導入される).

$ npm install fibers

なお,Fiber の直接利用は "not recommended" であり,Future の利用を推奨しているようです (README.md の EXAMPLES 参照).
Future は並行処理ではおなじみの概念ですし,なかなか扱いやすそうです.ただ,generator のようなものは Fiber の直接利用が必要となります.

今回は実験と割り切って Fiber を直接利用しています.

Fiber の利用例

以下のコードはよくある generator です.サンプルとしても紹介しやすいですし,動作も理解しやすい使い方の一つです.

// sample.js
require('fibers');

var inc = Fiber(function() {
  var i = 0;
  while (true) {
    yield(i++);  // (1)
  }
});

var i;
while ((i = inc.run()) < 5) {  // (2)
  console.log(i);
}
$ node sample.js
0
1
2
3
4

(2) の run で Fiber が実行されます.Fiber 側で yield が呼び出されると,呼び出し元 (2) に制御が戻ります.このとき yield の引数を戻り値として返します.再度 run が呼び出されると,今度は (1) 以降の処理が再開されます.
このように処理を切り替えていくわけです.

node-fibers のコードを読んでみると,native でいろいろとやっていて (V8 との絡みとか,linux の場合 get/make/swapcontext とか) なかなか興味深いです.

Fiber を使った非同期処理の記述例

例として read input.txt → write output.txt のコードを取り上げます.
ナイーブに書くと以下の通りです.

var fs = require('fs');

var error_handler = function(e) {
  console.log(e);
};

fs.readFile('input.txt', 'UTF-8', function(err, data) {
  if (err) {
    error_handler(err);
    return;
  }
  fs.writeFile('output.txt', data, 'UTF-8', function(err) {
    if (err) {
      error_handler(err);
      return;
    } else {
      console.log('成功');
      // 処理が続く...
    }
  });
});

Fiber を使って少し工夫してみると,以下のように書くことができます.

// Fiber を再開するための汎用コールバック
var resume_cb = function(fiber) {
  return function(/*...*/) { fiber.run(Array.prototype.slice.call(arguments)); };
};

var error_handler = function(e) {
  console.log(e);
};

var main = function() {
  var fiber = Fiber.current;

  fs.readFile('input.txt', 'UTF-8', resume_cb(fiber));
  var input_res = yield();
  if (input_res[0]) {
    error_handler(input_res[0]);
    return;
  }

  fs.writeFile('output.txt', input_res[1], 'UTF-8', resume_cb(fiber));
  var output_res = yield();
  if (output_res[0]) {
    error_handler(output_res[0]);
    return;
  }

  // 処理が続く...
};

Fiber(main).run();

上記は,非同期 API を呼び出した後すぐに yield してイベントループに戻り,コールバック (resume_cb が返す function) が呼ばれたタイミングで Fiber を再開しています.
こんな使い方もできるかな,といった感じですが.

おわりに

今回は fiber を使った非同期処理の記述を取り上げました.fiber は ruby でも 1.9 から導入されていますね (最近脚光を浴びているのでしょうか?).
node.js では node-fibers を利用することで Fiber が利用可能となります.これによりコールバックネストをいくぶんか軽減することができました.

次回で node.js の control-flow あたりをまとめられたら良いですね...(できるの?)

*1:ですよね?

*2:[http://en.wikipedia.org/wiki/Fiber_(computer_science):title=fiber] は軽量スレッドと呼ばれるものです.通常のスレッドはカーネルが実行をスケジューリングしますが,fiber は明示的にスケジューリングする必要があります (協調的マルチタスクと呼ばれます.明示的に CPU リソースを明け渡さないといけないアレです.).[http://en.wikipedia.org/wiki/Coroutine:title=coroutine] は,おおざっぱに言ってしまえば処理を途中で中断/再開 (suspend/resume) できるサブルーチンです.言語レベルの機能である coroutine を実装するために fiber が利用されます.

TameJS と Fiber による非同期処理の記述 (1/2)

前回からかなり時間が空いてしまいました...やっと時間取れた.今回は 2 回に分け,1 回目で TameJSを,2 回目で node-fibers を取り上げたいと思います.
ちなみに,「この方法が良い」と言うよりは「こんな方法もありますよね」というスタンスで書いています*1

見出し

  • はじめに
  • TameJS とは?
  • TameJS の利用例
  • TameJS のしていること
  • おわりに

はじめに

Node.js は非同期処理が基本であり,コールバックを多用するスタイルです.そのため,コードは簡単にコールバックのネストだらけになります*2
この "深いネスト" を解消するため,多くの場合は control flow ライブラリ が使用されます*3

TameJS では,非同期処理の記述に await/defer を持ち込み,上記の解決を試みています.

TameJS とは?

TameJS は tjs コードから JavaScript を生成する translator です.tjs コードは await/defer が追加されている点を除けば,JavaScript そのものです.
非同期処理の記述に await/defer を持ち込むことで,ノンブロッキング API を利用しているにもかかわらず,あたかも同期処理であるかのような記述を可能としています*4.これにより,ユーザの記述するコード上からコールバックによるネストを無くすことができます*5

TameJS の利用例

まずは以下のような tjs コードを記述します.

// ファイル名:read_file.tjs

var fs = require('fs');
await {
  fs.readFile('./test.txt', defer(var err, text));
}
if (err) {
  console.log('error: ' + err);
} else {
  console.log(text.toString('utf8'));
}

await ブロックで囲まれたコードが非同期に実行されます.複数の非同期 API を呼び出せば,それぞれ並行して動作します.ブロック中の API から結果が返ってくると,ブロック外のコードへと処理が進むイメージです.
defer にはコールバックが受け取るパラメータを指定します.await ブロックを抜けると,これらのパラメータが値に束縛されます.
利用可能な API については,以下の "API and Documentation" に載っています.

ちなみに,上記の tjs コードは以下の JavaScript とほぼ等価です.見た目はずいぶんと違いますね.

var fs = require('fs');
fs.readFile('./test.txt', function(err, text) {
  if (err) {
    console.log(err);
  } else {
    consloe.log(text);
  }
});

実行方法は,以下に示す 2 つのパターンがあります.


事前にコンパイルして実行
tamejs コマンドでコンパイルします."-o" オプションで出力ファイル名を指定できます.

$ tamejs read_file.tjs  // tjs -> js
$ node read_file.js     // 実行


require されたときにコンパイルして実行*6
*.tjs コードを利用したい JavaScript コード中に,以下を記載します.

// ファイル名:main.js

require('tamejs').register();
require('./read_file.tjs');

// ...

最後に,いつも通り実行します.

$ node main.js

(余談: register() 内部では,".tjs" に対してコンパイル処理を定義しています.TameJS Engine によって *.tjs から *.js に変換し,V8 コンパイルルーチンにコードを渡す,といった流れです.)

TameJS のしていること

await/defer の記述を元に,tjs コードを 継続渡し形式 (CPS)JavaScript に変換しています.CPSググるといっぱい解説が出てきます.
継続とは,端的に言ってしまうと「以降の計算処理」をまとめたものです.CPS では,渡された継続を利用してプログラムの制御を行います.

例えば,以下のような 2 つの処理があったとします.

A;
B;

これを CPS 変換*7すると,以下のようになります.

function _A(next) {
  A;
  next();
}
function _B(next) {
  B;
  next();
}
function end() {}

// 実行
_A(function() { _B(function() { end(); }); });

上記コードにおける next が継続に相当します*8.もしも A の処理が非同期であった場合は,_A に渡された next をコールバック内部で呼び出せばよいわけです.


実際に,前出の read_file.tjs をコンパイルすると,以下のようなコードが生成されます.

var tame = require('tamejs').runtime;
var __tame_fn_6 = function (__tame_k) {
    var fs = require ( 'fs' ) ;
    var __tame_fn_0 = function (__tame_k) {
        var err, text;
        var __tame_fn_1 = function (__tame_k) {
            var __tame_defers = new tame.Defers (__tame_k);
            var __tame_fn_2 = function (__tame_k) {
                fs . readFile ( './test.txt' ,
                __tame_defers.defer (
                    function () {
                        err = arguments[0];
                        text = arguments[1];
                    }
                )
                ) ;
                tame.callChain([__tame_k]);
            };
            __tame_fn_2(tame.end);
            __tame_defers._fulfill();
        };
        var __tame_fn_3 = function (__tame_k) {
            var __tame_fn_4 = function (__tame_k) {
                console . log ( 'error: ' + err ) ;
                tame.callChain([__tame_k]);
            };
            var __tame_fn_5 = function (__tame_k) {
                console . log ( text . toString ( 'utf8' ) ) ;
                tame.callChain([__tame_k]);
            };
            if (err) {
                tame.callChain([__tame_fn_4, __tame_k]);
            } else {
                tame.callChain([__tame_fn_5, __tame_k]);
            }
        };
        tame.callChain([__tame_fn_1, __tame_fn_3, __tame_k]);
    };
    tame.callChain([__tame_fn_0, __tame_k]);
};
__tame_fn_6 (tame.end);

"次の処理" を __tame_fn_* で包んで継続とし,callChain に渡しています.callChain は以下のように定義されており,次々と継続を処理していきます.

function callChain (l) {
    if (l.length) {
        var first = l.shift ();
        first (function () { callChain (l); });
    }
};


await ブロック内部において,defer 指定の変数が値に束縛されるまで待つのは,Defers#defer と Defers#_fulfill が同期部分の継続呼び出しをコントロールしているためです (上記コードにおける __tame_fn_3).
Defers 内部ではコールバック呼び出しをカウンタ管理しており,defer 呼び出しで内部カウンタを increment,非同期 API のコールバックが呼び出されたタイミングで _fulfill を呼び出して decrement しています*9.このカウンタが 0 になるタイミング → 最後に呼び出されたコールバックのタイミング,で次の継続 (同期コードの部分) が呼び出されます.



図にするとこんな感じです.黒四角の番号は __tame_fn_*, R は readFile, E と E' は tame.end, C は readFile のコールバック,青四角は callChain による継続を表しています.function を直接呼び出す場合は黒矢印,継続として呼び出す場合は青矢印で表しています.
6 から順に呼び出しが進んでいき,2 内部の tame.callChain が終わると一旦イベントループに戻ります.1 で _fulfill が呼び出されていますが,先に述べた Defers の働きにより,3 以降の継続は呼び出されません.
その後,readFile の結果としてコールバック (図中 C) が呼び出されると,Defers が内部に保持していた 3 以降の継続が呼び出されます.











おわりに

TameJS により,非同期処理を見通しの良いコードで記述することができます.外部リソースの参照・更新を頻繁に行いながら処理を進めるコードで利用すると良さそうです.
あとは IDE でこの記述がサポートされるともっと便利になりますね.
次回は Fiber を利用します.Node.js 上で coroutine が使えます.
(2/2 へ続く...)

*1:どちらもコアは,古より伝わりし手法です

*2:いやマジで

*3:有名どころは async,Step,Slide とかでしょうか,正直詳しくは分かりません...

*4:当然ですが,ノンブロッキング API はキチンと "ノンブロッキング" として動作します

*5:後で述べますが,プリコンパイル後の JavaScript はコールバックだらけです

*6:手元の環境は,最近ランタイム系をごちゃごちゃいじったせいか,うまく動きません...なぜだ

*7:JavaScript

*8:このような変換を行わず,継続を明示的に扱うオブジェクトとして取得するのが rubyscheme 等の call/cc であり,次回に紹介する Fiber の素でもあります

*9:_fulfill は defer の返すクロージャ内部で呼び出されます