読者です 読者をやめる 読者になる 読者になる

Conduit で Twitter Streaming API を扱う

Haskell

忙しいときほど他のことをやりたくなるのは人の性なのか...というわけで今回も Haskell 関連の内容です.

はじめに

Twitter の Streamimg API から取得した内容を Conduit で処理する,という内容です.Streaming API は Source として利用するのに最適な気がしたのです.

環境

  • ubuntu 10.04 32bit
  • ghc 7.0.4
  • cabal-install 0.10.2 *1
    • using version 1.10.2.0 of the Cabal library

利用パッケージ

主な利用パッケージは以下の通りです.
(具体的な import については「リポジトリ」のところに載せたコードを参照)

JSON データの取り扱い

Streaming API から取得できる JSON データは Aeson で処理します.以下,データ型と parser の定義です.

data Status = Status { text :: Text
                     , createdAt :: ByteString
                     , user :: User
                     }

data User = User { screenName :: ByteString
                 }

instance FromJSON Status where
  parseJSON (Object v) = Status
                          <$> v .: "text"
                          <*> v .: "created_at"
                          <*> v .: "user"
  parseJSON _          = mzero

instance FromJSON User where
  parseJSON (Object v) = User
                          <$> v .: "screen_name"
  parseJSON _          = mzero

Twitter のステータス JSON データはもっと複雑なのですが,上記では欲しい情報だけを定義しています.parser をサクッと書けるところがイイ感じですね.

userstream を Conduit で処理

attoparsec-conduit の sinkParser を利用すると,attoparsec parser (一つ前で定義した JSON parser) の Sink を作ることができます.これを responseBody と接続することで,Streaming API から流れてくるステータスを順次処理することができます.

まずは Parser を Sink に変換します.繰り返し消費するように再帰させておきます *2

statusParser :: (Status -> IO ()) -> (String -> IO ()) -> C.Sink ByteString (C.ResourceT IO) ()
statusParser hs hf = do
  j <- CA.sinkParser json  -- TODO catch ParseError
  case fromJSON j of
    Success s@(Status {..}) -> liftIO . hs $ s  -- RecordWildCards (言語拡張) を使ってます
    Error m                 -> liftIO . hf $ m
  statusParser hs hf

parsing に失敗すると ParseError が投げられるのですが,これをうまく処理する方法が分からず,保留としています.
(statusParser 内部で catch して Either 返すように書く方法が分からなかった)

あとは Source につなぐだけ.

userStream :: OAuth -> Credential -> IO ()
userStream oauth credential = do
  withManager $ \manager -> do
    ...
    Response {..} <- http signedReq manager
    responseBody C.$$ statusParser success failure

これで JSON データをひたすら処理し続けます.

実際のコード

機能を順次追加する予定.

終わりに

Conduit の利用感覚をつかみたくて始めたのですが,終わってみるとあまり利用しない結果となってしまいました.
しかし attoparsec-conduit を知ることができたので,良かったかなといった感じです.

参考

基本的に Hoogle と HackageDB を使いまくりました.必要に応じてコードを見ながら.何というか,これらが無いとコード書けませんね.
以下は参考にさせていただいた記事です.

*1:ディレクトリ毎にパッケージをインストールできる奴があったはずだけど,何だっけ?

*2:Conduit 使った場合のプログラム終了処理ってどう書くのだろうか?