Development Etc.
“Maybe it’s about time I expanded the realm of possibilities around here.” – MacGyver
Iterate over entire Cassandra column family
Posted by on May 5, 2010
I’m in the middle of implementing a Cassandra persistence adapter for ActiveMQ and one of the requirements is to recover all the messages. This of course means I need to iterate over all the keys in a Column Family. I’m going to show how to do this in Cassandra using the Hector client. This example code has also been given to the Hector project on github because it’s all about spreading the love!
package example.hector;
import static me.prettyprint.cassandra.utils.StringUtils.bytes;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import me.prettyprint.cassandra.service.CassandraClient;
import me.prettyprint.cassandra.service.CassandraClientPool;
import me.prettyprint.cassandra.service.CassandraClientPoolFactory;
import me.prettyprint.cassandra.service.Keyspace;
import me.prettyprint.cassandra.service.PoolExhaustedException;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.SlicePredicate;
/**
* Simple example showing how to iterate over an entire column family. This will work even with
* the Random Partitioner but be aware that you will of course, get results in an unordered manner.
* @author mark j greene
*/
public class IterateExample {
public IterateExample() {
}
private void iterateEntireColumnFamily(CassandraClient client, Keyspace ks, ColumnParent cp,
SlicePredicate sp, String start, String end, int rowFetchSize) throws Exception {
// get all the keys with a limit of rowFetchSize values returned
Map<String, List<Column>> results = this.fetchPagedResults(ks, cp, sp, start, end, rowFetchSize);
Iterator<String> i = results.keySet().iterator();
while (i.hasNext()) {
String key = i.next();
List<Coulmn> c = results.get(key);
//check for tombstones
if (c.isEmpty()) {
System.out.println("Column is null for key: " + key);
} else {
//key was found, print to console
System.out.println(key + " -> " + new String(results.get(key).get(0).getValue()));
}
if(!i.hasNext()) {
System.out.println("This page has run out, getting more results from key: " + key);
//we've run out of results in this page, get more
results = this.fetchPagedResults(ks, cp, sp, key, "", rowFetchSize);
String inclusiveStart = results.keySet().iterator().next();
System.out.println("Removing inclusive start key: " + inclusiveStart);
//remove the inclusive start key from the results
results.remove(inclusiveStart);
Set<String> keySet = results.keySet();
//if more results were found, replace the iterator
if(keySet.size() > 0) {
i = keySet.iterator();
}
}
}
}
private Map<String, List<Column>> fetchPagedResults( Keyspace ks, ColumnParent cp,
SlicePredicate sp, String start, String end, int rowFetchSize ) throws Exception {
Map<String, List<Column>> results = ks.getRangeSlice(cp, sp, start, end, rowFetchSize);
return results;
}
public void setupAndIterate() throws Exception {
CassandraClientPool pool = CassandraClientPoolFactory.INSTANCE.get();
CassandraClient client = pool.borrowClient("localhost", 9160);
try {
Keyspace ks = client.getKeyspace("Keyspace1");
// just look for one column for now
List<byte[]> colNames = new ArrayList<byte[]>();
colNames.add("column-name".getBytes());
// setup the slice predicate
SlicePredicate sp = new SlicePredicate();
sp.setColumn_names(colNames);
// setup column parent
ColumnParent cp = new ColumnParent("Standard3");
this.iterateEntireColumnFamily(client, ks, cp, sp, "", "", 5);
} finally {
if (pool != null) {
pool.releaseClient(client);
}
}
}
public static void main(String[] args) throws IllegalStateException, PoolExhaustedException, Exception {
IterateExample dao = new IterateExample();
dao.setupAndIterate();
}
}
Advertisement