読者です 読者をやめる 読者になる 読者になる

cassy: Cassandra client for Haskell

最近 Cassandra を利用する機会がありまして,今回はその絡みで調べてみたことです. そういえば最近になって 1.2 がリリースされましたね.atomic_batch とか気になります.

はじめに

cassy は Haskell で書かれた Cassandra クライアントです.ClientOptions でも紹介されています.

以降の内容は version 0.4.0.1 に基づいています.

基本

使い方は github の README に書いてありま...と思ったら,古いのか記述どおりの内容だと動きません (2012/12/30 時点). 基本的な使い方は,以下のように Cassandra ノードへのコネクションプールを作成し,runCas で操作を実行します.

import Database.Cassandra.Basic
import Control.Monad.Trans.Class (lift)
import qualified Data.Text.IO as T

main :: IO ()
main = do
    pool <- createCassandraPool servers 1 3 300 ksTest
    column <- runCas pool $ do
        insert cfTest "rowkey_aaa" QUORUM
            [ packCol ( TUtf8 "sampleValue"
                      , encodeCas $ TUtf8 "ほげほげ"
                      )
            ]
        getCol cfTest "rowkey_aaa" (TUtf8 "samplevalue") QUORUM

    case column of
        Nothing -> putStrLn "not found"
        Just c  -> do
            let val = decodeCas $ colVal c :: TUtf8
            T.putStrLn $ getUtf8 val

servers :: [Server]
servers = [("192.168.xxx.xxx", 9160)]

ksTest :: KeySpace
ksTest = "test_ks"

cfTest :: ColumnFamily
cfTest = "test_cf"

createCassandraPool の第 2,3,4 引数は resource-pool パッケージに関係しており,順にサブプール数,サブプール毎のコネクション数,未使用コネクションを維持する秒数を指定します.

例外処理

insert/get/delete は CassandraException を投げるため,これらに備えておく必要があります. しかし現時点 (version <= 0.4.0.1) では,insert だけは cassandra-thrift の例外をそのまま投げるため,CassandraException で catch できません (次のリリースでは修正されています).

import Control.Exception.Lifted (catch)
import Prelude hiding (catch)

main = do
    ...
    runCas pool $
        operation `catch` (\(e :: CassandraException) -> エラー処理)
    ...

Slice Query

KeySelector とか Selector を使います.

main = do
    pool <- createCassandraPool servers 1 3 300 ksToDoList

    rows <- runCas pool $ getRows ["rowkey_aaa", "rowkey_ccc"]
    printRow $ Map.lookup "rowkey_aaa" rows     -- OK
    printRow $ Map.lookup "rowkey_bbb" rows     -- not found
    printRow $ Map.lookup "rowkey_ccc" rows     -- OK

    where
        getRows ks = do
            getMulti cfTest (Keys ks)
                Range { rangeStart = Just (TInt 1)
                      , rangeEnd   = Just (TInt 3)
                      , rangeOrder = Regular
                      , rangeLimit = 1024
                      }
                QUORUM
        printRow Nothing    = putStrLn "not found"
        printRow (Just row) = forM_ row $ \c ->
            T.putStrLn $ getUtf8 . decodeCas . colVal $ c

Composite Column

少し複雑なデータ構造だと Composite Column を使いたくなりますよね.

"user_id:00001": {
    "name":   "krdlab",
    "e-mail": [
        "krdlab@example.net",
        "krdlab@example.com"
    ]
}

例えば上記のようなデータを Composite Column で保存する場合は,以下のようにカラム名に当たる部分をタプルにします.

main = do
    ...
    runCas pool $ do
        cols <- exampleInsertAndGet
        printColumns cols
    where
        exampleInsertAndGet = do
            insert cfUsers "user_id:00001" QUORUM
                [ packCol ((TUtf8 "name"   , TInt 0), "krdlab")
                , packCol ((TUtf8 "reading", TInt 0), encodeCas . TUtf8 $ "けーあーるでぃーらぼ")
                , packCol ((TUtf8 "e-mail" , TInt 0), "krdlab@example.net")
                , packCol ((TUtf8 "e-mail" , TInt 1), "krdlab@example.com")
                ]
            get cfUsers "user_id:00001" All QUORUM

        printColumns cs = lift $ forM_ cs $ \c -> do
            let (k, v) = unpackCol c :: ((TUtf8, TInt), Value)
            print k
            T.putStrLn $ getUtf8 . decodeCas $ v

これは以下のように Cassandra へ格納されます."!" は cassy が追加しています.

                    ___________________________________________________________________________
"user_id:00001" -> | name:0:! | reading:0:!          | e-mail:0:!         | e-mail:1:!         |
                   +----------+----------------------+--------------------+--------------------+
                   | krdlab   | けーあーるでぃーらぼ | krdlab@example.net | krdlab@example.com |
                    ---------------------------------------------------------------------------

JSON データ

aeson の FromJSON/ToJSON のインスタンスとして定義しておくと,自動的に serialize/deserialize してくれます. しかし現状はどうも 1 つの column にシリアライズされたデータを押し込む方式のようですから,この点はデータ設計時に注意が必要です.

{-# LANGUAGE OverloadedStrings, DeriveGeneric #-}
module Main where

import Database.Cassandra.JSON  -- Basic ではなくこちらを import
...
import GHC.Generics (Generic)
import qualified Data.Aeson as A

data User = User
    { name :: Text
    , reading :: Text
    , emails :: [ByteString]
    }
    deriving (Show, Generic)

instance A.FromJSON User
instance A.ToJSON User

main :: IO ()
main = do
    ...
    runCas pool $ do
        c <- exampleInsertAndGet
        lift $ print (c :: Maybe User)
    where
        exampleInsertAndGet = do
            let user = User "krdlab" "けーあーるでぃーらぼ" ["krdlab@example.com"]
            insertCol cfUsers "user_id:00001" (TUtf8 "krdlab") QUORUM user
            getCol cfUsers "user_id:00001" (TUtf8 "krdlab") QUORUM

寄り道: resource-pool

名前の通り,リソースプールを提供するパッケージです.cassy ではコネクションを管理する目的で使用されています.

管理対象のリソース (Entry) をいくつかのサブプール (LocalPool) に分けて管理しています (Striped Pooling).

             ↓ sub-pools
[Pool] --+-- [LocalPool] --+-- [Entry]  <- 今回はここがコネクション
         |                 |
         |                 ...
         |                 +-- [Entry]
         ...
         +-- [LocalPool] --+-- [Entry]
                           |
                           ...
                           +-- [Entry]

この様に分割管理しているのは,マルチスレッド下でのリソース取得における競合の発生を抑えるためのようです.takeResource では threadId の hash 値に基づいて LocalPool が選択されます.

また裏では reaper スレッドが動いており,idle 状態のリソースは createPool で指定された時間を過ぎると destroy されるようになっています.

少し気になった点としては,0.2.1.1 時点では Pool 自体の破棄方法が提供されておらず,long running なアプリで Pool を変更したい時や,そうでなくても終了処理で確保済みのリソースを破棄したい時には困るのではないか?ということです.他の方はどうしているんでしょうか...

おわりに

  • cassy を使えば Haskell から一通りの操作 (insert/get/slice-query/delete) は実行可能です.
  • Cassandra 用のデータタイプ (CasType a) も定義されています.
  • aeson を用いたデータの auto-serialize/deserialize は便利ですが,各フィールドがカラムに収まるような方式もありかなと思っています (persistent パッケージと似た感じするとか).