最近 Cassandra を利用する機会がありまして,今回はその絡みで調べてみたことです.
そういえば最近になって 1.2 がリリースされましたね.atomic_batch
とか気になります.
はじめに
cassy は Haskell で書かれた Cassandra クライアントです.ClientOptions でも紹介されています.
以降の内容は version 0.4.0.1 に基づいています.
基本
使い方は github の README に書いてありま...と思ったら,古いのか記述どおりの内容だと動きません (2012/12/30 時点).
基本的な使い方は,以下のように Cassandra ノードへのコネクションプールを作成し,runCas で操作を実行します.
import Database.Cassandra.Basic
import Control.Monad.Trans.Class (lift)
import qualified Data.Text.IO as T
main :: IO ()
main = do
pool <- createCassandraPool servers 1 3 300 ksTest
column <- runCas pool $ do
insert cfTest "rowkey_aaa" QUORUM
[ packCol ( TUtf8 "sampleValue"
, encodeCas $ TUtf8 "ほげほげ"
)
]
getCol cfTest "rowkey_aaa" (TUtf8 "samplevalue") QUORUM
case column of
Nothing -> putStrLn "not found"
Just c -> do
let val = decodeCas $ colVal c :: TUtf8
T.putStrLn $ getUtf8 val
servers :: [Server]
servers = [("192.168.xxx.xxx", 9160)]
ksTest :: KeySpace
ksTest = "test_ks"
cfTest :: ColumnFamily
cfTest = "test_cf"
createCassandraPool の第 2,3,4 引数は resource-pool パッケージに関係しており,順にサブプール数,サブプール毎のコネクション数,未使用コネクションを維持する秒数を指定します.
例外処理
insert/get/delete は CassandraException を投げるため,これらに備えておく必要があります.
しかし現時点 (version <= 0.4.0.1
) では,insert だけは cassandra-thrift の例外をそのまま投げるため,CassandraException で catch できません (次のリリースでは修正されています).
import Control.Exception.Lifted (catch)
import Prelude hiding (catch)
main = do
...
runCas pool $
operation `catch` (\(e :: CassandraException) -> エラー処理)
...
Slice Query
KeySelector とか Selector を使います.
main = do
pool <- createCassandraPool servers 1 3 300 ksToDoList
rows <- runCas pool $ getRows ["rowkey_aaa", "rowkey_ccc"]
printRow $ Map.lookup "rowkey_aaa" rows
printRow $ Map.lookup "rowkey_bbb" rows
printRow $ Map.lookup "rowkey_ccc" rows
where
getRows ks = do
getMulti cfTest (Keys ks)
Range { rangeStart = Just (TInt 1)
, rangeEnd = Just (TInt 3)
, rangeOrder = Regular
, rangeLimit = 1024
}
QUORUM
printRow Nothing = putStrLn "not found"
printRow (Just row) = forM_ row $ \c ->
T.putStrLn $ getUtf8 . decodeCas . colVal $ c
Composite Column
少し複雑なデータ構造だと Composite Column を使いたくなりますよね.
"user_id:00001": {
"name": "krdlab",
"e-mail": [
"krdlab@example.net",
"krdlab@example.com"
]
}
例えば上記のようなデータを Composite Column で保存する場合は,以下のようにカラム名に当たる部分をタプルにします.
main = do
...
runCas pool $ do
cols <- exampleInsertAndGet
printColumns cols
where
exampleInsertAndGet = do
insert cfUsers "user_id:00001" QUORUM
[ packCol ((TUtf8 "name" , TInt 0), "krdlab")
, packCol ((TUtf8 "reading", TInt 0), encodeCas . TUtf8 $ "けーあーるでぃーらぼ")
, packCol ((TUtf8 "e-mail" , TInt 0), "krdlab@example.net")
, packCol ((TUtf8 "e-mail" , TInt 1), "krdlab@example.com")
]
get cfUsers "user_id:00001" All QUORUM
printColumns cs = lift $ forM_ cs $ \c -> do
let (k, v) = unpackCol c :: ((TUtf8, TInt), Value)
print k
T.putStrLn $ getUtf8 . decodeCas $ v
これは以下のように Cassandra へ格納されます."!" は cassy が追加しています.
___________________________________________________________________________
"user_id:00001" -> | name:0:! | reading:0:! | e-mail:0:! | e-mail:1:! |
+----------+----------------------+--------------------+--------------------+
| krdlab | けーあーるでぃーらぼ | krdlab@example.net | krdlab@example.com |
---------------------------------------------------------------------------
JSON データ
aeson の FromJSON/ToJSON のインスタンスとして定義しておくと,自動的に serialize/deserialize してくれます.
しかし現状はどうも 1 つの column にシリアライズされたデータを押し込む方式のようですから,この点はデータ設計時に注意が必要です.
{-# LANGUAGE OverloadedStrings, DeriveGeneric #-}
module Main where
import Database.Cassandra.JSON
...
import GHC.Generics (Generic)
import qualified Data.Aeson as A
data User = User
{ name :: Text
, reading :: Text
, emails :: [ByteString]
}
deriving (Show, Generic)
instance A.FromJSON User
instance A.ToJSON User
main :: IO ()
main = do
...
runCas pool $ do
c <- exampleInsertAndGet
lift $ print (c :: Maybe User)
where
exampleInsertAndGet = do
let user = User "krdlab" "けーあーるでぃーらぼ" ["krdlab@example.com"]
insertCol cfUsers "user_id:00001" (TUtf8 "krdlab") QUORUM user
getCol cfUsers "user_id:00001" (TUtf8 "krdlab") QUORUM
寄り道: resource-pool
名前の通り,リソースプールを提供するパッケージです.cassy ではコネクションを管理する目的で使用されています.
管理対象のリソース (Entry) をいくつかのサブプール (LocalPool) に分けて管理しています (Striped Pooling).
↓ sub-pools
[Pool] --+-- [LocalPool] --+-- [Entry] <- 今回はここがコネクション
| |
| ...
| +-- [Entry]
...
+-- [LocalPool] --+-- [Entry]
|
...
+-- [Entry]
この様に分割管理しているのは,マルチスレッド下でのリソース取得における競合の発生を抑えるためのようです.takeResource では threadId の hash 値に基づいて LocalPool が選択されます.
また裏では reaper スレッドが動いており,idle 状態のリソースは createPool で指定された時間を過ぎると destroy されるようになっています.
少し気になった点としては,0.2.1.1 時点では Pool 自体の破棄方法が提供されておらず,long running なアプリで Pool を変更したい時や,そうでなくても終了処理で確保済みのリソースを破棄したい時には困るのではないか?ということです.他の方はどうしているんでしょうか...
おわりに
- cassy を使えば Haskell から一通りの操作 (insert/get/slice-query/delete) は実行可能です.
- Cassandra 用のデータタイプ (CasType a) も定義されています.
- aeson を用いたデータの auto-serialize/deserialize は便利ですが,各フィールドがカラムに収まるような方式もありかなと思っています (persistent パッケージと似た感じするとか).