Archive for August, 2010

Cassandra Java Annotations

August 30, 2010

Overview

Cassandra has a unique column-oriented data model which does not easily map to an entity-based Java model. Furthermore, the Java Thrift client implementation is very low-level and presents the developer with a rather difficult API to work with on a daily basis. This situation  is a good candidate for an adapter to shield the business code from mundane plumbing details.

I recently did some intensive Cassandra (version 0.6.5) work to load millions of geographical postions for ships at sea.  Locations were already being stored in MySQL/Innodb using JPA/Hibernate so I already had a ready-made model based on JPA entity beans. After some analysis, I created a mini-framework based on custom annotations and a substantial adapter to encapsulate all the “ugly” Thrift boiler-plate code.  Naturally everything was wired together with Spring.

Implementation

The very first step was to investigate existing Cassandra Java client toolkits. As usual in a startup environment time was at a premium, but I quickly checked out a few key clients. Firstly, I looked at Hector, but its API still exposed too much of the Thrift cruft for my needs. It did have nice features for failover and connection pooling, and I will definitely look at it in more detail in the future. Pelops looked really cool with its Mutators and Selectors, but it too dealt with columns – see the description.  What I was looking for was an object-oriented way to load and query Java beans. Note that this OO entity-like paradigm might not be applicable to other Cassandra data models, e.g. sparse matrices.

And then there was DataNucleus which advertises JPA/JDO implementations for a large variety of non-SQL persistence stores: LDAP, Hadoop Hbase, Google App, etc. There was mention of a Cassandra solution, but it wasn’t yet ready for prime time. How they manage to address the massive semantic mismatch between JPA is beyond me – unfortunately I didn’t have time to drill down. Seems fishy – but I’ll definitely check this out in the future. Even though I’m a big fan of using existing frameworks/tools, there are times when “rolling your own” is the best course of action.

The following collaborating classes comprised the  framework:

  • CassandraDao – High-level class that understands annotated entity beans
  • ColumnFamily – An adapter for common column family operations – hides the Thrift gore
  • AnnotationManager – Manages the annotated beans
  • TypeMapper – Maps Java data types into bytes and vice versa

Since we already had a JPA-annotated Location bean, my first thought was to reuse this class and simply process the the JPA annotations into their equivalent Cassandra concepts. Upon further examination this proved ugly – the semantic mismatch was too great. I certainly did not want to be importing JPA/Hibernate packages into a Cassandra application! Furthermore, many annotations (such as collections) were not applicable and I needed  annotations for Cassandra concepts that did not exist in JPA. In “set theoretic” terms, there are JPA-specific features, Cassandra-specific features and an intersection of the two.

The first-pass implementation required only three annotations: Entity, Column and Key. The Entity annotation is a class-level annotation with keyspace and columnFamily attributes. The Column annotation closely corresponded to its JPA equivalent. The Key annotation specifies the row key. The Entity defines the column family/keyspace  that the entity belongs to and its constituent columns. The CassandraDao class corresponds to a single column family and accepts an entity and type mapper.

Two column families were created: a column family for ship definitions, and a super column family for ship locations. The Ship CF was a simple collection of ship details keyed by each ship’s MMSI (a unique ID for a ship which is typically engraved on the keel).  The Location CF represented a one-to-many relationship for all the possible locations of a ship. The key was the ship’s MMSI, and the column names were Long types representing the millisecond timestamp for the location. The value of the column was a super column – it contained the columns as defined in the ShipLocation bean – latitude, longitude, course over ground, speed over ground, etc.  The number of location for a given ship could possibly range in the millions!

From an implementation perspective, I was rather surprised to find that there are no standard reusable classes to map basic Java data types to bytes. Sure, String has getBytes(), but I had to do some non-trivial distracting detective work to get doubles, longs, BigInteger, BigDecimal and Dates converted – all the shifting magic etc. Also made sure to run some performance tests to choose the best alternative!

CassandraDao

The DAO is based on the standard concept of  a genericized DAO of which many versions are floating around:

The initial version of the DAO with basic CRUD functionality is shown below:

public class CassandraDao<T> {
  public CassandraDao(Class<T> clazz, CassandraClient client, TypeMapper mapper)
  public T get(String key)
  public void insert(T entity)
  public T getSuperColumn(String key, byte[] superColumnName)
  public List<T> getSuperColumns(String key, List<byte[]> superColumnNames)
  public void insertSuperColumn(String key, T entity)
  public void insertSuperColumns(String key, List<T> entities)
 }

Of course more complex batch and range operations that reflect advanced Cassandra API methods are needed.

