Haskell の machines に入門してみた,というお話

はじめに

io-streams パッケージがリリースされた折にふと「conduit,pipes,io-streams 以外の streaming data を扱うライブラリには何があるんだろうか?」と疑問に思いつぶやいてみたところ, machines がある ということを教えていただきました.

気になったので調べてみた,というのが今回の内容です.

基本的な使い方に始まり,何とか attoparsec を組み込むあたりまでは辿り着きました.なお,GHC 7.4.1 を使用しています.

見出し

  • これは何?
  • 雰囲気
  • どう使うの?
    • 基本形
    • Source の作成
    • Process の作成
    • Transducer を組み込む
    • 複数入力の取り扱い
  • Parser を組み込む
  • おわりに

これは何?

今回の対象は↓これ.

リポジトリREADME によれば,

Machines are demand driven input sources like pipes or conduits, but can support multiple inputs.

だそうです.加えてトランスデューサのデータ構造も定義されています.

用意されている API はシンプルに見えるのですが,どれも汎用性の高いものばかりです.

雰囲気

  • Plan から Machine を作成
  • (<~)(~>) を使って Machine をつなげる
  • Source は入力を読まない Machine
    • 文字通りソースとして利用する
  • Process は a -> b という関数に相当する Machine
    • stream に何か処理をかけたいときはこれを利用する
  • Tee や Wye は複数入力を扱う Machine
  • Mealy や Moore はトランスデューサを表現
    • Automaton のインスタンスになっているため,Process にして連結できる
  • Unread は入力の push back を表現
    • 0.2.3.1 では使い方がわからず...
    • github から commit f03dd47 までは行ったバージョンを持ってくると unreading が定義されていて Process 化できる
  • Automaton クラスのインスタンスは Process になる
    • auto 関数を使う
    • (->)インスタンスになっているため,a -> b 型の関数は auto で Process になる

最後に run すると動きます.

どう使うの?

ドキュメントとソース (の一部) を読んでサンプルコードを作ってみました.なお,以下すべてにおいて先頭の

