「Chainerによる実践深層学習」を読みながら Chainer に入門した話
はじめに
「Chainerによる実践深層学習」という本を読みながら実際に chainer を触ってみました.
実際に書いたコードはこちらです.
翻訳モデルの実験で使用するデータはこちらを参考に取得しました.
書籍について
上記の本では Chainer の基本的な使い方から始まって,徐々に複雑なネットワークの構築方法へと解説が進んでいきます. Chainer は名前を知っている程度の状態から読み始めましたが,問題なく読みすすめられました.
出てくるモデルは自然言語処理に特化しています. 「実践」ということで理論面の解説は多少省略されていますが,実際に作る上で必要な説明は丁寧にされています. word2vec,RNN,LSTM,翻訳モデルが紹介されており,論文は見たことがあるけど実際に作ったことがなかったということもあって,個人的には楽しみながらコードを書くことができました.
コードの記述ミスや説明文との不一致もあります*1が,読みながら自分で修正していけるレベルです.
また出版後に入った (と思われる) Chainer の更新は当然反映されていませんので,新しい機能については公式のドキュメントや Chainer の examples を見ると良いです. training と evaluation をサポートする機能があったりして便利だなと思いました (上記の本を読みながらコードを書いた後だとなおさらそう感じます).
Chainer について
Chainer はモデル定義がわかりやすくてすごく使いやすいですね.正直驚きました.これは便利だ……
以下は本の解説に従って書いたコードですが,
class MyChain(Chain): def __init__(self): super(MyChain, self).__init__( l1=L.Linear(4, 3), l2=L.Linear(3, 3), ) def __call__(self, x, y): fv = self.fwd(x) loss = F.mean_squared_error(fv, y) return loss def fwd(self, x): return self.l2(F.sigmoid(self.l1(x)))
あとは optimizer に setup して,
model = MyChain() optimizer = optimizers.SGD() optimizer.setup(model) # training loop ... model.zerograds() # 1.15 から cleargrads に変わったようです loss = model(x, y) loss.backward() optimizer.update()
すればパラメータが更新されます.更新処理を自分でゴリゴリ書かなくてもいいのか……
本を読み終わった後で Chainer Tutorial の方を見たのですが,学習や評価のフェーズをサポートする機能もあるみたいです.
あと Caffe のモデルデータをそのまま使うことができるため,既存のモデルを活用することもできます.
おわりに
この手のフレームワークを触るのはこれが初めてだった*2ので,まさかここまで便利なものだとは思っておらず驚きました.
これを機に他のフレームワークを調べつついろいろと試してみようと思います.
RabbitMQ の分散構成はどうするのが良さそうか?
はじめに
モジュール間連携のイベントバスとして RabbitMQ を使用したいのですが,クラスタを組む場合にどうするのが良いのかな?ということを調べてみました *1.
なお,以降では「CAP のうち CP 特性が欲しい」ということを念頭において調査しています.
動作確認について
RabbitMQ は 3.6.1 です.ローカルのマシンに VM を起動して動作確認しています.
https://github.com/krdlab/examples/tree/master/distributed-rabbitmq-brokers
RabbitMQ の分散構成について
分散構成に関する公式の情報は以下の通りです.
- https://www.rabbitmq.com/distributed.html
- https://www.rabbitmq.com/clustering.html
- https://www.rabbitmq.com/federation.html
- https://www.rabbitmq.com/shovel.html
Clustering
- 動作環境として LAN を想定している
- 複数のノードを単一のブローカとして構成する
- "All nodes connect to all other nodes in both directions"
- ノードには disk と RAM の 2 種類がある
- disk ノードはランタイム情報をメモリとディスクの両方に保存
- RAM ノードはメモリのみに保存
- ただし
delivery-mode = 2
(persistent) のメッセージは disk/RAM に関係なくディスクに保存される
- どのノードからも exchange や queue を利用 (publish や consume) できる
- queue を除くすべてのデータ/ランタイム情報は,クラスタを構成するすべてのノードにレプリケーションされる
- queue だけは最初に declare したノードに保存される
- publisher/consumer の接続先と queue のロケーションとの関係によってはルーティングが発生するためスループットに影響を与える
しかしこれだけだと,キューを保持したノードがダウンした場合にそのデータをロストしてしまいます. これを防ぐ仕組みとして mirrored queue があります.
Mirrored Queue
- queue 単位で master/slave が構成される
- queue に対する操作は master から slave へ伝搬される
- master と同じ順序で適用し,同じ状態を維持する
- consumer は接続先のノードに関係なく queue のメッセージデータを消費できる
- master queue が ACK を受け取ると slave queue からも削除される
- 新しくノードを追加した場合,そのノードの slave queue は空の状態
- 追加後に publish されたメッセージが蓄積される
- 強制的に同期可能だが,設定 (
ha-sync-mode
) で自動的に同期させることも可能
- master queue がダウンした場合は最古の slave が昇格する
- このとき slave が完全に同期しきっていない分のメッセージはロストする
具体的なポリシー設定については Configuring Mirroring に説明があります.
master への昇格が発生した場合,昇格の最中に publish されたメッセージはロストしません. publish は常に master とすべての slave に直接行われているため,新しく master になる slave がメッセージを受けているからです. 一方 consume は以下のような影響を受けます.
- slave queue と consumer の接続は切断される
- consumer はこれを検知して re-consume する必要あり
- 以下のいずれかに該当するメッセージは re-consume 時に再配信される
- consumer からの ACK が master queue に伝わっていなかった
- master queue のメッセージ削除が slave queue に伝わっていなかった
Federation
- WAN を想定して設計されている
- clustering とは異なり複数のブローカを「ゆるく」結合する
- virtual host とか user とかが異なっても良い
- 指定したポリシに従って結合されるため "all or nothing" ではない
- 双方向に設定しなくても良い
- federated exchange
- federated queue
- downstream から upstream を consume する
- upstream 側のキューに積まれたメッセージは消費される
- downstream 側を複数用意することで分散が可能となる
- それぞれが別々のメッセージを消費できる
- worker の job queue として使用するイメージ
- downstream から upstream を consume する
federated exchange のイメージはこんな感じ.
federated queue のイメージはこんな感じ.
federation は downstream 側に設定します.rabbitmq_federation
プラグインを有効化するのも downstream 側だけで良いみたい.
また exchange 間を双方向に upstream 設定すると,自分以外のブローカで publish されたメッセージも federated exchange を通して受け取ることができるようになります.
federation の Getting Started には exchange の設定しか載っていませんが,ポリシー設定を --apply-to queues
にすると federated queue になります.
$ sudo rabbitmqctl set_policy --apply-to queues federate-queue "^federation-queue\." '{"federation-upstream-set":"all"}'
Shovel
基本的には federation とよく似た役割 (broker から broker へメッセージを移動させる) ですが,細かく対象や付随する動作を設定できるみたいです.
https://www.rabbitmq.com/shovel-static.html
イメージはこんな感じ.
ただ今回の目的から少しズレてしまうので省略.
どれを選択するか?
今回の目的からすると適切なミラーリングを設定した clustering を選択することになります.ただ,
The network links between machines in a cluster must be reliable
とあるように,LAN のような信頼性の高いネットワーク上に組むことが推奨されています. サービスを AWS 上の Multi-AZ 構成とする場合,クラスタを構成するノードは AZ をまたいでも大丈夫なのか心配になります*2.
実際 mirrored queue にネットワーク分断が発生すると,分断後の各パーティション上にそれぞれ master queue が生じてしまいます. 目的次第ではこれでも構わないと思いますが,今回の目的に限ればこの状態を避けるために孤立したノードには停止して欲しいところです.
Clustering の Network Partitions 発生時の動作について
何も設定しないと分断後もそれぞれのパーティションが動作し続けるのですが,それ以外の動作をさせるための設定もあります.
"Automatically handling partitions" を見ると network partitions が検出された場合の動作を設定できるようです.ハンドリングの設定としては以下の 4 つがあって,それぞれ分断が発生した場合の動作は以下のようになります.
ignore
- 何もしない
pause_minority
- 少数派のパーティションに属するノードが停止する
pause_if_all_down
- 指定したリストのノードとの接続が切れると,接続できなくなった側が停止する
autoheal
- 自動的に生き残るパーティションが選択され,残りは再起動される
今回の目的に限れば,3 ノード以上でクラスタを構築する場合は pause_minority
が良さそうです.
2 ノードの場合は ignore
にしてノードへのアクセスを工夫する必要があります.
ちなみに CloudAMQP も 3 ノード構成の場合は pause_minority
にしているそうです.
cluster_patition_handling についてもう少しだけ
3 ノード (rabbit1, rabbit2, rabbit3) でクラスタを構築して各設定における動作をざっと確認します.
pause_minority の場合
rabbit1 -> rabbit3, rabbit2 -> rabbit3 の通信を遮断して rabbit3 を孤立させると,rabbit3 はその状態を検出して停止します.
=WARNING REPORT==== 13-May-2016::11:16:50 === Cluster minority/secondary status detected - awaiting recovery ... =INFO REPORT==== 13-May-2016::11:16:58 === Stopped RabbitMQ application
rabbit1 -> rabbit3 の通信のみを遮断した場合は "Partial patition detected" となりますが,やはり (この場合は rabbit3 が) 停止します.
=ERROR REPORT==== 13-May-2016::13:59:43 === Partial partition detected: * We saw DOWN from rabbit@rabbit1 * We can still see rabbit@rabbit2 which can see rabbit@rabbit1 * pause_minority mode enabled We will therefore pause until the *entire* cluster recovers =WARNING REPORT==== 13-May-2016::13:59:43 === Cluster minority/secondary status detected - awaiting recovery ... =INFO REPORT==== 13-May-2016::13:59:43 === Stopped RabbitMQ application
pause_if_all_down の場合
{pause_if_all_down, ['rabbit@rabbit1'], ignore}
と設定しておきます.
この状態で rabbit1 -> rabbit3 の通信を遮断すると,rabbit3 は以下のように停止します.
=WARNING REPORT==== 13-May-2016::12:26:17 === Cluster minority/secondary status detected - awaiting recovery =INFO REPORT==== 13-May-2016::12:26:17 === Stopping RabbitMQ =INFO REPORT==== 13-May-2016::12:26:17 === Partial partition detected: * We saw DOWN from rabbit@rabbit1 * We can still see rabbit@rabbit2 which can see rabbit@rabbit1 We are about to pause, no need for further actions ... =INFO REPORT==== 13-May-2016::12:26:17 === Stopped RabbitMQ application
次に rabbit2 -> rabbit3 の通信を遮断すると,rabbit3 では
=ERROR REPORT==== 13-May-2016::12:28:47 === Partial partition detected: * We saw DOWN from rabbit@rabbit2 * We can still see rabbit@rabbit1 which can see rabbit@rabbit2 We will therefore intentionally disconnect from rabbit@rabbit1
のようにわざと rabbit1 との接続も切って一旦孤立し,自身を停止した後は rabbit2 との通信が回復するまで ERROR REPORT が出続けます.
また {pause_if_all_down, ['rabbit@rabbit1'], autoheal}
については今のところ挙動がよくわかっていません*3.
autoheal の場合
rabbit1 -> rabbit3 の通信を遮断すると,rabbit1 で以下のような検出ログが出力されます.
=ERROR REPORT==== 13-May-2016::12:52:03 === Partial partition detected: * We saw DOWN from rabbit@rabbit3 * We can still see rabbit@rabbit2 which can see rabbit@rabbit3 We will therefore intentionally disconnect from rabbit@rabbit2
この後,各ノード間で "Autoheal request" を送りあって最終的に勝者が決定されました.
=INFO REPORT==== 13-May-2016::12:53:38 === Autoheal decision * Partitions: [[rabbit@rabbit1],[rabbit@rabbit3,rabbit@rabbit2]] * Winner: rabbit@rabbit3 * Losers: [rabbit@rabbit1]
負けたノードは再起動されます.
=WARNING REPORT==== 13-May-2016::12:53:38 === Autoheal: we were selected to restart; winner is rabbit@rabbit3 =INFO REPORT==== 13-May-2016::12:53:38 === Stopping RabbitMQ
こちらは勝ったノードのログ.
=INFO REPORT==== 13-May-2016::12:53:38 === Autoheal: I am the winner, waiting for [rabbit@rabbit1] to stop
ミラーリング設定について
mirrored queue を有効化するにはクラスタリングを構築した後で ha-mode
を設定する必要があります.ha-mode
には以下の 3 種類があります.
ノード数が多くなると all
はスループットが低下します.PerfTest を使用して計測してみると以下のような傾向がみられました.
$ ./runjava.sh com.rabbitmq.examples.PerfTest -a -h 'amqp://guest:guest@rabbit1/%2F' -u ha.perftest
N | msg/sec | N = 1 に対する比 |
---|---|---|
1 | 14877 | 1.00 |
2 | 4956 | 0.33 |
3 | 3210 | 0.22 |
今回は動作確認ということで 1 台のマシンに複数のノードを起動して計測しています.実際に判断を下すためには複数のマシンを用意して計測する必要はありますが,ノード数が多い場合は exactly
や nodes
への変更を検討した方が良さそうです.
構成の検討
以上を踏まえ,いくつかのパターンを考えてみます.
2 台構成
{"ha-mode": "all"}
cluster_patition_handling
は設定しない- ロードバランスしない
- 利用していない方のノードはバックアップノード
ha-sync-mode
やha-sync-batch-size
は適宜
普段は 1 側のみに接続し,そちらに障害が発生したら 2 側に切り替えます. 切り替わった後は 1 側を回復して,今度はこちらがバックアップになります. ロードバランスはしません. network partitions が発生したら,基本的にバックアップ側を切り捨ててリカバリ作業を行います.
3 台構成
{"ha-mode": "all"}
{cluster_patition_handling, pause_minority}
- ロードバランスする
ha-sync-mode
やha-sync-batch-size
は適宜
network partitions が発生したら,停止したパーティションのノードを切り捨ててリカバリ作業を行います.
もっと多い場合
単純にノードを追加するか,クラスタのトポロジーを変更するか,これはクラスタに担わせる仕事によって変わってくるのだと思います.
単純にノードを追加するにしても構成ノード数は奇数を保って,ha-mode
は exactly
の検討をした方が良さそうです.
単一のクラスタにする必要が無ければ,少数ノードによるクラスタを「クラスタグループ」としてこれを複数用意し,シャーディングするのもありかもしれません.
今のところこの規模のクラスタを構築する予定は無いため,必要なタイミングで検討しようと思います.
Haskell で書いた Web サービスにおける IO 部分の自動テスト
Haskell で書いた Web サービスの自動テストを考えたとき,IO の部分が問題になる場合があります. KVS や DBMS を利用する部分は CI サービス上で必要なものを起動すれば問題ないのですが,外部サービスと連携する部分は問題として残ります. またデグレするとユーザに直接影響を及ぼす部分については,IO であってもその動作を自動テスト化しておきたくなります.
こういった部分はモック化をサポートするライブラリを用いてテストすると思いますが*1,Haskell の場合はその辺どうするのだろうか?という疑問から調べて実験してみました.
なお,モック化のことを考えなければ既にある素晴らしいライブラリを利用してテストを書くことができます. この件についてはこちらの記事が大変参考になります.
先行事例
型クラスを利用したものと Free モナドを利用した事例がいくつかみつかります. 今回は型クラスによる分離を試みるため,その方針が紹介されていた記事を参考にさせてもらいました.
基本的な方針としては,IO が発生する操作を一般化した型クラスとして定義し,テストでは Reader
や State
モナドを用いてそれらを実装しています.
class Monad m => MonadXxx m where operation :: ... -> m a -- サービス instance MonadXxx IO where operation ... = ...(IO のコード)... -- テスト newtype MockXxx m a = MockXxx { unMock :: StateT MockState m a } deriving ... instance Monad m => MonadXxx (Mock m) where operation ... = ...(モック実装)... run :: Monad m => MockXxx m a -> ... -> m a run ... = ...
実験用 Web サービスのコード
実験用に小さな Web サービスを作成し,実際にモック化を試しました.Web サービスフレームワークには servant を使用しています.
この Web サービスで定義した API は以下の 3 つです.
POST /register
- ユーザ情報を新規作成し,登録完了メールを送信する
POST /login
- 最新のログイン時刻を更新し,ログイン成功メールを送信する
GET /users/:id
- ユーザ情報を取得する,キャッシュを使用する
今回は外部サービスとの連携部分をメール送信処理で代用しています.
IO 発生箇所の分離
基本方針は先行事例と同じですが,操作の内容に応じて分割定義しました. サービスはそれらをまとめる形で実装しています.
(※ この命名が慣習に従ったものなのかどうかは……自信がありません)
サービスとしての実装
まずは先の型クラスを IO
に対して実装します.
-- src/WS/Cache.hs instance MonadCache IO where get key = do ...(ここはいつも通り書く)...
またこれらをまとめた App m
を定義し,servant のハンドラにはこの制約を付けます.
-- src/WS/App.hs class (MonadCatch m, MonadMail m, MonadCache m, MonadDB m) => App m where ...(サービス固有の関数を定義したり)... register :: App m => RegForm -> m User -- こんな感じで型クラスを実装したら切り替えられるようにしておく ...
これで IO
に対する実装があれば上記の register
を servant のハンドラとして実行できるようになります.
テスト用のモックと型クラスの実装
MockApp m a
が今回のモックです.
テスト実行中の状態を保存したいため State
を使用しています.
また今回はテストに SQLite を使用するため部分的に IO
が発生します*2.
なので StateT
として IO
を lift
可能にしておきます.
-- test/Spec.hs newtype MockApp m a = MockApp { app :: StateT MockAppState m a } deriving (Functor, Applicative, Monad, MonadTrans, MonadThrow, MonadCatch)
あとはこのモックに対して型クラスを実装していきます.
このとき State
に持たせるデータを調整することで,戻り値を自由に変えるだけで無く呼び出し履歴の記録といったことも可能です.
-- test/Spec.hs instance (Functor m, MonadCatch m) => WS.MonadCache (MockApp m) where get k = do S.modify $ saveHistory "Cache.get" s <- S.get return $ decode' <$> Map.lookup k (cache s) ...
テストの実装
テストの記述には Hspec を使用しています.
テストコードのうち,モックを使用した部分は registerSpec
です*3.
実行するときは初期状態 initState
とテストしたい servant のハンドラを渡して以下のように実行します.
-- test/Spec.hs registerSpec :: Spec registerSpec = describe "POST /register" $ it "mock registration" $ do curr <- getPOSIXTime let form = WS.RegForm (pack $ "user-" ++ show curr) (pack $ "user-" ++ show curr ++ "@localhost") (res, state) <- runMock (WS.register form) initState -- モックで実行する WS.name res `shouldBe` WS.regName form emails state `shouldNotSatisfy` null (addressEmail . head . mailTo . head . emails $ state) `shouldBe` WS.regEmailAddress form history state `shouldBe` ["DB.insert", "DB.select", "Mail.sendMail"] -- 最後にメールを送信しているかチェック
サービスとしては IO
で実行される部分が,テストでは MockApp m
としてモック化した状態で実行されます.
おわりに
基本に従って実装すれば,IO をモック化した自動テストはうまくいきそうだなという感覚は得られました.
- IO が発生する箇所を限定する
- 実装の粒度とモック化のしやすさを考えながら分離面を決める
また Free
モナドを利用した方法もあるようですが,こちらの検討についてはまた今度.
おまけ: その他いろいろ
使用したフレームワークとライブラリ
servant のエラー処理
今回のようにハンドラ全体が IO になっている場合,その内部からエラーを投げたいときにどうすれば良いのか迷いました.
今回は throwM
で外までぶん投げてから catch
し,Either.left
し直しています.
transfomers
import
で Control.Monad.State
と Control.Monad.Trans.State
を間違えていることに気づかず,get
や put
の型が合わなくてしばらく悩んだりしました.
OpenID Connect 1.0 Relying Party を実装するための Haskell 用ライブラリ
タイトルの通り,OpenID Connect 1.0 のクライアントライブラリを作りました.Hackage に上げてありますので cabal install
で導入できます.
背景
Web サービスを作成するにあたって認証をどうするかは悩ましいところです.先々の展望を考慮して自前で実装することもありますが,小規模あるいは個人サービスであれば外部の認証サービスを利用することが選択肢に入るはずです.
あるサービスを実装するにあたり Google を OpenID Provider (以降 OP) として利用しようと考えたのですが,OpenID Connect 1.0 に対応したパッケージが見当たらなかった (2015 年 7 - 8 月ぐらいのことです) ため,今回のライブラリを作成しました.
現在サポートしているのは Code Flow のみです.
利用方法
手順としては以下の通りです.
discover
で OP の情報 (Provider
) を取得Provider
とクレデンシャルからOIDC
を作成- Authentication Request URL を作成 (してリダイレクト)
- OP からのコールバックを受けてトークンを要求,かつ受け取ったトークンを検証
- 検証済みトークンをサービスで利用する
実際に実行可能なコードが リポジトリ の examples にあります.
API の簡単な紹介
まずは discover
で OP の情報 (Provider
) を取得します.
discover :: IssuerLocation -> Manager -> IO Provider discover location manager = do ...
Web.OIDC.Discovery.Issuers
に具体的な Issuer Location の値を定義しています.Manager
は http-client パッケージのものです.Code Flow では TLS 必須ですから http-client-tls の設定を使って Manager
を生成する必要があります.
次に OIDC
を準備します.
newOIDC :: CPRG g => IORef g -> OIDC newOIDC ref = def { cprgRef = Ref ref }
CPRG
は crypto-random に定義されている暗号論的擬似乱数生成器の class です.暗号論的擬似乱数生成器はほぼ間違いなく使うでしょうから,引数にはそれを指定します.
指定された引数は ID Token Validation の際に id_token
(JWT) のデコード処理で使用します.デコード処理では jose-jwt を利用しています.
あとは setProvider
や setCredentials
を使用して,先に取得した Provider
や client ID/secret 等を設定します.
次に authorization endpoint にリクエストするための URL を取得します.
getAuthenticationRequestUrl :: (MonadThrow m, MonadCatch m) => OIDC -> Scope -> Maybe State -> Parameters -> m URI getAuthenticationRequestUrl oidc scope state params = do ...
第 2 引数は OpenID Connect 1.0 仕様の scope
パラメータに相当します.このパラメータに対する openid
の指定は MUST であるため,明示的に指定しなくても内部で補完しています.
第 3 引数は state
パラメータ (RECOMMENDED) に指定する値です.MUST ではないため Maybe
としています.
第 4 引数は OPTIONAL なパラメータを指定するリストです.
ユーザの許可を得てコールバックされたら,token endpoint に ID Token とアクセストークンを要求します.
requestTokens :: OIDC -> Code -> Manager -> IO Tokens requestTokens oidc code manager = do ...
Code
は authorization endpoint から渡された code
パラメータの値です.
結果として得られる Tokens
には検証済みの ID Token が含まれていますので,これをサービスで利用します.またユーザ情報を取得する場合は Tokens
に含まれるアクセストークンを利用します.
haskell-servant の利用例とちょっとだけ仕組みの調査
はじめに
Haskell には大小様々な Web フレームワークがあります.(yesod, scotty, spock, apiary, rest, 等々)
API サーバを作りたいときは scotty を利用することが多かったのですが,つい最近 haskell-servant というパッケージ群を知りました.
小さな API サーバを書きたいときに便利そうだなと思い,使いつつ軽くコードを読んでみました*1.
環境
- GHC 7.8.3 (64bit)
haskell-servant
型を用いて API 仕様を定義するタイプのフレームワークで,クライアントコードやドキュメントも生成してくれます. API の型とそれに対応するハンドラを組み合わせることで WAI アプリケーションとして動作します.
現在の形になったのはつい最近なんでしょうか? → Rethinking webservice and APIs in Haskell: servant 0.2
試作: メモアプリ用 API サーバ
以下のような API を定義しています.
type MemoAPI = "memos" :> Get [Memo] :<|> "memos" :> ReqBody ReqMemo :> Post Memo :<|> "memos" :> Capture "id" Int :> Delete
:>
が /
みたいなもので,:<|>
が API の結合だそうです.
route を構成するパスやパラメータ名もリテラルとして型に埋め込めます.
戻り値の型は ToJSON
,ReqBody
に指定する型は FromJSON
,Capture
に指定する型は FromText
をそれぞれ実装する必要があります.
例えば FromText
であれば以下のようになります.
-- 例: 数値の offset を TimeZone として解釈させたいとき import Data.Time import Data.Text.Read (decimal) instance FromText TimeZone where fromText t = case decimal t of Right (h, _) -> Just $ hoursToTimeZone h _ -> Nothing
また,ドキュメント生成時にパラメータの説明やサンプルデータが必要になるため,それぞれ ToCapture
,ToSample
のインスタンスを実装します.
instance ToCapture (Capture "id" Int) where toCapture _ = DocCapture "id" "memo id" instance ToSample Memo where toSample = Just sampleMemo instance ToSample [Memo] where toSample = Just [sampleMemo] sampleMemo :: Memo sampleMemo = Memo { id = 5 , text = "Try haskell-servant." , createdAt = ZonedTime (LocalTime (fromGregorian 2014 12 31) midday) (hoursToTimeZone 9) }
仕様変更したときに何か不足していればコンパイルエラーになるため「実装とドキュメントがずれてしまう」といったことは減らせそうです.
ただ今のところデフォルトで出力可能な形式は markdown だけのようで,出力形式を変えたい場合は自前で API -> String
を実装する必要があります.
仕組みを知る上で前提となる知識
(※ご存知の方は読み飛ばしてください)
Type operators
https://downloads.haskell.org/~ghc/7.8.3/docs/html/users_guide/data-type-extensions.html
オペレータシンボルを型コンストラクタとして扱うための拡張です.-XTypeOperators
で有効になります.
型やクラスの宣言に利用されるオペレータを除けば,プレフィックスに :
がなくても良い?みたいです.
servant では :>
や :<|>
がこれに相当します.
Type-Level Literals
https://downloads.haskell.org/~ghc/7.8.3/docs/html/users_guide/type-level-literals.html
数値や文字列を型レベルの定数として扱うことができるようです.
-XDataKinds
で有効になります (DataKinds
自体は,型を kind に,値コンストラクタを型コンストラクタに持ち上げる拡張).
以下は上記ページの例そのままですが,Label "x"
が型として機能しています.
-- 引用元: https://downloads.haskell.org/~ghc/7.8.3/docs/html/users_guide/type-level-literals.html data Label (l :: Symbol) = Get -- String は Symbol kind class Has a l b | a l -> b where from :: a -> Label l -> b data Point = Point Int Int deriving Show instance Has Point "x" Int where from (Point x _) _ = x instance Has Point "y" Int where from (Point _ y) _ = y example = from (Point 1 2) (Get :: Label "x")
もちろん Get :: Label "y"
とすれば example
は 2
を返します.
servant では API のルーティングパスを表現するところで利用されています.
Kind polymorphism
https://downloads.haskell.org/~ghc/7.8.3/docs/html/users_guide/kind-polymorphism.html
kind が多相的に扱えるようになります.何て呼ぶんでしょうか?多相カインド? -XPolyKinds
で有効になります.
kind が明示されていない部分は *
ではなく kind 変数として推論されるようになるみたいです.
data T m a = MkT (m a)
↓GHCi
*Test> :kind T T :: (* -> *) -> * -> * *Test> :set -XPolyKinds *Test> :kind T T :: (k -> *) -> k -> * -- m の kind が * -> * から k -> * (変数あり) に変わった
Proxy
base の Data.Proxy に定義された,型情報を持つだけの data です.
data Proxy t = Proxy
↓GHCi
Prelude Data.Proxy> :kind Proxy Proxy :: k -> *
kind が多相的になっているため,*
や Nat
,Symbol
といった kind に関係なく様々な型を持つことができます.
servant の :>
はいろいろな kind を持った型をとるため,Proxy
を使って定義されています.
-- 引用元: https://hackage.haskell.org/package/servant-0.2.1/docs/src/Servant-API-Sub.html#:> data (path :: k) :> a = Proxy path :> a infixr 9 :>
ちょっとだけ内部に潜る
Application の作成
serve
に渡した p :: Proxy layout
と server :: Server layout
から,HasServer
クラスの route
関数 (の実装) が RoutingApplication
を作成します.
RoutingApplication
は WAI の Application
が少し変更されたもので,servant の RoutingResult
を扱うようになっているため toApplication
関数でこれを変換して最終的な Application
にしています.
-- 引用元: https://hackage.haskell.org/package/wai-3.0.2/docs/src/Network-Wai.html#Application type Application = Request -> (Response -> IO ResponseReceived) -> IO ResponseReceived -- 引用元: https://hackage.haskell.org/package/servant-server-0.2.2/docs/src/Servant-Server.html#serve serve :: HasServer layout => Proxy layout -> Server layout -> Application serve p server = toApplication (route p server)
ルーティング処理
実際にルーティングを行っている route
関数は Visitor パターンのような実装になっています.HasServer
の instance 実装でパターンマッチしているようなイメージ?であっているでしょうか.
パスやパラメータ/戻り値の型は Proxy layout
から決定され,Server layout
もその型に従うように要求されます.
-- 引用元: https://hackage.haskell.org/package/servant-server-0.2.2/docs/src/Servant-Server-Internal.html#HasServer class HasServer layout where type Server layout :: * route :: Proxy layout -> Server layout -> RoutingApplication
API を結合する :<|>
の場合,以下のように分離された a :: Server a
,b :: Server b
をそれぞれまた route
に引き渡すような実装になっています.
a
が失敗した場合に b
が実行されるようになっていることもわかります.
-- 引用元: https://hackage.haskell.org/package/servant-server-0.2.2/docs/src/Servant-Server-Internal.html#instance%20HasServer%20(a%20:<|>%20b) instance (HasServer a, HasServer b) => HasServer (a :<|> b) where type Server (a :<|> b) = Server a :<|> Server b -- ここは type operator route Proxy (a :<|> b) request respond = -- ここは constructor route pa a request $ \ mResponse -> if isMismatch mResponse then route pb b request $ \mResponse' -> respond (mResponse <> mResponse') else respond mResponse where pa = Proxy :: Proxy a pb = Proxy :: Proxy b
途中の path をたどっていく部分は,WAI の Request
から取得したリクエストパスと,シンボルとして型に埋め込まれた文字列 path
とを比較し,一致すればさらに進み,一致しなければ失敗 (NotFound
) としてレスポンスを返していることがわかります.
-- 引用元: https://hackage.haskell.org/package/servant-server-0.2.2/docs/src/Servant-Server-Internal.html#instance%20HasServer%20(path%20:>%20sublayout) instance (KnownSymbol path, HasServer sublayout) => HasServer (path :> sublayout) where type Server (path :> sublayout) = Server sublayout route Proxy subserver request respond = case pathInfo request of (first : rest) | first == cs (symbolVal proxyPath) -- ここで文字列を取り出して比較 -> route (Proxy :: Proxy sublayout) subserver request{ pathInfo = rest } respond _ -> respond $ failWith NotFound where proxyPath = Proxy :: Proxy path
なお,symbolVal
は型レベルリテラル (Symbol) を文字列として取り出す関数です.
symbolVal (Proxy :: Proxy "hoge") == "hoge" -- True
パラメータがある場合 (例えばサンプルコードの Capture "id" Int
) は,captured
で Text
から a
型の値に変換され (ここで FromText
のインスタンスが必要になる),ハンドラの方 (route
の引数である subserver :: Server layout
) は a -> Server sublayout
型としてキャプチャしたパラメータの値に適用されています.
-- 引用元: https://hackage.haskell.org/package/servant-server-0.2.2/docs/src/Servant-Server-Internal.html#instance%20HasServer%20(Capture%20capture%20a%20:>%20sublayout) instance (KnownSymbol capture, FromText a, HasServer sublayout) => HasServer (Capture capture a :> sublayout) where type Server (Capture capture a :> sublayout) = a -> Server sublayout route Proxy subserver request respond = case pathInfo request of (first : rest) -> case captured captureProxy first of Nothing -> respond $ failWith NotFound Just v -> route (Proxy :: Proxy sublayout) (subserver v) request{ -- *ここ pathInfo = rest } respond _ -> respond $ failWith NotFound where captureProxy = Proxy :: Proxy (Capture capture a)
最終的に Proxy layout
の末尾に指定された HTTP Method と戻り値の型の部分までたどり着き,ここで WAI のレスポンスが構築されます.
-- 引用元: https://hackage.haskell.org/package/servant-server-0.2.2/docs/src/Servant-Server-Internal.html#instance%20HasServer%20(Get%20result) instance ToJSON result => HasServer (Get result) where type Server (Get result) = EitherT (Int, String) IO result route Proxy action request respond | null (pathInfo request) && requestMethod request == methodGet = do e <- runEitherT action -- ここでハンドラの実行結果を取り出す respond . succeedWith $ case e of Right output -> responseLBS ok200 [("Content-Type", "application/json")] (encode output) Left (status, message) -> responseLBS (mkStatus status (cs message)) [] (cs message) | null (pathInfo request) && requestMethod request /= methodGet = respond $ failWith WrongMethod | otherwise = respond $ failWith NotFound
EitherT
の Left
は失敗したときのステータスコードとメッセージの組,Right
は成功したときの戻り値をそれぞれ表しています.
type Server (Get result) = EitherT (Int, String) IO result
おわりに
あまり深追いはしていませんが,ざっと仕組みは追えたと思います.
ルーティングの部分を型で守りたいと思い apiary を試していましたが,servant も興味深い選択肢の一つだと思いました.
*1:加えて,読み進める過程で自身の知識不足がよく分かりましたので,いろいろ書き残そうと思いました.
Haskell の machines に入門してみた,というお話
はじめに
io-streams パッケージがリリースされた折にふと「conduit,pipes,io-streams 以外の streaming data を扱うライブラリには何があるんだろうか?」と疑問に思いつぶやいてみたところ, machines がある ということを教えていただきました.
気になったので調べてみた,というのが今回の内容です.
基本的な使い方に始まり,何とか attoparsec を組み込むあたりまでは辿り着きました.なお,GHC 7.4.1 を使用しています.
見出し
- これは何?
- 雰囲気
- どう使うの?
- 基本形
- Source の作成
- Process の作成
- Transducer を組み込む
- 複数入力の取り扱い
- Parser を組み込む
- おわりに
これは何?
今回の対象は↓これ.
- machines-0.2.3.1
Machines are demand driven input sources like pipes or conduits, but can support multiple inputs.
だそうです.加えてトランスデューサのデータ構造も定義されています.
用意されている API はシンプルに見えるのですが,どれも汎用性の高いものばかりです.
雰囲気
- Plan から Machine を作成
(<~)
や(~>)
を使って Machine をつなげる- Source は入力を読まない Machine
- 文字通りソースとして利用する
- Process は
a -> b
という関数に相当する Machine- stream に何か処理をかけたいときはこれを利用する
- Tee や Wye は複数入力を扱う Machine
- Mealy や Moore はトランスデューサを表現
- Automaton のインスタンスになっているため,Process にして連結できる
- Unread は入力の push back を表現
- 0.2.3.1 では使い方がわからず...
- github から
commit f03dd47
までは行ったバージョンを持ってくると unreading が定義されていて Process 化できる
- Automaton クラスのインスタンスは Process になる
- auto 関数を使う
(->)
がインスタンスになっているため,a -> b
型の関数は auto で Process になる
最後に run
すると動きます.
どう使うの?
ドキュメントとソース (の一部) を読んでサンプルコードを作ってみました.なお,以下すべてにおいて先頭の
{-# LANGUAGE OverloadedStrings #-} module Main where import Data.Machine
を省略しています.
基本形
最も単純な例として,リストをソースとしてそれをそのまま出力するコードです.
main :: IO () main = runT test >>= print where test = source [1..10] ~> echo -- [1,2,3,4,5,6,7,8,9,10]
source
関数は Foldable
のインスタンスから Source
を生成します.
(~>)
は Data.Machine.Process
に定義されている関数で,ProcessT
と MachineT
を連結します.
Source
/SourceT
や Process
/ProcessT
はすべて MachineT
のシノニムになっているため,これで連結できるというわけです (連結には MachineT m k o
の k
が関係するため,好き勝手に連結できるわけではない).
また,Data.Machine.Process
にはいくつかの Process
があらかじめ定義されています.上記の echo
もその一つです.
Source の作成
Plan
を使うことで Source
を作ることができます.ここでは Handle
から ByteString
を読み込んで Source
にしてみます.
import qualified Data.ByteString as BS import qualified System.IO as IO import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Exception.Lifted (bracket) sourceHandle :: MonadIO m => IO.Handle -> SourceT m BS.ByteString sourceHandle h = repeatedly $ do bs <- liftIO $ BS.hGetSome h 4096 if BS.null bs then stop else yield bs main :: IO () main = readAll "test.txt" >>= print where readAll fp = bracket (IO.openBinaryFile fp IO.ReadMode) IO.hClose action action h = runT $ sourceHandle h ~> echo -- ここで使ってる
sourceHandle
には,以下の内容をそのまま書き下しているだけです.
- データを取り出す
- 空なら停止
- そうで無いなら yield で返す
- 停止するまで繰り返す (repeatedly)
repeatedly
は Plan
を繰り返し実行する Machine
を作り出す関数です.Data.Machine.Types
に定義されており,他にも construct
や before
があります.
Process の作成
取り出した値を文字列化するだけの単純なものを作ってみます.
main :: IO () main = runT test >>= print where test = src ~> str src = source [1..5] str = repeatedly $ do -- 注意: auto show と等価 i <- await yield $ show i -- ["1","2","3","4","5"]
str
は auto
関数を使って auto show
と書いたものと等価です.
Automaton
クラスのインスタンスは auto
を使えば Process
に変換できます.
(->)
のインスタンスが定義されているため,a -> b
は Process
にできます.
Transducer を組み込む
トランスデューサを Process
として連結できます.例えば以下のような立ち上がりエッジ検出もどきは
次のように書けます.
main :: [Int] -> IO () main i = test i >>= print where test i = runT $ source i ~> auto ms ms = Mealy $ \a -> case a of 0 -> (0, m0) 1 -> (0, m1) m0 = Mealy $ \a -> case a of 0 -> (0, m0) 1 -> (1, m1) m1 = Mealy $ \a -> case a of 0 -> (0, m0) 1 -> (0, m1) -- > main [0,0,1,1,0,1,0,1,1,1,1,0] -- [0,0,1,0,0,1,0,1,0,0,0,0] -- > main [1,0,1,1,0,1,0,1,1,1,1,0] -- [0,0,1,0,0,1,0,1,0,0,0,0]
Mealy
で遷移を組んで,auto
で Process
にしているだけです.そのまんまですね.
複数入力の取り扱い
Tee
や Wye
を使うと複数の入力を扱うことができます.
main :: IO () main = runT test >>= print where test = tee inL inR use inL :: Process Int Int inL = source [1..10] inR :: Process Int Int inR = source [1..10] ~> auto (*10) use = repeatedly $ do l <- awaits L r <- awaits R yield $ l + r -- [11,22,33,44,55,66,77,88,99,110]
test
関数の内容は単純で,以下のようなことをしているだけです.
inL: [1..10] ------------+ | use: l + r ---> 出力 | inR: [1..10] --> (*10) --+
Tee
の部分は Plan
を使って作成しています.各入力は対応するコンストラクタを awaits
関数に指定して取り出します.
Wye
を使う場合も多分同じようにします.
Parser を組み込む
Plan
に attoparsec
の parsing 処理を組み込めば attoparsec-conduit
みたいなものが作れます.
なお,ここだけは github から最新の (commit f03dd47 まで入っている) コードを取得して利用しています.
最近になって unreading
という,Unread
を利用した Plan
を Process
化する関数が入ったためです.
import qualified Data.ByteString as BS import Control.Monad (unless) import qualified Data.Attoparsec.ByteString as AB import qualified Data.Attoparsec.Types as A -- ByteString や Text の差を吸収するためのクラス class ParserInput a where parse :: A.Parser a b -> a -> A.IResult a b isNull :: a -> Bool -- とりあえず ByteString だけ定義 instance ParserInput BS.ByteString where parse = AB.parse isNull = BS.null -- parser process の本体 pp :: (ParserInput i, Show o) => A.Parser i o -> Process i o pp pr = unreading $ plan (parse pr) -- (1) where plan p = await >>= runp where runp i = go $ p i go (A.Fail _ _ err) = error err -- XXX ごまかした go (A.Partial p') = plan p' go (A.Done t r) = do unless (isNull t) $ unread t -- (2) yield r plan (parse pr)
パースして残った入力は (2) で Unread a
として push back しています.(1) の unreading
で Unread
を適切に処理する Process
へと変換されます.
これで Source
をパースすることができるようになりました.
import qualified System.IO as IO import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Exception.Lifted (bracket) import Control.Applicative (empty) import qualified Data.LTSV as L -- 前回の記事で作成した LTSV パーサ main :: IO () main = readAll "test.txt" >>= print where readAll fp = bracket (IO.openBinaryFile fp IO.ReadMode) IO.hClose action action h = run $ sourceHandle h ~> pp L.recordNL sourceHandle :: (MonadIO m) => IO.Handle -> Machine m BS.ByteString sourceHandle h = repeatedly $ do bs <- liftIO $ BS.hGetSome h 10 -- わざと小さくしている if BS.null bs then empty -- 0.2.3.1 より後のバージョンでは stop が無くなっている else yield bs -- 実行結果: -- sourceHandle 自体の出力 -- ["aaa:111\tbb","b:222\naaa:","111\tbbb:22","2\tccc:333\n"] -- main の出力 -- [[("aaa","111"),("bbb","222")],[("aaa","111"),("bbb","222"),("ccc","333")]]
入力は \n
に関係なく途切れていますが,正しく処理されています.
おわりに
Plan
からMachine
が作れるMachine
を連結することでより大きなMachine
が作れるSource
やProcess
もPlan
を書くことで自由に定義できるTee
やWye
で複数の入力を連結し,処理することができるMealy
やMoore
でトランデューサを定義し,Process
として連結できる- parser を
Process
として組み込んでみた (Unread
の利用例でもある)
次のバージョンではコードが結構変わっているっぽいです.