Development Etc.

“Maybe it’s about time I expanded the realm of possibilities around here.” – MacGyver

Iterate over entire Cassandra column family

I’m in the middle of implementing a Cassandra persistence adapter for ActiveMQ and one of the requirements is to recover all the messages. This of course means I need to iterate over all the keys in a Column Family. I’m going to show how to do this in Cassandra using the Hector client. This example code has also been given to the Hector project on github because it’s all about spreading the love!

package example.hector;

import static me.prettyprint.cassandra.utils.StringUtils.bytes;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import me.prettyprint.cassandra.service.CassandraClient;
import me.prettyprint.cassandra.service.CassandraClientPool;
import me.prettyprint.cassandra.service.CassandraClientPoolFactory;
import me.prettyprint.cassandra.service.Keyspace;
import me.prettyprint.cassandra.service.PoolExhaustedException;

import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.SlicePredicate;

/**
 * Simple example showing how to iterate over an entire column family. This will work even with
 * the Random Partitioner but be aware that you will of course, get results in an unordered manner.
 * @author mark j greene
 */
public class IterateExample {

public IterateExample() {
}

private void iterateEntireColumnFamily(CassandraClient client, Keyspace ks, ColumnParent cp,
				SlicePredicate sp, String start, String end, int rowFetchSize) throws Exception {

	// get all the keys with a limit of rowFetchSize values returned
	Map<String, List<Column>> results = this.fetchPagedResults(ks, cp, sp, start, end, rowFetchSize);

	Iterator<String> i = results.keySet().iterator();
	while (i.hasNext()) {
		String key = i.next();
		List<Coulmn> c = results.get(key);

		//check for tombstones
		if (c.isEmpty()) {
			System.out.println("Column is null for key: " + key);
		} else {
			//key was found, print to console
			System.out.println(key + " -> " + new String(results.get(key).get(0).getValue()));
		}

		if(!i.hasNext()) {
			System.out.println("This page has run out, getting more results from key: " + key);

			//we've run out of results in this page, get more
			results = this.fetchPagedResults(ks, cp, sp, key, "", rowFetchSize);

			String inclusiveStart = results.keySet().iterator().next();
			System.out.println("Removing inclusive start key: " + inclusiveStart);

			//remove the inclusive start key from the results
			results.remove(inclusiveStart);
			Set<String> keySet = results.keySet();

			//if more results were found, replace the iterator
			if(keySet.size() > 0) {
				i = keySet.iterator();
			}
		}
	}
}

private Map<String, List<Column>> fetchPagedResults( Keyspace ks, ColumnParent cp,
		SlicePredicate sp, String start, String end, int rowFetchSize ) throws Exception {

	 Map<String, List<Column>> results = ks.getRangeSlice(cp, sp, start, end, rowFetchSize);		

	return results;
}

public void setupAndIterate() throws Exception {
	CassandraClientPool pool = CassandraClientPoolFactory.INSTANCE.get();
	CassandraClient client = pool.borrowClient("localhost", 9160);

	try {
		Keyspace ks = client.getKeyspace("Keyspace1");

		// just look for one column for now
		List<byte[]> colNames = new ArrayList<byte[]>();
		colNames.add("column-name".getBytes());

		// setup the slice predicate
		SlicePredicate sp = new SlicePredicate();
		sp.setColumn_names(colNames);

		// setup column parent
		ColumnParent cp = new ColumnParent("Standard3");

		this.iterateEntireColumnFamily(client, ks, cp, sp, "", "", 5);
	} finally {
		if (pool != null) {
			pool.releaseClient(client);
		}
	}

}

public static void main(String[] args) throws IllegalStateException, PoolExhaustedException, Exception {
	IterateExample dao = new IterateExample();
	dao.setupAndIterate();
}
}
Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.