Usage Sample

  import com.google.common.collect.ImmutableList;
  import org.springframework.context.support.ClassPathXmlApplicationContext;
  import org.springframework.context.ApplicationContext;

  // initialization
  ApplicationContext context = new ClassPathXmlApplicationContext("config.xml");
  CassandraDao<Ship> shipDao = (CassandraDao<Ship>)context.getBean("shipDao");
  CassandraDao<ShipLocation> shipLocationDao =
    (CassandraDao<ShipLocation>)context.getBean("shipLocationDao");
  TypeMapper mapper = (DefaultTypeMapper)applicationContext.getBean("typeMapper");

  // get ship
  Ship ship = shipDao.get("1975");

  // insert ship
  Ship ship = new Ship();
  ship.setMmsi(1975); // note: row key - framework insert() converts to required String
  ship.setName("Hokulea");
  shipDao.insert(ship);

  // get ship location (super column)
  byte [] superColumn = typeMapper.toBytes(1283116367653L));
  ShipLocation location = shipLocationDao.getSuperColumn("1975",superColumn);

  // get ship locations (super column)
  ImmutableList<byte[]> superColumns = ImmutableList.of( // Until Java 7, Google rocks!
    typeMapper.toBytes(1283116367653L),
    typeMapper.toBytes(1283116913738L),
    typeMapper.toBytes(1283116977580L));
  List<ShipLocation> locations = shipLocationDao.getSuperColumns("1975",superColumns);

  // insert ship location (super column)
  ShipLocation location = new ShipLocation();
  location.setTimestamp(new Date());
  location.setLat(20);
  location.setLon(-90);
  shipLocationDao.insertSuperColumn("1775",location);

Java Entity Beans

Ship

@Entity( keyspace="Marine", columnFamily="Ship")
public class Ship {
  private Integer mmsi;
  private String name;
  private Integer length;
  private Integer width;

  @Key
  @Column(name = "mmsi")
  public Integer getMmsi() {return this.mmsi;}
  public void setMmsi(Integer mmsi) {this.mmsi= mmsi;}

  @Column(name = "name")
  public String getName() { return name; }
  public void setName(String name) { this.name = name; }
}

ShipLocation

@Entity( keyspace="Marine", columnFamily="ShipLocation")
public class ShipLocation {
  private Integer mmsi;
  private Date timestamp;
  private Double lat;
  private Double lon;

  @Key
  @Column(name = "mmsi")
  public Integer getMmsi() {return this.mmsi;}
  public void setMmsi(Integer mmsi) {this.mmsi= mmsi;}

  @Column(name = "timestamp")
  public Date getTimestamp() {return this.timestamp;}
  public void setTimestamp(Date timestamp) {this.timestamp = msgTimestamp;}

  @Column(name = "lat")
  public Double getLat() {return this.lat;}
  public void setLat(Double lat) {this.lat = lat;}

  @Column(name = "lon")
  public Double getLon() {return this.lon;}
  public void setLon(Double lon) {this.lon = lon;}
}

Spring Configuration

 <bean id="propertyConfigurer">
   <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
   <property name="location" value="classpath:config.properties</value>
 </bean>
 <bean id="shipDao" class="com.andre.cassandra.dao.CassandraDao" scope="prototype" >
   <constructor-arg value="com.andre.cassandra.data.Ship" />
   <constructor-arg ref="cassandraClient" />
   <constructor-arg ref="typeMapper" />
 </bean>
 <bean id="shipLocationDao" scope="prototype" >
   <constructor-arg value="com.andre.cassandra.data.ShipLocation" />
   <constructor-arg ref="cassandraClient" />
   <constructor-arg ref="typeMapper" />
 </bean>

<bean id="cassandraClient" class="com.andre.cassandra.util.CassandraClient" scope="prototype" >
  <constructor-arg value="${cassandra.host}" />
  <constructor-arg value="${cassandra.port}" />
</bean>

<bean id="typeMapper" class="com.andre.cassandra.util.DefaultTypeMapper" scope="prototype" />

Annotation Documentation

Annotations

Annotation Class/Field Description
Entity Class Defines the keyspace and column family
Column Field Column name
Key Field Row key

Entity Attributes

Attribute Type Description
keyspace String Keyspace
columnFamily String Column Family

Initial Cassandra Impressions

August 30, 2010

Recently I’ve been doing some intensive work with the popular NoSQL framework Cassandra. In this post I describe some of my first impressions of working with Cassandra Thrift Java stubs and some comparisons with Voldemort – another NoSQL framework that I am familiar with.

Cassandra Issues

Data Model

