こちらの記事 で作者の方が haskell-relational-record の解説をされています.
MySQL からも使いたいなぁと思ったので,他の RDBMS driver をまねて MySQL driver を書いてみました.
haskell-relational-record-driver-mysql
少しずつブラッシュアップしていきます.
io-streams パッケージがリリースされた折にふと「conduit,pipes,io-streams 以外の streaming data を扱うライブラリには何があるんだろうか?」と疑問に思いつぶやいてみたところ, machines がある ということを教えていただきました.
気になったので調べてみた,というのが今回の内容です.
基本的な使い方に始まり,何とか attoparsec を組み込むあたりまでは辿り着きました.なお,GHC 7.4.1 を使用しています.
今回の対象は↓これ.
Machines are demand driven input sources like pipes or conduits, but can support multiple inputs.
だそうです.加えてトランスデューサのデータ構造も定義されています.
用意されている API はシンプルに見えるのですが,どれも汎用性の高いものばかりです.
(<~)
や (~>)
を使って Machine をつなげるa -> b
という関数に相当する Machine
commit f03dd47
までは行ったバージョンを持ってくると unreading が定義されていて Process 化できる(->)
がインスタンスになっているため,a -> b
型の関数は auto で Process になる最後に run
すると動きます.
ドキュメントとソース (の一部) を読んでサンプルコードを作ってみました.なお,以下すべてにおいて先頭の
{-# LANGUAGE OverloadedStrings #-} module Main where import Data.Machine
を省略しています.
最も単純な例として,リストをソースとしてそれをそのまま出力するコードです.
main :: IO () main = runT test >>= print where test = source [1..10] ~> echo -- [1,2,3,4,5,6,7,8,9,10]
source
関数は Foldable
のインスタンスから Source
を生成します.
(~>)
は Data.Machine.Process
に定義されている関数で,ProcessT
と MachineT
を連結します.
Source
/SourceT
や Process
/ProcessT
はすべて MachineT
のシノニムになっているため,これで連結できるというわけです (連結には MachineT m k o
の k
が関係するため,好き勝手に連結できるわけではない).
また,Data.Machine.Process
にはいくつかの Process
があらかじめ定義されています.上記の echo
もその一つです.
Plan
を使うことで Source
を作ることができます.ここでは Handle
から ByteString
を読み込んで Source
にしてみます.
import qualified Data.ByteString as BS import qualified System.IO as IO import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Exception.Lifted (bracket) sourceHandle :: MonadIO m => IO.Handle -> SourceT m BS.ByteString sourceHandle h = repeatedly $ do bs <- liftIO $ BS.hGetSome h 4096 if BS.null bs then stop else yield bs main :: IO () main = readAll "test.txt" >>= print where readAll fp = bracket (IO.openBinaryFile fp IO.ReadMode) IO.hClose action action h = runT $ sourceHandle h ~> echo -- ここで使ってる
sourceHandle
には,以下の内容をそのまま書き下しているだけです.
repeatedly
は Plan
を繰り返し実行する Machine
を作り出す関数です.Data.Machine.Types
に定義されており,他にも construct
や before
があります.
取り出した値を文字列化するだけの単純なものを作ってみます.
main :: IO () main = runT test >>= print where test = src ~> str src = source [1..5] str = repeatedly $ do -- 注意: auto show と等価 i <- await yield $ show i -- ["1","2","3","4","5"]
str
は auto
関数を使って auto show
と書いたものと等価です.
Automaton
クラスのインスタンスは auto
を使えば Process
に変換できます.
(->)
のインスタンスが定義されているため,a -> b
は Process
にできます.
トランスデューサを Process
として連結できます.例えば以下のような立ち上がりエッジ検出もどきは
次のように書けます.
main :: [Int] -> IO () main i = test i >>= print where test i = runT $ source i ~> auto ms ms = Mealy $ \a -> case a of 0 -> (0, m0) 1 -> (0, m1) m0 = Mealy $ \a -> case a of 0 -> (0, m0) 1 -> (1, m1) m1 = Mealy $ \a -> case a of 0 -> (0, m0) 1 -> (0, m1) -- > main [0,0,1,1,0,1,0,1,1,1,1,0] -- [0,0,1,0,0,1,0,1,0,0,0,0] -- > main [1,0,1,1,0,1,0,1,1,1,1,0] -- [0,0,1,0,0,1,0,1,0,0,0,0]
Mealy
で遷移を組んで,auto
で Process
にしているだけです.そのまんまですね.
Tee
や Wye
を使うと複数の入力を扱うことができます.
main :: IO () main = runT test >>= print where test = tee inL inR use inL :: Process Int Int inL = source [1..10] inR :: Process Int Int inR = source [1..10] ~> auto (*10) use = repeatedly $ do l <- awaits L r <- awaits R yield $ l + r -- [11,22,33,44,55,66,77,88,99,110]
test
関数の内容は単純で,以下のようなことをしているだけです.
inL: [1..10] ------------+ | use: l + r ---> 出力 | inR: [1..10] --> (*10) --+
Tee
の部分は Plan
を使って作成しています.各入力は対応するコンストラクタを awaits
関数に指定して取り出します.
Wye
を使う場合も多分同じようにします.
Plan
に attoparsec
の parsing 処理を組み込めば attoparsec-conduit
みたいなものが作れます.
なお,ここだけは github から最新の (commit f03dd47 まで入っている) コードを取得して利用しています.
最近になって unreading
という,Unread
を利用した Plan
を Process
化する関数が入ったためです.
import qualified Data.ByteString as BS import Control.Monad (unless) import qualified Data.Attoparsec.ByteString as AB import qualified Data.Attoparsec.Types as A -- ByteString や Text の差を吸収するためのクラス class ParserInput a where parse :: A.Parser a b -> a -> A.IResult a b isNull :: a -> Bool -- とりあえず ByteString だけ定義 instance ParserInput BS.ByteString where parse = AB.parse isNull = BS.null -- parser process の本体 pp :: (ParserInput i, Show o) => A.Parser i o -> Process i o pp pr = unreading $ plan (parse pr) -- (1) where plan p = await >>= runp where runp i = go $ p i go (A.Fail _ _ err) = error err -- XXX ごまかした go (A.Partial p') = plan p' go (A.Done t r) = do unless (isNull t) $ unread t -- (2) yield r plan (parse pr)
パースして残った入力は (2) で Unread a
として push back しています.(1) の unreading
で Unread
を適切に処理する Process
へと変換されます.
これで Source
をパースすることができるようになりました.
import qualified System.IO as IO import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Exception.Lifted (bracket) import Control.Applicative (empty) import qualified Data.LTSV as L -- 前回の記事で作成した LTSV パーサ main :: IO () main = readAll "test.txt" >>= print where readAll fp = bracket (IO.openBinaryFile fp IO.ReadMode) IO.hClose action action h = run $ sourceHandle h ~> pp L.recordNL sourceHandle :: (MonadIO m) => IO.Handle -> Machine m BS.ByteString sourceHandle h = repeatedly $ do bs <- liftIO $ BS.hGetSome h 10 -- わざと小さくしている if BS.null bs then empty -- 0.2.3.1 より後のバージョンでは stop が無くなっている else yield bs -- 実行結果: -- sourceHandle 自体の出力 -- ["aaa:111\tbb","b:222\naaa:","111\tbbb:22","2\tccc:333\n"] -- main の出力 -- [[("aaa","111"),("bbb","222")],[("aaa","111"),("bbb","222"),("ccc","333")]]
入力は \n
に関係なく途切れていますが,正しく処理されています.
Plan
から Machine
が作れるMachine
を連結することでより大きな Machine
が作れるSource
や Process
も Plan
を書くことで自由に定義できるTee
や Wye
で複数の入力を連結し,処理することができるMealy
や Moore
でトランデューサを定義し,Process
として連結できるProcess
として組み込んでみた (Unread
の利用例でもある)次のバージョンではコードが結構変わっているっぽいです.
少し前から LTSV が話題になっています.ログ出力データの処理がやりやすそうです.
上記サイトに載っているとおり仕様がシンプルですから,パーサの練習問題としてもってこいな気がしました.
ByteString
の扱いがうまくできていないような気もしますが,おそらく期待する動作になるはずです.
{-# LANGUAGE OverloadedStrings #-} import Data.ByteString (ByteString, pack) import Data.Attoparsec.ByteString import Data.Word (Word8) import Control.Applicative ( (<*), (<|>) ) type Field = (ByteString, ByteString) type Record = [Field] type LTSV = [Record] tab, cr, lf, colon :: Parser Word8 tab = word8 9 cr = word8 13 lf = word8 10 colon = word8 58 -- | -- LTSV format parser. -- -- >>> parseOnly ltsv "aaa:111\tbbb:222\nccc:333\tddd:444" -- Right [[("aaa","111"),("bbb","222")],[("ccc","333"),("ddd","444")]] -- ltsv :: Parser LTSV ltsv = do rs <- many' recordNL r <- record return $ if null r -- 0 or 1 を表す方法は...? then rs else rs ++ [r] recordNL :: Parser Record recordNL = record <* nl where nl = (cr >> lf) <|> lf record :: Parser Record record = sepBy field tab where field = do l <- label <* colon v <- value return (pack l, pack v) label = many1 $ satisfy $ inClass "0-9A-Za-z_.-" value = many' $ satisfy $ \w -> w `notElem` [9, 10, 13]
attoparsec-conduit パッケージと組み合わせることで,ストリーミングデータに適用可能となります.
(上記コードの recordNL
を sinkParser
に渡す感じ)
最近 Cassandra を利用する機会がありまして,今回はその絡みで調べてみたことです.
そういえば最近になって 1.2 がリリースされましたね.atomic_batch
とか気になります.
cassy は Haskell で書かれた Cassandra クライアントです.ClientOptions でも紹介されています.
以降の内容は version 0.4.0.1 に基づいています.
使い方は github の README に書いてありま...と思ったら,古いのか記述どおりの内容だと動きません (2012/12/30 時点). 基本的な使い方は,以下のように Cassandra ノードへのコネクションプールを作成し,runCas で操作を実行します.
import Database.Cassandra.Basic import Control.Monad.Trans.Class (lift) import qualified Data.Text.IO as T main :: IO () main = do pool <- createCassandraPool servers 1 3 300 ksTest column <- runCas pool $ do insert cfTest "rowkey_aaa" QUORUM [ packCol ( TUtf8 "sampleValue" , encodeCas $ TUtf8 "ほげほげ" ) ] getCol cfTest "rowkey_aaa" (TUtf8 "samplevalue") QUORUM case column of Nothing -> putStrLn "not found" Just c -> do let val = decodeCas $ colVal c :: TUtf8 T.putStrLn $ getUtf8 val servers :: [Server] servers = [("192.168.xxx.xxx", 9160)] ksTest :: KeySpace ksTest = "test_ks" cfTest :: ColumnFamily cfTest = "test_cf"
createCassandraPool の第 2,3,4 引数は resource-pool パッケージに関係しており,順にサブプール数,サブプール毎のコネクション数,未使用コネクションを維持する秒数を指定します.
insert/get/delete は CassandraException を投げるため,これらに備えておく必要があります.
しかし現時点 (version <= 0.4.0.1
) では,insert だけは cassandra-thrift の例外をそのまま投げるため,CassandraException で catch できません (次のリリースでは修正されています).
import Control.Exception.Lifted (catch) import Prelude hiding (catch) main = do ... runCas pool $ operation `catch` (\(e :: CassandraException) -> エラー処理) ...
KeySelector とか Selector を使います.
main = do pool <- createCassandraPool servers 1 3 300 ksToDoList rows <- runCas pool $ getRows ["rowkey_aaa", "rowkey_ccc"] printRow $ Map.lookup "rowkey_aaa" rows -- OK printRow $ Map.lookup "rowkey_bbb" rows -- not found printRow $ Map.lookup "rowkey_ccc" rows -- OK where getRows ks = do getMulti cfTest (Keys ks) Range { rangeStart = Just (TInt 1) , rangeEnd = Just (TInt 3) , rangeOrder = Regular , rangeLimit = 1024 } QUORUM printRow Nothing = putStrLn "not found" printRow (Just row) = forM_ row $ \c -> T.putStrLn $ getUtf8 . decodeCas . colVal $ c
少し複雑なデータ構造だと Composite Column を使いたくなりますよね.
"user_id:00001": { "name": "krdlab", "e-mail": [ "krdlab@example.net", "krdlab@example.com" ] }
例えば上記のようなデータを Composite Column で保存する場合は,以下のようにカラム名に当たる部分をタプルにします.
main = do ... runCas pool $ do cols <- exampleInsertAndGet printColumns cols where exampleInsertAndGet = do insert cfUsers "user_id:00001" QUORUM [ packCol ((TUtf8 "name" , TInt 0), "krdlab") , packCol ((TUtf8 "reading", TInt 0), encodeCas . TUtf8 $ "けーあーるでぃーらぼ") , packCol ((TUtf8 "e-mail" , TInt 0), "krdlab@example.net") , packCol ((TUtf8 "e-mail" , TInt 1), "krdlab@example.com") ] get cfUsers "user_id:00001" All QUORUM printColumns cs = lift $ forM_ cs $ \c -> do let (k, v) = unpackCol c :: ((TUtf8, TInt), Value) print k T.putStrLn $ getUtf8 . decodeCas $ v
これは以下のように Cassandra へ格納されます."!" は cassy が追加しています.
___________________________________________________________________________ "user_id:00001" -> | name:0:! | reading:0:! | e-mail:0:! | e-mail:1:! | +----------+----------------------+--------------------+--------------------+ | krdlab | けーあーるでぃーらぼ | krdlab@example.net | krdlab@example.com | ---------------------------------------------------------------------------
aeson の FromJSON/ToJSON のインスタンスとして定義しておくと,自動的に serialize/deserialize してくれます. しかし現状はどうも 1 つの column にシリアライズされたデータを押し込む方式のようですから,この点はデータ設計時に注意が必要です.
{-# LANGUAGE OverloadedStrings, DeriveGeneric #-} module Main where import Database.Cassandra.JSON -- Basic ではなくこちらを import ... import GHC.Generics (Generic) import qualified Data.Aeson as A data User = User { name :: Text , reading :: Text , emails :: [ByteString] } deriving (Show, Generic) instance A.FromJSON User instance A.ToJSON User main :: IO () main = do ... runCas pool $ do c <- exampleInsertAndGet lift $ print (c :: Maybe User) where exampleInsertAndGet = do let user = User "krdlab" "けーあーるでぃーらぼ" ["krdlab@example.com"] insertCol cfUsers "user_id:00001" (TUtf8 "krdlab") QUORUM user getCol cfUsers "user_id:00001" (TUtf8 "krdlab") QUORUM
名前の通り,リソースプールを提供するパッケージです.cassy ではコネクションを管理する目的で使用されています.
管理対象のリソース (Entry) をいくつかのサブプール (LocalPool) に分けて管理しています (Striped Pooling).
↓ sub-pools [Pool] --+-- [LocalPool] --+-- [Entry] <- 今回はここがコネクション | | | ... | +-- [Entry] ... +-- [LocalPool] --+-- [Entry] | ... +-- [Entry]
この様に分割管理しているのは,マルチスレッド下でのリソース取得における競合の発生を抑えるためのようです.takeResource では threadId の hash 値に基づいて LocalPool が選択されます.
また裏では reaper スレッドが動いており,idle 状態のリソースは createPool で指定された時間を過ぎると destroy されるようになっています.
少し気になった点としては,0.2.1.1 時点では Pool 自体の破棄方法が提供されておらず,long running なアプリで Pool を変更したい時や,そうでなくても終了処理で確保済みのリソースを破棄したい時には困るのではないか?ということです.他の方はどうしているんでしょうか...
長らく放置状態になっていた Lisys ですが,コード管理を GitHub に変更しました.コード管理が中途半端なまま放置するのは精神的によくありませんでしたので.
https://github.com/krdlab/lisys
「LAPACK のほんの一部を C# 向けにラップ + Matrix/Vector + 少量の分析手法」が含まれたライブラリです.
エントリを掘り返してみると,作成はずいぶん前ですね.
昔,Windows 向けの GUI 分析ツールを作るために作成しました.作成当時は .NET 向けの良いライブラリがなく,「じゃあ LAPACK を利用させてもらおうかな」ということでできあがったものです.
元々は CLAPACK の使い方を学ぶために C++ でコードを書いており,そいつの派生物として作成しました.
しかし今となっては Math.NET Project がありますから,そちらの利用を検討した方が良いかもしれません.DotNumerics というのもあります.これらは pure C# なので Lisys よりも利用が簡単です.
sample や test を見ていただければ.
既に Visual Studio 2012 が出ているものの,Lisys は Visual C# 2010 Express と Visual C++ 2010 Express を利用して作成しています (2012 が出る大分前から 2008 → 2010 移行を開始したものの,しばらく放置してたらいつの間にか 2012 が出ていた).
Visual C++ 2010 Express Edition は 32bit コンパイラのみであるため,混合アセンブリを生成する Lisys も 32bit 版しか動作確認をしていません *1.
久しぶりに時間を確保できたため,Resource モナドのあたりを調査してみました.結果としてはよくわからない部分*1 が残っているのですが,ごちゃごちゃを一旦整理する目的で書き出してみました.
Conduit では Resource モナドによるリソース管理が行われています.Resource モナドは例外安全にリソースを解放します.
中身が気になったのでコードを追いかけることにしました.
以下の通りです.
import System.IO import Control.Monad.Trans.Resource (allocate, release, runResourceT) import Control.Monad.Trans.Class (lift) main :: IO () main = runResourceT $ do -- runResourceT :: MonadBaseControl IO m => ResourceT m a -> m a (rkeyO, output) <- allocate (openFile "output.txt" WriteMode) hClose (rkeyI, input) <- allocate (openFile "input.txt" ReadMode) hClose lift $ hGetContents input >>= hPutStr output release rkeyI release rkeyO
allocate でリソースの取得,ならびに release action (hClose) の登録を行い,ReleaseKey と Handle を受け取ります.使い終わったら release に ReleaseKey を指定して登録した hClose を呼び出しています.
また,明示的に release を呼び出さなくても runResourceT を抜けるタイミングでリソースは解放されます.
Conduit でもこれらの関数を使ってリソース管理が行われています.
runResourceT が取る ResourceT の定義は以下の通りです.
... import qualified Data.IORef as I ... newtype ResourceT m a = ResourceT { unResourceT :: I.IORef ReleaseMap -> m a }
この ReleaseMap にリソース解放アクション (release action,型は IO ()) が登録されます.
リソース管理用の各関数は MonadResource に定義されており,ResourceT m はこのインスタンスとして定義されています (定義の詳細は ドキュメント を参照).
前述の通り ResourceT では release action を Map で管理しており,runResourceT から抜ける際に登録済みの release action 全てを呼び出します.
runResourceT の定義は以下の通りです.
-- Control.Monad.Trans.Resource runResourceT :: MonadBaseControl IO m => ResourceT m a -> m a runResourceT (ResourceT r) = do istate <- liftBase $ I.newIORef $ ReleaseMap maxBound minBound IntMap.empty bracket_ (stateAlloc istate) (stateCleanup istate) (r istate)
runResourceT では release action を登録するための最初の ReleaseMap を用意し,bracket_ を経由してリソースアクション*2 r を実行します.stateAlloc では現コンテキストにおいてマップが利用されていることを示すために参照カウントが +1 されます.stateCleanup では参照カウントが -1 されます.
ReleaseMap は参照カウントでリソースを管理しており,カウントが 0 になると stateCleanup 内部で登録済みの全 release action を呼び出します.
なお,各 release action は 1 度しか呼ばれないようになっています*3.
リソース解放の保証と関係しますが,例外安全性については bracket_ により保証されています (control については後述).
-- Control.Monad.Trans.Resource bracket_ :: MonadBaseControl IO m => IO () -> IO () -> m a -> m a bracket_ alloc cleanup inside = control $ \run -> E.bracket_ alloc cleanup (run inside)
bracket_ 内部では E.bracket_ (Control.Exception.bracket_) を利用した例外ハンドリングが行われています.余談ですが,bracket_ の inside と戻り値の型がともに m a なので,元の E.bracket_ のように実行したい計算が IO である必要は無くなっています.
E.bracket_ の定義は以下の通りです.
-- Control.Exception bracket_ :: IO a -> IO b -> IO c -> IO c bracket_ before after thing = bracket before (const after) (const thing) bracket :: IO a -- ^ computation to run first (\"acquire resource\") -> (a -> IO b) -- ^ computation to run last (\"release resource\") -> (a -> IO c) -- ^ computation to run in-between -> IO c -- returns the value from the in-between computation bracket before after thing = mask $ \restore -> do a <- before r <- restore (thing a) `onException` after a _ <- after a return r onException :: IO a -> IO b -> IO a onException io what = io `catch` \e -> do _ <- what throwIO (e :: SomeException) throwIO :: Exception e => e -> IO a throwIO e = IO (raiseIO# (toException e))
bracket では mask により非同期例外をコントロールし,onException により例外を補足することで,after が確実に呼ばれるようになっています.restore は呼び出し元で非同期例外が Unmasked/Masked かによって mask/id のいずれかになります.
非同期例外や mask については以下の情報が参考になりました.
ところで,Control.Monad.Trans.Resource.bracket_ 内部では control 経由で Control.Exception.bracket_ が呼び出されています.
-- Control.Monad.Trans.Resource bracket_ :: MonadBaseControl IO m => IO () -> IO () -> m a -> m a bracket_ alloc cleanup inside = control $ \run -> E.bracket_ alloc cleanup (run inside)
control の定義は monad-control にあります.
-- Control.Monad.Trans.Control control :: MonadBaseControl b m => (RunInBase m b -> b (StM m a)) -> m a control f = liftBaseWith f >>= restoreM
monad-control は,モナド変換子によって積み上げられたスタックにもぐって計算したり戻ってきたりする仕組みの一般化のようですが,正直まだよくわかっていません.lifted-base は monad-control を利用して Control.Exception のより一般的な定義を提供しているようです.
で,liftBaseWith と restoreM の型は以下のようになっています.
-- Control.Monad.Trans.Control class MonadBase b m => MonadBaseControl b m | m -> b where -- | Monadic state of @m@. data StM m :: * -> * liftBaseWith :: (RunInBase m b -> b a) -> m a restoreM :: StM m a -> m a type RunInBase m b = ∀ a. m a -> b (StM m a)
これ,どういうことかわからなくてずっとうねうね考えていたんですが,IO や XxxT に対する定義を眺めた結果,
という感じかな?と思いました...が,正直まだよくわかってないです.
↓眺めていた IO に対する定義
instance MonadBaseControl IO IO where newtype StM IO a = StIO a -- liftBaseWith :: (RunInBase IO IO -> IO a) -> IO a -- ↓展開 -- liftBaseWith :: ((forall a'. IO a' -> IO (StM IO a')) -> IO a) -> IO a liftBaseWith f = f $ liftM StIO restoreM (StIO x) = return x
↓眺めていた IdentityT に対する定義
instance MonadTransControl IdentityT where newtype StT IdentityT a = StId {unStId :: a} liftWith f = IdentityT $ f $ liftM StId . runIdentityT restoreT = IdentityT . liftM unStId instance (MonadBaseControl b m) ⇒ MonadBaseControl b (IdentityT m) where newtype StM (IdentityT m) a = StMId { unStMId :: ComposeSt IdentityT m a } liftBaseWith = defaultLiftBaseWith StMId restoreM = defaultRestoreM unStMId defaultLiftBaseWith :: (MonadTransControl t, MonadBaseControl b m) ⇒ (∀ c. ComposeSt t m c → StM (t m) c) -- ^ 'StM' constructor → ((RunInBase (t m) b → b a) → t m a) defaultLiftBaseWith stM = \f → liftWith $ \run → liftBaseWith $ \runInBase → f $ liftM stM . runInBase . run defaultRestoreM :: (MonadTransControl t, MonadBaseControl b m) ⇒ (StM (t m) a → ComposeSt t m a) -- ^ 'StM' deconstructor → (StM (t m) a → t m a) defaultRestoreM unStM = restoreT . restoreM . unStM
ここはもうちょい調べます.
ライブラリのコードは勉強になりますが,型を追いかけるのは思っていたよりもしんどかったです...
忙しいときほど他のことをやりたくなるのは人の性なのか...というわけで今回も Haskell 関連の内容です.
Twitter の Streamimg API から取得した内容を Conduit で処理する,という内容です.Streaming API は Source として利用するのに最適な気がしたのです.
主な利用パッケージは以下の通りです.
(具体的な import については「リポジトリ」のところに載せたコードを参照)
Streaming API から取得できる JSON データは Aeson で処理します.以下,データ型と parser の定義です.
data Status = Status { text :: Text , createdAt :: ByteString , user :: User } data User = User { screenName :: ByteString } instance FromJSON Status where parseJSON (Object v) = Status <$> v .: "text" <*> v .: "created_at" <*> v .: "user" parseJSON _ = mzero instance FromJSON User where parseJSON (Object v) = User <$> v .: "screen_name" parseJSON _ = mzero
Twitter のステータス JSON データはもっと複雑なのですが,上記では欲しい情報だけを定義しています.parser をサクッと書けるところがイイ感じですね.
attoparsec-conduit の sinkParser を利用すると,attoparsec parser (一つ前で定義した JSON parser) の Sink を作ることができます.これを responseBody と接続することで,Streaming API から流れてくるステータスを順次処理することができます.
まずは Parser を Sink に変換します.繰り返し消費するように再帰させておきます *2.
statusParser :: (Status -> IO ()) -> (String -> IO ()) -> C.Sink ByteString (C.ResourceT IO) () statusParser hs hf = do j <- CA.sinkParser json -- TODO catch ParseError case fromJSON j of Success s@(Status {..}) -> liftIO . hs $ s -- RecordWildCards (言語拡張) を使ってます Error m -> liftIO . hf $ m statusParser hs hf
parsing に失敗すると ParseError が投げられるのですが,これをうまく処理する方法が分からず,保留としています.
(statusParser 内部で catch して Either 返すように書く方法が分からなかった)
あとは Source につなぐだけ.
userStream :: OAuth -> Credential -> IO () userStream oauth credential = do withManager $ \manager -> do ... Response {..} <- http signedReq manager responseBody C.$$ statusParser success failure
これで JSON データをひたすら処理し続けます.
Conduit の利用感覚をつかみたくて始めたのですが,終わってみるとあまり利用しない結果となってしまいました.
しかし attoparsec-conduit を知ることができたので,良かったかなといった感じです.
基本的に Hoogle と HackageDB を使いまくりました.必要に応じてコードを見ながら.何というか,これらが無いとコード書けませんね.
以下は参考にさせていただいた記事です.