Development Etc.

“Maybe it’s about time I expanded the realm of possibilities around here.” – MacGyver

Iterate over entire Cassandra column family

I’m in the middle of implementing a Cassandra persistence adapter for ActiveMQ and one of the requirements is to recover all the messages. This of course means I need to iterate over all the keys in a Column Family. I’m going to show how to do this in Cassandra using the Hector client. This example code has also been given to the Hector project on github because it’s all about spreading the love!

package example.hector;

import static me.prettyprint.cassandra.utils.StringUtils.bytes;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import me.prettyprint.cassandra.service.CassandraClient;
import me.prettyprint.cassandra.service.CassandraClientPool;
import me.prettyprint.cassandra.service.CassandraClientPoolFactory;
import me.prettyprint.cassandra.service.Keyspace;
import me.prettyprint.cassandra.service.PoolExhaustedException;

import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.SlicePredicate;

/**
 * Simple example showing how to iterate over an entire column family. This will work even with
 * the Random Partitioner but be aware that you will of course, get results in an unordered manner.
 * @author mark j greene
 */
public class IterateExample {

public IterateExample() {
}

private void iterateEntireColumnFamily(CassandraClient client, Keyspace ks, ColumnParent cp,
				SlicePredicate sp, String start, String end, int rowFetchSize) throws Exception {

	// get all the keys with a limit of rowFetchSize values returned
	Map<String, List<Column>> results = this.fetchPagedResults(ks, cp, sp, start, end, rowFetchSize);

	Iterator<String> i = results.keySet().iterator();
	while (i.hasNext()) {
		String key = i.next();
		List<Coulmn> c = results.get(key);

		//check for tombstones
		if (c.isEmpty()) {
			System.out.println("Column is null for key: " + key);
		} else {
			//key was found, print to console
			System.out.println(key + " -> " + new String(results.get(key).get(0).getValue()));
		}

		if(!i.hasNext()) {
			System.out.println("This page has run out, getting more results from key: " + key);

			//we've run out of results in this page, get more
			results = this.fetchPagedResults(ks, cp, sp, key, "", rowFetchSize);

			String inclusiveStart = results.keySet().iterator().next();
			System.out.println("Removing inclusive start key: " + inclusiveStart);

			//remove the inclusive start key from the results
			results.remove(inclusiveStart);
			Set<String> keySet = results.keySet();

			//if more results were found, replace the iterator
			if(keySet.size() > 0) {
				i = keySet.iterator();
			}
		}
	}
}

private Map<String, List<Column>> fetchPagedResults( Keyspace ks, ColumnParent cp,
		SlicePredicate sp, String start, String end, int rowFetchSize ) throws Exception {

	 Map<String, List<Column>> results = ks.getRangeSlice(cp, sp, start, end, rowFetchSize);		

	return results;
}

public void setupAndIterate() throws Exception {
	CassandraClientPool pool = CassandraClientPoolFactory.INSTANCE.get();
	CassandraClient client = pool.borrowClient("localhost", 9160);

	try {
		Keyspace ks = client.getKeyspace("Keyspace1");

		// just look for one column for now
		List<byte[]> colNames = new ArrayList<byte[]>();
		colNames.add("column-name".getBytes());

		// setup the slice predicate
		SlicePredicate sp = new SlicePredicate();
		sp.setColumn_names(colNames);

		// setup column parent
		ColumnParent cp = new ColumnParent("Standard3");

		this.iterateEntireColumnFamily(client, ks, cp, sp, "", "", 5);
	} finally {
		if (pool != null) {
			pool.releaseClient(client);
		}
	}

}

public static void main(String[] args) throws IllegalStateException, PoolExhaustedException, Exception {
	IterateExample dao = new IterateExample();
	dao.setupAndIterate();
}
}

Range Slices In Cassandra With Hector

I thought I’d write up a simple tutorial on how do a simple range slice in Cassandra with Hector for people coming from the relational world.

I’m going to outline the Cassandra version of SELECT col_name_1 FROM table_name LIMIT 100

Step 1) Setup the connection pool and the client

CassandraClientPool pool = CassandraClientPoolFactory.INSTANCE.get();
CassandraClient client = pool.borrowClient("localhost", 9160);

Step 2) Define which Keyspace you want to use. This is equivalent to which DB or Schema.

Keyspace keyspace = client.getKeyspace("Keyspace1");

Step 3) Setup the slice predicate by defining which columns within the row you want returned. This is equivalent to SELECT col_name_1

// just look for one column for now
List<byte[]> colNames = new ArrayList<byte[]>();
colNames.add("column-name".getBytes());

// setup the slice predicate
SlicePredicate sp = new SlicePredicate();
sp.setColumn_names(colNames);