The Cassandra data model – with its columns and super columns is radically different from the traditional SQL data model. Most of the Cassandra descriptions are example-based, and though rich in details they lack generality. While examples are necessary they are not sufficient. What is missing is some formalism to capture the essential qualities of the model which no example fully captures. I recently came across a very good article about “NoSQL data model” from a “relational purist” that strongly resonates with me – see The Cassandra Data Model – highly recommended!

One day soon, I’ll try to write a new post summarizing some of my thoughts on NoSQL data modeling. In short, as the field matures there is going to be a need to create some types of standards out of the wide variety of implementations. There are distinct NoSQL categories: key/value stores, column-oriented stores, document-oriented stores –  but even within these categories there is much unnecessary overlap.

Regarding Cassandra columns, here’s a bit of clarification that may help. There are essentially two kinds of column families:

  • Those that have a fixed finite set of columns. The columns represent the attributes of single objects. Each row has the same number of columns, and the column names are fixed metadata.
  • Those that have an infinite set of columns that represent a collection for the key. The confusing part is that the column name is not really metadata – it is actually a value in its own right!

Thrift Client Limitations

Let me be frank – working with Cassandra’s Java Thrift client is a real pain. In part this due to the auto-generated cross-platform nature of the beast, but there are many pain points that reflect accidental and not inherent complexity. As Cassandra/Thrift matures, I hope more attention will be paid to ameliorating the life of poor programmers.

No class hierarchy for Thrift exceptions

Not deriving your exceptions from a base class is truly a disappointment. Interestingly, neither does Google ProtoBuf! The developer is  forced to either catch up to five exceptions for each call, or resort to the ugly catch Exception workaround. How much nicer would it have been to catch one Thrift base exception!

For example, just look at all the exceptions thrown by the get method of Cassandra.client!

  • org.apache.cassandra.thrift.InvalidRequestException
  • org.apache.cassandra.thrift.UnavailableException
  • org.apache.cassandra.thrift.TimedOutException
  • org.apache.thrift.TException
  • java.io.UnsupportedEncodingException

No class hierarchy for Column and SuperColumn

The core Thrift concepts Column and SuperColumn lack a base class for “implementation” reasons due to the “cross-platform” limitations of Thrift. Instead there is a ColumnOrSuperColumn class that encapsulates return results where either a Column or SuperColumn could be returned. For example, see get_slice. This leads to horrible non-OO onerous and problematic switch statements  – if is_setColumn() is true then call getColumn(), or if  is_setSuperColumn() then call getSuperColumn(). Aargh!

Documentation

Both Voldemort and Cassandra do not provide satisfactory documentation. If you are going to bet your company’s future on one of these products, you definitely have a right to expect better documentation. Interestingly, other open-source NoSQL products such as MongoDB and Riak do have better documentation.

Documentation for Voldemort configuration properties was truly a disaster (at least in version 60.1).  Parameters responsible for key system performance or even basic functionality were either cryptically documented or not at all. I counted a total of sixty properties. For the majority we were forced to scour the source code to get some basic understanding. Totally unecessary! Some examples: client.max.threads, client.max.connections.per.node, client.max.total.connections, client.connection.timeout.ms, client.routing.timeout.ms, client.max.queued.requests, enable.redirect.routing, socket.listen.queue.length, nio.parallel.processing.threshold, max.threads, scheduler.threads, socket.timeout.ms, etc.

Comparison of Cassandra with Voldemort

On the basic level, both Cassandra and Voldemort are sharded key value stores modeled on Dynamo. Cassandra can be regarded as a superset in that it also provides a data model on top of the base K/V store.

Some comparison points with Voldemort:

  • Cluster node failover
  • Quorum policies
  • Read or write optimized?
  • Can nodes be added to the cluster dynamically?
  • Pluggable store engines: Voldemort supports pluggable engines, Cassandra does not.
  • Dynamically adding column families
  • Hinted Hand-off
  • Read Repair
  • Vector Clocks

Cluster Node Failover

A Voldemort client can specify one or more cluster nodes to connect to. The first node that the client connects to will return to the client a list of all nodes. The client stubs will then account for failover and load balancing. In fact, you can plug in your custom strategies. The third-party Cassandra Java client Hector claims to support node failover.

Read/Write Optimization

Read or write optimized? Cassandra is write-optimized whereas Voldemort reads are faster. Cassandra uses a journaling and compacting paradigm model. Writes are instantaneous in that they simply append a log entry to the current log file. Reads are more expensive since they have to potentially look at more than one SSTable  file to find the latest version of a key. If you are lucky you will find it cached in memory – otherwise one or more disk accesses will have to be performed. In a way the comparison is not truly apples-to-apples since Voldemort is simply storing blobs, while Cassandra has to deal with its accompanying data model overhead. However, it is curious to see such how two basically K/V products having a different performance profile regarding this vital issue.