{-# LANGUAGE OverloadedStrings #-}
module Main where
import Data.Machine

を省略しています.

基本形

最も単純な例として,リストをソースとしてそれをそのまま出力するコードです.

main :: IO ()
main = runT test >>= print
    where
        test = source [1..10] ~> echo

-- [1,2,3,4,5,6,7,8,9,10]

source 関数は Foldableインスタンスから Source を生成します.

(~>)Data.Machine.Process に定義されている関数で,ProcessTMachineT を連結します. Source/SourceTProcess/ProcessT はすべて MachineT のシノニムになっているため,これで連結できるというわけです (連結には MachineT m k ok が関係するため,好き勝手に連結できるわけではない).

また,Data.Machine.Process にはいくつかの Process があらかじめ定義されています.上記の echo もその一つです.

Source の作成

Plan を使うことで Source を作ることができます.ここでは Handle から ByteString を読み込んで Source にしてみます.

import qualified Data.ByteString    as BS
import qualified System.IO          as IO
import Control.Monad.IO.Class       (MonadIO, liftIO)
import Control.Exception.Lifted     (bracket)

sourceHandle :: MonadIO m => IO.Handle -> SourceT m BS.ByteString
sourceHandle h = repeatedly $ do
    bs <- liftIO $ BS.hGetSome h 4096
    if BS.null bs
        then stop
        else yield bs

main :: IO ()
main = readAll "test.txt" >>= print
    where
        readAll fp =
            bracket
                (IO.openBinaryFile fp IO.ReadMode)
                IO.hClose
                action
        action h = runT $ sourceHandle h ~> echo    -- ここで使ってる

sourceHandle には,以下の内容をそのまま書き下しているだけです.

  1. データを取り出す
  2. 空なら停止
  3. そうで無いなら yield で返す
  4. 停止するまで繰り返す (repeatedly)

repeatedlyPlan を繰り返し実行する Machine を作り出す関数です.Data.Machine.Types に定義されており,他にも constructbefore があります.

Process の作成

取り出した値を文字列化するだけの単純なものを作ってみます.

main :: IO ()
main = runT test >>= print
    where
        test = src ~> str
        src = source [1..5]
        str = repeatedly $ do  -- 注意: auto show と等価
            i <- await
            yield $ show i

-- ["1","2","3","4","5"]

strauto 関数を使って auto show と書いたものと等価です. Automaton クラスのインスタンスauto を使えば Process に変換できます. (->)インスタンスが定義されているため,a -> bProcess にできます.

Transducer を組み込む

トランスデューサを Process として連結できます.例えば以下のような立ち上がりエッジ検出もどきは

f:id:KrdLab:20130316161327p:plain

次のように書けます.

main :: [Int] -> IO ()
main i = test i >>= print
    where
        test i = runT $ source i ~> auto ms
        ms = Mealy $ \a -> case a of
            0 -> (0, m0)
            1 -> (0, m1)
        m0 = Mealy $ \a -> case a of
            0 -> (0, m0)
            1 -> (1, m1)
        m1 = Mealy $ \a -> case a of
            0 -> (0, m0)
            1 -> (0, m1)

-- > main [0,0,1,1,0,1,0,1,1,1,1,0]
-- [0,0,1,0,0,1,0,1,0,0,0,0]
-- > main [1,0,1,1,0,1,0,1,1,1,1,0]
-- [0,0,1,0,0,1,0,1,0,0,0,0]

Mealy で遷移を組んで,autoProcess にしているだけです.そのまんまですね.

複数入力の取り扱い

TeeWye を使うと複数の入力を扱うことができます.

main :: IO ()
main = runT test >>= print
    where
        test = tee inL inR use

        inL :: Process Int Int
        inL = source [1..10]
        inR :: Process Int Int
        inR = source [1..10] ~> auto (*10)

        use = repeatedly $ do
            l <- awaits L
            r <- awaits R
            yield $ l + r

-- [11,22,33,44,55,66,77,88,99,110]

test 関数の内容は単純で,以下のようなことをしているだけです.

inL: [1..10] ------------+
                         |
                         use: l + r ---> 出力
                         |
inR: [1..10] --> (*10) --+

Tee の部分は Plan を使って作成しています.各入力は対応するコンストラクタを awaits 関数に指定して取り出します. Wye を使う場合も多分同じようにします.

Parser を組み込む

Planattoparsec の parsing 処理を組み込めば attoparsec-conduit みたいなものが作れます.

なお,ここだけは github から最新の (commit f03dd47 まで入っている) コードを取得して利用しています. 最近になって unreading という,Unread を利用した PlanProcess 化する関数が入ったためです.

import qualified Data.ByteString            as BS
import Control.Monad                        (unless)
import qualified Data.Attoparsec.ByteString as AB
import qualified Data.Attoparsec.Types      as A

-- ByteString や Text の差を吸収するためのクラス
class ParserInput a where
    parse  :: A.Parser a b -> a -> A.IResult a b
    isNull :: a -> Bool

-- とりあえず ByteString だけ定義
instance ParserInput BS.ByteString where
    parse  = AB.parse
    isNull = BS.null

-- parser process の本体
pp :: (ParserInput i, Show o) => A.Parser i o -> Process i o
pp pr = unreading $ plan (parse pr)                 -- (1)
    where
        plan p = await >>= runp
            where
                runp i = go $ p i
                go (A.Fail _ _ err) = error err     -- XXX ごまかした
                go (A.Partial p')   = plan p'
                go (A.Done t r)     = do
                    unless (isNull t) $ unread t    -- (2)
                    yield r
                    plan (parse pr)

パースして残った入力は (2) で Unread a として push back しています.(1) の unreadingUnread を適切に処理する Process へと変換されます.

これで Source をパースすることができるようになりました.

import qualified System.IO      as IO
import Control.Monad.IO.Class   (MonadIO, liftIO)
import Control.Exception.Lifted (bracket)
import Control.Applicative      (empty)
import qualified Data.LTSV      as L    -- 前回の記事で作成した LTSV パーサ

main :: IO ()
main = readAll "test.txt" >>= print
    where
        readAll fp =
            bracket
                (IO.openBinaryFile fp IO.ReadMode)
                IO.hClose
                action
        action h = run $ sourceHandle h ~> pp L.recordNL

sourceHandle :: (MonadIO m) => IO.Handle -> Machine m BS.ByteString
sourceHandle h = repeatedly $ do
    bs <- liftIO $ BS.hGetSome h 10     -- わざと小さくしている
    if BS.null bs
        then empty                      -- 0.2.3.1 より後のバージョンでは stop が無くなっている
        else yield bs

-- 実行結果:
-- sourceHandle 自体の出力
-- ["aaa:111\tbb","b:222\naaa:","111\tbbb:22","2\tccc:333\n"]
-- main の出力
-- [[("aaa","111"),("bbb","222")],[("aaa","111"),("bbb","222"),("ccc","333")]]

入力は \n に関係なく途切れていますが,正しく処理されています.

おわりに

  • Plan から Machine が作れる
  • Machine を連結することでより大きな Machine が作れる
  • SourceProcessPlan を書くことで自由に定義できる
  • TeeWye で複数の入力を連結し,処理することができる
  • MealyMoore でトランデューサを定義し,Process として連結できる
  • parser を Process として組み込んでみた (Unread の利用例でもある)

次のバージョンではコードが結構変わっているっぽいです.