akishin999の日記

調べた事などを書いて行きます。

Java から Cassandra を使ってみる その5

Windows で Cassandra を動かしてみる
Java から Cassandra を使ってみる その1
Java から Cassandra を使ってみる その2
Java から Cassandra を使ってみる その3
Java から Cassandra を使ってみる その4

の続きです。

尚、この記事の動作確認環境は以下の通りです。

storage-conf.xml に Keyspace を定義してみる

今回は conf/storage-conf.xml に独自の Keyspace を追加し、コマンドプロンプトから使用するクライアントを作成してみたいと思います。


Cassandra が起動している場合は一旦停止し、Cassandra インストールディレクトリの下の conf ディレクトリの下にある、storage-conf.xml をエディタで開きます。

Keyspaces 要素の下に、ここまで使用してきたサンプルの Keyspace である「Keyspace1」の定義があるので、この下に独自の Keyspace を追加してみます。

    <Keyspace Name="Examples">
      <ColumnFamily Name="Entries" CompareWith="BytesType"/>
      <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
      <ReplicationFactor>1</ReplicationFactor>
      <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
    </Keyspace>


「Examples」という Keyspace と、 「Entries」という ColumnFamily を追加しました。

使ってみる

それでは、以下のようなクラスを書いて実際に追加した Keyspace を使用してみます。
実際には Web アプリケーションから使用した方がより実践的だとは思いますが、簡単のため今回はコマンドラインから使用するコンソールアプリケーションを作成してみました。


まずは Cassandra への登録とデータ取得を行う以下のようなクラスを作成しました。
RDBMS で DAO パターンを使う時の DAO のようなイメージのクラスです。

package example.cassandra;

import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

public class Entries {
    private static final String COLUMN_FAMILY = "Entries";
    private static final String COLUMN_TITLE   = "title";
    private static final String COLUMN_CONTENT = "content";
    
    private ColumnPath titleColumn;
    private ColumnPath contentColumn;
    private ColumnParent columnParent;
    
    private String host;
    private int port;
    private String keySpace;

    private Cassandra.Client client;
    private TTransport transport;
    
    /**
     * コンストラクタ
     * @param host Cassandra のホスト
     * @param port Thrift の待受けポート番号
     * @param keySpace キースペース
     */
    public Entries(final String host, final int port, final String keySpace) {
        this.host = host;
        this.port = port;
        this.keySpace = keySpace;
        
        this.titleColumn = new ColumnPath(COLUMN_FAMILY);
        this.contentColumn = new ColumnPath(COLUMN_FAMILY);
        this.columnParent = new ColumnParent(COLUMN_FAMILY);
        try {
            this.titleColumn.setColumn(COLUMN_TITLE.getBytes("UTF8"));
            this.contentColumn.setColumn(COLUMN_CONTENT.getBytes("UTF8"));
        } catch (UnsupportedEncodingException e) { /* nop */  }
        
        this.transport = new TSocket(this.host, this.port);
        TProtocol protocol = new TBinaryProtocol(transport);
        this.client = new Cassandra.Client(protocol);
    }
    
    /**
     * 接続する。
     * @throws TTransportException
     */
    public void open() throws TTransportException {
        this.transport.open();
    }
    
    /**
     * 接続を切断する。
     */
    public void close() {
        if (this.transport != null && this.transport.isOpen()) {
            this.transport.close();
        }
    }
    
    /**
     * Cassandra に一件分のデータを保存する。
     * @param title
     * @param content
     * @return
     * @throws UnsupportedEncodingException
     * @throws InvalidRequestException
     * @throws UnavailableException
     * @throws TimedOutException
     * @throws TException
     */
    public String save(final String title, final String content) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, TimedOutException, TException {
        if (!this.transport.isOpen()) {
            throw new IllegalStateException();
        }
        // タイムスタンプをキーとして取得
        String key = String.valueOf(System.currentTimeMillis());
        
        // タイトルを保存
        client.insert(this.keySpace, 
                      key, 
                      titleColumn, 
                      title.getBytes("UTF8"), 
                      System.currentTimeMillis(), 
                      ConsistencyLevel.ONE);

        // 内容を保存
        client.insert(this.keySpace, 
                      key, 
                      contentColumn, 
                      content.getBytes("UTF8"), 
                      System.currentTimeMillis(), 
                      ConsistencyLevel.ONE);
        return key;
    }
    