Pluggable store engines

Voldemort supports pluggable engines, Cassandra does not. This is a big plus for Voldemort! Out of the box, Voldemort already provides a Berkeley DB and MySQL engine and allows you to easily plug-in your own custom engine. Being able to implement your own backing store is an important concern for many shops.  In fact, on my recent project for a large telecom this was a crucial deal-breaking feature that played a large role in selecting Voldemort. We had in-house MySQL expertise and spent inordinate resources writing our own “highly optimized” MySQL engine. By the way, Riak also has pluggable engines – seven in total!

Dynamically adding column families

Neither Voldemort nor Cassandra (should do soon) support this. In order to add a new “database” or “table” you need update the configuration file and recycle all servers. Obviously this is not a viable production strategy. Riak does support this with buckets.

Quorum Policies

Quorum policies – Voldemort has one, Cassandra has several many Consistency Levels:

  • Zero – Ensure nothing. A write happens asynchronously in background
  • Any – Ensure that the write has been written to at least 1 node
  • One – Ensure that the write has been written to at least 1 replica’s commit log and memory table before responding to the client
  • Quorom – Ensure that the write has been written to N / 2 + 1 replicas before responding to the client
  • DCQuorom – As above but takes into account the rack aware placement strategy
  • All – Ensure that the write is written to all N replicas before responding to the client

Hinted Hand-off

Cassandra and Voldemort both support hinted handoff. Riak also has suppport.

Cassandra:

If a node which should receive a write is down, Cassandra will write a hint to a live replica node indicating that the write needs to be replayed to the unavailable node. If no live replica nodes exist for this key, and ConsistencyLevel.ANY was specified, the coordinating node will write the hint locally. Cassandra uses hinted handoff as a way to (1) reduce the time required for a temporarily failed node to become consistent again with live ones and (2) provide extreme write availability when consistency is not required.

Voldemort:

Hinted Handoff is extremely useful when dealing with a multiple datacenter environment. However, work remains to make this feasible.

Riak:

Hinted handoff is a technique for dealing with node failure in the Riak cluster in which neighboring nodes temporarily takeover storage operations for the failed node. When the failed node returns to the cluster, the updates received by the neighboring nodes are handed off to it.

Hinted handoff allows Riak to ensure database availability. When a node fails, Riak can continue to handle requests as if the node were still there

Read Repair

Cassandra Read Repair

Read repair means that when a query is made against a given key, we perform that query against all the replicas of the key. If a low ConsistencyLevel was specified, this is done in the background after returning the data from the closest replica to the client; otherwise, it is done before returning the data.

This means that in almost all cases, at most the first instance of a query will return old data.

Voldemort

There are several methods for reaching consistency with different guarantees and performance tradeoffs.

Two-Phase Commit — This is a locking protocol that involves two rounds of co-ordination between machines. It perfectly consistent, but not failure tolerant, and very slow.

Paxos-style consensus — This is a protocol for coming to agreement on a value that is more failure tolerant.

Read-repair — The first two approaches prevent permanent inconsistency. This approach involves writing all inconsistent versions, and then at read-time detecting the conflict, and resolving the problems. This involves little co-ordination and is completely failure tolerant, but may require additional application logic to resolve conflicts.

Riak

Read repair occurs when a successful read occurs – that is, the quorum was met – but not all replicas from which the object was requested agreed on the value. There are two possibilities here for the errant nodes:

  1. The node responded with a “not found” for the object, meaning it doesn’t have a copy.
  2. The node responded with a vector clock that is an ancestor of the vector clock of the successful read.

When this situation occurs, Riak will force the errant nodes to update their object values based on the value of the successful read.

Version Conflict Resolution – Vector Clocks

Cassandra

Cassandra departs from the Dynamo paper by omitting vector clocks and moving from partition-based consistent hashing to key ranges, while adding functionality like order-preserving partitioners and range queries.  Source.

Voldemort

Voldemort uses Dynamo-style vector clocks for versioning.

Riak

Riak utilizes vector clocks (short: vclock) to handle version control. Since any node in a Riak cluster is able to handle a request, and not all nodes need to participate, data versioning is required to keep track of a current value. When a value is stored in Riak, it is tagged with a vector clock and establishes the initial version. When it is updated, the client provides the vector clock of the object being modified so that this vector clock can be extended to reflect the update. Riak can then compare vector clocks on different versions of the object and determine certain attributes of the data.

