cassy: Cassandra client for Haskell
最近 Cassandra を利用する機会がありまして,今回はその絡みで調べてみたことです.
そういえば最近になって 1.2 がリリースされましたね.atomic_batch
とか気になります.
はじめに
cassy は Haskell で書かれた Cassandra クライアントです.ClientOptions でも紹介されています.
以降の内容は version 0.4.0.1 に基づいています.
基本
使い方は github の README に書いてありま...と思ったら,古いのか記述どおりの内容だと動きません (2012/12/30 時点). 基本的な使い方は,以下のように Cassandra ノードへのコネクションプールを作成し,runCas で操作を実行します.
import Database.Cassandra.Basic import Control.Monad.Trans.Class (lift) import qualified Data.Text.IO as T main :: IO () main = do pool <- createCassandraPool servers 1 3 300 ksTest column <- runCas pool $ do insert cfTest "rowkey_aaa" QUORUM [ packCol ( TUtf8 "sampleValue" , encodeCas $ TUtf8 "ほげほげ" ) ] getCol cfTest "rowkey_aaa" (TUtf8 "samplevalue") QUORUM case column of Nothing -> putStrLn "not found" Just c -> do let val = decodeCas $ colVal c :: TUtf8 T.putStrLn $ getUtf8 val servers :: [Server] servers = [("192.168.xxx.xxx", 9160)] ksTest :: KeySpace ksTest = "test_ks" cfTest :: ColumnFamily cfTest = "test_cf"
createCassandraPool の第 2,3,4 引数は resource-pool パッケージに関係しており,順にサブプール数,サブプール毎のコネクション数,未使用コネクションを維持する秒数を指定します.
例外処理
insert/get/delete は CassandraException を投げるため,これらに備えておく必要があります.
しかし現時点 (version <= 0.4.0.1
) では,insert だけは cassandra-thrift の例外をそのまま投げるため,CassandraException で catch できません (次のリリースでは修正されています).
import Control.Exception.Lifted (catch) import Prelude hiding (catch) main = do ... runCas pool $ operation `catch` (\(e :: CassandraException) -> エラー処理) ...
Slice Query
KeySelector とか Selector を使います.
main = do pool <- createCassandraPool servers 1 3 300 ksToDoList rows <- runCas pool $ getRows ["rowkey_aaa", "rowkey_ccc"] printRow $ Map.lookup "rowkey_aaa" rows -- OK printRow $ Map.lookup "rowkey_bbb" rows -- not found printRow $ Map.lookup "rowkey_ccc" rows -- OK where getRows ks = do getMulti cfTest (Keys ks) Range { rangeStart = Just (TInt 1) , rangeEnd = Just (TInt 3) , rangeOrder = Regular , rangeLimit = 1024 } QUORUM printRow Nothing = putStrLn "not found" printRow (Just row) = forM_ row $ \c -> T.putStrLn $ getUtf8 . decodeCas . colVal $ c
Composite Column
少し複雑なデータ構造だと Composite Column を使いたくなりますよね.
"user_id:00001": { "name": "krdlab", "e-mail": [ "krdlab@example.net", "krdlab@example.com" ] }
例えば上記のようなデータを Composite Column で保存する場合は,以下のようにカラム名に当たる部分をタプルにします.
main = do ... runCas pool $ do cols <- exampleInsertAndGet printColumns cols where exampleInsertAndGet = do insert cfUsers "user_id:00001" QUORUM [ packCol ((TUtf8 "name" , TInt 0), "krdlab") , packCol ((TUtf8 "reading", TInt 0), encodeCas . TUtf8 $ "けーあーるでぃーらぼ") , packCol ((TUtf8 "e-mail" , TInt 0), "krdlab@example.net") , packCol ((TUtf8 "e-mail" , TInt 1), "krdlab@example.com") ] get cfUsers "user_id:00001" All QUORUM printColumns cs = lift $ forM_ cs $ \c -> do let (k, v) = unpackCol c :: ((TUtf8, TInt), Value) print k T.putStrLn $ getUtf8 . decodeCas $ v
これは以下のように Cassandra へ格納されます."!" は cassy が追加しています.
___________________________________________________________________________ "user_id:00001" -> | name:0:! | reading:0:! | e-mail:0:! | e-mail:1:! | +----------+----------------------+--------------------+--------------------+ | krdlab | けーあーるでぃーらぼ | krdlab@example.net | krdlab@example.com | ---------------------------------------------------------------------------
JSON データ
aeson の FromJSON/ToJSON のインスタンスとして定義しておくと,自動的に serialize/deserialize してくれます. しかし現状はどうも 1 つの column にシリアライズされたデータを押し込む方式のようですから,この点はデータ設計時に注意が必要です.
{-# LANGUAGE OverloadedStrings, DeriveGeneric #-} module Main where import Database.Cassandra.JSON -- Basic ではなくこちらを import ... import GHC.Generics (Generic) import qualified Data.Aeson as A data User = User { name :: Text , reading :: Text , emails :: [ByteString] } deriving (Show, Generic) instance A.FromJSON User instance A.ToJSON User main :: IO () main = do ... runCas pool $ do c <- exampleInsertAndGet lift $ print (c :: Maybe User) where exampleInsertAndGet = do let user = User "krdlab" "けーあーるでぃーらぼ" ["krdlab@example.com"] insertCol cfUsers "user_id:00001" (TUtf8 "krdlab") QUORUM user getCol cfUsers "user_id:00001" (TUtf8 "krdlab") QUORUM
寄り道: resource-pool
名前の通り,リソースプールを提供するパッケージです.cassy ではコネクションを管理する目的で使用されています.
管理対象のリソース (Entry) をいくつかのサブプール (LocalPool) に分けて管理しています (Striped Pooling).
↓ sub-pools [Pool] --+-- [LocalPool] --+-- [Entry] <- 今回はここがコネクション | | | ... | +-- [Entry] ... +-- [LocalPool] --+-- [Entry] | ... +-- [Entry]
この様に分割管理しているのは,マルチスレッド下でのリソース取得における競合の発生を抑えるためのようです.takeResource では threadId の hash 値に基づいて LocalPool が選択されます.
また裏では reaper スレッドが動いており,idle 状態のリソースは createPool で指定された時間を過ぎると destroy されるようになっています.
少し気になった点としては,0.2.1.1 時点では Pool 自体の破棄方法が提供されておらず,long running なアプリで Pool を変更したい時や,そうでなくても終了処理で確保済みのリソースを破棄したい時には困るのではないか?ということです.他の方はどうしているんでしょうか...
おわりに
- cassy を使えば Haskell から一通りの操作 (insert/get/slice-query/delete) は実行可能です.
- Cassandra 用のデータタイプ (CasType a) も定義されています.
- aeson を用いたデータの auto-serialize/deserialize は便利ですが,各フィールドがカラムに収まるような方式もありかなと思っています (persistent パッケージと似た感じするとか).