    /**
     * Cassandra から指定したキーの値を取得する。
     * @param keys
     * @return
     * @throws InvalidRequestException
     * @throws UnavailableException
     * @throws TimedOutException
     * @throws TException
     */
    public Map<String, List<ColumnOrSuperColumn>> read(final List<String> keys) throws InvalidRequestException, UnavailableException, TimedOutException, TException {
        if (!this.transport.isOpen()) {
            throw new IllegalStateException();
        }
        SliceRange sliceRange = new SliceRange();
        sliceRange.setStart(new byte[0]);
        sliceRange.setFinish(new byte[0]);

        SlicePredicate slicePredicate = new SlicePredicate();
        slicePredicate.setSlice_range(sliceRange);
        
        Map<String, List<ColumnOrSuperColumn>> results = client.multiget_slice(this.keySpace, 
                                                                               keys, 
                                                                               this.columnParent, 
                                                                               slicePredicate, 
                                                                               ConsistencyLevel.ONE);
        return results;
    }
}


次に、上記クラスのクライアントとして、以下のようなコンソールアプリケーションを作成しました。

package example.cassandra;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;

public class ConsoleExample {

    private static List<String> storedKeys = new ArrayList<String>();
    private static Entries entries;
    
    public static void main(String[] args) throws Exception {
        System.out.println("Starting Cassandra Console Example");
        try {
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            entries = new Entries("localhost", 9160, "Examples");
            entries.open();
            
            MAINLOOP : {
                while(true) {
                    usage();
                    int selectedNumber = 0;
                    try {
                        selectedNumber = Integer.parseInt(br.readLine().trim());
                    } catch (final NumberFormatException e) {
                        continue;
                    }
                    
                    switch (selectedNumber) {
                    case 1:
                        storeEntry();
                        continue;
                    case 2:
                        showStoredEntry();
                        continue;
                    case 9:
                        break MAINLOOP;
                    default:
                        continue;
                    }
                }
            }
        } finally {
            entries.close();
        }
        System.out.println("Cassandra Console Example End");
    }

    /**
     * メニュー表示。
     */
    private static void usage() {
        System.out.println("1. store entry.");
        System.out.println("2. show stored entry.");
        System.out.println("9. exit.");
        System.out.print("? :");
    }
    
    /**
     * Entry を保存する。
     * @throws Exception
     */
    private static void storeEntry() throws Exception {
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        System.out.println("store entry. Type 'quit' to quit.");
        while (true) {
            System.out.print("title? :");
            String title = br.readLine();
            if (title == null || title.equals("quit")) {
                break;
            }
            
            System.out.print("content? :");
            String content = br.readLine();
            if (content == null || content.equals("quit")) {
                break;
            }
            System.out.printf("title:[%s] content:[%s]\n", title, content);
            storedKeys.add(entries.save(title, content));
        }
    }
    
    /**
     * 保存した Entry を表示する。
     * @throws Exception
     */
    private static void showStoredEntry() throws Exception {
        if (storedKeys.size() <= 0) {
            System.out.println("no stored entry.");
            return;
        }
        
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        
        Map<String, List<ColumnOrSuperColumn>> results = entries.read(storedKeys);
        for (Map.Entry<String, List<ColumnOrSuperColumn>> entry : results.entrySet()) {
            String key = entry.getKey();
            System.out.printf("key:[%s]\n", key);
            
            List<ColumnOrSuperColumn> list = entry.getValue();
            for (int i = 0; i < list.size(); i++) {
                ColumnOrSuperColumn result = list.get(i); 
                Column col = result.column;
                System.out.printf("[%d] カラム名:[%s] 値:[%s] タイムスタンプ:[%s]\n",
                        i + 1,
                        new String(col.name, "UTF8"),
                        new String(col.value, "UTF8"),
                        sdf.format(new Date(col.timestamp)));
            }
            System.out.println("--------------------");
        }
    }
}


実行してみると、作成した Keyspace への値の追加・参照が行えてる事が分かるかと思います。
このように、storage-conf.xml に追記することで、任意の Keyspace や ColumnFamily を追加することができます。

参考

StorageConfiguration - Cassandra Wiki
http://wiki.apache.org/cassandra/StorageConfiguration