Other

Synchronous/asynchronous writes

For Voldemort, inserts of a key’s replicas are synchronous. Cassandra allows you to choose which policy best suits you. For cross-data center replication, synchronous updates can be extremely slow.

Caching

Cassandra caches data in-memory, periodically flushing to disk. Voldemort does not cache.

Initial Python Impressions

August 12, 2010

This blog is about my initial impressions of Python in the context of an “official Python production project”. I have long used Unix scripts for basic scripting needs, and occasionally used Python (Perl less so) for more substantial tasks but it has always been “unofficial”. My latest gig involved deploying a Python program to listen to incoming AWS SQS messages and dispatch them to a downstream processing engine (business logic, MySQL database).

Though Java has been my bread ‘n butter since its inception, I am firmly in the camp of language non-bigots. I was a coder long before Java, and it is hardly the only show in town. It basically boils down to the best tool for the task at hand. After all, it is all about tools – that’s what launched us Homo sapiens onto our current trajectory towards ultimate civilization.

Python is certainly enticing, and I fully appreciate its appeal. For example, there’s no question that a Python dictionary is so much more convenient to define than a Java map. For example:

mydict = []

instead of its Java equivalent  of:

Map<String,Integer> map = new HashMap<String,Integer>()

or the upcoming Java 7 syntax improvement with inferred typing:

Map<String,Integer> map = new HashMap()

You can also use Google Guava utilities to mitigate this issue for now.

It is obviously so much easier to “whip out” a Python program to execute some basic functionality than a Java equivalent. The crux of the dilemma is: convenience for developers vs. long-term operations concerns.

It basically boils down to two issues (not necessarily unrelated):

  • Type safety
  • Size of team

If you’re one developer or a tight group of like-minded developers, then type safety issues can be mitigated by convention and mind-meld. However, as soon as the team grows, and the life cycle of the application is extended (original developers are no longer involved in maintenance), then problems begin. Its hard to imagine a type-less language such as Python comparing to Java for a large-scale development team where unrelated developers  and hundreds of thousands of lines of code are involved.

For example, without explicit typing, new developers are forced to drill down into the source code to verify method signatures. Typically in the Java world this is handled by Javadoc, IDE magic or mere perusal of source signatures. In Python, you cannot merely look at a method’s source signature for there is none – you have to actually look at the entire method’s code and all its return values (cyclomatic complexity).

An interesting recent article precisely looks at these issues in the migration from Python to Java for Nuxeo’s CMS – see here.

In order to bullet-proof production code, the developer is forced to “play compiler”. To compensate for the lack of a compiler, much of the type-checking should be done by unit tests; these unit tests  would not exist in the Java world. These tests are basically accidental complexity – extra cost – and exist only for type safety.  Here the chickens come home to roost – the trade-off between developer ease of use and run-time stability. Senior Python developers have told me that the “safe” way is to check function return values by either using “isintance()” or checking for specific attributes with “hasattr()”. Whew! This just doesn’t “smell right” to me – too dependent on the whims of individuals. The stuff of nightmares for operations folks trying to discern what went wrong at 3 AM!

One particular place I noticed that this can cause run-time production problems is in the rarely executed “except” clause of a try/except (Java’s try/catch). I ran into unpleasant surprises due to Python’s inexplicable inability to conveniently cast different values in a print statement. Where Java easily concatenates distinct types, Python requires you to cast everything to a string with the str() function if you wish to use the “+” operator – using “,” you don’t, but formatting suffers. Whew, a bit of inconsistency I’d say. You’ll never know this is a problem until an error happens.

Another Python cultural issue that strikes me as “strange” is the lack of true multi-threading due to the GIL (Global Interpreter Lock) limitation. This limitation seems to be an arbitrary constraint due to to the BDL (Benevolent Dictator for Life). Sure, threading is a non-trivial issue – as any tool it can be used or abused. But to summarily dismiss it and force people to spawn processes strikes me as arbitrary and ultimately retro.

Threading concerns can divided into two basic types:

  • Threads that access shared resources that need to be synchronized. Care, diligence and discipline need to be exercised.
  • Threads that access external resources that require no synchronization Goetz et. al. in their seminal book Java Concurrency In Practice call these deferred computations. Since there is no synchronization, programmer complexity is greatly reduced.

It is the latter that is used more often, and thus more important. Forcing users to always spawn processes is unnecessary accidental complexity. For an interesting recent perspective on the subject, see Michele Simonato’s Artima post at Threads, processes and concurrency in Python: some thoughts.