Haskell の machines に入門してみた,というお話
はじめに
io-streams パッケージがリリースされた折にふと「conduit,pipes,io-streams 以外の streaming data を扱うライブラリには何があるんだろうか?」と疑問に思いつぶやいてみたところ, machines がある ということを教えていただきました.
気になったので調べてみた,というのが今回の内容です.
基本的な使い方に始まり,何とか attoparsec を組み込むあたりまでは辿り着きました.なお,GHC 7.4.1 を使用しています.
見出し
- これは何?
- 雰囲気
- どう使うの?
- 基本形
- Source の作成
- Process の作成
- Transducer を組み込む
- 複数入力の取り扱い
- Parser を組み込む
- おわりに
これは何?
今回の対象は↓これ.
- machines-0.2.3.1
Machines are demand driven input sources like pipes or conduits, but can support multiple inputs.
だそうです.加えてトランスデューサのデータ構造も定義されています.
用意されている API はシンプルに見えるのですが,どれも汎用性の高いものばかりです.
雰囲気
- Plan から Machine を作成
(<~)
や(~>)
を使って Machine をつなげる- Source は入力を読まない Machine
- 文字通りソースとして利用する
- Process は
a -> b
という関数に相当する Machine- stream に何か処理をかけたいときはこれを利用する
- Tee や Wye は複数入力を扱う Machine
- Mealy や Moore はトランスデューサを表現
- Automaton のインスタンスになっているため,Process にして連結できる
- Unread は入力の push back を表現
- 0.2.3.1 では使い方がわからず...
- github から
commit f03dd47
までは行ったバージョンを持ってくると unreading が定義されていて Process 化できる
- Automaton クラスのインスタンスは Process になる
- auto 関数を使う
(->)
がインスタンスになっているため,a -> b
型の関数は auto で Process になる
最後に run
すると動きます.
どう使うの?
ドキュメントとソース (の一部) を読んでサンプルコードを作ってみました.なお,以下すべてにおいて先頭の
{-# LANGUAGE OverloadedStrings #-} module Main where import Data.Machine
を省略しています.
基本形
最も単純な例として,リストをソースとしてそれをそのまま出力するコードです.
main :: IO () main = runT test >>= print where test = source [1..10] ~> echo -- [1,2,3,4,5,6,7,8,9,10]
source
関数は Foldable
のインスタンスから Source
を生成します.
(~>)
は Data.Machine.Process
に定義されている関数で,ProcessT
と MachineT
を連結します.
Source
/SourceT
や Process
/ProcessT
はすべて MachineT
のシノニムになっているため,これで連結できるというわけです (連結には MachineT m k o
の k
が関係するため,好き勝手に連結できるわけではない).
また,Data.Machine.Process
にはいくつかの Process
があらかじめ定義されています.上記の echo
もその一つです.
Source の作成
Plan
を使うことで Source
を作ることができます.ここでは Handle
から ByteString
を読み込んで Source
にしてみます.
import qualified Data.ByteString as BS import qualified System.IO as IO import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Exception.Lifted (bracket) sourceHandle :: MonadIO m => IO.Handle -> SourceT m BS.ByteString sourceHandle h = repeatedly $ do bs <- liftIO $ BS.hGetSome h 4096 if BS.null bs then stop else yield bs main :: IO () main = readAll "test.txt" >>= print where readAll fp = bracket (IO.openBinaryFile fp IO.ReadMode) IO.hClose action action h = runT $ sourceHandle h ~> echo -- ここで使ってる
sourceHandle
には,以下の内容をそのまま書き下しているだけです.
- データを取り出す
- 空なら停止
- そうで無いなら yield で返す
- 停止するまで繰り返す (repeatedly)
repeatedly
は Plan
を繰り返し実行する Machine
を作り出す関数です.Data.Machine.Types
に定義されており,他にも construct
や before
があります.
Process の作成
取り出した値を文字列化するだけの単純なものを作ってみます.
main :: IO () main = runT test >>= print where test = src ~> str src = source [1..5] str = repeatedly $ do -- 注意: auto show と等価 i <- await yield $ show i -- ["1","2","3","4","5"]
str
は auto
関数を使って auto show
と書いたものと等価です.
Automaton
クラスのインスタンスは auto
を使えば Process
に変換できます.
(->)
のインスタンスが定義されているため,a -> b
は Process
にできます.
Transducer を組み込む
トランスデューサを Process
として連結できます.例えば以下のような立ち上がりエッジ検出もどきは
次のように書けます.
main :: [Int] -> IO () main i = test i >>= print where test i = runT $ source i ~> auto ms ms = Mealy $ \a -> case a of 0 -> (0, m0) 1 -> (0, m1) m0 = Mealy $ \a -> case a of 0 -> (0, m0) 1 -> (1, m1) m1 = Mealy $ \a -> case a of 0 -> (0, m0) 1 -> (0, m1) -- > main [0,0,1,1,0,1,0,1,1,1,1,0] -- [0,0,1,0,0,1,0,1,0,0,0,0] -- > main [1,0,1,1,0,1,0,1,1,1,1,0] -- [0,0,1,0,0,1,0,1,0,0,0,0]
Mealy
で遷移を組んで,auto
で Process
にしているだけです.そのまんまですね.
複数入力の取り扱い
Tee
や Wye
を使うと複数の入力を扱うことができます.
main :: IO () main = runT test >>= print where test = tee inL inR use inL :: Process Int Int inL = source [1..10] inR :: Process Int Int inR = source [1..10] ~> auto (*10) use = repeatedly $ do l <- awaits L r <- awaits R yield $ l + r -- [11,22,33,44,55,66,77,88,99,110]
test
関数の内容は単純で,以下のようなことをしているだけです.
inL: [1..10] ------------+ | use: l + r ---> 出力 | inR: [1..10] --> (*10) --+
Tee
の部分は Plan
を使って作成しています.各入力は対応するコンストラクタを awaits
関数に指定して取り出します.
Wye
を使う場合も多分同じようにします.
Parser を組み込む
Plan
に attoparsec
の parsing 処理を組み込めば attoparsec-conduit
みたいなものが作れます.
なお,ここだけは github から最新の (commit f03dd47 まで入っている) コードを取得して利用しています.
最近になって unreading
という,Unread
を利用した Plan
を Process
化する関数が入ったためです.
import qualified Data.ByteString as BS import Control.Monad (unless) import qualified Data.Attoparsec.ByteString as AB import qualified Data.Attoparsec.Types as A -- ByteString や Text の差を吸収するためのクラス class ParserInput a where parse :: A.Parser a b -> a -> A.IResult a b isNull :: a -> Bool -- とりあえず ByteString だけ定義 instance ParserInput BS.ByteString where parse = AB.parse isNull = BS.null -- parser process の本体 pp :: (ParserInput i, Show o) => A.Parser i o -> Process i o pp pr = unreading $ plan (parse pr) -- (1) where plan p = await >>= runp where runp i = go $ p i go (A.Fail _ _ err) = error err -- XXX ごまかした go (A.Partial p') = plan p' go (A.Done t r) = do unless (isNull t) $ unread t -- (2) yield r plan (parse pr)
パースして残った入力は (2) で Unread a
として push back しています.(1) の unreading
で Unread
を適切に処理する Process
へと変換されます.
これで Source
をパースすることができるようになりました.
import qualified System.IO as IO import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Exception.Lifted (bracket) import Control.Applicative (empty) import qualified Data.LTSV as L -- 前回の記事で作成した LTSV パーサ main :: IO () main = readAll "test.txt" >>= print where readAll fp = bracket (IO.openBinaryFile fp IO.ReadMode) IO.hClose action action h = run $ sourceHandle h ~> pp L.recordNL sourceHandle :: (MonadIO m) => IO.Handle -> Machine m BS.ByteString sourceHandle h = repeatedly $ do bs <- liftIO $ BS.hGetSome h 10 -- わざと小さくしている if BS.null bs then empty -- 0.2.3.1 より後のバージョンでは stop が無くなっている else yield bs -- 実行結果: -- sourceHandle 自体の出力 -- ["aaa:111\tbb","b:222\naaa:","111\tbbb:22","2\tccc:333\n"] -- main の出力 -- [[("aaa","111"),("bbb","222")],[("aaa","111"),("bbb","222"),("ccc","333")]]
入力は \n
に関係なく途切れていますが,正しく処理されています.
おわりに
Plan
からMachine
が作れるMachine
を連結することでより大きなMachine
が作れるSource
やProcess
もPlan
を書くことで自由に定義できるTee
やWye
で複数の入力を連結し,処理することができるMealy
やMoore
でトランデューサを定義し,Process
として連結できる- parser を
Process
として組み込んでみた (Unread
の利用例でもある)
次のバージョンではコードが結構変わっているっぽいです.