Step 4) Specify which Column Family to use. This is equivalent to a table or FROM table_name in SQL

// setup column parent (CF)
ColumnParent cp = new ColumnParent("Standard3");

Step 5) Execute the request. This is equivalent to executing a SQL query.

// get all the keys with a limit of 100 values returned
Map<String, List<Column>> map = keyspace.getRangeSlice(cp, sp, "", "", 100);

Note that the 3rd and 4th method args are empty strings. When you combine two empty strings for the start and end range, it is like saying SELECT col_name_1 FROM table_name without a WHERE clause.
Also if you have records that were deleted, the List of Column objects will be empty but the key will still be returned until is GC’d by Cassandra.

This example was used against a single Cassandra node using the Random Partitioner. Results will be returned to you in an unordered fashion.

Durability With Cassandra

After an RTFM fail pointed out by Project Chair Jonathan Ellis about how Cassandra handles durability I thought I’d share what I learned.

In a single node configuration by default, Cassandra will not immediately fsync() your write to the commit log from buffer cache to disk. I discovered this (thanks to Jon via the users mailing list) when I was trying to do an insert and immediately tried to shutdown Cassandra and start it again. What I found was if I did that within a second or two, Cassandra’s commit log did not have my insert. If I waited around 5-10 seconds before shutting it down it did. So if you see this behavior this is what is happening. Now, a single node configuration for production is not desirable but I think it’s worth blogging about because it’s a common configuration for someone who is just hacking around and evaluating Cassandra for the first time.

Cassandra will let you achieve guaranteed durability for the commit log if you want. You can change this by changing “CommitLogSync” from periodic to batch and setting “CommitLogSyncBatchWindowInMS” to an appropriate number. This will obviously come with performance trade offs but at least you get to pick what is right for you. Also, if you are running a multi node configuration this may not be needed, assuming you are doing replication for your writes as Cassandra won’t acknowledge the write until that replication has finished. By doing this you virtually eliminate the effects of a hard system crash and can relax the durability of commit log writes for individual nodes.

Cassandra Durability Wiki: http://wiki.apache.org/cassandra/Durability

Troubleshooting Hung Systems: Spiked CPU

First off I wanted to thank Filip Hanik. He illustrated this technique in a presentation at SpringOne this year in New Orleans. I happened to actually give a similar presentation to my fellow engineers at my current employer.

The goal here is to figure out using the top command in linux combined with thread dumps to figure out which thread and java code is consuming CPU resources. Below is a step by step walk through on how to accomplish this.

Step 1

Execute top command to determine the process id (PID) for the JBoss server. It will be listed under “java”

Add an Image

Step 2

Run the top command to show only the threads associated with the java process.
Execute top –H –p PID
(In this example the PID is 10937)

Add an Image

Add an Image

Step 3

Execute kill -3
to obtain a stack trace (In this example the PID is 10937)
Note: You can also utilize the dump stack capability in the JBoss JMX Console under the ServerInfo MBean

Add an Image

Step 4

Take the PID of the thread you want to correlate and convert it to hex using calc on Windows.
Add an Image

Add an Image

Step 5

Open the log file and do a search for the hex representation (native ID) of the thread ID. In this example it will be “2ae1

Add an Image

This effectively tells you the line of code that is responsible for consuming the CPU resources.

Impulse Buying Redux

I’ve started noticing a very interesting trend. Woot and RedTagCrazy have seemingly figured out that people are very interested in social impulse shopping. A quick visit to RedTagCrazy.com and you will witness a set of people expressing shoppers remorse quickly followed up by another set of people producing amusing justifications for pushing the “Buy Now” button. In some sense the social aspect is really what makes these sites and the companies who own them absolutely love it. Think about it, how many times do you walk into a clothing store and there are hundreds of people conversing back and forth about how great this one particular item is. The whole thing is genius and forces buyers on the fence to feel left out if they don’t conform to the herd mentality.

The beauty of this “new” kind of e-commerce is that it’s already been done…well at least on TV. I mean let’s not forget about how easy it is to get sucked into QVC at 1 in the morning after you forgot to pay your expanded cable bill. All kidding aside, what I love best is the minimalist approach these sites have taken on. It is in fact, a common trend in most Web 2.0 sites and encourages a specific set of workflows for the user to follow. And when it comes to retailing, you want to make it is as simple as possible for someone to open their wallet. Amazon is definitely an early pioneer in this area with their fancy one click purchasing.

With this trend proving itself, it seems to me that the giant retailers of the world ought to pay attention to this racket. I think it’s inevitable that the Target’s, Wal-Mart’s, etc, will get into this independently or through some sort of collaborative network. If retailers don’t jump on this first, I can easily see other industries utilizing this as a way of getting in front of consumers without overwhelming them.